ring_buffer.h 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. #ifndef RING_BUFFER
  2. #define RING_BUFFER
  3. #include <stdio.h>
  4. #include <stdlib.h>
  5. #include <mutex>
  6. #include <queue>
  7. #include "error_define.h"
  8. #include "log_helper.h"
  9. namespace am {
  10. template<typename T>
  11. struct ring_frame
  12. {
  13. T type;
  14. int len;
  15. };
  16. template<typename T>
  17. class ring_buffer
  18. {
  19. public:
  20. ring_buffer(unsigned int size = 1920 * 1080 * 4 * 2) // 从10帧减少到2帧,减少积压
  21. {
  22. _size = size;
  23. _head = _tail = 0;
  24. _buf = new uint8_t[size];
  25. }
  26. ~ring_buffer()
  27. {
  28. if (_buf)
  29. delete[] _buf;
  30. }
  31. void put(const void *data, int len, const T &type)
  32. {
  33. std::lock_guard<std::mutex> locker(_lock);
  34. // 实时推流优化:当缓冲区积压超过阈值时,丢弃最旧的帧
  35. const size_t MAX_FRAMES = 3; // 最大允许3帧积压
  36. while (_frames.size() >= MAX_FRAMES) {
  37. // 丢弃最旧的帧
  38. if (!_frames.empty()) {
  39. ring_frame<T> old_frame = _frames.front();
  40. _frames.pop();
  41. // 更新tail指针,跳过被丢弃的数据
  42. _tail = (_tail + old_frame.len) % _size;
  43. }
  44. }
  45. if (_head + len <= _size) {
  46. memcpy(_buf + _head, data, len);
  47. _head += len;
  48. } else if (_head + len > _size) {
  49. int remain = len - (_size - _head);
  50. if (len - remain > 0)
  51. memcpy(_buf + _head, data, len - remain);
  52. if (remain > 0)
  53. memcpy(_buf, (unsigned char *) data + len - remain, remain);
  54. _head = remain;
  55. }
  56. struct ring_frame<T> frame;
  57. frame.len = len;
  58. frame.type = type;
  59. _frames.push(frame);
  60. }
  61. int get(void *data, int len, T &type)
  62. {
  63. std::lock_guard<std::mutex> locker(_lock);
  64. int retLen = 0;
  65. if (_frames.size() <= 0) {
  66. retLen = 0;
  67. return retLen;
  68. }
  69. struct ring_frame<T> frame = _frames.front();
  70. _frames.pop();
  71. if (frame.len > len) {
  72. al_error("ringbuff::get need larger buffer");
  73. return 0;
  74. }
  75. type = frame.type;
  76. retLen = frame.len;
  77. if (_tail + frame.len <= _size) {
  78. memcpy(data, _buf + _tail, frame.len);
  79. _tail += frame.len;
  80. } else {
  81. int remain = frame.len - (_size - _tail);
  82. if (frame.len - remain > 0)
  83. memcpy(data, _buf + _tail, frame.len - remain);
  84. if (remain > 0)
  85. memcpy((unsigned char *) data + frame.len - remain, _buf, remain);
  86. _tail = remain;
  87. }
  88. return retLen;
  89. }
  90. // 添加缓冲区监控方法
  91. size_t get_pending_frames() const
  92. {
  93. std::lock_guard<std::mutex> locker(_lock);
  94. return _frames.size();
  95. }
  96. bool is_buffer_full(float threshold = 0.8f) const
  97. {
  98. std::lock_guard<std::mutex> locker(_lock);
  99. const size_t MAX_FRAMES = 3;
  100. return _frames.size() >= (MAX_FRAMES * threshold);
  101. }
  102. private:
  103. std::queue<ring_frame<T>> _frames;
  104. unsigned int _size, _head, _tail;
  105. uint8_t *_buf;
  106. std::mutex _lock;
  107. };
  108. } // namespace am
  109. #endif