ring_buffer.h 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. #ifndef RING_BUFFER
  2. #define RING_BUFFER
  3. #include <stdio.h>
  4. #include <stdlib.h>
  5. #include <mutex>
  6. #include <queue>
  7. #include <atomic>
  8. #include "error_define.h"
  9. #include "log_helper.h"
  10. namespace am {
  11. template<typename T>
  12. struct ring_frame
  13. {
  14. T type;
  15. int len;
  16. };
  17. template<typename T>
  18. class ring_buffer
  19. {
  20. public:
  21. ring_buffer(unsigned int size = 1920 * 1080 * 4 * 2, size_t max_frames = 1) // 回到1帧最小延迟
  22. {
  23. _size = size;
  24. _buffer_size = size; // 记录缓冲区大小
  25. _head = _tail = 0;
  26. _max_frames = max_frames;
  27. _dropped_frames.store(0, std::memory_order_relaxed);
  28. _buf = new uint8_t[size];
  29. }
  30. ~ring_buffer()
  31. {
  32. if (_buf)
  33. delete[] _buf;
  34. }
  35. void put(const void *data, int len, const T &type)
  36. {
  37. std::lock_guard<std::mutex> locker(_lock);
  38. // 智能丢帧策略:根据帧重要性决定是否丢弃
  39. if (_frames.size() >= _max_frames) {
  40. // 检查是否为关键帧或重要帧(这里简化处理)
  41. bool is_important_frame = (len > _buffer_size * 0.8); // 大帧通常更重要
  42. if (!is_important_frame && !_frames.empty()) {
  43. // 只丢弃非重要帧
  44. ring_frame<T> old_frame = _frames.front();
  45. _frames.pop();
  46. _tail = (_tail + old_frame.len) % _size;
  47. _dropped_frames.fetch_add(1, std::memory_order_relaxed);
  48. } else if (_frames.size() >= _max_frames * 2) {
  49. // 缓冲区严重积压时强制丢帧
  50. ring_frame<T> old_frame = _frames.front();
  51. _frames.pop();
  52. _tail = (_tail + old_frame.len) % _size;
  53. _dropped_frames.fetch_add(1, std::memory_order_relaxed);
  54. }
  55. }
  56. if (_head + len <= _size) {
  57. memcpy(_buf + _head, data, len);
  58. _head += len;
  59. } else if (_head + len > _size) {
  60. int remain = len - (_size - _head);
  61. if (len - remain > 0)
  62. memcpy(_buf + _head, data, len - remain);
  63. if (remain > 0)
  64. memcpy(_buf, (unsigned char *) data + len - remain, remain);
  65. _head = remain;
  66. }
  67. struct ring_frame<T> frame;
  68. frame.len = len;
  69. frame.type = type;
  70. _frames.push(frame);
  71. }
  72. int get(void *data, int len, T &type)
  73. {
  74. std::lock_guard<std::mutex> locker(_lock);
  75. int retLen = 0;
  76. if (_frames.size() <= 0) {
  77. retLen = 0;
  78. return retLen;
  79. }
  80. struct ring_frame<T> frame = _frames.front();
  81. _frames.pop();
  82. if (frame.len > len) {
  83. al_error("ringbuff::get need larger buffer");
  84. return 0;
  85. }
  86. type = frame.type;
  87. retLen = frame.len;
  88. if (_tail + frame.len <= _size) {
  89. memcpy(data, _buf + _tail, frame.len);
  90. _tail += frame.len;
  91. } else {
  92. int remain = frame.len - (_size - _tail);
  93. if (frame.len - remain > 0)
  94. memcpy(data, _buf + _tail, frame.len - remain);
  95. if (remain > 0)
  96. memcpy((unsigned char *) data + frame.len - remain, _buf, remain);
  97. _tail = remain;
  98. }
  99. return retLen;
  100. }
  101. // 添加缓冲区监控方法
  102. size_t get_pending_frames() const
  103. {
  104. std::lock_guard<std::mutex> locker(_lock);
  105. return _frames.size();
  106. }
  107. bool is_buffer_full(float threshold = 0.8f) const
  108. {
  109. std::lock_guard<std::mutex> locker(_lock);
  110. return _frames.size() >= (static_cast<size_t>(_max_frames * threshold));
  111. }
  112. // 配置与统计接口
  113. void set_max_frames(size_t max_frames)
  114. {
  115. std::lock_guard<std::mutex> locker(_lock);
  116. _max_frames = max_frames;
  117. }
  118. size_t max_frames() const { return _max_frames; }
  119. size_t dropped_frames() const { return _dropped_frames.load(std::memory_order_relaxed); }
  120. void reset_dropped_frames() { _dropped_frames.store(0, std::memory_order_relaxed); }
  121. private:
  122. std::queue<ring_frame<T>> _frames;
  123. unsigned int _size, _head, _tail;
  124. unsigned int _buffer_size; // 添加缓冲区大小记录
  125. uint8_t *_buf;
  126. mutable std::mutex _lock;
  127. std::atomic<size_t> _dropped_frames{0};
  128. size_t _max_frames{1};
  129. };
  130. } // namespace am
  131. #endif