#ifndef AV_UTILS_FRAME_QUEUE_H #define AV_UTILS_FRAME_QUEUE_H #include "../base/types.h" #include #include #include #include #include #include #include #include #include #include extern "C" { #include #include } namespace av { namespace utils { using namespace av; // 帧队列项 - 使用智能指针统一管理内存 struct FrameQueueItem { AVFramePtr frame; // 帧数据 - 使用智能指针 int64_t pts = AV_NOPTS_VALUE; // 显示时间戳 int64_t dts = AV_NOPTS_VALUE; // 解码时间戳 double duration = 0.0; // 帧持续时间 int streamIndex = -1; // 流索引 bool isKeyFrame = false; // 是否关键帧 std::chrono::steady_clock::time_point enqueueTime; // 入队时间 FrameQueueItem() { enqueueTime = std::chrono::steady_clock::now(); } // 从智能指针构造 FrameQueueItem(AVFramePtr f, int stream = -1) : frame(std::move(f)), streamIndex(stream) { if (frame) { pts = frame->pts; dts = frame->pkt_dts; isKeyFrame = (frame->key_frame == 1); } enqueueTime = std::chrono::steady_clock::now(); } // 从原生指针构造(为了兼容性,自动转换为智能指针) FrameQueueItem(AVFrame* f, int stream = -1) : streamIndex(stream) { if (f) { frame = AVFramePtr(f); // 转换为智能指针 pts = frame->pts; dts = frame->pkt_dts; isKeyFrame = (frame->key_frame == 1); } enqueueTime = std::chrono::steady_clock::now(); } // 智能指针自动管理内存,无需手动析构 ~FrameQueueItem() = default; // 移动构造函数 FrameQueueItem(FrameQueueItem&& other) noexcept : frame(std::move(other.frame)), pts(other.pts), dts(other.dts), duration(other.duration), streamIndex(other.streamIndex), isKeyFrame(other.isKeyFrame), enqueueTime(other.enqueueTime) { // frame 已经被移动,无需置空 } // 移动赋值操作符 FrameQueueItem& operator=(FrameQueueItem&& other) noexcept { if (this != &other) { frame = std::move(other.frame); pts = other.pts; dts = other.dts; duration = other.duration; streamIndex = other.streamIndex; isKeyFrame = other.isKeyFrame; enqueueTime = other.enqueueTime; // other.frame 已经被移动,无需手动置空 } return *this; } // 禁用拷贝 FrameQueueItem(const FrameQueueItem&) = delete; FrameQueueItem& operator=(const FrameQueueItem&) = delete; }; // 帧队列统计信息 struct FrameQueueStats { size_t currentSize = 0; // 当前队列大小 size_t maxSize = 0; // 最大队列大小 uint64_t totalEnqueued = 0; // 总入队数量 uint64_t totalDequeued = 0; // 总出队数量 uint64_t totalDropped = 0; // 总丢弃数量 double averageWaitTime = 0.0; // 平均等待时间(毫秒) double maxWaitTime = 0.0; // 最大等待时间(毫秒) std::chrono::steady_clock::time_point lastUpdateTime; FrameQueueStats() { lastUpdateTime = std::chrono::steady_clock::now(); } }; // 帧队列配置 struct FrameQueueConfig { size_t maxSize = 100; // 最大队列大小 bool dropOnFull = true; // 队列满时是否丢弃新帧 bool dropOldest = true; // 丢弃最旧的帧(false则丢弃最新的) int timeoutMs = 1000; // 超时时间(毫秒) bool enableStats = true; // 启用统计信息 // 丢帧策略 bool enableFrameDrop = false; // 启用智能丢帧 double maxLatency = 100.0; // 最大延迟(毫秒) int dropRatio = 2; // 丢帧比例(每N帧丢1帧) }; // 帧队列类 class FrameQueue { public: explicit FrameQueue(const FrameQueueConfig& config = FrameQueueConfig()); ~FrameQueue(); // 基本操作 ErrorCode enqueue(std::unique_ptr item); std::unique_ptr dequeue(); std::unique_ptr dequeue(int timeoutMs); // 安全接口(使用智能指针) ErrorCode enqueue(AVFramePtr frame, int streamIndex = -1); // 预览方法(不移除队列中的帧) const FrameQueueItem* peek() const; const FrameQueueItem* peek(int index) const; // 预览第index个帧(0为队首) const AVFramePtr& peekFramePtr() const; // 预览队首帧 // 帧操作接口 AVFramePtr dequeueFrame(); AVFramePtr dequeueFrame(int timeoutMs); // 便捷别名 ErrorCode push(AVFramePtr frame, int streamIndex = -1) { return enqueue(std::move(frame), streamIndex); } AVFramePtr pop() { return dequeueFrame(); } AVFramePtr pop(int timeoutMs) { return dequeueFrame(timeoutMs); } // 队列控制 void clear(); void flush(); void setMaxSize(size_t maxSize); void setDropPolicy(bool dropOnFull, bool dropOldest = true); // 状态查询 size_t size() const; bool empty() const; bool full() const; size_t capacity() const; // 统计信息 FrameQueueStats getStats() const; void resetStats(); // 帧丢弃策略 void enableFrameDrop(bool enable, double maxLatency = 100.0, int dropRatio = 2); void setFrameDropCallback(std::function callback); // 阻塞控制 void setBlocking(bool blocking); void wakeup(); // 回调设置 using EnqueueCallback = std::function; using DequeueCallback = std::function; using DropCallback = std::function; void setEnqueueCallback(EnqueueCallback callback) { enqueueCallback_ = callback; } void setDequeueCallback(DequeueCallback callback) { dequeueCallback_ = callback; } void setDropCallback(DropCallback callback) { dropCallback_ = callback; } protected: // 内部方法 bool shouldDropFrame(const FrameQueueItem& item) const; void dropOldestFrame(); void dropNewestFrame(); void updateStats(const FrameQueueItem& item, bool isEnqueue); double calculateWaitTime(const FrameQueueItem& item) const; // 智能丢帧 bool shouldDropByLatency(const FrameQueueItem& item) const; bool shouldDropByRatio() const; private: FrameQueueConfig config_; // 队列数据 - 使用deque以支持随机访问 std::deque> queue_; mutable std::mutex queueMutex_; std::condition_variable notEmpty_; std::condition_variable notFull_; // 状态控制 std::atomic blocking_{true}; std::atomic shutdown_{false}; // 统计信息 mutable std::mutex statsMutex_; FrameQueueStats stats_; // 丢帧控制 mutable std::atomic frameCounter_{0}; std::function frameDropCallback_; // 回调函数 EnqueueCallback enqueueCallback_; DequeueCallback dequeueCallback_; DropCallback dropCallback_; }; // 多流帧队列 class MultiStreamFrameQueue { public: explicit MultiStreamFrameQueue(const FrameQueueConfig& config = FrameQueueConfig()); ~MultiStreamFrameQueue(); // 流管理 ErrorCode addStream(int streamIndex, const FrameQueueConfig& config = FrameQueueConfig()); ErrorCode removeStream(int streamIndex); bool hasStream(int streamIndex) const; std::vector getStreamIndices() const; // 安全帧操作接口 ErrorCode enqueue(AVFramePtr frame, int streamIndex); AVFramePtr dequeue(int streamIndex); AVFramePtr dequeue(int streamIndex, int timeoutMs); // 安全批量操作接口 ErrorCode enqueueToAll(AVFramePtr frame); std::vector dequeueFromAll(); // 队列控制 void clear(); void clearStream(int streamIndex); void flush(); void flushStream(int streamIndex); // 状态查询 size_t size() const; size_t size(int streamIndex) const; bool empty() const; bool empty(int streamIndex) const; // 统计信息 std::map getAllStats() const; FrameQueueStats getStats(int streamIndex) const; void resetStats(); void resetStats(int streamIndex); // 同步控制 void setBlocking(bool blocking); void wakeupAll(); private: FrameQueueConfig defaultConfig_; std::map> streamQueues_; mutable std::shared_mutex streamsMutex_; }; // 帧队列工厂 class FrameQueueFactory { public: // 创建标准帧队列 static std::unique_ptr createStandardQueue(size_t maxSize = 100); // 创建低延迟队列 static std::unique_ptr createLowLatencyQueue(size_t maxSize = 10); // 创建高容量队列 static std::unique_ptr createHighCapacityQueue(size_t maxSize = 1000); // 创建实时队列(启用智能丢帧) static std::unique_ptr createRealtimeQueue(size_t maxSize = 50, double maxLatency = 50.0); // 创建多流队列 static std::unique_ptr createMultiStreamQueue(const FrameQueueConfig& config = FrameQueueConfig()); }; } // namespace utils } // namespace av #endif // AV_UTILS_FRAME_QUEUE_H