#include "utils_frame_queue.h" #include "../base/logger.h" #include #include #include namespace av { namespace utils { using namespace av; FrameQueue::FrameQueue(const FrameQueueConfig& config) : config_(config) { Logger::instance().debugf("FrameQueue created with max size: {}", config_.maxSize); } FrameQueue::~FrameQueue() { shutdown_ = true; notEmpty_.notify_all(); notFull_.notify_all(); clear(); Logger::instance().debug("FrameQueue destroyed"); } ErrorCode FrameQueue::enqueue(std::unique_ptr item) { if (!item) { return ErrorCode::INVALID_PARAMS; } if (shutdown_) { return ErrorCode::INVALID_STATE; } std::unique_lock lock(queueMutex_); // 检查是否需要丢帧 if (shouldDropFrame(*item)) { if (dropCallback_) { dropCallback_(*item, "Smart frame drop"); } std::lock_guard statsLock(statsMutex_); stats_.totalDropped++; Logger::instance().debug("Frame dropped by smart drop policy"); return ErrorCode::SUCCESS; } // 等待队列有空间 if (blocking_ && queue_.size() >= config_.maxSize) { if (config_.dropOnFull) { if (config_.dropOldest) { dropOldestFrame(); } else { if (dropCallback_) { dropCallback_(*item, "Queue full - drop newest"); } std::lock_guard statsLock(statsMutex_); stats_.totalDropped++; Logger::instance().debug("Frame dropped - queue full"); return ErrorCode::SUCCESS; } } else { // 等待队列有空间 if (!notFull_.wait_for(lock, std::chrono::milliseconds(config_.timeoutMs), [this] { return queue_.size() < config_.maxSize || shutdown_; })) { Logger::instance().warning("Enqueue timeout"); return ErrorCode::TIMEOUT; } if (shutdown_) { return ErrorCode::INVALID_STATE; } } } // 添加到队列 queue_.push_back(std::move(item)); // 更新统计信息 if (config_.enableStats) { updateStats(*queue_.back(), true); } // 通知等待的消费者 notEmpty_.notify_one(); // 回调 if (enqueueCallback_) { enqueueCallback_(*queue_.back()); } return ErrorCode::SUCCESS; } // 预览方法实现 const FrameQueueItem* FrameQueue::peek() const { std::lock_guard lock(queueMutex_); if (queue_.empty()) { return nullptr; } return queue_.front().get(); } const FrameQueueItem* FrameQueue::peek(int index) const { std::lock_guard lock(queueMutex_); if (index < 0 || static_cast(index) >= queue_.size()) { return nullptr; } // 使用deque的随机访问能力 return queue_[index].get(); } AVFrame* FrameQueue::peekFrame() const { const FrameQueueItem* item = peek(); return item ? item->frame : nullptr; } std::unique_ptr FrameQueue::dequeue() { return dequeue(config_.timeoutMs); } std::unique_ptr FrameQueue::dequeue(int timeoutMs) { std::unique_lock lock(queueMutex_); // 等待队列有数据 if (blocking_ && queue_.empty()) { if (!notEmpty_.wait_for(lock, std::chrono::milliseconds(timeoutMs), [this] { return !queue_.empty() || shutdown_; })) { return nullptr; // 超时 } if (shutdown_ || queue_.empty()) { return nullptr; } } if (queue_.empty()) { return nullptr; } // 取出队列头部元素 auto item = std::move(queue_.front()); queue_.pop_front(); // 更新统计信息 if (config_.enableStats) { updateStats(*item, false); } // 通知等待的生产者 notFull_.notify_one(); // 回调 if (dequeueCallback_) { dequeueCallback_(*item); } return item; } ErrorCode FrameQueue::enqueue(AVFrame* frame, int streamIndex) { if (!frame) { return ErrorCode::INVALID_PARAMS; } auto item = std::make_unique(frame, streamIndex); return enqueue(std::move(item)); } AVFrame* FrameQueue::dequeueFrame() { auto item = dequeue(); if (item) { AVFrame* frame = item->frame; item->frame = nullptr; // 转移所有权 return frame; } return nullptr; } AVFrame* FrameQueue::dequeueFrame(int timeoutMs) { auto item = dequeue(timeoutMs); if (item) { AVFrame* frame = item->frame; item->frame = nullptr; // 转移所有权 return frame; } return nullptr; } void FrameQueue::clear() { std::lock_guard lock(queueMutex_); while (!queue_.empty()) { queue_.pop_front(); } Logger::instance().debug("Frame queue cleared"); } void FrameQueue::flush() { clear(); notEmpty_.notify_all(); notFull_.notify_all(); } void FrameQueue::setMaxSize(size_t maxSize) { std::lock_guard lock(queueMutex_); config_.maxSize = maxSize; // 如果当前队列大小超过新的最大值,丢弃多余的帧 while (queue_.size() > maxSize) { if (config_.dropOldest) { dropOldestFrame(); } else { break; // 不丢弃,保持当前状态 } } Logger::instance().debugf("Frame queue max size set to: {}", maxSize); } void FrameQueue::setDropPolicy(bool dropOnFull, bool dropOldest) { config_.dropOnFull = dropOnFull; config_.dropOldest = dropOldest; Logger::instance().debugf("Drop policy set: dropOnFull={}, dropOldest={}", dropOnFull, dropOldest); } size_t FrameQueue::size() const { std::lock_guard lock(queueMutex_); return queue_.size(); } bool FrameQueue::empty() const { std::lock_guard lock(queueMutex_); return queue_.empty(); } bool FrameQueue::full() const { std::lock_guard lock(queueMutex_); return queue_.size() >= config_.maxSize; } size_t FrameQueue::capacity() const { return config_.maxSize; } FrameQueueStats FrameQueue::getStats() const { std::lock_guard lock(statsMutex_); FrameQueueStats stats = stats_; stats.currentSize = size(); return stats; } void FrameQueue::resetStats() { std::lock_guard lock(statsMutex_); stats_ = FrameQueueStats(); Logger::instance().debug("Frame queue stats reset"); } void FrameQueue::enableFrameDrop(bool enable, double maxLatency, int dropRatio) { config_.enableFrameDrop = enable; config_.maxLatency = maxLatency; config_.dropRatio = dropRatio; Logger::instance().debugf("Frame drop enabled: {}, maxLatency: {}ms, dropRatio: {}", enable, maxLatency, dropRatio); } void FrameQueue::setFrameDropCallback(std::function callback) { frameDropCallback_ = callback; } void FrameQueue::setBlocking(bool blocking) { blocking_ = blocking; if (!blocking) { notEmpty_.notify_all(); notFull_.notify_all(); } Logger::instance().debugf("Frame queue blocking mode: {}", blocking); } void FrameQueue::wakeup() { notEmpty_.notify_all(); notFull_.notify_all(); } // 内部方法实现 bool FrameQueue::shouldDropFrame(const FrameQueueItem& item) const { if (!config_.enableFrameDrop) { return false; } // 自定义丢帧回调 if (frameDropCallback_ && frameDropCallback_(item)) { return true; } // 基于延迟的丢帧 if (shouldDropByLatency(item)) { return true; } // 基于比例的丢帧 if (shouldDropByRatio()) { return true; } return false; } void FrameQueue::dropOldestFrame() { if (!queue_.empty()) { auto& oldestItem = queue_.front(); if (dropCallback_) { dropCallback_(*oldestItem, "Drop oldest frame"); } queue_.pop_front(); std::lock_guard statsLock(statsMutex_); stats_.totalDropped++; Logger::instance().debug("Dropped oldest frame"); } } void FrameQueue::dropNewestFrame() { // 这个方法在enqueue时调用,不需要实际操作队列 // 只需要更新统计信息 } void FrameQueue::updateStats(const FrameQueueItem& item, bool isEnqueue) { std::lock_guard lock(statsMutex_); if (isEnqueue) { stats_.totalEnqueued++; stats_.maxSize = std::max(stats_.maxSize, queue_.size()); } else { stats_.totalDequeued++; // 计算等待时间 double waitTime = calculateWaitTime(item); if (waitTime > 0) { // 更新平均等待时间 if (stats_.totalDequeued == 1) { stats_.averageWaitTime = waitTime; } else { stats_.averageWaitTime = (stats_.averageWaitTime * (stats_.totalDequeued - 1) + waitTime) / stats_.totalDequeued; } stats_.maxWaitTime = std::max(stats_.maxWaitTime, waitTime); } } stats_.currentSize = queue_.size(); stats_.lastUpdateTime = std::chrono::steady_clock::now(); } double FrameQueue::calculateWaitTime(const FrameQueueItem& item) const { auto now = std::chrono::steady_clock::now(); auto duration = std::chrono::duration_cast(now - item.enqueueTime); return duration.count() / 1000.0; // 转换为毫秒 } bool FrameQueue::shouldDropByLatency(const FrameQueueItem& item) const { double waitTime = calculateWaitTime(item); return waitTime > config_.maxLatency; } bool FrameQueue::shouldDropByRatio() const { if (config_.dropRatio <= 1) { return false; } uint64_t count = frameCounter_.fetch_add(1); return (count % config_.dropRatio) == 0; } // 多流帧队列实现 MultiStreamFrameQueue::MultiStreamFrameQueue(const FrameQueueConfig& config) : defaultConfig_(config) { Logger::instance().debug("MultiStreamFrameQueue created"); } MultiStreamFrameQueue::~MultiStreamFrameQueue() { clear(); Logger::instance().debug("MultiStreamFrameQueue destroyed"); } ErrorCode MultiStreamFrameQueue::addStream(int streamIndex, const FrameQueueConfig& config) { std::unique_lock lock(streamsMutex_); if (streamQueues_.find(streamIndex) != streamQueues_.end()) { Logger::instance().warningf("Stream {} already exists", streamIndex); return ErrorCode::ALREADY_EXISTS; } streamQueues_[streamIndex] = std::make_unique(config); Logger::instance().debugf("Added stream: {}", streamIndex); return ErrorCode::SUCCESS; } ErrorCode MultiStreamFrameQueue::removeStream(int streamIndex) { std::unique_lock lock(streamsMutex_); auto it = streamQueues_.find(streamIndex); if (it == streamQueues_.end()) { Logger::instance().warningf("Stream {} not found", streamIndex); return ErrorCode::NOT_FOUND; } streamQueues_.erase(it); Logger::instance().debugf("Removed stream: {}", streamIndex); return ErrorCode::SUCCESS; } bool MultiStreamFrameQueue::hasStream(int streamIndex) const { std::shared_lock lock(streamsMutex_); return streamQueues_.find(streamIndex) != streamQueues_.end(); } std::vector MultiStreamFrameQueue::getStreamIndices() const { std::shared_lock lock(streamsMutex_); std::vector indices; for (const auto& pair : streamQueues_) { indices.push_back(pair.first); } return indices; } ErrorCode MultiStreamFrameQueue::enqueue(AVFrame* frame, int streamIndex) { std::shared_lock lock(streamsMutex_); auto it = streamQueues_.find(streamIndex); if (it == streamQueues_.end()) { // 自动创建流队列 lock.unlock(); addStream(streamIndex, defaultConfig_); lock.lock(); it = streamQueues_.find(streamIndex); if (it == streamQueues_.end()) { return ErrorCode::NOT_FOUND; } } return it->second->enqueue(frame, streamIndex); } AVFrame* MultiStreamFrameQueue::dequeue(int streamIndex) { std::shared_lock lock(streamsMutex_); auto it = streamQueues_.find(streamIndex); if (it == streamQueues_.end()) { return nullptr; } return it->second->dequeueFrame(); } AVFrame* MultiStreamFrameQueue::dequeue(int streamIndex, int timeoutMs) { std::shared_lock lock(streamsMutex_); auto it = streamQueues_.find(streamIndex); if (it == streamQueues_.end()) { return nullptr; } return it->second->dequeueFrame(timeoutMs); } ErrorCode MultiStreamFrameQueue::enqueueToAll(AVFrame* frame) { std::shared_lock lock(streamsMutex_); ErrorCode result = ErrorCode::SUCCESS; for (const auto& pair : streamQueues_) { // 为每个流创建帧的副本 AVFrame* frameCopy = av_frame_alloc(); if (!frameCopy) { result = ErrorCode::MEMORY_ALLOC_FAILED; continue; } if (av_frame_ref(frameCopy, frame) < 0) { av_frame_free(&frameCopy); result = ErrorCode::COPY_FAILED; continue; } ErrorCode enqueueResult = pair.second->enqueue(frameCopy, pair.first); if (enqueueResult != ErrorCode::SUCCESS) { av_frame_free(&frameCopy); result = enqueueResult; } } return result; } std::vector MultiStreamFrameQueue::dequeueFromAll() { std::shared_lock lock(streamsMutex_); std::vector frames; for (const auto& pair : streamQueues_) { AVFrame* frame = pair.second->dequeueFrame(); frames.push_back(frame); // 可能为nullptr } return frames; } void MultiStreamFrameQueue::clear() { std::unique_lock lock(streamsMutex_); for (auto& pair : streamQueues_) { pair.second->clear(); } Logger::instance().debug("All stream queues cleared"); } void MultiStreamFrameQueue::clearStream(int streamIndex) { std::shared_lock lock(streamsMutex_); auto it = streamQueues_.find(streamIndex); if (it != streamQueues_.end()) { it->second->clear(); Logger::instance().debugf("Stream {} queue cleared", streamIndex); } } void MultiStreamFrameQueue::flush() { std::shared_lock lock(streamsMutex_); for (auto& pair : streamQueues_) { pair.second->flush(); } Logger::instance().debug("All stream queues flushed"); } void MultiStreamFrameQueue::flushStream(int streamIndex) { std::shared_lock lock(streamsMutex_); auto it = streamQueues_.find(streamIndex); if (it != streamQueues_.end()) { it->second->flush(); Logger::instance().debugf("Stream {} queue flushed", streamIndex); } } size_t MultiStreamFrameQueue::size() const { std::shared_lock lock(streamsMutex_); size_t totalSize = 0; for (const auto& pair : streamQueues_) { totalSize += pair.second->size(); } return totalSize; } size_t MultiStreamFrameQueue::size(int streamIndex) const { std::shared_lock lock(streamsMutex_); auto it = streamQueues_.find(streamIndex); if (it != streamQueues_.end()) { return it->second->size(); } return 0; } bool MultiStreamFrameQueue::empty() const { std::shared_lock lock(streamsMutex_); for (const auto& pair : streamQueues_) { if (!pair.second->empty()) { return false; } } return true; } bool MultiStreamFrameQueue::empty(int streamIndex) const { std::shared_lock lock(streamsMutex_); auto it = streamQueues_.find(streamIndex); if (it != streamQueues_.end()) { return it->second->empty(); } return true; } std::map MultiStreamFrameQueue::getAllStats() const { std::shared_lock lock(streamsMutex_); std::map allStats; for (const auto& pair : streamQueues_) { allStats[pair.first] = pair.second->getStats(); } return allStats; } FrameQueueStats MultiStreamFrameQueue::getStats(int streamIndex) const { std::shared_lock lock(streamsMutex_); auto it = streamQueues_.find(streamIndex); if (it != streamQueues_.end()) { return it->second->getStats(); } return FrameQueueStats(); } void MultiStreamFrameQueue::resetStats() { std::shared_lock lock(streamsMutex_); for (auto& pair : streamQueues_) { pair.second->resetStats(); } Logger::instance().debug("All stream queue stats reset"); } void MultiStreamFrameQueue::resetStats(int streamIndex) { std::shared_lock lock(streamsMutex_); auto it = streamQueues_.find(streamIndex); if (it != streamQueues_.end()) { it->second->resetStats(); Logger::instance().debugf("Stream {} queue stats reset", streamIndex); } } void MultiStreamFrameQueue::setBlocking(bool blocking) { std::shared_lock lock(streamsMutex_); for (auto& pair : streamQueues_) { pair.second->setBlocking(blocking); } Logger::instance().debugf("All stream queues blocking mode: {}", blocking); } void MultiStreamFrameQueue::wakeupAll() { std::shared_lock lock(streamsMutex_); for (auto& pair : streamQueues_) { pair.second->wakeup(); } } // 工厂类实现 std::unique_ptr FrameQueueFactory::createStandardQueue(size_t maxSize) { FrameQueueConfig config; config.maxSize = maxSize; config.dropOnFull = true; config.dropOldest = true; config.timeoutMs = 1000; config.enableStats = true; return std::make_unique(config); } std::unique_ptr FrameQueueFactory::createLowLatencyQueue(size_t maxSize) { FrameQueueConfig config; config.maxSize = maxSize; config.dropOnFull = true; config.dropOldest = true; config.timeoutMs = 100; config.enableStats = true; config.enableFrameDrop = true; config.maxLatency = 50.0; config.dropRatio = 3; return std::make_unique(config); } std::unique_ptr FrameQueueFactory::createHighCapacityQueue(size_t maxSize) { FrameQueueConfig config; config.maxSize = maxSize; config.dropOnFull = false; // 不丢帧,等待 config.dropOldest = true; config.timeoutMs = 5000; config.enableStats = true; config.enableFrameDrop = false; return std::make_unique(config); } std::unique_ptr FrameQueueFactory::createRealtimeQueue(size_t maxSize, double maxLatency) { FrameQueueConfig config; config.maxSize = maxSize; config.dropOnFull = true; config.dropOldest = true; config.timeoutMs = 50; config.enableStats = true; config.enableFrameDrop = true; config.maxLatency = maxLatency; config.dropRatio = 2; return std::make_unique(config); } std::unique_ptr FrameQueueFactory::createMultiStreamQueue(const FrameQueueConfig& config) { return std::make_unique(config); } } // namespace utils } // namespace av