| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698 |
- #include "utils_frame_queue.h"
- #include "../base/logger.h"
- #include <algorithm>
- #include <shared_mutex>
- namespace av {
- namespace utils {
- using namespace av::base;
- FrameQueue::FrameQueue(const FrameQueueConfig& config)
- : config_(config) {
- Logger::debug("FrameQueue created with max size: {}", config_.maxSize);
- }
- FrameQueue::~FrameQueue() {
- shutdown_ = true;
- notEmpty_.notify_all();
- notFull_.notify_all();
- clear();
- Logger::debug("FrameQueue destroyed");
- }
- ErrorCode FrameQueue::enqueue(std::unique_ptr<FrameQueueItem> item) {
- if (!item) {
- return ErrorCode::INVALID_PARAMETER;
- }
-
- if (shutdown_) {
- return ErrorCode::INVALID_STATE;
- }
-
- std::unique_lock<std::mutex> lock(queueMutex_);
-
- // 检查是否需要丢帧
- if (shouldDropFrame(*item)) {
- if (dropCallback_) {
- dropCallback_(*item, "Smart frame drop");
- }
-
- std::lock_guard<std::mutex> statsLock(statsMutex_);
- stats_.totalDropped++;
-
- Logger::debug("Frame dropped by smart drop policy");
- return ErrorCode::SUCCESS;
- }
-
- // 等待队列有空间
- if (blocking_ && queue_.size() >= config_.maxSize) {
- if (config_.dropOnFull) {
- if (config_.dropOldest) {
- dropOldestFrame();
- } else {
- if (dropCallback_) {
- dropCallback_(*item, "Queue full - drop newest");
- }
-
- std::lock_guard<std::mutex> statsLock(statsMutex_);
- stats_.totalDropped++;
-
- Logger::debug("Frame dropped - queue full");
- return ErrorCode::SUCCESS;
- }
- } else {
- // 等待队列有空间
- if (!notFull_.wait_for(lock, std::chrono::milliseconds(config_.timeoutMs),
- [this] { return queue_.size() < config_.maxSize || shutdown_; })) {
- Logger::warning("Enqueue timeout");
- return ErrorCode::TIMEOUT;
- }
-
- if (shutdown_) {
- return ErrorCode::INVALID_STATE;
- }
- }
- }
-
- // 添加到队列
- queue_.push(std::move(item));
-
- // 更新统计信息
- if (config_.enableStats) {
- updateStats(*queue_.back(), true);
- }
-
- // 通知等待的消费者
- notEmpty_.notify_one();
-
- // 回调
- if (enqueueCallback_) {
- enqueueCallback_(*queue_.back());
- }
-
- return ErrorCode::SUCCESS;
- }
- std::unique_ptr<FrameQueueItem> FrameQueue::dequeue() {
- return dequeue(config_.timeoutMs);
- }
- std::unique_ptr<FrameQueueItem> FrameQueue::dequeue(int timeoutMs) {
- std::unique_lock<std::mutex> lock(queueMutex_);
-
- // 等待队列有数据
- if (blocking_ && queue_.empty()) {
- if (!notEmpty_.wait_for(lock, std::chrono::milliseconds(timeoutMs),
- [this] { return !queue_.empty() || shutdown_; })) {
- return nullptr; // 超时
- }
-
- if (shutdown_ || queue_.empty()) {
- return nullptr;
- }
- }
-
- if (queue_.empty()) {
- return nullptr;
- }
-
- // 取出队列头部元素
- auto item = std::move(queue_.front());
- queue_.pop();
-
- // 更新统计信息
- if (config_.enableStats) {
- updateStats(*item, false);
- }
-
- // 通知等待的生产者
- notFull_.notify_one();
-
- // 回调
- if (dequeueCallback_) {
- dequeueCallback_(*item);
- }
-
- return item;
- }
- ErrorCode FrameQueue::enqueue(AVFrame* frame, int streamIndex) {
- if (!frame) {
- return ErrorCode::INVALID_PARAMETER;
- }
-
- auto item = std::make_unique<FrameQueueItem>(frame, streamIndex);
- return enqueue(std::move(item));
- }
- AVFrame* FrameQueue::dequeueFrame() {
- auto item = dequeue();
- if (item) {
- AVFrame* frame = item->frame;
- item->frame = nullptr; // 转移所有权
- return frame;
- }
- return nullptr;
- }
- AVFrame* FrameQueue::dequeueFrame(int timeoutMs) {
- auto item = dequeue(timeoutMs);
- if (item) {
- AVFrame* frame = item->frame;
- item->frame = nullptr; // 转移所有权
- return frame;
- }
- return nullptr;
- }
- void FrameQueue::clear() {
- std::lock_guard<std::mutex> lock(queueMutex_);
-
- while (!queue_.empty()) {
- queue_.pop();
- }
-
- Logger::debug("Frame queue cleared");
- }
- void FrameQueue::flush() {
- clear();
- notEmpty_.notify_all();
- notFull_.notify_all();
- }
- void FrameQueue::setMaxSize(size_t maxSize) {
- std::lock_guard<std::mutex> lock(queueMutex_);
- config_.maxSize = maxSize;
-
- // 如果当前队列大小超过新的最大值,丢弃多余的帧
- while (queue_.size() > maxSize) {
- if (config_.dropOldest) {
- dropOldestFrame();
- } else {
- break; // 不丢弃,保持当前状态
- }
- }
-
- Logger::debug("Frame queue max size set to: {}", maxSize);
- }
- void FrameQueue::setDropPolicy(bool dropOnFull, bool dropOldest) {
- config_.dropOnFull = dropOnFull;
- config_.dropOldest = dropOldest;
- Logger::debug("Drop policy set: dropOnFull={}, dropOldest={}", dropOnFull, dropOldest);
- }
- size_t FrameQueue::size() const {
- std::lock_guard<std::mutex> lock(queueMutex_);
- return queue_.size();
- }
- bool FrameQueue::empty() const {
- std::lock_guard<std::mutex> lock(queueMutex_);
- return queue_.empty();
- }
- bool FrameQueue::full() const {
- std::lock_guard<std::mutex> lock(queueMutex_);
- return queue_.size() >= config_.maxSize;
- }
- size_t FrameQueue::capacity() const {
- return config_.maxSize;
- }
- FrameQueueStats FrameQueue::getStats() const {
- std::lock_guard<std::mutex> lock(statsMutex_);
- FrameQueueStats stats = stats_;
- stats.currentSize = size();
- return stats;
- }
- void FrameQueue::resetStats() {
- std::lock_guard<std::mutex> lock(statsMutex_);
- stats_ = FrameQueueStats();
- Logger::debug("Frame queue stats reset");
- }
- void FrameQueue::enableFrameDrop(bool enable, double maxLatency, int dropRatio) {
- config_.enableFrameDrop = enable;
- config_.maxLatency = maxLatency;
- config_.dropRatio = dropRatio;
-
- Logger::debug("Frame drop enabled: {}, maxLatency: {}ms, dropRatio: {}",
- enable, maxLatency, dropRatio);
- }
- void FrameQueue::setFrameDropCallback(std::function<bool(const FrameQueueItem&)> callback) {
- frameDropCallback_ = callback;
- }
- void FrameQueue::setBlocking(bool blocking) {
- blocking_ = blocking;
- if (!blocking) {
- notEmpty_.notify_all();
- notFull_.notify_all();
- }
- Logger::debug("Frame queue blocking mode: {}", blocking);
- }
- void FrameQueue::wakeup() {
- notEmpty_.notify_all();
- notFull_.notify_all();
- }
- // 内部方法实现
- bool FrameQueue::shouldDropFrame(const FrameQueueItem& item) const {
- if (!config_.enableFrameDrop) {
- return false;
- }
-
- // 自定义丢帧回调
- if (frameDropCallback_ && frameDropCallback_(item)) {
- return true;
- }
-
- // 基于延迟的丢帧
- if (shouldDropByLatency(item)) {
- return true;
- }
-
- // 基于比例的丢帧
- if (shouldDropByRatio()) {
- return true;
- }
-
- return false;
- }
- void FrameQueue::dropOldestFrame() {
- if (!queue_.empty()) {
- auto& oldestItem = queue_.front();
-
- if (dropCallback_) {
- dropCallback_(*oldestItem, "Drop oldest frame");
- }
-
- queue_.pop();
-
- std::lock_guard<std::mutex> statsLock(statsMutex_);
- stats_.totalDropped++;
-
- Logger::debug("Dropped oldest frame");
- }
- }
- void FrameQueue::dropNewestFrame() {
- // 这个方法在enqueue时调用,不需要实际操作队列
- // 只需要更新统计信息
- }
- void FrameQueue::updateStats(const FrameQueueItem& item, bool isEnqueue) {
- std::lock_guard<std::mutex> lock(statsMutex_);
-
- if (isEnqueue) {
- stats_.totalEnqueued++;
- stats_.maxSize = std::max(stats_.maxSize, queue_.size());
- } else {
- stats_.totalDequeued++;
-
- // 计算等待时间
- double waitTime = calculateWaitTime(item);
- if (waitTime > 0) {
- // 更新平均等待时间
- if (stats_.totalDequeued == 1) {
- stats_.averageWaitTime = waitTime;
- } else {
- stats_.averageWaitTime = (stats_.averageWaitTime * (stats_.totalDequeued - 1) + waitTime) / stats_.totalDequeued;
- }
-
- stats_.maxWaitTime = std::max(stats_.maxWaitTime, waitTime);
- }
- }
-
- stats_.currentSize = queue_.size();
- stats_.lastUpdateTime = std::chrono::steady_clock::now();
- }
- double FrameQueue::calculateWaitTime(const FrameQueueItem& item) const {
- auto now = std::chrono::steady_clock::now();
- auto duration = std::chrono::duration_cast<std::chrono::microseconds>(now - item.enqueueTime);
- return duration.count() / 1000.0; // 转换为毫秒
- }
- bool FrameQueue::shouldDropByLatency(const FrameQueueItem& item) const {
- double waitTime = calculateWaitTime(item);
- return waitTime > config_.maxLatency;
- }
- bool FrameQueue::shouldDropByRatio() const {
- if (config_.dropRatio <= 1) {
- return false;
- }
-
- uint64_t count = frameCounter_.fetch_add(1);
- return (count % config_.dropRatio) == 0;
- }
- // 多流帧队列实现
- MultiStreamFrameQueue::MultiStreamFrameQueue(const FrameQueueConfig& config)
- : defaultConfig_(config) {
- Logger::debug("MultiStreamFrameQueue created");
- }
- MultiStreamFrameQueue::~MultiStreamFrameQueue() {
- clear();
- Logger::debug("MultiStreamFrameQueue destroyed");
- }
- ErrorCode MultiStreamFrameQueue::addStream(int streamIndex, const FrameQueueConfig& config) {
- std::unique_lock<std::shared_mutex> lock(streamsMutex_);
-
- if (streamQueues_.find(streamIndex) != streamQueues_.end()) {
- Logger::warning("Stream {} already exists", streamIndex);
- return ErrorCode::ALREADY_EXISTS;
- }
-
- streamQueues_[streamIndex] = std::make_unique<FrameQueue>(config);
- Logger::debug("Added stream: {}", streamIndex);
-
- return ErrorCode::SUCCESS;
- }
- ErrorCode MultiStreamFrameQueue::removeStream(int streamIndex) {
- std::unique_lock<std::shared_mutex> lock(streamsMutex_);
-
- auto it = streamQueues_.find(streamIndex);
- if (it == streamQueues_.end()) {
- Logger::warning("Stream {} not found", streamIndex);
- return ErrorCode::NOT_FOUND;
- }
-
- streamQueues_.erase(it);
- Logger::debug("Removed stream: {}", streamIndex);
-
- return ErrorCode::SUCCESS;
- }
- bool MultiStreamFrameQueue::hasStream(int streamIndex) const {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
- return streamQueues_.find(streamIndex) != streamQueues_.end();
- }
- std::vector<int> MultiStreamFrameQueue::getStreamIndices() const {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- std::vector<int> indices;
- for (const auto& pair : streamQueues_) {
- indices.push_back(pair.first);
- }
-
- return indices;
- }
- ErrorCode MultiStreamFrameQueue::enqueue(AVFrame* frame, int streamIndex) {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- auto it = streamQueues_.find(streamIndex);
- if (it == streamQueues_.end()) {
- // 自动创建流队列
- lock.unlock();
- addStream(streamIndex, defaultConfig_);
- lock.lock();
-
- it = streamQueues_.find(streamIndex);
- if (it == streamQueues_.end()) {
- return ErrorCode::NOT_FOUND;
- }
- }
-
- return it->second->enqueue(frame, streamIndex);
- }
- AVFrame* MultiStreamFrameQueue::dequeue(int streamIndex) {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- auto it = streamQueues_.find(streamIndex);
- if (it == streamQueues_.end()) {
- return nullptr;
- }
-
- return it->second->dequeueFrame();
- }
- AVFrame* MultiStreamFrameQueue::dequeue(int streamIndex, int timeoutMs) {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- auto it = streamQueues_.find(streamIndex);
- if (it == streamQueues_.end()) {
- return nullptr;
- }
-
- return it->second->dequeueFrame(timeoutMs);
- }
- ErrorCode MultiStreamFrameQueue::enqueueToAll(AVFrame* frame) {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- ErrorCode result = ErrorCode::SUCCESS;
- for (const auto& pair : streamQueues_) {
- // 为每个流创建帧的副本
- AVFrame* frameCopy = av_frame_alloc();
- if (!frameCopy) {
- result = ErrorCode::OUT_OF_MEMORY;
- continue;
- }
-
- if (av_frame_ref(frameCopy, frame) < 0) {
- av_frame_free(&frameCopy);
- result = ErrorCode::COPY_FAILED;
- continue;
- }
-
- ErrorCode enqueueResult = pair.second->enqueue(frameCopy, pair.first);
- if (enqueueResult != ErrorCode::SUCCESS) {
- av_frame_free(&frameCopy);
- result = enqueueResult;
- }
- }
-
- return result;
- }
- std::vector<AVFrame*> MultiStreamFrameQueue::dequeueFromAll() {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- std::vector<AVFrame*> frames;
- for (const auto& pair : streamQueues_) {
- AVFrame* frame = pair.second->dequeueFrame();
- frames.push_back(frame); // 可能为nullptr
- }
-
- return frames;
- }
- void MultiStreamFrameQueue::clear() {
- std::unique_lock<std::shared_mutex> lock(streamsMutex_);
-
- for (auto& pair : streamQueues_) {
- pair.second->clear();
- }
-
- Logger::debug("All stream queues cleared");
- }
- void MultiStreamFrameQueue::clearStream(int streamIndex) {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- auto it = streamQueues_.find(streamIndex);
- if (it != streamQueues_.end()) {
- it->second->clear();
- Logger::debug("Stream {} queue cleared", streamIndex);
- }
- }
- void MultiStreamFrameQueue::flush() {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- for (auto& pair : streamQueues_) {
- pair.second->flush();
- }
-
- Logger::debug("All stream queues flushed");
- }
- void MultiStreamFrameQueue::flushStream(int streamIndex) {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- auto it = streamQueues_.find(streamIndex);
- if (it != streamQueues_.end()) {
- it->second->flush();
- Logger::debug("Stream {} queue flushed", streamIndex);
- }
- }
- size_t MultiStreamFrameQueue::size() const {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- size_t totalSize = 0;
- for (const auto& pair : streamQueues_) {
- totalSize += pair.second->size();
- }
-
- return totalSize;
- }
- size_t MultiStreamFrameQueue::size(int streamIndex) const {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- auto it = streamQueues_.find(streamIndex);
- if (it != streamQueues_.end()) {
- return it->second->size();
- }
-
- return 0;
- }
- bool MultiStreamFrameQueue::empty() const {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- for (const auto& pair : streamQueues_) {
- if (!pair.second->empty()) {
- return false;
- }
- }
-
- return true;
- }
- bool MultiStreamFrameQueue::empty(int streamIndex) const {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- auto it = streamQueues_.find(streamIndex);
- if (it != streamQueues_.end()) {
- return it->second->empty();
- }
-
- return true;
- }
- std::map<int, FrameQueueStats> MultiStreamFrameQueue::getAllStats() const {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- std::map<int, FrameQueueStats> allStats;
- for (const auto& pair : streamQueues_) {
- allStats[pair.first] = pair.second->getStats();
- }
-
- return allStats;
- }
- FrameQueueStats MultiStreamFrameQueue::getStats(int streamIndex) const {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- auto it = streamQueues_.find(streamIndex);
- if (it != streamQueues_.end()) {
- return it->second->getStats();
- }
-
- return FrameQueueStats();
- }
- void MultiStreamFrameQueue::resetStats() {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- for (auto& pair : streamQueues_) {
- pair.second->resetStats();
- }
-
- Logger::debug("All stream queue stats reset");
- }
- void MultiStreamFrameQueue::resetStats(int streamIndex) {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- auto it = streamQueues_.find(streamIndex);
- if (it != streamQueues_.end()) {
- it->second->resetStats();
- Logger::debug("Stream {} queue stats reset", streamIndex);
- }
- }
- void MultiStreamFrameQueue::setBlocking(bool blocking) {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- for (auto& pair : streamQueues_) {
- pair.second->setBlocking(blocking);
- }
-
- Logger::debug("All stream queues blocking mode: {}", blocking);
- }
- void MultiStreamFrameQueue::wakeupAll() {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- for (auto& pair : streamQueues_) {
- pair.second->wakeup();
- }
- }
- // 工厂类实现
- std::unique_ptr<FrameQueue> FrameQueueFactory::createStandardQueue(size_t maxSize) {
- FrameQueueConfig config;
- config.maxSize = maxSize;
- config.dropOnFull = true;
- config.dropOldest = true;
- config.timeoutMs = 1000;
- config.enableStats = true;
-
- return std::make_unique<FrameQueue>(config);
- }
- std::unique_ptr<FrameQueue> FrameQueueFactory::createLowLatencyQueue(size_t maxSize) {
- FrameQueueConfig config;
- config.maxSize = maxSize;
- config.dropOnFull = true;
- config.dropOldest = true;
- config.timeoutMs = 100;
- config.enableStats = true;
- config.enableFrameDrop = true;
- config.maxLatency = 50.0;
- config.dropRatio = 3;
-
- return std::make_unique<FrameQueue>(config);
- }
- std::unique_ptr<FrameQueue> FrameQueueFactory::createHighCapacityQueue(size_t maxSize) {
- FrameQueueConfig config;
- config.maxSize = maxSize;
- config.dropOnFull = false; // 不丢帧,等待
- config.dropOldest = true;
- config.timeoutMs = 5000;
- config.enableStats = true;
- config.enableFrameDrop = false;
-
- return std::make_unique<FrameQueue>(config);
- }
- std::unique_ptr<FrameQueue> FrameQueueFactory::createRealtimeQueue(size_t maxSize, double maxLatency) {
- FrameQueueConfig config;
- config.maxSize = maxSize;
- config.dropOnFull = true;
- config.dropOldest = true;
- config.timeoutMs = 50;
- config.enableStats = true;
- config.enableFrameDrop = true;
- config.maxLatency = maxLatency;
- config.dropRatio = 2;
-
- return std::make_unique<FrameQueue>(config);
- }
- std::unique_ptr<MultiStreamFrameQueue> FrameQueueFactory::createMultiStreamQueue(const FrameQueueConfig& config) {
- return std::make_unique<MultiStreamFrameQueue>(config);
- }
- } // namespace utils
- } // namespace av
|