#include "utils_packet_queue.h" #include "../base/logger.h" #include #include namespace av { namespace utils { using namespace av; PacketQueue::PacketQueue(const PacketQueueConfig& config) : config_(config) { Logger::instance().debugf("PacketQueue created with max size: {}, max bytes: {}", config_.maxSize, config_.maxBytes); } PacketQueue::~PacketQueue() { shutdown_ = true; notEmpty_.notify_all(); notFull_.notify_all(); clear(); Logger::instance().debug("PacketQueue destroyed"); } ErrorCode PacketQueue::enqueue(std::unique_ptr item) { if (!item) { return ErrorCode::INVALID_PARAMS; } if (shutdown_) { return ErrorCode::INVALID_STATE; } // 检查流过滤 if (!isStreamAllowed(item->streamIndex)) { Logger::instance().debugf("Packet dropped - stream {} not allowed", item->streamIndex); return ErrorCode::SUCCESS; } std::unique_lock lock(queueMutex_); // 检查是否需要丢包 if (shouldDropPacket(*item)) { if (dropCallback_) { dropCallback_(*item, "Smart packet drop"); } std::lock_guard statsLock(statsMutex_); stats_.totalDropped++; Logger::instance().debug("Packet dropped by smart drop policy"); return ErrorCode::SUCCESS; } // 检查队列是否已满 size_t currentSize = config_.priorityQueue ? priorityQueue_.size() : normalQueue_.size(); size_t currentBytes = totalBytes_.load(); bool queueFull = (currentSize >= config_.maxSize) || (currentBytes >= config_.maxBytes); if (blocking_ && queueFull) { if (config_.dropOnFull) { // 根据策略丢弃包 if (config_.dropOldest) { dropOldestPacket(); } else if (config_.dropNonKeyPackets && !item->isKeyPacket) { dropNonKeyPackets(); } else { if (dropCallback_) { dropCallback_(*item, "Queue full - drop newest"); } std::lock_guard statsLock(statsMutex_); stats_.totalDropped++; Logger::instance().debug("Packet dropped - queue full"); return ErrorCode::SUCCESS; } } else { // 等待队列有空间 if (!notFull_.wait_for(lock, std::chrono::milliseconds(config_.timeoutMs), [this, currentSize, currentBytes] { size_t newSize = config_.priorityQueue ? priorityQueue_.size() : normalQueue_.size(); size_t newBytes = totalBytes_.load(); return (newSize < config_.maxSize && newBytes < config_.maxBytes) || shutdown_; })) { Logger::instance().warning("Enqueue timeout"); return ErrorCode::TIMEOUT; } if (shutdown_) { return ErrorCode::INVALID_STATE; } } } // 更新字节计数 if (item->packet) { totalBytes_ += item->packet->size; } // 在move之前保存item的引用,用于后续的统计和回调 const PacketQueueItem& itemRef = *item; // 添加到队列 if (config_.priorityQueue) { enqueueToPriorityQueue(std::move(item)); } else { enqueueToNormalQueue(std::move(item)); } // 更新统计信息 if (config_.enableStats) { updateStats(itemRef, true); } // 通知等待的消费者 notEmpty_.notify_one(); // 回调 if (enqueueCallback_) { enqueueCallback_(itemRef); } return ErrorCode::SUCCESS; } std::unique_ptr PacketQueue::dequeue() { return dequeue(config_.timeoutMs); } std::unique_ptr PacketQueue::dequeue(int timeoutMs) { std::unique_lock lock(queueMutex_); // 等待队列有数据 if (blocking_) { bool hasData = config_.priorityQueue ? !priorityQueue_.empty() : !normalQueue_.empty(); if (!hasData) { if (!notEmpty_.wait_for(lock, std::chrono::milliseconds(timeoutMs), [this] { bool hasData = config_.priorityQueue ? !priorityQueue_.empty() : !normalQueue_.empty(); return hasData || shutdown_; })) { return nullptr; // 超时 } if (shutdown_) { return nullptr; } } } // 检查队列是否为空 bool isEmpty = config_.priorityQueue ? priorityQueue_.empty() : normalQueue_.empty(); if (isEmpty) { return nullptr; } // 取出包 std::unique_ptr item; if (config_.priorityQueue) { item = dequeueFromPriorityQueue(); } else { item = dequeueFromNormalQueue(); } if (!item) { return nullptr; } // 更新字节计数 if (item->packet) { totalBytes_ -= item->packet->size; } // 更新统计信息 if (config_.enableStats) { updateStats(*item, false); } // 通知等待的生产者 notFull_.notify_one(); // 回调 if (dequeueCallback_) { dequeueCallback_(*item); } return item; } ErrorCode PacketQueue::enqueue(AVPacket* packet, int streamIndex, int priority) { if (!packet) { return ErrorCode::INVALID_PARAMS; } auto item = std::make_unique(packet, streamIndex, priority); return enqueue(std::move(item)); } AVPacket* PacketQueue::dequeuePacket() { auto item = dequeue(); if (item) { AVPacket* packet = item->packet; item->packet = nullptr; // 转移所有权 return packet; } return nullptr; } AVPacket* PacketQueue::dequeuePacket(int timeoutMs) { auto item = dequeue(timeoutMs); if (item) { AVPacket* packet = item->packet; item->packet = nullptr; // 转移所有权 return packet; } return nullptr; } ErrorCode PacketQueue::enqueueWithPriority(AVPacket* packet, int priority, int streamIndex) { if (!packet) { return ErrorCode::INVALID_PARAMS; } auto item = std::make_unique(packet, streamIndex, priority); return enqueue(std::move(item)); } std::unique_ptr PacketQueue::dequeueHighestPriority() { if (!config_.priorityQueue) { Logger::instance().warning("Priority queue not enabled"); return dequeue(); } return dequeue(); } void PacketQueue::clear() { std::lock_guard lock(queueMutex_); // 清空普通队列 while (!normalQueue_.empty()) { normalQueue_.pop(); } // 清空优先级队列 while (!priorityQueue_.empty()) { priorityQueue_.pop(); } totalBytes_ = 0; Logger::instance().debug("Packet queue cleared"); } void PacketQueue::flush() { clear(); notEmpty_.notify_all(); notFull_.notify_all(); } void PacketQueue::setMaxSize(size_t maxSize) { std::lock_guard lock(queueMutex_); config_.maxSize = maxSize; // 如果当前队列大小超过新的最大值,丢弃多余的包 size_t currentSize = config_.priorityQueue ? priorityQueue_.size() : normalQueue_.size(); while (currentSize > maxSize) { if (config_.dropOldest) { dropOldestPacket(); } else { break; } currentSize = config_.priorityQueue ? priorityQueue_.size() : normalQueue_.size(); } Logger::instance().debugf("Packet queue max size set to: {}", maxSize); } void PacketQueue::setMaxBytes(size_t maxBytes) { config_.maxBytes = maxBytes; Logger::instance().debugf("Packet queue max bytes set to: {}", maxBytes); } void PacketQueue::setDropPolicy(bool dropOnFull, bool dropOldest) { config_.dropOnFull = dropOnFull; config_.dropOldest = dropOldest; Logger::instance().debugf("Drop policy set: dropOnFull={}, dropOldest={}", dropOnFull, dropOldest); } void PacketQueue::enablePriorityQueue(bool enable) { std::lock_guard lock(queueMutex_); if (config_.priorityQueue == enable) { return; } // 如果要切换队列类型,需要迁移数据 if (enable) { // 从普通队列迁移到优先级队列 while (!normalQueue_.empty()) { auto item = std::move(normalQueue_.front()); normalQueue_.pop(); priorityQueue_.push(std::move(item)); } } else { // 从优先级队列迁移到普通队列 std::vector> tempItems; while (!priorityQueue_.empty()) { tempItems.push_back(std::move(const_cast&>(priorityQueue_.top()))); priorityQueue_.pop(); } // 按时间戳排序后加入普通队列 std::sort(tempItems.begin(), tempItems.end(), [](const std::unique_ptr& a, const std::unique_ptr& b) { return a->dts < b->dts; }); for (auto& item : tempItems) { normalQueue_.push(std::move(item)); } } config_.priorityQueue = enable; Logger::instance().debugf("Priority queue {}", enable ? "enabled" : "disabled"); } size_t PacketQueue::size() const { std::lock_guard lock(queueMutex_); return config_.priorityQueue ? priorityQueue_.size() : normalQueue_.size(); } size_t PacketQueue::bytes() const { return totalBytes_.load(); } bool PacketQueue::empty() const { std::lock_guard lock(queueMutex_); return config_.priorityQueue ? priorityQueue_.empty() : normalQueue_.empty(); } bool PacketQueue::full() const { std::lock_guard lock(queueMutex_); size_t currentSize = config_.priorityQueue ? priorityQueue_.size() : normalQueue_.size(); return currentSize >= config_.maxSize; } bool PacketQueue::fullByBytes() const { return totalBytes_.load() >= config_.maxBytes; } size_t PacketQueue::capacity() const { return config_.maxSize; } size_t PacketQueue::capacityBytes() const { return config_.maxBytes; } PacketQueueStats PacketQueue::getStats() const { std::lock_guard lock(statsMutex_); PacketQueueStats stats = stats_; stats.currentSize = size(); stats.totalBytes = bytes(); return stats; } void PacketQueue::resetStats() { std::lock_guard lock(statsMutex_); stats_ = PacketQueueStats(); Logger::instance().debug("Packet queue stats reset"); } void PacketQueue::enablePacketDrop(bool enable, double maxLatency, int dropRatio) { config_.enablePacketDrop = enable; config_.maxLatency = maxLatency; config_.dropRatio = dropRatio; Logger::instance().debugf("Packet drop enabled: {}, maxLatency: {}ms, dropRatio: {}", enable, maxLatency, dropRatio); } void PacketQueue::setPacketDropCallback(std::function callback) { packetDropCallback_ = callback; } void PacketQueue::setBlocking(bool blocking) { blocking_ = blocking; if (!blocking) { notEmpty_.notify_all(); notFull_.notify_all(); } Logger::instance().debugf("Packet queue blocking mode: {}", blocking); } void PacketQueue::wakeup() { notEmpty_.notify_all(); notFull_.notify_all(); } void PacketQueue::setStreamFilter(const std::vector& allowedStreams) { std::lock_guard lock(streamFilterMutex_); allowedStreams_ = allowedStreams; hasStreamFilter_ = !allowedStreams.empty(); Logger::instance().debugf("Stream filter set with {} allowed streams", allowedStreams.size()); } void PacketQueue::clearStreamFilter() { std::lock_guard lock(streamFilterMutex_); allowedStreams_.clear(); hasStreamFilter_ = false; Logger::instance().debug("Stream filter cleared"); } // 内部方法实现 bool PacketQueue::shouldDropPacket(const PacketQueueItem& item) const { if (!config_.enablePacketDrop) { return false; } // 自定义丢包回调 if (packetDropCallback_ && packetDropCallback_(item)) { return true; } // 基于延迟的丢包 if (shouldDropByLatency(item)) { return true; } // 基于比例的丢包 if (shouldDropByRatio()) { return true; } // 基于大小的丢包 if (shouldDropBySize()) { return true; } return false; } void PacketQueue::dropOldestPacket() { if (config_.priorityQueue) { if (!priorityQueue_.empty()) { auto& oldestItem = const_cast&>(priorityQueue_.top()); if (dropCallback_) { dropCallback_(*oldestItem, "Drop oldest packet"); } if (oldestItem->packet) { totalBytes_ -= oldestItem->packet->size; } priorityQueue_.pop(); std::lock_guard statsLock(statsMutex_); stats_.totalDropped++; Logger::instance().debug("Dropped oldest packet from priority queue"); } } else { if (!normalQueue_.empty()) { auto& oldestItem = normalQueue_.front(); if (dropCallback_) { dropCallback_(*oldestItem, "Drop oldest packet"); } if (oldestItem->packet) { totalBytes_ -= oldestItem->packet->size; } normalQueue_.pop(); std::lock_guard statsLock(statsMutex_); stats_.totalDropped++; Logger::instance().debug("Dropped oldest packet from normal queue"); } } } void PacketQueue::dropNewestPacket() { // 这个方法在enqueue时调用,不需要实际操作队列 } void PacketQueue::dropLargestPacket() { // 找到最大的包并丢弃 // 这里简化实现,直接丢弃最旧的包 dropOldestPacket(); } void PacketQueue::dropNonKeyPackets() { std::vector> keyPackets; if (config_.priorityQueue) { while (!priorityQueue_.empty()) { auto item = std::move(const_cast&>(priorityQueue_.top())); priorityQueue_.pop(); if (item->isKeyPacket) { keyPackets.push_back(std::move(item)); } else { if (dropCallback_) { dropCallback_(*item, "Drop non-key packet"); } if (item->packet) { totalBytes_ -= item->packet->size; } std::lock_guard statsLock(statsMutex_); stats_.totalDropped++; } } // 重新加入关键包 for (auto& item : keyPackets) { priorityQueue_.push(std::move(item)); } } else { while (!normalQueue_.empty()) { auto item = std::move(normalQueue_.front()); normalQueue_.pop(); if (item->isKeyPacket) { keyPackets.push_back(std::move(item)); } else { if (dropCallback_) { dropCallback_(*item, "Drop non-key packet"); } if (item->packet) { totalBytes_ -= item->packet->size; } std::lock_guard statsLock(statsMutex_); stats_.totalDropped++; } } // 重新加入关键包 for (auto& item : keyPackets) { normalQueue_.push(std::move(item)); } } Logger::instance().debugf("Dropped non-key packets, kept {} key packets", keyPackets.size()); } void PacketQueue::updateStats(const PacketQueueItem& item, bool isEnqueue) { std::lock_guard lock(statsMutex_); if (isEnqueue) { stats_.totalEnqueued++; size_t currentSize = config_.priorityQueue ? priorityQueue_.size() : normalQueue_.size(); stats_.maxSize = std::max(stats_.maxSize, currentSize); if (item.packet) { stats_.totalBytes += item.packet->size; // 更新平均包大小 if (stats_.totalEnqueued == 1) { stats_.averagePacketSize = item.packet->size; } else { stats_.averagePacketSize = (stats_.averagePacketSize * (stats_.totalEnqueued - 1) + item.packet->size) / stats_.totalEnqueued; } } } else { stats_.totalDequeued++; // 计算等待时间 double waitTime = calculateWaitTime(item); if (waitTime > 0) { // 更新平均等待时间 if (stats_.totalDequeued == 1) { stats_.averageWaitTime = waitTime; } else { stats_.averageWaitTime = (stats_.averageWaitTime * (stats_.totalDequeued - 1) + waitTime) / stats_.totalDequeued; } stats_.maxWaitTime = std::max(stats_.maxWaitTime, waitTime); } } stats_.lastUpdateTime = std::chrono::steady_clock::now(); } double PacketQueue::calculateWaitTime(const PacketQueueItem& item) const { auto now = std::chrono::steady_clock::now(); auto duration = std::chrono::duration_cast(now - item.enqueueTime); return duration.count() / 1000.0; // 转换为毫秒 } bool PacketQueue::isStreamAllowed(int streamIndex) const { if (!hasStreamFilter_) { return true; } std::lock_guard lock(streamFilterMutex_); return std::find(allowedStreams_.begin(), allowedStreams_.end(), streamIndex) != allowedStreams_.end(); } bool PacketQueue::shouldDropByLatency(const PacketQueueItem& item) const { double waitTime = calculateWaitTime(item); return waitTime > config_.maxLatency; } bool PacketQueue::shouldDropByRatio() const { if (config_.dropRatio <= 1) { return false; } uint64_t count = packetCounter_.fetch_add(1); return (count % config_.dropRatio) == 0; } bool PacketQueue::shouldDropBySize() const { return totalBytes_.load() > config_.maxBytes; } void PacketQueue::enqueueToNormalQueue(std::unique_ptr item) { normalQueue_.push(std::move(item)); } void PacketQueue::enqueueToPriorityQueue(std::unique_ptr item) { priorityQueue_.push(std::move(item)); } std::unique_ptr PacketQueue::dequeueFromNormalQueue() { if (normalQueue_.empty()) { return nullptr; } auto item = std::move(normalQueue_.front()); normalQueue_.pop(); return item; } std::unique_ptr PacketQueue::dequeueFromPriorityQueue() { if (priorityQueue_.empty()) { return nullptr; } auto item = std::move(const_cast&>(priorityQueue_.top())); priorityQueue_.pop(); return item; } // 多流包队列实现 MultiStreamPacketQueue::MultiStreamPacketQueue(const PacketQueueConfig& config) : defaultConfig_(config) { Logger::instance().debug("MultiStreamPacketQueue created"); } MultiStreamPacketQueue::~MultiStreamPacketQueue() { clear(); Logger::instance().debug("MultiStreamPacketQueue destroyed"); } ErrorCode MultiStreamPacketQueue::addStream(int streamIndex, const PacketQueueConfig& config) { std::unique_lock lock(streamsMutex_); if (streamQueues_.find(streamIndex) != streamQueues_.end()) { Logger::instance().warningf("Stream {} already exists", streamIndex); return ErrorCode::ALREADY_EXISTS; } streamQueues_[streamIndex] = std::make_unique(config); Logger::instance().debugf("Added stream: {}", streamIndex); return ErrorCode::SUCCESS; } ErrorCode MultiStreamPacketQueue::removeStream(int streamIndex) { std::unique_lock lock(streamsMutex_); auto it = streamQueues_.find(streamIndex); if (it == streamQueues_.end()) { Logger::instance().warningf("Stream {} not found", streamIndex); return ErrorCode::NOT_FOUND; } streamQueues_.erase(it); Logger::instance().debugf("Removed stream: {}", streamIndex); return ErrorCode::SUCCESS; } bool MultiStreamPacketQueue::hasStream(int streamIndex) const { std::shared_lock lock(streamsMutex_); return streamQueues_.find(streamIndex) != streamQueues_.end(); } std::vector MultiStreamPacketQueue::getStreamIndices() const { std::shared_lock lock(streamsMutex_); std::vector indices; for (const auto& pair : streamQueues_) { indices.push_back(pair.first); } return indices; } ErrorCode MultiStreamPacketQueue::enqueue(AVPacket* packet, int streamIndex) { std::shared_lock lock(streamsMutex_); auto it = streamQueues_.find(streamIndex); if (it == streamQueues_.end()) { // 自动创建流队列 lock.unlock(); addStream(streamIndex, defaultConfig_); lock.lock(); it = streamQueues_.find(streamIndex); if (it == streamQueues_.end()) { return ErrorCode::NOT_FOUND; } } return it->second->enqueue(packet, streamIndex); } AVPacket* MultiStreamPacketQueue::dequeue(int streamIndex) { std::shared_lock lock(streamsMutex_); auto it = streamQueues_.find(streamIndex); if (it == streamQueues_.end()) { return nullptr; } return it->second->dequeuePacket(); } AVPacket* MultiStreamPacketQueue::dequeue(int streamIndex, int timeoutMs) { std::shared_lock lock(streamsMutex_); auto it = streamQueues_.find(streamIndex); if (it == streamQueues_.end()) { return nullptr; } return it->second->dequeuePacket(timeoutMs); } ErrorCode MultiStreamPacketQueue::enqueueToAll(AVPacket* packet) { std::shared_lock lock(streamsMutex_); ErrorCode result = ErrorCode::SUCCESS; for (const auto& pair : streamQueues_) { // 为每个流创建包的副本 AVPacket* packetCopy = av_packet_alloc(); if (!packetCopy) { result = ErrorCode::MEMORY_ALLOC_FAILED; continue; } if (av_packet_ref(packetCopy, packet) < 0) { av_packet_free(&packetCopy); result = ErrorCode::COPY_FAILED; continue; } ErrorCode enqueueResult = pair.second->enqueue(packetCopy, pair.first); if (enqueueResult != ErrorCode::SUCCESS) { av_packet_free(&packetCopy); result = enqueueResult; } } return result; } std::vector MultiStreamPacketQueue::dequeueFromAll() { std::shared_lock lock(streamsMutex_); std::vector packets; for (const auto& pair : streamQueues_) { AVPacket* packet = pair.second->dequeuePacket(); packets.push_back(packet); // 可能为nullptr } return packets; } AVPacket* MultiStreamPacketQueue::dequeueSynchronized() { std::lock_guard lock(syncMutex_); // 从同步队列中取出时间戳最小的包 if (!syncQueue_.empty()) { SyncItem item = syncQueue_.top(); syncQueue_.pop(); return item.packet; } // 如果同步队列为空,从各个流队列中取包并排序 std::vector items; { std::shared_lock streamLock(streamsMutex_); for (const auto& pair : streamQueues_) { AVPacket* packet = pair.second->dequeuePacket(10); // 短超时 if (packet) { SyncItem item; item.packet = packet; item.streamIndex = pair.first; item.timestamp = (packet->dts != AV_NOPTS_VALUE) ? packet->dts : packet->pts; items.push_back(item); } } } if (items.empty()) { return nullptr; } // 按时间戳排序 std::sort(items.begin(), items.end(), [](const SyncItem& a, const SyncItem& b) { return a.timestamp < b.timestamp; }); // 返回时间戳最小的包 AVPacket* result = items[0].packet; // 将其余包放入同步队列 for (size_t i = 1; i < items.size(); ++i) { syncQueue_.push(items[i]); } return result; } std::vector MultiStreamPacketQueue::dequeueSynchronizedBatch(size_t maxCount) { std::vector packets; for (size_t i = 0; i < maxCount; ++i) { AVPacket* packet = dequeueSynchronized(); if (!packet) { break; } packets.push_back(packet); } return packets; } void MultiStreamPacketQueue::clear() { std::unique_lock lock(streamsMutex_); for (auto& pair : streamQueues_) { pair.second->clear(); } // 清空同步队列 std::lock_guard syncLock(syncMutex_); while (!syncQueue_.empty()) { SyncItem item = syncQueue_.top(); syncQueue_.pop(); if (item.packet) { av_packet_free(&item.packet); } } Logger::instance().debug("All stream queues cleared"); } void MultiStreamPacketQueue::clearStream(int streamIndex) { std::shared_lock lock(streamsMutex_); auto it = streamQueues_.find(streamIndex); if (it != streamQueues_.end()) { it->second->clear(); Logger::instance().debugf("Stream {} queue cleared", streamIndex); } } void MultiStreamPacketQueue::flush() { std::shared_lock lock(streamsMutex_); for (auto& pair : streamQueues_) { pair.second->flush(); } Logger::instance().debug("All stream queues flushed"); } void MultiStreamPacketQueue::flushStream(int streamIndex) { std::shared_lock lock(streamsMutex_); auto it = streamQueues_.find(streamIndex); if (it != streamQueues_.end()) { it->second->flush(); Logger::instance().debugf("Stream {} queue flushed", streamIndex); } } size_t MultiStreamPacketQueue::size() const { std::shared_lock lock(streamsMutex_); size_t totalSize = 0; for (const auto& pair : streamQueues_) { totalSize += pair.second->size(); } return totalSize; } size_t MultiStreamPacketQueue::size(int streamIndex) const { std::shared_lock lock(streamsMutex_); auto it = streamQueues_.find(streamIndex); if (it != streamQueues_.end()) { return it->second->size(); } return 0; } size_t MultiStreamPacketQueue::bytes() const { std::shared_lock lock(streamsMutex_); size_t totalBytes = 0; for (const auto& pair : streamQueues_) { totalBytes += pair.second->bytes(); } return totalBytes; } size_t MultiStreamPacketQueue::bytes(int streamIndex) const { std::shared_lock lock(streamsMutex_); auto it = streamQueues_.find(streamIndex); if (it != streamQueues_.end()) { return it->second->bytes(); } return 0; } bool MultiStreamPacketQueue::empty() const { std::shared_lock lock(streamsMutex_); for (const auto& pair : streamQueues_) { if (!pair.second->empty()) { return false; } } return true; } bool MultiStreamPacketQueue::empty(int streamIndex) const { std::shared_lock lock(streamsMutex_); auto it = streamQueues_.find(streamIndex); if (it != streamQueues_.end()) { return it->second->empty(); } return true; } std::map MultiStreamPacketQueue::getAllStats() const { std::shared_lock lock(streamsMutex_); std::map allStats; for (const auto& pair : streamQueues_) { allStats[pair.first] = pair.second->getStats(); } return allStats; } PacketQueueStats MultiStreamPacketQueue::getStats(int streamIndex) const { std::shared_lock lock(streamsMutex_); auto it = streamQueues_.find(streamIndex); if (it != streamQueues_.end()) { return it->second->getStats(); } return PacketQueueStats(); } void MultiStreamPacketQueue::resetStats() { std::shared_lock lock(streamsMutex_); for (auto& pair : streamQueues_) { pair.second->resetStats(); } Logger::instance().debug("All stream queue stats reset"); } void MultiStreamPacketQueue::resetStats(int streamIndex) { std::shared_lock lock(streamsMutex_); auto it = streamQueues_.find(streamIndex); if (it != streamQueues_.end()) { it->second->resetStats(); Logger::instance().debugf("Stream {} queue stats reset", streamIndex); } } void MultiStreamPacketQueue::setBlocking(bool blocking) { std::shared_lock lock(streamsMutex_); for (auto& pair : streamQueues_) { pair.second->setBlocking(blocking); } Logger::instance().debugf("All stream queues blocking mode: {}", blocking); } void MultiStreamPacketQueue::wakeupAll() { std::shared_lock lock(streamsMutex_); for (auto& pair : streamQueues_) { pair.second->wakeup(); } } // 工厂类实现 std::unique_ptr PacketQueueFactory::createStandardQueue(size_t maxSize) { PacketQueueConfig config; config.maxSize = maxSize; config.dropOnFull = true; config.dropOldest = true; config.timeoutMs = 1000; config.enableStats = true; return std::make_unique(config); } std::unique_ptr PacketQueueFactory::createLowLatencyQueue(size_t maxSize) { PacketQueueConfig config; config.maxSize = maxSize; config.dropOnFull = true; config.dropOldest = true; config.timeoutMs = 100; config.enableStats = true; config.enablePacketDrop = true; config.maxLatency = 100.0; config.dropRatio = 3; return std::make_unique(config); } std::unique_ptr PacketQueueFactory::createHighCapacityQueue(size_t maxSize) { PacketQueueConfig config; config.maxSize = maxSize; config.maxBytes = 200 * 1024 * 1024; // 200MB config.dropOnFull = false; // 不丢包,等待 config.dropOldest = true; config.timeoutMs = 5000; config.enableStats = true; config.enablePacketDrop = false; return std::make_unique(config); } std::unique_ptr PacketQueueFactory::createRealtimeQueue(size_t maxSize, double maxLatency) { PacketQueueConfig config; config.maxSize = maxSize; config.dropOnFull = true; config.dropOldest = true; config.timeoutMs = 50; config.enableStats = true; config.enablePacketDrop = true; config.maxLatency = maxLatency; config.dropRatio = 2; config.dropNonKeyPackets = true; return std::make_unique(config); } std::unique_ptr PacketQueueFactory::createPriorityQueue(size_t maxSize) { PacketQueueConfig config; config.maxSize = maxSize; config.priorityQueue = true; config.dropOnFull = true; config.dropOldest = true; config.timeoutMs = 1000; config.enableStats = true; return std::make_unique(config); } std::unique_ptr PacketQueueFactory::createMultiStreamQueue(const PacketQueueConfig& config) { return std::make_unique(config); } } // namespace utils } // namespace av