| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098 |
- #include "utils_packet_queue.h"
- #include "../base/base_logger.h"
- #include <algorithm>
- #include <shared_mutex>
- namespace av {
- namespace utils {
- using namespace av::base;
- PacketQueue::PacketQueue(const PacketQueueConfig& config)
- : config_(config) {
- Logger::debug("PacketQueue created with max size: {}, max bytes: {}",
- config_.maxSize, config_.maxBytes);
- }
- PacketQueue::~PacketQueue() {
- shutdown_ = true;
- notEmpty_.notify_all();
- notFull_.notify_all();
- clear();
- Logger::debug("PacketQueue destroyed");
- }
- ErrorCode PacketQueue::enqueue(std::unique_ptr<PacketQueueItem> item) {
- if (!item) {
- return ErrorCode::INVALID_PARAMETER;
- }
-
- if (shutdown_) {
- return ErrorCode::INVALID_STATE;
- }
-
- // 检查流过滤
- if (!isStreamAllowed(item->streamIndex)) {
- Logger::debug("Packet dropped - stream {} not allowed", item->streamIndex);
- return ErrorCode::SUCCESS;
- }
-
- std::unique_lock<std::mutex> lock(queueMutex_);
-
- // 检查是否需要丢包
- if (shouldDropPacket(*item)) {
- if (dropCallback_) {
- dropCallback_(*item, "Smart packet drop");
- }
-
- std::lock_guard<std::mutex> statsLock(statsMutex_);
- stats_.totalDropped++;
-
- Logger::debug("Packet dropped by smart drop policy");
- return ErrorCode::SUCCESS;
- }
-
- // 检查队列是否已满
- size_t currentSize = config_.priorityQueue ? priorityQueue_.size() : normalQueue_.size();
- size_t currentBytes = totalBytes_.load();
-
- bool queueFull = (currentSize >= config_.maxSize) || (currentBytes >= config_.maxBytes);
-
- if (blocking_ && queueFull) {
- if (config_.dropOnFull) {
- // 根据策略丢弃包
- if (config_.dropOldest) {
- dropOldestPacket();
- } else if (config_.dropNonKeyPackets && !item->isKeyPacket) {
- dropNonKeyPackets();
- } else {
- if (dropCallback_) {
- dropCallback_(*item, "Queue full - drop newest");
- }
-
- std::lock_guard<std::mutex> statsLock(statsMutex_);
- stats_.totalDropped++;
-
- Logger::debug("Packet dropped - queue full");
- return ErrorCode::SUCCESS;
- }
- } else {
- // 等待队列有空间
- if (!notFull_.wait_for(lock, std::chrono::milliseconds(config_.timeoutMs),
- [this, currentSize, currentBytes] {
- size_t newSize = config_.priorityQueue ? priorityQueue_.size() : normalQueue_.size();
- size_t newBytes = totalBytes_.load();
- return (newSize < config_.maxSize && newBytes < config_.maxBytes) || shutdown_;
- })) {
- Logger::warning("Enqueue timeout");
- return ErrorCode::TIMEOUT;
- }
-
- if (shutdown_) {
- return ErrorCode::INVALID_STATE;
- }
- }
- }
-
- // 更新字节计数
- if (item->packet) {
- totalBytes_ += item->packet->size;
- }
-
- // 添加到队列
- if (config_.priorityQueue) {
- enqueueToPriorityQueue(std::move(item));
- } else {
- enqueueToNormalQueue(std::move(item));
- }
-
- // 更新统计信息
- if (config_.enableStats) {
- updateStats(*item, true);
- }
-
- // 通知等待的消费者
- notEmpty_.notify_one();
-
- // 回调
- if (enqueueCallback_) {
- enqueueCallback_(*item);
- }
-
- return ErrorCode::SUCCESS;
- }
- std::unique_ptr<PacketQueueItem> PacketQueue::dequeue() {
- return dequeue(config_.timeoutMs);
- }
- std::unique_ptr<PacketQueueItem> PacketQueue::dequeue(int timeoutMs) {
- std::unique_lock<std::mutex> lock(queueMutex_);
-
- // 等待队列有数据
- if (blocking_) {
- bool hasData = config_.priorityQueue ? !priorityQueue_.empty() : !normalQueue_.empty();
-
- if (!hasData) {
- if (!notEmpty_.wait_for(lock, std::chrono::milliseconds(timeoutMs),
- [this] {
- bool hasData = config_.priorityQueue ? !priorityQueue_.empty() : !normalQueue_.empty();
- return hasData || shutdown_;
- })) {
- return nullptr; // 超时
- }
-
- if (shutdown_) {
- return nullptr;
- }
- }
- }
-
- // 检查队列是否为空
- bool isEmpty = config_.priorityQueue ? priorityQueue_.empty() : normalQueue_.empty();
- if (isEmpty) {
- return nullptr;
- }
-
- // 取出包
- std::unique_ptr<PacketQueueItem> item;
- if (config_.priorityQueue) {
- item = dequeueFromPriorityQueue();
- } else {
- item = dequeueFromNormalQueue();
- }
-
- if (!item) {
- return nullptr;
- }
-
- // 更新字节计数
- if (item->packet) {
- totalBytes_ -= item->packet->size;
- }
-
- // 更新统计信息
- if (config_.enableStats) {
- updateStats(*item, false);
- }
-
- // 通知等待的生产者
- notFull_.notify_one();
-
- // 回调
- if (dequeueCallback_) {
- dequeueCallback_(*item);
- }
-
- return item;
- }
- ErrorCode PacketQueue::enqueue(AVPacket* packet, int streamIndex, int priority) {
- if (!packet) {
- return ErrorCode::INVALID_PARAMETER;
- }
-
- auto item = std::make_unique<PacketQueueItem>(packet, streamIndex, priority);
- return enqueue(std::move(item));
- }
- AVPacket* PacketQueue::dequeuePacket() {
- auto item = dequeue();
- if (item) {
- AVPacket* packet = item->packet;
- item->packet = nullptr; // 转移所有权
- return packet;
- }
- return nullptr;
- }
- AVPacket* PacketQueue::dequeuePacket(int timeoutMs) {
- auto item = dequeue(timeoutMs);
- if (item) {
- AVPacket* packet = item->packet;
- item->packet = nullptr; // 转移所有权
- return packet;
- }
- return nullptr;
- }
- ErrorCode PacketQueue::enqueueWithPriority(AVPacket* packet, int priority, int streamIndex) {
- if (!packet) {
- return ErrorCode::INVALID_PARAMETER;
- }
-
- auto item = std::make_unique<PacketQueueItem>(packet, streamIndex, priority);
- return enqueue(std::move(item));
- }
- std::unique_ptr<PacketQueueItem> PacketQueue::dequeueHighestPriority() {
- if (!config_.priorityQueue) {
- Logger::warning("Priority queue not enabled");
- return dequeue();
- }
-
- return dequeue();
- }
- void PacketQueue::clear() {
- std::lock_guard<std::mutex> lock(queueMutex_);
-
- // 清空普通队列
- while (!normalQueue_.empty()) {
- normalQueue_.pop();
- }
-
- // 清空优先级队列
- while (!priorityQueue_.empty()) {
- priorityQueue_.pop();
- }
-
- totalBytes_ = 0;
- Logger::debug("Packet queue cleared");
- }
- void PacketQueue::flush() {
- clear();
- notEmpty_.notify_all();
- notFull_.notify_all();
- }
- void PacketQueue::setMaxSize(size_t maxSize) {
- std::lock_guard<std::mutex> lock(queueMutex_);
- config_.maxSize = maxSize;
-
- // 如果当前队列大小超过新的最大值,丢弃多余的包
- size_t currentSize = config_.priorityQueue ? priorityQueue_.size() : normalQueue_.size();
- while (currentSize > maxSize) {
- if (config_.dropOldest) {
- dropOldestPacket();
- } else {
- break;
- }
- currentSize = config_.priorityQueue ? priorityQueue_.size() : normalQueue_.size();
- }
-
- Logger::debug("Packet queue max size set to: {}", maxSize);
- }
- void PacketQueue::setMaxBytes(size_t maxBytes) {
- config_.maxBytes = maxBytes;
- Logger::debug("Packet queue max bytes set to: {}", maxBytes);
- }
- void PacketQueue::setDropPolicy(bool dropOnFull, bool dropOldest) {
- config_.dropOnFull = dropOnFull;
- config_.dropOldest = dropOldest;
- Logger::debug("Drop policy set: dropOnFull={}, dropOldest={}", dropOnFull, dropOldest);
- }
- void PacketQueue::enablePriorityQueue(bool enable) {
- std::lock_guard<std::mutex> lock(queueMutex_);
-
- if (config_.priorityQueue == enable) {
- return;
- }
-
- // 如果要切换队列类型,需要迁移数据
- if (enable) {
- // 从普通队列迁移到优先级队列
- while (!normalQueue_.empty()) {
- auto item = std::move(normalQueue_.front());
- normalQueue_.pop();
- priorityQueue_.push(std::move(item));
- }
- } else {
- // 从优先级队列迁移到普通队列
- std::vector<std::unique_ptr<PacketQueueItem>> tempItems;
- while (!priorityQueue_.empty()) {
- tempItems.push_back(std::move(const_cast<std::unique_ptr<PacketQueueItem>&>(priorityQueue_.top())));
- priorityQueue_.pop();
- }
-
- // 按时间戳排序后加入普通队列
- std::sort(tempItems.begin(), tempItems.end(),
- [](const std::unique_ptr<PacketQueueItem>& a, const std::unique_ptr<PacketQueueItem>& b) {
- return a->dts < b->dts;
- });
-
- for (auto& item : tempItems) {
- normalQueue_.push(std::move(item));
- }
- }
-
- config_.priorityQueue = enable;
- Logger::debug("Priority queue {}", enable ? "enabled" : "disabled");
- }
- size_t PacketQueue::size() const {
- std::lock_guard<std::mutex> lock(queueMutex_);
- return config_.priorityQueue ? priorityQueue_.size() : normalQueue_.size();
- }
- size_t PacketQueue::bytes() const {
- return totalBytes_.load();
- }
- bool PacketQueue::empty() const {
- std::lock_guard<std::mutex> lock(queueMutex_);
- return config_.priorityQueue ? priorityQueue_.empty() : normalQueue_.empty();
- }
- bool PacketQueue::full() const {
- std::lock_guard<std::mutex> lock(queueMutex_);
- size_t currentSize = config_.priorityQueue ? priorityQueue_.size() : normalQueue_.size();
- return currentSize >= config_.maxSize;
- }
- bool PacketQueue::fullByBytes() const {
- return totalBytes_.load() >= config_.maxBytes;
- }
- size_t PacketQueue::capacity() const {
- return config_.maxSize;
- }
- size_t PacketQueue::capacityBytes() const {
- return config_.maxBytes;
- }
- PacketQueueStats PacketQueue::getStats() const {
- std::lock_guard<std::mutex> lock(statsMutex_);
- PacketQueueStats stats = stats_;
- stats.currentSize = size();
- stats.totalBytes = bytes();
- return stats;
- }
- void PacketQueue::resetStats() {
- std::lock_guard<std::mutex> lock(statsMutex_);
- stats_ = PacketQueueStats();
- Logger::debug("Packet queue stats reset");
- }
- void PacketQueue::enablePacketDrop(bool enable, double maxLatency, int dropRatio) {
- config_.enablePacketDrop = enable;
- config_.maxLatency = maxLatency;
- config_.dropRatio = dropRatio;
-
- Logger::debug("Packet drop enabled: {}, maxLatency: {}ms, dropRatio: {}",
- enable, maxLatency, dropRatio);
- }
- void PacketQueue::setPacketDropCallback(std::function<bool(const PacketQueueItem&)> callback) {
- packetDropCallback_ = callback;
- }
- void PacketQueue::setBlocking(bool blocking) {
- blocking_ = blocking;
- if (!blocking) {
- notEmpty_.notify_all();
- notFull_.notify_all();
- }
- Logger::debug("Packet queue blocking mode: {}", blocking);
- }
- void PacketQueue::wakeup() {
- notEmpty_.notify_all();
- notFull_.notify_all();
- }
- void PacketQueue::setStreamFilter(const std::vector<int>& allowedStreams) {
- std::lock_guard<std::mutex> lock(streamFilterMutex_);
- allowedStreams_ = allowedStreams;
- hasStreamFilter_ = !allowedStreams.empty();
- Logger::debug("Stream filter set with {} allowed streams", allowedStreams.size());
- }
- void PacketQueue::clearStreamFilter() {
- std::lock_guard<std::mutex> lock(streamFilterMutex_);
- allowedStreams_.clear();
- hasStreamFilter_ = false;
- Logger::debug("Stream filter cleared");
- }
- // 内部方法实现
- bool PacketQueue::shouldDropPacket(const PacketQueueItem& item) const {
- if (!config_.enablePacketDrop) {
- return false;
- }
-
- // 自定义丢包回调
- if (packetDropCallback_ && packetDropCallback_(item)) {
- return true;
- }
-
- // 基于延迟的丢包
- if (shouldDropByLatency(item)) {
- return true;
- }
-
- // 基于比例的丢包
- if (shouldDropByRatio()) {
- return true;
- }
-
- // 基于大小的丢包
- if (shouldDropBySize()) {
- return true;
- }
-
- return false;
- }
- void PacketQueue::dropOldestPacket() {
- if (config_.priorityQueue) {
- if (!priorityQueue_.empty()) {
- auto& oldestItem = const_cast<std::unique_ptr<PacketQueueItem>&>(priorityQueue_.top());
-
- if (dropCallback_) {
- dropCallback_(*oldestItem, "Drop oldest packet");
- }
-
- if (oldestItem->packet) {
- totalBytes_ -= oldestItem->packet->size;
- }
-
- priorityQueue_.pop();
-
- std::lock_guard<std::mutex> statsLock(statsMutex_);
- stats_.totalDropped++;
-
- Logger::debug("Dropped oldest packet from priority queue");
- }
- } else {
- if (!normalQueue_.empty()) {
- auto& oldestItem = normalQueue_.front();
-
- if (dropCallback_) {
- dropCallback_(*oldestItem, "Drop oldest packet");
- }
-
- if (oldestItem->packet) {
- totalBytes_ -= oldestItem->packet->size;
- }
-
- normalQueue_.pop();
-
- std::lock_guard<std::mutex> statsLock(statsMutex_);
- stats_.totalDropped++;
-
- Logger::debug("Dropped oldest packet from normal queue");
- }
- }
- }
- void PacketQueue::dropNewestPacket() {
- // 这个方法在enqueue时调用,不需要实际操作队列
- }
- void PacketQueue::dropLargestPacket() {
- // 找到最大的包并丢弃
- // 这里简化实现,直接丢弃最旧的包
- dropOldestPacket();
- }
- void PacketQueue::dropNonKeyPackets() {
- std::vector<std::unique_ptr<PacketQueueItem>> keyPackets;
-
- if (config_.priorityQueue) {
- while (!priorityQueue_.empty()) {
- auto item = std::move(const_cast<std::unique_ptr<PacketQueueItem>&>(priorityQueue_.top()));
- priorityQueue_.pop();
-
- if (item->isKeyPacket) {
- keyPackets.push_back(std::move(item));
- } else {
- if (dropCallback_) {
- dropCallback_(*item, "Drop non-key packet");
- }
-
- if (item->packet) {
- totalBytes_ -= item->packet->size;
- }
-
- std::lock_guard<std::mutex> statsLock(statsMutex_);
- stats_.totalDropped++;
- }
- }
-
- // 重新加入关键包
- for (auto& item : keyPackets) {
- priorityQueue_.push(std::move(item));
- }
- } else {
- while (!normalQueue_.empty()) {
- auto item = std::move(normalQueue_.front());
- normalQueue_.pop();
-
- if (item->isKeyPacket) {
- keyPackets.push_back(std::move(item));
- } else {
- if (dropCallback_) {
- dropCallback_(*item, "Drop non-key packet");
- }
-
- if (item->packet) {
- totalBytes_ -= item->packet->size;
- }
-
- std::lock_guard<std::mutex> statsLock(statsMutex_);
- stats_.totalDropped++;
- }
- }
-
- // 重新加入关键包
- for (auto& item : keyPackets) {
- normalQueue_.push(std::move(item));
- }
- }
-
- Logger::debug("Dropped non-key packets, kept {} key packets", keyPackets.size());
- }
- void PacketQueue::updateStats(const PacketQueueItem& item, bool isEnqueue) {
- std::lock_guard<std::mutex> lock(statsMutex_);
-
- if (isEnqueue) {
- stats_.totalEnqueued++;
- size_t currentSize = config_.priorityQueue ? priorityQueue_.size() : normalQueue_.size();
- stats_.maxSize = std::max(stats_.maxSize, currentSize);
-
- if (item.packet) {
- stats_.totalBytes += item.packet->size;
-
- // 更新平均包大小
- if (stats_.totalEnqueued == 1) {
- stats_.averagePacketSize = item.packet->size;
- } else {
- stats_.averagePacketSize = (stats_.averagePacketSize * (stats_.totalEnqueued - 1) + item.packet->size) / stats_.totalEnqueued;
- }
- }
- } else {
- stats_.totalDequeued++;
-
- // 计算等待时间
- double waitTime = calculateWaitTime(item);
- if (waitTime > 0) {
- // 更新平均等待时间
- if (stats_.totalDequeued == 1) {
- stats_.averageWaitTime = waitTime;
- } else {
- stats_.averageWaitTime = (stats_.averageWaitTime * (stats_.totalDequeued - 1) + waitTime) / stats_.totalDequeued;
- }
-
- stats_.maxWaitTime = std::max(stats_.maxWaitTime, waitTime);
- }
- }
-
- stats_.lastUpdateTime = std::chrono::steady_clock::now();
- }
- double PacketQueue::calculateWaitTime(const PacketQueueItem& item) const {
- auto now = std::chrono::steady_clock::now();
- auto duration = std::chrono::duration_cast<std::chrono::microseconds>(now - item.enqueueTime);
- return duration.count() / 1000.0; // 转换为毫秒
- }
- bool PacketQueue::isStreamAllowed(int streamIndex) const {
- if (!hasStreamFilter_) {
- return true;
- }
-
- std::lock_guard<std::mutex> lock(streamFilterMutex_);
- return std::find(allowedStreams_.begin(), allowedStreams_.end(), streamIndex) != allowedStreams_.end();
- }
- bool PacketQueue::shouldDropByLatency(const PacketQueueItem& item) const {
- double waitTime = calculateWaitTime(item);
- return waitTime > config_.maxLatency;
- }
- bool PacketQueue::shouldDropByRatio() const {
- if (config_.dropRatio <= 1) {
- return false;
- }
-
- uint64_t count = packetCounter_.fetch_add(1);
- return (count % config_.dropRatio) == 0;
- }
- bool PacketQueue::shouldDropBySize() const {
- return totalBytes_.load() > config_.maxBytes;
- }
- void PacketQueue::enqueueToNormalQueue(std::unique_ptr<PacketQueueItem> item) {
- normalQueue_.push(std::move(item));
- }
- void PacketQueue::enqueueToPriorityQueue(std::unique_ptr<PacketQueueItem> item) {
- priorityQueue_.push(std::move(item));
- }
- std::unique_ptr<PacketQueueItem> PacketQueue::dequeueFromNormalQueue() {
- if (normalQueue_.empty()) {
- return nullptr;
- }
-
- auto item = std::move(normalQueue_.front());
- normalQueue_.pop();
- return item;
- }
- std::unique_ptr<PacketQueueItem> PacketQueue::dequeueFromPriorityQueue() {
- if (priorityQueue_.empty()) {
- return nullptr;
- }
-
- auto item = std::move(const_cast<std::unique_ptr<PacketQueueItem>&>(priorityQueue_.top()));
- priorityQueue_.pop();
- return item;
- }
- // 多流包队列实现
- MultiStreamPacketQueue::MultiStreamPacketQueue(const PacketQueueConfig& config)
- : defaultConfig_(config) {
- Logger::debug("MultiStreamPacketQueue created");
- }
- MultiStreamPacketQueue::~MultiStreamPacketQueue() {
- clear();
- Logger::debug("MultiStreamPacketQueue destroyed");
- }
- ErrorCode MultiStreamPacketQueue::addStream(int streamIndex, const PacketQueueConfig& config) {
- std::unique_lock<std::shared_mutex> lock(streamsMutex_);
-
- if (streamQueues_.find(streamIndex) != streamQueues_.end()) {
- Logger::warning("Stream {} already exists", streamIndex);
- return ErrorCode::ALREADY_EXISTS;
- }
-
- streamQueues_[streamIndex] = std::make_unique<PacketQueue>(config);
- Logger::debug("Added stream: {}", streamIndex);
-
- return ErrorCode::SUCCESS;
- }
- ErrorCode MultiStreamPacketQueue::removeStream(int streamIndex) {
- std::unique_lock<std::shared_mutex> lock(streamsMutex_);
-
- auto it = streamQueues_.find(streamIndex);
- if (it == streamQueues_.end()) {
- Logger::warning("Stream {} not found", streamIndex);
- return ErrorCode::NOT_FOUND;
- }
-
- streamQueues_.erase(it);
- Logger::debug("Removed stream: {}", streamIndex);
-
- return ErrorCode::SUCCESS;
- }
- bool MultiStreamPacketQueue::hasStream(int streamIndex) const {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
- return streamQueues_.find(streamIndex) != streamQueues_.end();
- }
- std::vector<int> MultiStreamPacketQueue::getStreamIndices() const {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- std::vector<int> indices;
- for (const auto& pair : streamQueues_) {
- indices.push_back(pair.first);
- }
-
- return indices;
- }
- ErrorCode MultiStreamPacketQueue::enqueue(AVPacket* packet, int streamIndex) {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- auto it = streamQueues_.find(streamIndex);
- if (it == streamQueues_.end()) {
- // 自动创建流队列
- lock.unlock();
- addStream(streamIndex, defaultConfig_);
- lock.lock();
-
- it = streamQueues_.find(streamIndex);
- if (it == streamQueues_.end()) {
- return ErrorCode::NOT_FOUND;
- }
- }
-
- return it->second->enqueue(packet, streamIndex);
- }
- AVPacket* MultiStreamPacketQueue::dequeue(int streamIndex) {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- auto it = streamQueues_.find(streamIndex);
- if (it == streamQueues_.end()) {
- return nullptr;
- }
-
- return it->second->dequeuePacket();
- }
- AVPacket* MultiStreamPacketQueue::dequeue(int streamIndex, int timeoutMs) {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- auto it = streamQueues_.find(streamIndex);
- if (it == streamQueues_.end()) {
- return nullptr;
- }
-
- return it->second->dequeuePacket(timeoutMs);
- }
- ErrorCode MultiStreamPacketQueue::enqueueToAll(AVPacket* packet) {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- ErrorCode result = ErrorCode::SUCCESS;
- for (const auto& pair : streamQueues_) {
- // 为每个流创建包的副本
- AVPacket* packetCopy = av_packet_alloc();
- if (!packetCopy) {
- result = ErrorCode::OUT_OF_MEMORY;
- continue;
- }
-
- if (av_packet_ref(packetCopy, packet) < 0) {
- av_packet_free(&packetCopy);
- result = ErrorCode::COPY_FAILED;
- continue;
- }
-
- ErrorCode enqueueResult = pair.second->enqueue(packetCopy, pair.first);
- if (enqueueResult != ErrorCode::SUCCESS) {
- av_packet_free(&packetCopy);
- result = enqueueResult;
- }
- }
-
- return result;
- }
- std::vector<AVPacket*> MultiStreamPacketQueue::dequeueFromAll() {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- std::vector<AVPacket*> packets;
- for (const auto& pair : streamQueues_) {
- AVPacket* packet = pair.second->dequeuePacket();
- packets.push_back(packet); // 可能为nullptr
- }
-
- return packets;
- }
- AVPacket* MultiStreamPacketQueue::dequeueSynchronized() {
- std::lock_guard<std::mutex> lock(syncMutex_);
-
- // 从同步队列中取出时间戳最小的包
- if (!syncQueue_.empty()) {
- SyncItem item = syncQueue_.top();
- syncQueue_.pop();
- return item.packet;
- }
-
- // 如果同步队列为空,从各个流队列中取包并排序
- std::vector<SyncItem> items;
-
- {
- std::shared_lock<std::shared_mutex> streamLock(streamsMutex_);
- for (const auto& pair : streamQueues_) {
- AVPacket* packet = pair.second->dequeuePacket(10); // 短超时
- if (packet) {
- SyncItem item;
- item.packet = packet;
- item.streamIndex = pair.first;
- item.timestamp = (packet->dts != AV_NOPTS_VALUE) ? packet->dts : packet->pts;
- items.push_back(item);
- }
- }
- }
-
- if (items.empty()) {
- return nullptr;
- }
-
- // 按时间戳排序
- std::sort(items.begin(), items.end(), [](const SyncItem& a, const SyncItem& b) {
- return a.timestamp < b.timestamp;
- });
-
- // 返回时间戳最小的包
- AVPacket* result = items[0].packet;
-
- // 将其余包放入同步队列
- for (size_t i = 1; i < items.size(); ++i) {
- syncQueue_.push(items[i]);
- }
-
- return result;
- }
- std::vector<AVPacket*> MultiStreamPacketQueue::dequeueSynchronizedBatch(size_t maxCount) {
- std::vector<AVPacket*> packets;
-
- for (size_t i = 0; i < maxCount; ++i) {
- AVPacket* packet = dequeueSynchronized();
- if (!packet) {
- break;
- }
- packets.push_back(packet);
- }
-
- return packets;
- }
- void MultiStreamPacketQueue::clear() {
- std::unique_lock<std::shared_mutex> lock(streamsMutex_);
-
- for (auto& pair : streamQueues_) {
- pair.second->clear();
- }
-
- // 清空同步队列
- std::lock_guard<std::mutex> syncLock(syncMutex_);
- while (!syncQueue_.empty()) {
- SyncItem item = syncQueue_.top();
- syncQueue_.pop();
- if (item.packet) {
- av_packet_free(&item.packet);
- }
- }
-
- Logger::debug("All stream queues cleared");
- }
- void MultiStreamPacketQueue::clearStream(int streamIndex) {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- auto it = streamQueues_.find(streamIndex);
- if (it != streamQueues_.end()) {
- it->second->clear();
- Logger::debug("Stream {} queue cleared", streamIndex);
- }
- }
- void MultiStreamPacketQueue::flush() {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- for (auto& pair : streamQueues_) {
- pair.second->flush();
- }
-
- Logger::debug("All stream queues flushed");
- }
- void MultiStreamPacketQueue::flushStream(int streamIndex) {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- auto it = streamQueues_.find(streamIndex);
- if (it != streamQueues_.end()) {
- it->second->flush();
- Logger::debug("Stream {} queue flushed", streamIndex);
- }
- }
- size_t MultiStreamPacketQueue::size() const {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- size_t totalSize = 0;
- for (const auto& pair : streamQueues_) {
- totalSize += pair.second->size();
- }
-
- return totalSize;
- }
- size_t MultiStreamPacketQueue::size(int streamIndex) const {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- auto it = streamQueues_.find(streamIndex);
- if (it != streamQueues_.end()) {
- return it->second->size();
- }
-
- return 0;
- }
- size_t MultiStreamPacketQueue::bytes() const {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- size_t totalBytes = 0;
- for (const auto& pair : streamQueues_) {
- totalBytes += pair.second->bytes();
- }
-
- return totalBytes;
- }
- size_t MultiStreamPacketQueue::bytes(int streamIndex) const {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- auto it = streamQueues_.find(streamIndex);
- if (it != streamQueues_.end()) {
- return it->second->bytes();
- }
-
- return 0;
- }
- bool MultiStreamPacketQueue::empty() const {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- for (const auto& pair : streamQueues_) {
- if (!pair.second->empty()) {
- return false;
- }
- }
-
- return true;
- }
- bool MultiStreamPacketQueue::empty(int streamIndex) const {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- auto it = streamQueues_.find(streamIndex);
- if (it != streamQueues_.end()) {
- return it->second->empty();
- }
-
- return true;
- }
- std::map<int, PacketQueueStats> MultiStreamPacketQueue::getAllStats() const {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- std::map<int, PacketQueueStats> allStats;
- for (const auto& pair : streamQueues_) {
- allStats[pair.first] = pair.second->getStats();
- }
-
- return allStats;
- }
- PacketQueueStats MultiStreamPacketQueue::getStats(int streamIndex) const {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- auto it = streamQueues_.find(streamIndex);
- if (it != streamQueues_.end()) {
- return it->second->getStats();
- }
-
- return PacketQueueStats();
- }
- void MultiStreamPacketQueue::resetStats() {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- for (auto& pair : streamQueues_) {
- pair.second->resetStats();
- }
-
- Logger::debug("All stream queue stats reset");
- }
- void MultiStreamPacketQueue::resetStats(int streamIndex) {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- auto it = streamQueues_.find(streamIndex);
- if (it != streamQueues_.end()) {
- it->second->resetStats();
- Logger::debug("Stream {} queue stats reset", streamIndex);
- }
- }
- void MultiStreamPacketQueue::setBlocking(bool blocking) {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- for (auto& pair : streamQueues_) {
- pair.second->setBlocking(blocking);
- }
-
- Logger::debug("All stream queues blocking mode: {}", blocking);
- }
- void MultiStreamPacketQueue::wakeupAll() {
- std::shared_lock<std::shared_mutex> lock(streamsMutex_);
-
- for (auto& pair : streamQueues_) {
- pair.second->wakeup();
- }
- }
- // 工厂类实现
- std::unique_ptr<PacketQueue> PacketQueueFactory::createStandardQueue(size_t maxSize) {
- PacketQueueConfig config;
- config.maxSize = maxSize;
- config.dropOnFull = true;
- config.dropOldest = true;
- config.timeoutMs = 1000;
- config.enableStats = true;
-
- return std::make_unique<PacketQueue>(config);
- }
- std::unique_ptr<PacketQueue> PacketQueueFactory::createLowLatencyQueue(size_t maxSize) {
- PacketQueueConfig config;
- config.maxSize = maxSize;
- config.dropOnFull = true;
- config.dropOldest = true;
- config.timeoutMs = 100;
- config.enableStats = true;
- config.enablePacketDrop = true;
- config.maxLatency = 100.0;
- config.dropRatio = 3;
-
- return std::make_unique<PacketQueue>(config);
- }
- std::unique_ptr<PacketQueue> PacketQueueFactory::createHighCapacityQueue(size_t maxSize) {
- PacketQueueConfig config;
- config.maxSize = maxSize;
- config.maxBytes = 200 * 1024 * 1024; // 200MB
- config.dropOnFull = false; // 不丢包,等待
- config.dropOldest = true;
- config.timeoutMs = 5000;
- config.enableStats = true;
- config.enablePacketDrop = false;
-
- return std::make_unique<PacketQueue>(config);
- }
- std::unique_ptr<PacketQueue> PacketQueueFactory::createRealtimeQueue(size_t maxSize, double maxLatency) {
- PacketQueueConfig config;
- config.maxSize = maxSize;
- config.dropOnFull = true;
- config.dropOldest = true;
- config.timeoutMs = 50;
- config.enableStats = true;
- config.enablePacketDrop = true;
- config.maxLatency = maxLatency;
- config.dropRatio = 2;
- config.dropNonKeyPackets = true;
-
- return std::make_unique<PacketQueue>(config);
- }
- std::unique_ptr<PacketQueue> PacketQueueFactory::createPriorityQueue(size_t maxSize) {
- PacketQueueConfig config;
- config.maxSize = maxSize;
- config.priorityQueue = true;
- config.dropOnFull = true;
- config.dropOldest = true;
- config.timeoutMs = 1000;
- config.enableStats = true;
-
- return std::make_unique<PacketQueue>(config);
- }
- std::unique_ptr<MultiStreamPacketQueue> PacketQueueFactory::createMultiStreamQueue(const PacketQueueConfig& config) {
- return std::make_unique<MultiStreamPacketQueue>(config);
- }
- } // namespace utils
- } // namespace av
|