feat: implementation inference (#3)

This commit is contained in:
Evgeny Baranov 2024-05-01 19:41:41 +04:00 committed by GitHub
parent 29489a1b9f
commit 3e7132bec4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 573 additions and 129 deletions

View File

@ -6,6 +6,7 @@ configure_file (version.h.in ${CMAKE_CURRENT_SOURCE_DIR}/version.h @ONLY)
add_subdirectory(va-recorder dist/va-recorder)
add_subdirectory(va-hls dist/va-hls)
add_subdirectory(va-inference dist/va-inference)
if (TEST)
enable_testing()
add_subdirectory(tests tests)

View File

@ -49,6 +49,7 @@
Функционал:
- Реализация HLS сервера для архивного видео. Запрос видео делается за конкретный час архива
- Реализация HLS сервера для live видео
Параметры запуска:
- префикс пути для хранения видео архива, по умолчанию /tmp/va
@ -76,6 +77,35 @@ ffplay http://localhost:8888/camera-1/index.m3u8
Задержка Live трансляции от 40 - 60 секунд при файлах (10 с)
### va-inference
Функционал:
- Отправка данных на удаленный инференс, получение результата и сохранение в файл рядом с видео файлом
Параметры запуска:
- префикс пути для хранения видео архива, по умолчанию /tmp/va
```bash
--prefix-archive-path
```
- адрес инференс сервиса, по умолчанию localhost
```bash
--inference-server-address
```
- порт инференс сервиса, по умолчанию 3030
```bash
--inference-server-port
```
- количество паралелльных потоков инференса, по умолчанию 1
```bash
--num-threads
```
- уровень логирования приложения (debug, info, warning, error), по умолчанию info
```bash
--logging-level
```
В репозитории на данный момент присутствиет пример для инференса на YOLO V5
### Сборка
Зависимости:

View File

@ -15,8 +15,17 @@ namespace va {
StateApp &operator=(const StateApp &) = delete;
StateApp(StateApp &&) = delete;
StateApp &operator=(StateApp &&) = delete;
void stop_app();
void wait_stop_app();
void stop_app() {
std::unique_lock<std::mutex> lock(is_stop_mutex_);
is_stop_.test_and_set(std::memory_order_acquire);
is_stop_condvar_.notify_one();
}
void wait_stop_app() {
std::unique_lock<std::mutex> lock(is_stop_mutex_);
while (!is_stop_.test_and_set()) {
is_stop_condvar_.wait(lock);
}
}
private:
std::condition_variable is_stop_condvar_;

View File

@ -0,0 +1,74 @@
#pragma once
#include <condition_variable>
#include <memory>
#include <mutex>
#include <queue>
namespace va {
template <typename T> class ThreadSafeQueue {
public:
ThreadSafeQueue() {
}
ThreadSafeQueue(const ThreadSafeQueue &other) : std::lock_guard<std::mutex>(other.mutex), queue_(other.queue_) {
}
ThreadSafeQueue &operator=(const ThreadSafeQueue &other) = delete;
void push(T new_value) {
std::lock_guard<std::mutex> lock{mutex_};
queue_.push(new_value);
cond_.notify_one();
}
void wait_and_pop(T &value) {
std::unique_lock<std::mutex> lock{mutex_};
cond_.wait(lock, [this] { return !queue_.empty(); });
value = queue_.front();
queue_.pop();
}
std::shared_ptr<T> wait_and_pop() {
std::unique_lock<std::mutex> lock{mutex_};
cond_.wait(lock, [this] { return !queue_.empty(); });
std::shared_ptr<T> res{std::make_shared<T>(queue_.front())};
queue_.pop();
return res;
}
bool try_pop(T &value) {
std::lock_guard<std::mutex> lock{mutex_};
if (queue_.empty())
return false;
value = queue_.front();
queue_.pop();
return true;
}
std::shared_ptr<T> try_pop() {
std::lock_guard<std::mutex> lock{mutex_};
if (queue_.empty())
return std::shared_ptr<T>{};
std::shared_ptr<T> res{std::make_shared<T>(queue_.front())};
queue_.pop();
return res;
}
bool empty() const {
std::lock_guard<std::mutex> lock{mutex_};
return queue_.empty();
}
size_t size() const {
std::lock_guard<std::mutex> lock{mutex_};
return queue_.size();
}
private:
mutable std::mutex mutex_;
std::condition_variable cond_;
std::queue<T> queue_;
};
} // namespace va

45
common/utils.h Normal file
View File

@ -0,0 +1,45 @@
#pragma once
#include <fstream>
#include <sstream>
#include <string>
#include <vector>
namespace va {
namespace utils {
struct Metatada {
double duration;
bool is_valid = false;
};
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-function"
static Metatada parse_metadatat_file(const std::string &path) {
std::ifstream in(path);
std::string start_marker, end_marker;
double duration;
in >> start_marker;
if (!in) {
return {0, false};
}
in >> duration;
if (!in) {
return {0, false};
}
in >> end_marker;
if (in && start_marker == "START_DATA" && end_marker == "END_DATA") {
return {duration, true};
}
return {0, false};
}
static std::vector<std::string> split(const std::string &s, char delim) {
std::vector<std::string> elems;
std::stringstream ss(s);
std::string item;
while (std::getline(ss, item, delim)) {
elems.push_back(item);
}
return elems;
}
} // namespace utils
#pragma GCC diagnostic pop
} // namespace va

View File

@ -0,0 +1,69 @@
import torch
import socket
from io import BytesIO
from PIL import Image
import json
import time
import uuid
import cv2
from pathlib import Path
from _thread import *
import threading
def threaded(conn, model):
with conn:
buffer = BytesIO()
buffer_len = -1
while True:
data = conn.recv(4096)
if not data:
break
if buffer_len == -1:
buffer_len = int.from_bytes(data[:8], byteorder='little')
buffer.write(data[8:])
else:
buffer.write(data)
if buffer.getbuffer().nbytes == buffer_len:
file_tmp = Path(str(uuid.uuid4()))
file_tmp.write_bytes(buffer.getbuffer())
cap=cv2.VideoCapture(file_tmp.name);
count = 0
json_data_list = [];
while(cap.isOpened()):
success, frame = cap.read()
if success:
ret,buffer = cv2.imencode('.jpg',frame)
if ret:
count += 1
img = Image.open(BytesIO(buffer.tobytes()))
results = model(img)
pred = results.pandas().xyxy[0]
inferences = list()
for index, row in pred.iterrows():
dict_object = dict(name=row['name'], cls=row['class'], confidence=row['confidence'], xmin=int(row['xmin']), ymin=int(row['ymin']), xmax=int(row['xmax']), ymax=int(row['ymax']))
inferences.append(dict_object)
# print(count, row['name'], row['class'], row['confidence'], int(row['xmin']), int(row['ymin']), int(row['xmax']), int(row['ymax']))
json_data_list.append(inferences)
else:
break
conn.sendall(bytes(json.dumps(json_data_list), 'utf-8'))
conn.sendall(bytes('\r\n\r\n', 'utf-8'))
file_tmp.unlink()
cap.release()
def main():
# Model
model = torch.hub.load('ultralytics/yolov5', 'yolov5s') # or yolov5n - yolov5x6, custom
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
s.bind(('localhost', 3030))
s.listen()
with s:
while True:
conn, addr = s.accept()
start_new_thread(threaded, (conn, model))
if __name__ == '__main__':
main()

View File

@ -1,5 +1,5 @@
cmake_minimum_required(VERSION 3.21)
project(va-hls VERSION 0.1.0)
project(va-hls)
set(SRC_FILES src/pico/picohttpparser.c src/pico/picohttpparser.h)
add_library(pico STATIC ${SRC_FILES})
@ -58,7 +58,6 @@ set(SRC_FILES
src/main.cpp
src/server.cpp
src/session.cpp
src/state/state.cpp
src/utils.cpp
)

View File

@ -7,12 +7,13 @@
#include <iostream>
#include <thread>
#include "../../common/state.h"
#include "../../common/utils.h"
#include "../../version.h"
#include "../../watcher/watcher.h"
#include "playlist.h"
#include "server.h"
#include "settings.h"
#include "state/state.h"
#include "utils.h"
#include <map>
#include <queue>

View File

@ -1,15 +0,0 @@
#include "state.h"
namespace va {
void StateApp::stop_app() {
std::unique_lock<std::mutex> lock(is_stop_mutex_);
is_stop_.test_and_set(std::memory_order_acquire);
is_stop_condvar_.notify_one();
}
void StateApp::wait_stop_app() {
std::unique_lock<std::mutex> lock(is_stop_mutex_);
while (!is_stop_.test_and_set()) {
is_stop_condvar_.wait(lock);
}
}
} // namespace va

View File

@ -1,4 +1,5 @@
#include "utils.h"
#include "../../common/utils.h"
#include "pico/picohttpparser.h"
#include "playlist.h"
#include "settings.h"
@ -35,10 +36,7 @@ namespace {
struct ParamsRequestLive {
std::string stream_id;
};
struct Stat {
double duration;
bool is_valid = false;
};
inline std::pair<Tag, std::variant<ParamsRequestArchive, ParamsRequestLive>> parse_uri(const std::string &uri) {
auto elems = va::utils::split(uri, '/');
if (elems.size() == 7) {
@ -49,39 +47,11 @@ namespace {
}
throw std::runtime_error("could not parse uri, number of parameters is missing");
}
Stat parse_stat(const std::string &path) {
std::ifstream in(path);
std::string start_marker, end_marker;
double duration;
in >> start_marker;
if (!in) {
return {0, false};
}
in >> duration;
if (!in) {
return {0, false};
}
in >> end_marker;
if (in && start_marker == "START_DATA" && end_marker == "END_DATA") {
return {duration, true};
}
return {0, false};
}
} // namespace
namespace va {
namespace utils {
std::vector<std::string> split(const std::string &s, char delim) {
std::vector<std::string> elems;
std::stringstream ss(s);
std::string item;
while (std::getline(ss, item, delim)) {
elems.push_back(item);
}
return elems;
}
} // namespace utils
// namespace utils
std::optional<std::string> make_playlist(const std::string &uri) {
auto [tag, variant] = parse_uri(uri);
if (tag == Tag::Archive) {
@ -90,35 +60,35 @@ namespace va {
auto path = std::format("{}/{}/{}/{}/{}/{}", settings.prefix_archive_path(), params.stream_id, params.year,
params.month, params.day, params.hour);
std::set<fs::path> files, files_stat;
std::set<fs::path> files, files_meta;
for (const auto &entry : fs::directory_iterator(path)) {
auto file = entry.path();
if (file.extension() == ".ts") {
files.emplace(file.stem());
}
if (file.extension() == ".stat") {
files_stat.emplace(file.stem());
if (file.extension() == ".meta") {
files_meta.emplace(file.stem());
}
}
std::vector<fs::path> intersection;
/*
* В playlist включаются только файлы для которых известна их продолжительность,
* то есть присутствие файла filename.stat
* то есть присутствие файла filename.meta
*/
std::set_intersection(files.begin(), files.end(), files_stat.begin(), files_stat.end(),
std::set_intersection(files.begin(), files.end(), files_meta.begin(), files_meta.end(),
std::back_inserter(intersection));
if (intersection.empty()) {
return std::nullopt;
}
for (auto filename : intersection) {
auto path_stat =
std::format("{}/{}/{}/{}/{}/{}/{}.stat", settings.prefix_archive_path(), params.stream_id,
auto path_meta =
std::format("{}/{}/{}/{}/{}/{}/{}.meta", settings.prefix_archive_path(), params.stream_id,
params.year, params.month, params.day, params.hour, filename.c_str());
auto stat = parse_stat(path_stat);
if (stat.is_valid) {
auto meta = va::utils::parse_metadatat_file(path_meta);
if (meta.is_valid) {
auto path = std::format("{}.ts", filename.c_str());
auto item = std::format("#EXTINF:{}\n{}\n", stat.duration, path);
auto item = std::format("#EXTINF:{}\n{}\n", meta.duration, path);
playlist.append(item);
}
}
@ -136,13 +106,12 @@ namespace va {
}
auto &queue_ref = playlists[params.stream_id].queue;
for (auto path : queue_ref) {
auto path_stat = std::format("{}/{}", settings.prefix_archive_path(), path);
path_stat = path_stat.replace(path_stat.size() - 2, params.stream_id.size(), "stat");
std::cout << path_stat << std::endl;
auto stat = parse_stat(path_stat);
if (stat.is_valid) {
auto path_meta = std::format("{}/{}", settings.prefix_archive_path(), path);
path_meta = path_meta.replace(path_meta.size() - 2, params.stream_id.size(), "meta");
auto meta = va::utils::parse_metadatat_file(path_meta);
if (meta.is_valid) {
auto path_without_prefix = path.replace(0, params.stream_id.size() + 1, "");
auto item = std::format("#EXTINF:{}\n{}\n", stat.duration, path);
auto item = std::format("#EXTINF:{}\n{}\n", meta.duration, path);
playlist.append(item);
}
}

View File

@ -7,10 +7,4 @@
namespace va {
std::optional<std::string> make_playlist(const std::string &uri);
std::string fetch_uri(const std::string &req);
} // namespace va
namespace va {
namespace utils {
std::vector<std::string> split(const std::string &s, char delim);
}
} // namespace va

View File

@ -0,0 +1,61 @@
cmake_minimum_required(VERSION 3.21)
project(va-inference)
set(CMAKE_CXX_STANDARD 20)
if (SANITIZER)
set(CMAKE_CXX_FLAGS "-g -O0")
add_compile_options(-fno-omit-frame-pointer)
add_compile_options(
-fsanitize=address
-fsanitize=leak
-fsanitize=undefined
-fsanitize=null
-fsanitize=shift
-fsanitize=unreachable
-fsanitize=signed-integer-overflow
)
add_link_options(
-fsanitize=address
-fsanitize=leak
-fsanitize=undefined
-fsanitize=null
-fsanitize=shift
-fsanitize=unreachable
-fsanitize=signed-integer-overflow
)
else()
set(CMAKE_CXX_FLAGS "-O2")
add_compile_options(
-Werror
-Wall
-Wextra
-Wpedantic
-Wcast-align
-Wcast-qual
-Wconversion
-Wctor-dtor-privacy
-Wenum-compare
-Wfloat-equal
-Wnon-virtual-dtor
-Wold-style-cast
-Woverloaded-virtual
-Wredundant-decls
-Wsign-conversion
-Wsign-promo
-Wunused
-Wunused-result
)
endif ()
find_package( Boost COMPONENTS log program_options system thread REQUIRED)
include_directories( ${Boost_INCLUDE_DIR} )
set(SRC_FILES
src/main.cpp
src/executor.cpp
)
add_executable(${PROJECT_NAME} ${SRC_FILES})
target_link_libraries(${PROJECT_NAME} ${Boost_LIBRARIES})

View File

@ -0,0 +1,87 @@
#include "executor.h"
#include "../../common/thread-safe-queue.h"
#include <boost/asio.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/streambuf.hpp>
#include <boost/log/trivial.hpp>
#include <chrono>
#include <filesystem>
#include <fstream>
#include <iostream>
#include <string>
namespace fs = std::filesystem;
extern va::ThreadSafeQueue<std::string> queue;
namespace va {
Executor::Executor(const Settings &settings, std::stop_token stoken)
: inference_uri_(settings.inference_server_uri()), inference_port_(settings.inference_server_port()) {
auto num_threads = settings.num_threads();
while (num_threads > 0) {
threads_.emplace_back(&Executor::inference, this, stoken);
--num_threads;
}
}
void Executor::send(std::vector<char> &buffer, std::string &&path) {
using boost::asio::ip::tcp;
boost::asio::io_context io_context;
tcp::socket socket(io_context);
tcp::resolver resolver(io_context);
boost::asio::connect(socket, resolver.resolve(inference_uri_, std::to_string(inference_port_)));
auto buffer_len = static_cast<int64_t>(buffer.size());
char bytes[8] = {0};
std::copy(static_cast<const char *>(static_cast<const void *>(&buffer_len)),
static_cast<const char *>(static_cast<const void *>(&buffer_len)) + sizeof(buffer_len), bytes);
boost::asio::write(socket, boost::asio::buffer(bytes));
boost::asio::write(socket, boost::asio::buffer(buffer));
boost::asio::streambuf buf;
boost::asio::read_until(socket, buf, "\r\n\r\n");
std::string s((std::istreambuf_iterator<char>(&buf)), std::istreambuf_iterator<char>());
size_t start{path.find(".ts")};
fs::path path_inference{path.replace(start, 3, ".infe")};
std::ofstream ofs(path);
ofs << s;
boost::system::error_code ec;
socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
socket.close();
if (!ofs) {
auto err_msg = std::format("could not write to file {}", path);
throw std::runtime_error(err_msg);
}
}
void Executor::inference(std::stop_token stoken) {
while (!stoken.stop_requested()) {
auto path = queue.try_pop();
if (!path) {
std::this_thread::sleep_for(std::chrono::seconds(1));
continue;
}
std::chrono::steady_clock::time_point begin = std::chrono::steady_clock::now();
BOOST_LOG_TRIVIAL(debug) << "Start inference file (" << *path << ")";
std::ifstream input(*path, std::ios::binary);
if (input) {
// copies all data into buffer
std::vector<char> buffer(std::istreambuf_iterator<char>(input), {});
try {
send(buffer, std::string(path->begin(), path->end()));
std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now();
BOOST_LOG_TRIVIAL(debug)
<< "Finish inference file (" << *path << "), elapsed time = "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end - begin).count() << " ms";
} catch (std::exception &ex) {
std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now();
BOOST_LOG_TRIVIAL(error)
<< "Finish inference file (" << *path << "), elapsed time = "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end - begin).count()
<< " ms, with error " << ex.what();
}
} else {
BOOST_LOG_TRIVIAL(error) << "Error inference file (" << *path << "), because could not open file";
}
}
}
} // namespace va

View File

@ -0,0 +1,25 @@
#pragma once
#include "settings.h"
#include <thread>
#include <vector>
namespace va {
class Executor {
public:
Executor(const Settings &settings, std::stop_token stoken);
void inference(std::stop_token stoken);
void send(std::vector<char> &buffer, std::string &&path);
~Executor() {
for (auto &thread : threads_) {
if (thread.joinable()) {
thread.join();
}
}
}
private:
std::vector<std::thread> threads_;
const std::string inference_uri_;
const int16_t inference_port_;
};
} // namespace va

103
va-inference/src/main.cpp Normal file
View File

@ -0,0 +1,103 @@
#include <boost/log/core.hpp>
#include <boost/log/expressions.hpp>
#include <boost/log/trivial.hpp>
#include <boost/program_options.hpp>
#include <boost/version.hpp>
#include <csignal>
#include <cstdlib>
#include <iostream>
#include <thread>
#include "../../common/state.h"
#include "../../common/thread-safe-queue.h"
#include "../../common/utils.h"
#include "../../version.h"
#include "../../watcher/watcher.h"
#include "executor.h"
#include "settings.h"
namespace opt = boost::program_options;
namespace logging = boost::log;
va::StateApp state;
va::Settings settings;
va::ThreadSafeQueue<std::string> queue;
volatile static std::sig_atomic_t signal_num = -1;
void siginthandler(int param) {
signal_num = param;
state.stop_app();
BOOST_LOG_TRIVIAL(info) << "stop signal has been received (" << param << ")";
}
void init_logging(const std::string &level) {
if (level == "debug") {
logging::core::get()->set_filter(logging::trivial::severity >= logging::trivial::debug);
} else if (level == "info") {
logging::core::get()->set_filter(logging::trivial::severity >= logging::trivial::info);
} else if (level == "warning") {
logging::core::get()->set_filter(logging::trivial::severity >= logging::trivial::warning);
} else if (level == "error") {
logging::core::get()->set_filter(logging::trivial::severity >= logging::trivial::error);
} else {
throw std::runtime_error("unknow logging level");
}
}
int main(int argc, char *argv[]) {
try {
opt::options_description desc("all options");
desc.add_options()("prefix-archive-path", opt::value<std::string>()->default_value("/tmp/va"),
"prefix of the path for saving video files, default '/tmp/va'");
desc.add_options()("inference-server-address", opt::value<std::string>()->default_value("localhost"),
"inference server address, default localhost");
desc.add_options()("inference-server-port", opt::value<int16_t>()->default_value(3030),
"inference server port, default 3030");
desc.add_options()("num-threads", opt::value<size_t>()->default_value(1),
"number of parallel threads, default 1");
desc.add_options()("logging-level", opt::value<std::string>()->default_value("info"),
"logging level (debug, info, warning, error), default 'info'");
desc.add_options()("help,h", "help");
opt::variables_map vm;
opt::store(opt::parse_command_line(argc, argv, desc), vm);
opt::notify(vm);
if (vm.count("help")) {
std::cout << desc << "\n";
return EXIT_FAILURE;
}
init_logging(vm["logging-level"].as<std::string>());
settings.prefix_archvie_path(vm["prefix-archive-path"].as<std::string>());
settings.inference_server_address(vm["inference-server-address"].as<std::string>());
settings.inference_server_port(vm["inference-server-port"].as<int16_t>());
settings.num_threads(vm["num-threads"].as<size_t>());
} catch (std::exception &ex) {
BOOST_LOG_TRIVIAL(fatal) << "parse params error: " << ex.what();
return EXIT_FAILURE;
}
signal(SIGINT, siginthandler);
BOOST_LOG_TRIVIAL(info) << "module va-inference has been started "
<< "ver." << VERSION_MAJOR << "." << VERSION_MINOR << "." << VERSION_PATCH;
BOOST_LOG_TRIVIAL(info) << "boost version " << BOOST_VERSION / 100000 << "." << BOOST_VERSION / 100 % 1000 << "."
<< BOOST_VERSION % 100;
std::jthread th_watcher([&](std::stop_token stoken) {
try {
va::Watcher watcher(settings.prefix_archive_path().c_str());
watcher.run(stoken, [&](std::string &path) {
if (path.ends_with(".ts")) {
queue.push(path);
}
});
} catch (std::exception &ex) {
BOOST_LOG_TRIVIAL(fatal) << ex.what();
BOOST_LOG_TRIVIAL(fatal) << "application will be stopped";
signal_num = SIGABRT;
state.stop_app();
}
});
std::jthread th_executor([&](std::stop_token stoken) { va::Executor executor(settings, stoken); });
state.wait_stop_app();
th_watcher.request_stop();
th_executor.request_stop();
BOOST_LOG_TRIVIAL(info) << "application has been stopped";
return EXIT_SUCCESS;
}

View File

@ -0,0 +1,40 @@
#pragma once
#include <string>
namespace va {
class Settings final {
public:
const std::string &prefix_archive_path() const & {
return prefix_archive_path_;
}
void prefix_archvie_path(const std::string &path) {
prefix_archive_path_ = path;
}
const std::string &inference_server_uri() const & {
return inference_server_address_;
}
void inference_server_address(const std::string &address) {
inference_server_address_ = address;
}
int16_t inference_server_port() const {
return inference_server_port_;
}
void inference_server_port(int16_t port) {
inference_server_port_ = port;
}
size_t num_threads() const {
return num_threads_;
}
void num_threads(size_t num_threads) {
num_threads_ = num_threads;
}
private:
std::string prefix_archive_path_;
std::string inference_server_address_;
int16_t inference_server_port_;
size_t num_threads_;
};
} // namespace va

View File

@ -1,5 +1,5 @@
cmake_minimum_required(VERSION 3.21)
project(va-recorder VERSION 0.2.0)
project(va-recorder)
set(CMAKE_CXX_STANDARD 20)
@ -59,7 +59,6 @@ set(SRC_FILES
src/utils.cpp
src/app.cpp
src/source/source.cpp
src/state/state.cpp
src/capture/ffmpeg/avformat.cpp
src/capture/ffmpeg/avpacket.cpp
src/capture/ffmpeg/avframe.cpp

View File

@ -16,7 +16,7 @@ namespace va {
if (start_pts_pkt_ != AV_NOPTS_VALUE && current_pts_pkt_ != AV_NOPTS_VALUE && time_base_) {
auto time_base = time_base_.value();
size_t start{current_file_.rfind(".ts")};
fs::path path{current_file_.replace(start, 3, ".stat")};
fs::path path{current_file_.replace(start, 3, ".meta")};
std::ofstream ofs(path);
ofs << "START_DATA" << std::endl;
ofs << static_cast<double>((current_pts_pkt_ - start_pts_pkt_)) * time_base.num / time_base.den
@ -26,7 +26,7 @@ namespace va {
} else {
// TODO: Предполагаю длительность файла равна duration_file_
size_t start{current_file_.rfind(".ts")};
fs::path path{current_file_.replace(start, 3, ".stat")};
fs::path path{current_file_.replace(start, 3, ".meta")};
std::ofstream ofs(path);
ofs << static_cast<double>(duration_file_);
}

View File

@ -26,13 +26,13 @@ namespace va {
if (start_pts_pkt_ != AV_NOPTS_VALUE && current_pts_pkt_ != AV_NOPTS_VALUE && time_base_) {
auto time_base = time_base_.value();
size_t start{current_file_.find(".ts")};
fs::path path{current_file_.replace(start, 3, ".stat")};
fs::path path{current_file_.replace(start, 3, ".meta")};
std::ofstream ofs(path);
ofs << static_cast<double>((current_pts_pkt_ - start_pts_pkt_)) * time_base.num / time_base.den;
} else {
// TODO: Предполагаю длительность файла равна duration_file_
size_t start{current_file_.find(".ts")};
fs::path path{current_file_.replace(start, 3, ".stat")};
fs::path path{current_file_.replace(start, 3, ".meta")};
std::ofstream ofs(path);
ofs << static_cast<double>(duration_file_);
}

View File

@ -1,8 +1,8 @@
#include "../../common/state.h"
#include "app.h"
#include "capture/capture.h"
#include "capture/ffmpeg/backend.h"
#include "source/source.h"
#include "state/state.h"
#include <boost/log/core.hpp>
#include <boost/log/expressions.hpp>
#include <boost/log/trivial.hpp>

View File

@ -1,15 +0,0 @@
#include "state.h"
namespace va {
void StateApp::stop_app() {
std::unique_lock<std::mutex> lock(is_stop_mutex_);
is_stop_.test_and_set(std::memory_order_acquire);
is_stop_condvar_.notify_one();
}
void StateApp::wait_stop_app() {
std::unique_lock<std::mutex> lock(is_stop_mutex_);
while (!is_stop_.test()) {
is_stop_condvar_.wait(lock);
}
}
} // namespace va

View File

@ -1,32 +0,0 @@
#pragma once
#include <boost/log/trivial.hpp>
#include <boost/version.hpp>
#include <condition_variable>
#include <mutex>
extern "C" {
#include <libavdevice/avdevice.h>
}
namespace va {
class StateApp final {
public:
StateApp() {
}
StateApp(const StateApp &) = delete;
StateApp &operator=(const StateApp &) = delete;
StateApp(StateApp &&) = delete;
StateApp &operator=(StateApp &&) = delete;
void stop_app();
void wait_stop_app();
bool is_stop() {
return is_stop_.test();
}
private:
std::condition_variable is_stop_condvar_;
std::mutex is_stop_mutex_;
std::atomic_flag is_stop_ = ATOMIC_FLAG_INIT;
};
} // namespace va