#ifndef RING_BUFFER #define RING_BUFFER #include #include #include #include #include #include "error_define.h" #include "log_helper.h" namespace am { template struct ring_frame { T type; int len; }; template class ring_buffer { public: ring_buffer(unsigned int size = 1920 * 1080 * 4 * 2, size_t max_frames = 1) // 回到1帧最小延迟 { _size = size; _buffer_size = size; // 记录缓冲区大小 _head = _tail = 0; _max_frames = max_frames; _dropped_frames.store(0, std::memory_order_relaxed); _buf = new uint8_t[size]; } ~ring_buffer() { if (_buf) delete[] _buf; } void put(const void *data, int len, const T &type) { std::lock_guard locker(_lock); // 智能丢帧策略:根据帧重要性决定是否丢弃 if (_frames.size() >= _max_frames) { // 检查是否为关键帧或重要帧(这里简化处理) bool is_important_frame = (len > _buffer_size * 0.8); // 大帧通常更重要 if (!is_important_frame && !_frames.empty()) { // 只丢弃非重要帧 ring_frame old_frame = _frames.front(); _frames.pop(); _tail = (_tail + old_frame.len) % _size; _dropped_frames.fetch_add(1, std::memory_order_relaxed); } else if (_frames.size() >= _max_frames * 2) { // 缓冲区严重积压时强制丢帧 ring_frame old_frame = _frames.front(); _frames.pop(); _tail = (_tail + old_frame.len) % _size; _dropped_frames.fetch_add(1, std::memory_order_relaxed); } } if (_head + len <= _size) { memcpy(_buf + _head, data, len); _head += len; } else if (_head + len > _size) { int remain = len - (_size - _head); if (len - remain > 0) memcpy(_buf + _head, data, len - remain); if (remain > 0) memcpy(_buf, (unsigned char *) data + len - remain, remain); _head = remain; } struct ring_frame frame; frame.len = len; frame.type = type; _frames.push(frame); } int get(void *data, int len, T &type) { std::lock_guard locker(_lock); int retLen = 0; if (_frames.size() <= 0) { retLen = 0; return retLen; } struct ring_frame frame = _frames.front(); _frames.pop(); if (frame.len > len) { al_error("ringbuff::get need larger buffer"); return 0; } type = frame.type; retLen = frame.len; if (_tail + frame.len <= _size) { memcpy(data, _buf + _tail, frame.len); _tail += frame.len; } else { int remain = frame.len - (_size - _tail); if (frame.len - remain > 0) memcpy(data, _buf + _tail, frame.len - remain); if (remain > 0) memcpy((unsigned char *) data + frame.len - remain, _buf, remain); _tail = remain; } return retLen; } // 添加缓冲区监控方法 size_t get_pending_frames() const { std::lock_guard locker(_lock); return _frames.size(); } bool is_buffer_full(float threshold = 0.8f) const { std::lock_guard locker(_lock); return _frames.size() >= (static_cast(_max_frames * threshold)); } // 配置与统计接口 void set_max_frames(size_t max_frames) { std::lock_guard locker(_lock); _max_frames = max_frames; } size_t max_frames() const { return _max_frames; } size_t dropped_frames() const { return _dropped_frames.load(std::memory_order_relaxed); } void reset_dropped_frames() { _dropped_frames.store(0, std::memory_order_relaxed); } private: std::queue> _frames; unsigned int _size, _head, _tail; unsigned int _buffer_size; // 添加缓冲区大小记录 uint8_t *_buf; mutable std::mutex _lock; std::atomic _dropped_frames{0}; size_t _max_frames{1}; }; } // namespace am #endif