| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- #ifndef RING_BUFFER
- #define RING_BUFFER
- #include <stdio.h>
- #include <stdlib.h>
- #include <mutex>
- #include <queue>
- #include <atomic>
- #include "error_define.h"
- #include "log_helper.h"
- namespace am {
- template<typename T>
- struct ring_frame
- {
- T type;
- int len;
- };
- template<typename T>
- class ring_buffer
- {
- public:
- ring_buffer(unsigned int size = 1920 * 1080 * 4 * 2, size_t max_frames = 1) // 回到1帧最小延迟
- {
- _size = size;
- _buffer_size = size; // 记录缓冲区大小
- _head = _tail = 0;
- _max_frames = max_frames;
- _dropped_frames.store(0, std::memory_order_relaxed);
- _buf = new uint8_t[size];
- }
- ~ring_buffer()
- {
- if (_buf)
- delete[] _buf;
- }
- void put(const void *data, int len, const T &type)
- {
- std::lock_guard<std::mutex> locker(_lock);
- // 智能丢帧策略:根据帧重要性决定是否丢弃
- if (_frames.size() >= _max_frames) {
- // 检查是否为关键帧或重要帧(这里简化处理)
- bool is_important_frame = (len > _buffer_size * 0.8); // 大帧通常更重要
-
- if (!is_important_frame && !_frames.empty()) {
- // 只丢弃非重要帧
- ring_frame<T> old_frame = _frames.front();
- _frames.pop();
- _tail = (_tail + old_frame.len) % _size;
- _dropped_frames.fetch_add(1, std::memory_order_relaxed);
- } else if (_frames.size() >= _max_frames * 2) {
- // 缓冲区严重积压时强制丢帧
- ring_frame<T> old_frame = _frames.front();
- _frames.pop();
- _tail = (_tail + old_frame.len) % _size;
- _dropped_frames.fetch_add(1, std::memory_order_relaxed);
- }
- }
- if (_head + len <= _size) {
- memcpy(_buf + _head, data, len);
- _head += len;
- } else if (_head + len > _size) {
- int remain = len - (_size - _head);
- if (len - remain > 0)
- memcpy(_buf + _head, data, len - remain);
- if (remain > 0)
- memcpy(_buf, (unsigned char *) data + len - remain, remain);
- _head = remain;
- }
- struct ring_frame<T> frame;
- frame.len = len;
- frame.type = type;
- _frames.push(frame);
- }
- int get(void *data, int len, T &type)
- {
- std::lock_guard<std::mutex> locker(_lock);
- int retLen = 0;
- if (_frames.size() <= 0) {
- retLen = 0;
- return retLen;
- }
- struct ring_frame<T> frame = _frames.front();
- _frames.pop();
- if (frame.len > len) {
- al_error("ringbuff::get need larger buffer");
- return 0;
- }
- type = frame.type;
- retLen = frame.len;
- if (_tail + frame.len <= _size) {
- memcpy(data, _buf + _tail, frame.len);
- _tail += frame.len;
- } else {
- int remain = frame.len - (_size - _tail);
- if (frame.len - remain > 0)
- memcpy(data, _buf + _tail, frame.len - remain);
- if (remain > 0)
- memcpy((unsigned char *) data + frame.len - remain, _buf, remain);
- _tail = remain;
- }
- return retLen;
- }
- // 添加缓冲区监控方法
- size_t get_pending_frames() const
- {
- std::lock_guard<std::mutex> locker(_lock);
- return _frames.size();
- }
-
- bool is_buffer_full(float threshold = 0.8f) const
- {
- std::lock_guard<std::mutex> locker(_lock);
- return _frames.size() >= (static_cast<size_t>(_max_frames * threshold));
- }
- // 配置与统计接口
- void set_max_frames(size_t max_frames)
- {
- std::lock_guard<std::mutex> locker(_lock);
- _max_frames = max_frames;
- }
- size_t max_frames() const { return _max_frames; }
- size_t dropped_frames() const { return _dropped_frames.load(std::memory_order_relaxed); }
- void reset_dropped_frames() { _dropped_frames.store(0, std::memory_order_relaxed); }
- private:
- std::queue<ring_frame<T>> _frames;
- unsigned int _size, _head, _tail;
- unsigned int _buffer_size; // 添加缓冲区大小记录
- uint8_t *_buf;
- mutable std::mutex _lock;
- std::atomic<size_t> _dropped_frames{0};
- size_t _max_frames{1};
- };
- } // namespace am
- #endif
|