Hls live (#2)

feat: live hls
This commit is contained in:
Evgeny Baranov 2024-04-24 19:05:33 +04:00 committed by GitHub
parent 305f291616
commit 5360a2d734
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 294 additions and 55 deletions

View File

@ -1,3 +1,6 @@
### va
Поддерживаемые операционные системы: Linux
### va-record
Функционал:
@ -63,8 +66,14 @@
Пример запроса HLS с помощью ffplay, время в UTC
```bash
ffplay http://localhost:8888/2024/04/17/05/index.m3u8
ffplay http://localhost:8888/camera-1/2024/04/17/05/index.m3u8
```
Пример запроса HLS с помощью ffplay (live)
```bash
ffplay http://localhost:8888/camera-1/index.m3u8
```
Задержка Live трансляции от 20 - 40 секунд при файлах (5 с)
Задержка Live трансляции от 40 - 60 секунд при файлах (10 с)
### Сборка
@ -72,5 +81,5 @@ ffplay http://localhost:8888/2024/04/17/05/index.m3u8
- boost версия 1.84 и выше
- ffmpeg версия 6.0 и выше
Сборка:
- Версия компилятора gcc-13.1.0 (C++20) и выше
Компилятор:
- Версия компилятора gcc-13.1.0 (C++20) и выше

View File

@ -8,9 +8,15 @@
#include <thread>
#include "../../version.h"
#include "../../watcher/watcher.h"
#include "playlist.h"
#include "server.h"
#include "settings.h"
#include "state/state.h"
#include "utils.h"
#include <map>
#include <queue>
#include <shared_mutex>
namespace opt = boost::program_options;
namespace logging = boost::log;
@ -18,6 +24,9 @@ namespace logging = boost::log;
va::StateApp state;
va::Settings settings;
std::map<std::string, va::PlayList> playlists;
std::shared_mutex mutex_playlists;
volatile static std::sig_atomic_t signal_num = -1;
void siginthandler(int param) {
signal_num = param;
@ -67,14 +76,57 @@ int main(int argc, char *argv[]) {
<< "ver." << VERSION_MAJOR << "." << VERSION_MINOR << "." << VERSION_PATCH;
BOOST_LOG_TRIVIAL(info) << "boost version " << BOOST_VERSION / 100000 << "." << BOOST_VERSION / 100 % 1000 << "."
<< BOOST_VERSION % 100;
std::jthread th_watcher([&](std::stop_token stoken) {
try {
va::Watcher watcher(settings.prefix_archive_path().c_str());
watcher.run(stoken, [&](std::string &path) {
if (path.ends_with(".ts")) {
auto path_without_prefix = path.replace(0, settings.prefix_archive_path().size() + 1, "");
auto parts = va::utils::split(path_without_prefix, '/');
if (!parts.empty()) {
auto stream_id = parts[0];
std::unique_lock lock(mutex_playlists);
auto it = playlists.find(stream_id);
if (it == playlists.end()) {
playlists[stream_id] = {0, {path}};
} else {
auto &queue_ref = playlists[stream_id].queue;
auto it = std::find(queue_ref.begin(), queue_ref.end(), path);
if (it == queue_ref.end()) {
if (queue_ref.size() > 3) {
++playlists[stream_id].index;
queue_ref.pop_front();
}
queue_ref.push_back(std::move(path_without_prefix));
}
}
}
}
});
} catch (std::exception &ex) {
BOOST_LOG_TRIVIAL(fatal) << ex.what();
BOOST_LOG_TRIVIAL(fatal) << "application will be stopped";
signal_num = SIGABRT;
state.stop_app();
}
});
boost::asio::io_context io_context;
std::thread th([&]() {
va::Server server(io_context, settings.port());
io_context.run();
});
state.wait_stop_app();
io_context.stop();
th.join();
if (th.joinable()) {
th.join();
}
th_watcher.request_stop();
if (th_watcher.joinable()) {
th_watcher.join();
}
BOOST_LOG_TRIVIAL(info) << "HLS has been stopped";
BOOST_LOG_TRIVIAL(info) << "application has been stopped";
return EXIT_SUCCESS;

11
va-hls/src/playlist.h Normal file
View File

@ -0,0 +1,11 @@
#pragma once
#include <deque>
#include <string>
namespace va {
struct PlayList {
size_t index = 0;
std::deque<std::string> queue;
};
} // namespace va

View File

@ -1,52 +1,53 @@
#include "utils.h"
#include "pico/picohttpparser.h"
#include "playlist.h"
#include "settings.h"
#include <algorithm>
#include <filesystem>
#include <format>
#include <fstream>
#include <iostream>
#include <map>
#include <set>
#include <shared_mutex>
#include <sstream>
#include <stdexcept>
#include <string>
#include <vector>
#include <variant>
namespace fs = std::filesystem;
extern va::Settings settings;
extern std::map<std::string, va::PlayList> playlists;
extern std::shared_mutex mutex_playlists;
const char *HEADER_PLAYLIST =
"#EXTM3U\n#EXT-X-VERSION:3\n#EXT-X-MEDIA-SEQUENCE:0\n#EXT-X-ALLOW-CACHE:YES\n#EXT-X-TARGETDURATION:10\n";
const char *HEADER_PLAYLIST = "#EXTM3U\n#EXT-X-VERSION:3\n#EXT-X-MEDIA-SEQUENCE:0\n#EXT-X-TARGETDURATION:5\n";
const char *FOOTER_PLAYLIST = "#EXT-X-ENDLIST";
namespace {
struct ParamsRequest {
enum class Tag { Archive, Live };
struct ParamsRequestArchive {
std::string stream_id;
std::string year;
std::string month;
std::string day;
std::string hour;
};
struct ParamsRequestLive {
std::string stream_id;
};
struct Stat {
double duration;
bool is_valid = false;
};
std::vector<std::string> split(const std::string &s, char delim) {
std::vector<std::string> elems;
std::stringstream ss(s);
std::string item;
while (std::getline(ss, item, delim)) {
elems.push_back(item);
inline std::pair<Tag, std::variant<ParamsRequestArchive, ParamsRequestLive>> parse_uri(const std::string &uri) {
auto elems = va::utils::split(uri, '/');
if (elems.size() == 7) {
return {Tag::Archive, ParamsRequestArchive{elems[1], elems[2], elems[3], elems[4], elems[5]}};
}
return elems;
}
inline ParamsRequest parse_uri(const std::string &uri) {
auto elems = split(uri, '/');
if (elems.size() != 7) {
throw std::runtime_error("could not parse uri, number of parameters is missing");
if (elems.size() == 3) {
return {Tag::Live, ParamsRequestLive{elems[1]}};
}
return {elems[1], elems[2], elems[3], elems[4], elems[5]};
throw std::runtime_error("could not parse uri, number of parameters is missing");
}
Stat parse_stat(const std::string &path) {
std::ifstream in(path);
@ -70,43 +71,85 @@ namespace {
} // namespace
namespace va {
std::optional<std::string> make_playlist(const std::string &uri) {
auto params = parse_uri(uri);
auto path = std::format("{}/{}/{}/{}/{}/{}", settings.prefix_archive_path(), params.stream_id, params.year,
params.month, params.day, params.hour);
std::set<fs::path> files, files_stat;
for (const auto &entry : fs::directory_iterator(path)) {
auto file = entry.path();
if (file.extension() == ".ts") {
files.emplace(file.stem());
}
if (file.extension() == ".stat") {
files_stat.emplace(file.stem());
namespace utils {
std::vector<std::string> split(const std::string &s, char delim) {
std::vector<std::string> elems;
std::stringstream ss(s);
std::string item;
while (std::getline(ss, item, delim)) {
elems.push_back(item);
}
return elems;
}
std::vector<fs::path> intersection;
/*
* В playlist включаются только файлы для которых известна их продолжительность,
* то есть присутствие файла filename.stat
*/
std::set_intersection(files.begin(), files.end(), files_stat.begin(), files_stat.end(),
std::back_inserter(intersection));
if (intersection.empty()) {
} // namespace utils
std::optional<std::string> make_playlist(const std::string &uri) {
auto [tag, variant] = parse_uri(uri);
if (tag == Tag::Archive) {
std::string playlist(HEADER_PLAYLIST);
auto params = std::get<ParamsRequestArchive>(variant);
auto path = std::format("{}/{}/{}/{}/{}/{}", settings.prefix_archive_path(), params.stream_id, params.year,
params.month, params.day, params.hour);
std::set<fs::path> files, files_stat;
for (const auto &entry : fs::directory_iterator(path)) {
auto file = entry.path();
if (file.extension() == ".ts") {
files.emplace(file.stem());
}
if (file.extension() == ".stat") {
files_stat.emplace(file.stem());
}
}
std::vector<fs::path> intersection;
/*
* В playlist включаются только файлы для которых известна их продолжительность,
* то есть присутствие файла filename.stat
*/
std::set_intersection(files.begin(), files.end(), files_stat.begin(), files_stat.end(),
std::back_inserter(intersection));
if (intersection.empty()) {
return std::nullopt;
}
for (auto filename : intersection) {
auto path_stat =
std::format("{}/{}/{}/{}/{}/{}/{}.stat", settings.prefix_archive_path(), params.stream_id,
params.year, params.month, params.day, params.hour, filename.c_str());
auto stat = parse_stat(path_stat);
if (stat.is_valid) {
auto path = std::format("{}.ts", filename.c_str());
auto item = std::format("#EXTINF:{}\n{}\n", stat.duration, path);
playlist.append(item);
}
}
playlist.append(FOOTER_PLAYLIST);
return playlist;
} else if (tag == Tag::Live) {
auto params = std::get<ParamsRequestLive>(variant);
std::shared_lock lock(mutex_playlists);
std::string playlist(
std::format("#EXTM3U\n#EXT-X-VERSION:3\n#EXT-X-MEDIA-SEQUENCE:{}\n#EXT-X-TARGETDURATION:5\n",
playlists[params.stream_id].index));
auto it = playlists.find(params.stream_id);
if (it == playlists.end()) {
return std::nullopt;
}
auto &queue_ref = playlists[params.stream_id].queue;
for (auto path : queue_ref) {
auto path_stat = std::format("{}/{}", settings.prefix_archive_path(), path);
path_stat = path_stat.replace(path_stat.size() - 2, params.stream_id.size(), "stat");
std::cout << path_stat << std::endl;
auto stat = parse_stat(path_stat);
if (stat.is_valid) {
auto path_without_prefix = path.replace(0, params.stream_id.size() + 1, "");
auto item = std::format("#EXTINF:{}\n{}\n", stat.duration, path);
playlist.append(item);
}
}
return playlist;
} else {
return std::nullopt;
}
std::string playlist(HEADER_PLAYLIST);
for (auto filename : intersection) {
auto path_stat = std::format("{}/{}/{}/{}/{}/{}/{}.stat", settings.prefix_archive_path(), params.stream_id,
params.year, params.month, params.day, params.hour, filename.c_str());
auto stat = parse_stat(path_stat);
if (stat.is_valid) {
auto path = std::format("{}.ts", filename.c_str());
auto item = std::format("#EXTINF:{}\n{}\n", stat.duration, path);
playlist.append(item);
}
}
playlist.append(FOOTER_PLAYLIST);
return {playlist};
}
std::string fetch_uri(const std::string &req) {
std::string uri;

View File

@ -2,8 +2,15 @@
#include <optional>
#include <string>
#include <vector>
namespace va {
std::optional<std::string> make_playlist(const std::string &uri);
std::string fetch_uri(const std::string &req);
} // namespace va
namespace va {
namespace utils {
std::vector<std::string> split(const std::string &s, char delim);
}
} // namespace va

117
watcher/watcher.h Normal file
View File

@ -0,0 +1,117 @@
#pragma once
#include <boost/log/trivial.hpp>
#include <filesystem>
#include <format>
#include <map>
#include <stdexcept>
#include <stdlib.h>
#include <sys/fcntl.h>
#include <sys/inotify.h>
#include <sys/select.h>
namespace va {
enum class FileStatus { created, modified, erased };
struct EventWatcher {
std::string filename;
FileStatus kind;
};
class Watcher {
public:
Watcher(const char *path) {
notifyfd_ = inotify_init();
if (notifyfd_ < 0) {
throw std::runtime_error("could not init notify");
}
fcntl(notifyfd_, F_SETFL, O_NONBLOCK);
auto wd = inotify_add_watch(notifyfd_, path, IN_CREATE | IN_CLOSE_WRITE);
if (wd < 0) {
auto err_msg = std::format("could not add watch for \'{}\'", path);
throw std::runtime_error(err_msg);
}
watchfds_.insert(std::make_pair(wd, std::string(path)));
for (auto &file : std::filesystem::recursive_directory_iterator(path)) {
if (file.is_directory()) {
wd = inotify_add_watch(notifyfd_, file.path().c_str(), IN_CREATE | IN_CLOSE_WRITE);
if (wd < 0) {
auto err_msg = std::format("could not add watch for \'{}\'", file.path().c_str());
throw std::runtime_error(err_msg);
}
watchfds_.insert(std::make_pair(wd, file.path().string()));
}
}
BOOST_LOG_TRIVIAL(info) << "watch for path \"" << path << "\"";
}
~Watcher() {
if (notifyfd_ >= 0) {
for (auto [wd, filename] : watchfds_) {
inotify_rm_watch(notifyfd_, wd);
}
close(notifyfd_);
}
BOOST_LOG_TRIVIAL(info) << "cleanup watch";
}
template <typename Func> void run(std::stop_token stoken, const Func &action) {
while (!stoken.stop_requested()) {
struct timeval timeout = {};
timeout.tv_sec = 2;
FD_ZERO(&readfds_);
FD_SET(notifyfd_, &readfds_);
int ret = select(notifyfd_ + 1, &readfds_, &writefds_, NULL, &timeout);
if (-1 == ret) {
throw std::runtime_error("could not invoke select");
} else if (0 == ret) {
continue;
} else {
char buffer[sizeof(struct inotify_event) + NAME_MAX + 1] alignas(struct inotify_event);
ssize_t count = read(notifyfd_, buffer, sizeof(buffer));
if (count < 0) {
break;
}
ssize_t i = 0;
while (i < count) {
const struct inotify_event *event = reinterpret_cast<const struct inotify_event *>(&buffer[i]);
if (event->mask & IN_ISDIR) {
std::string filename(event->name);
auto prefix = watchfds_[event->wd];
auto path = std::format("{}/{}", prefix, filename);
auto wd = inotify_add_watch(notifyfd_, path.c_str(), IN_CREATE | IN_CLOSE_WRITE);
if (wd < 0) {
auto err_msg = std::format("could not add watch for \'{}\'", filename);
throw std::runtime_error(err_msg);
}
watchfds_.insert(std::make_pair(wd, path));
for (auto &file : std::filesystem::recursive_directory_iterator(path)) {
if (file.is_directory()) {
wd = inotify_add_watch(notifyfd_, file.path().c_str(), IN_CREATE | IN_CLOSE_WRITE);
if (wd < 0) {
auto err_msg =
std::format("could not add watch for \'{}\'", file.path().c_str());
throw std::runtime_error(err_msg);
}
watchfds_.insert(std::make_pair(wd, file.path().string()));
}
}
}
if (event->mask & IN_CLOSE_WRITE) {
std::string filename(event->name);
auto prefix = watchfds_[event->wd];
auto path = std::format("{}/{}", prefix, filename);
BOOST_LOG_TRIVIAL(debug) << "new watch file \"" << path << "\"";
action(path);
}
i += static_cast<ssize_t>(sizeof(inotify_event)) + event->len;
}
}
}
}
private:
int notifyfd_ = -1;
std::map<int, std::string> watchfds_;
fd_set readfds_;
fd_set writefds_;
};
} // namespace va