ring_buffer.h 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. #ifndef RING_BUFFER
  2. #define RING_BUFFER
  3. #include <stdio.h>
  4. #include <stdlib.h>
  5. #include <mutex>
  6. #include <queue>
  7. #include <atomic>
  8. #include "error_define.h"
  9. #include "log_helper.h"
  10. namespace am {
  11. template<typename T>
  12. struct ring_frame
  13. {
  14. T type;
  15. int len;
  16. };
  17. template<typename T>
  18. class ring_buffer
  19. {
  20. public:
  21. ring_buffer(unsigned int size = 1920 * 1080 * 4 * 2, size_t max_frames = 3) // 从10帧减少到2帧,减少积压
  22. {
  23. _size = size;
  24. _head = _tail = 0;
  25. _max_frames = max_frames;
  26. _dropped_frames.store(0, std::memory_order_relaxed);
  27. _buf = new uint8_t[size];
  28. }
  29. ~ring_buffer()
  30. {
  31. if (_buf)
  32. delete[] _buf;
  33. }
  34. void put(const void *data, int len, const T &type)
  35. {
  36. std::lock_guard<std::mutex> locker(_lock);
  37. // 实时推流优化:当缓冲区积压超过阈值时,丢弃最旧的帧
  38. while (_frames.size() >= _max_frames) {
  39. // 丢弃最旧的帧
  40. if (!_frames.empty()) {
  41. ring_frame<T> old_frame = _frames.front();
  42. _frames.pop();
  43. // 更新tail指针,跳过被丢弃的数据
  44. _tail = (_tail + old_frame.len) % _size;
  45. _dropped_frames.fetch_add(1, std::memory_order_relaxed);
  46. }
  47. }
  48. if (_head + len <= _size) {
  49. memcpy(_buf + _head, data, len);
  50. _head += len;
  51. } else if (_head + len > _size) {
  52. int remain = len - (_size - _head);
  53. if (len - remain > 0)
  54. memcpy(_buf + _head, data, len - remain);
  55. if (remain > 0)
  56. memcpy(_buf, (unsigned char *) data + len - remain, remain);
  57. _head = remain;
  58. }
  59. struct ring_frame<T> frame;
  60. frame.len = len;
  61. frame.type = type;
  62. _frames.push(frame);
  63. }
  64. int get(void *data, int len, T &type)
  65. {
  66. std::lock_guard<std::mutex> locker(_lock);
  67. int retLen = 0;
  68. if (_frames.size() <= 0) {
  69. retLen = 0;
  70. return retLen;
  71. }
  72. struct ring_frame<T> frame = _frames.front();
  73. _frames.pop();
  74. if (frame.len > len) {
  75. al_error("ringbuff::get need larger buffer");
  76. return 0;
  77. }
  78. type = frame.type;
  79. retLen = frame.len;
  80. if (_tail + frame.len <= _size) {
  81. memcpy(data, _buf + _tail, frame.len);
  82. _tail += frame.len;
  83. } else {
  84. int remain = frame.len - (_size - _tail);
  85. if (frame.len - remain > 0)
  86. memcpy(data, _buf + _tail, frame.len - remain);
  87. if (remain > 0)
  88. memcpy((unsigned char *) data + frame.len - remain, _buf, remain);
  89. _tail = remain;
  90. }
  91. return retLen;
  92. }
  93. // 添加缓冲区监控方法
  94. size_t get_pending_frames() const
  95. {
  96. std::lock_guard<std::mutex> locker(_lock);
  97. return _frames.size();
  98. }
  99. bool is_buffer_full(float threshold = 0.8f) const
  100. {
  101. std::lock_guard<std::mutex> locker(_lock);
  102. return _frames.size() >= (static_cast<size_t>(_max_frames * threshold));
  103. }
  104. // 配置与统计接口
  105. void set_max_frames(size_t max_frames)
  106. {
  107. std::lock_guard<std::mutex> locker(_lock);
  108. _max_frames = max_frames;
  109. }
  110. size_t max_frames() const { return _max_frames; }
  111. size_t dropped_frames() const { return _dropped_frames.load(std::memory_order_relaxed); }
  112. void reset_dropped_frames() { _dropped_frames.store(0, std::memory_order_relaxed); }
  113. private:
  114. std::queue<ring_frame<T>> _frames;
  115. unsigned int _size, _head, _tail;
  116. uint8_t *_buf;
  117. mutable std::mutex _lock;
  118. std::atomic<size_t> _dropped_frames{0};
  119. size_t _max_frames{3};
  120. };
  121. } // namespace am
  122. #endif