#ifndef AV_UTILS_PACKET_QUEUE_H #define AV_UTILS_PACKET_QUEUE_H #include "../base/types.h" #include #include #include #include #include #include #include #include #include #include extern "C" { #include #include } namespace av { namespace utils { using namespace av; // 包队列项 struct PacketQueueItem { AVPacket* packet = nullptr; // 包数据 int64_t pts = AV_NOPTS_VALUE; // 显示时间戳 int64_t dts = AV_NOPTS_VALUE; // 解码时间戳 double duration = 0.0; // 包持续时间 int streamIndex = -1; // 流索引 bool isKeyPacket = false; // 是否关键包 int priority = 0; // 优先级(数值越大优先级越高) std::chrono::steady_clock::time_point enqueueTime; // 入队时间 PacketQueueItem() { enqueueTime = std::chrono::steady_clock::now(); } PacketQueueItem(AVPacket* p, int stream = -1, int prio = 0) : packet(p), streamIndex(stream), priority(prio) { if (packet) { pts = packet->pts; dts = packet->dts; isKeyPacket = (packet->flags & AV_PKT_FLAG_KEY) != 0; } enqueueTime = std::chrono::steady_clock::now(); } ~PacketQueueItem() { if (packet) { av_packet_free(&packet); } } // 移动构造函数 PacketQueueItem(PacketQueueItem&& other) noexcept : packet(other.packet), pts(other.pts), dts(other.dts), duration(other.duration), streamIndex(other.streamIndex), isKeyPacket(other.isKeyPacket), priority(other.priority), enqueueTime(other.enqueueTime) { other.packet = nullptr; } // 移动赋值操作符 PacketQueueItem& operator=(PacketQueueItem&& other) noexcept { if (this != &other) { if (packet) { av_packet_free(&packet); } packet = other.packet; pts = other.pts; dts = other.dts; duration = other.duration; streamIndex = other.streamIndex; isKeyPacket = other.isKeyPacket; priority = other.priority; enqueueTime = other.enqueueTime; other.packet = nullptr; } return *this; } // 禁用拷贝 PacketQueueItem(const PacketQueueItem&) = delete; PacketQueueItem& operator=(const PacketQueueItem&) = delete; }; // 包队列统计信息 struct PacketQueueStats { size_t currentSize = 0; // 当前队列大小 size_t maxSize = 0; // 最大队列大小 uint64_t totalEnqueued = 0; // 总入队数量 uint64_t totalDequeued = 0; // 总出队数量 uint64_t totalDropped = 0; // 总丢弃数量 uint64_t totalBytes = 0; // 总字节数 double averageWaitTime = 0.0; // 平均等待时间(毫秒) double maxWaitTime = 0.0; // 最大等待时间(毫秒) double averagePacketSize = 0.0; // 平均包大小 std::chrono::steady_clock::time_point lastUpdateTime; PacketQueueStats() { lastUpdateTime = std::chrono::steady_clock::now(); } }; // 包队列配置 struct PacketQueueConfig { size_t maxSize = 200; // 最大队列大小 size_t maxBytes = 50 * 1024 * 1024; // 最大字节数(50MB) bool dropOnFull = true; // 队列满时是否丢弃新包 bool dropOldest = true; // 丢弃最旧的包(false则丢弃最新的) bool priorityQueue = false; // 是否启用优先级队列 int timeoutMs = 1000; // 超时时间(毫秒) bool enableStats = true; // 启用统计信息 // 丢包策略 bool enablePacketDrop = false; // 启用智能丢包 double maxLatency = 200.0; // 最大延迟(毫秒) int dropRatio = 3; // 丢包比例(每N包丢1包) bool dropNonKeyPackets = true; // 优先丢弃非关键包 }; // 优先级比较器 struct PacketPriorityCompare { bool operator()(const std::unique_ptr& a, const std::unique_ptr& b) const { // 优先级高的在前 if (a->priority != b->priority) { return a->priority < b->priority; } // 关键包优先 if (a->isKeyPacket != b->isKeyPacket) { return !a->isKeyPacket && b->isKeyPacket; } // 时间戳小的在前 return a->dts > b->dts; } }; // 包队列类 class PacketQueue { public: explicit PacketQueue(const PacketQueueConfig& config = PacketQueueConfig()); ~PacketQueue(); // 基本操作 ErrorCode enqueue(std::unique_ptr item); std::unique_ptr dequeue(); std::unique_ptr dequeue(int timeoutMs); // 便捷方法 ErrorCode enqueue(AVPacket* packet, int streamIndex = -1, int priority = 0); AVPacket* dequeuePacket(); AVPacket* dequeuePacket(int timeoutMs); // 兼容性方法 (push/pop 别名) ErrorCode push(AVPacket* packet, int streamIndex = -1, int priority = 0) { return enqueue(packet, streamIndex, priority); } AVPacket* pop() { return dequeuePacket(); } AVPacket* pop(int timeoutMs) { return dequeuePacket(timeoutMs); } // 优先级队列操作 ErrorCode enqueueWithPriority(AVPacket* packet, int priority, int streamIndex = -1); std::unique_ptr dequeueHighestPriority(); // 队列控制 void clear(); void flush(); void setMaxSize(size_t maxSize); void setMaxBytes(size_t maxBytes); void setDropPolicy(bool dropOnFull, bool dropOldest = true); void enablePriorityQueue(bool enable); // 状态查询 size_t size() const; size_t bytes() const; bool empty() const; bool full() const; bool fullByBytes() const; size_t capacity() const; size_t capacityBytes() const; // 统计信息 PacketQueueStats getStats() const; void resetStats(); // 包丢弃策略 void enablePacketDrop(bool enable, double maxLatency = 200.0, int dropRatio = 3); void setPacketDropCallback(std::function callback); // 阻塞控制 void setBlocking(bool blocking); void wakeup(); // 流过滤 void setStreamFilter(const std::vector& allowedStreams); void clearStreamFilter(); // 回调设置 using EnqueueCallback = std::function; using DequeueCallback = std::function; using DropCallback = std::function; void setEnqueueCallback(EnqueueCallback callback) { enqueueCallback_ = callback; } void setDequeueCallback(DequeueCallback callback) { dequeueCallback_ = callback; } void setDropCallback(DropCallback callback) { dropCallback_ = callback; } protected: // 内部方法 bool shouldDropPacket(const PacketQueueItem& item) const; void dropOldestPacket(); void dropNewestPacket(); void dropLargestPacket(); void dropNonKeyPackets(); void updateStats(const PacketQueueItem& item, bool isEnqueue); double calculateWaitTime(const PacketQueueItem& item) const; bool isStreamAllowed(int streamIndex) const; // 智能丢包 bool shouldDropByLatency(const PacketQueueItem& item) const; bool shouldDropByRatio() const; bool shouldDropBySize() const; // 队列操作 void enqueueToNormalQueue(std::unique_ptr item); void enqueueToPriorityQueue(std::unique_ptr item); std::unique_ptr dequeueFromNormalQueue(); std::unique_ptr dequeueFromPriorityQueue(); private: PacketQueueConfig config_; // 队列数据 std::queue> normalQueue_; std::priority_queue, std::vector>, PacketPriorityCompare> priorityQueue_; mutable std::mutex queueMutex_; std::condition_variable notEmpty_; std::condition_variable notFull_; // 状态控制 std::atomic blocking_{true}; std::atomic shutdown_{false}; std::atomic totalBytes_{0}; // 统计信息 mutable std::mutex statsMutex_; PacketQueueStats stats_; // 丢包控制 mutable std::atomic packetCounter_{0}; std::function packetDropCallback_; // 流过滤 std::vector allowedStreams_; bool hasStreamFilter_ = false; mutable std::mutex streamFilterMutex_; // 回调函数 EnqueueCallback enqueueCallback_; DequeueCallback dequeueCallback_; DropCallback dropCallback_; }; // 多流包队列 class MultiStreamPacketQueue { public: explicit MultiStreamPacketQueue(const PacketQueueConfig& config = PacketQueueConfig()); ~MultiStreamPacketQueue(); // 流管理 ErrorCode addStream(int streamIndex, const PacketQueueConfig& config = PacketQueueConfig()); ErrorCode removeStream(int streamIndex); bool hasStream(int streamIndex) const; std::vector getStreamIndices() const; // 包操作 ErrorCode enqueue(AVPacket* packet, int streamIndex); AVPacket* dequeue(int streamIndex); AVPacket* dequeue(int streamIndex, int timeoutMs); // 批量操作 ErrorCode enqueueToAll(AVPacket* packet); std::vector dequeueFromAll(); // 同步出队(按时间戳顺序) AVPacket* dequeueSynchronized(); std::vector dequeueSynchronizedBatch(size_t maxCount = 10); // 队列控制 void clear(); void clearStream(int streamIndex); void flush(); void flushStream(int streamIndex); // 状态查询 size_t size() const; size_t size(int streamIndex) const; size_t bytes() const; size_t bytes(int streamIndex) const; bool empty() const; bool empty(int streamIndex) const; // 统计信息 std::map getAllStats() const; PacketQueueStats getStats(int streamIndex) const; void resetStats(); void resetStats(int streamIndex); // 同步控制 void setBlocking(bool blocking); void wakeupAll(); private: PacketQueueConfig defaultConfig_; std::map> streamQueues_; mutable std::shared_mutex streamsMutex_; // 同步出队相关 struct SyncItem { AVPacket* packet; int streamIndex; int64_t timestamp; bool operator>(const SyncItem& other) const { return timestamp > other.timestamp; } }; std::priority_queue, std::greater> syncQueue_; mutable std::mutex syncMutex_; }; // 包队列工厂 class PacketQueueFactory { public: // 创建标准包队列 static std::unique_ptr createStandardQueue(size_t maxSize = 200); // 创建低延迟队列 static std::unique_ptr createLowLatencyQueue(size_t maxSize = 50); // 创建高容量队列 static std::unique_ptr createHighCapacityQueue(size_t maxSize = 2000); // 创建实时队列(启用智能丢包) static std::unique_ptr createRealtimeQueue(size_t maxSize = 100, double maxLatency = 100.0); // 创建优先级队列 static std::unique_ptr createPriorityQueue(size_t maxSize = 200); // 创建多流队列 static std::unique_ptr createMultiStreamQueue(const PacketQueueConfig& config = PacketQueueConfig()); }; } // namespace utils } // namespace av #endif // AV_UTILS_PACKET_QUEUE_H