utils_frame_queue.cpp 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698
  1. #include "utils_frame_queue.h"
  2. #include "../base/logger.h"
  3. #include <algorithm>
  4. #include <shared_mutex>
  5. namespace av {
  6. namespace utils {
  7. using namespace av::base;
  8. FrameQueue::FrameQueue(const FrameQueueConfig& config)
  9. : config_(config) {
  10. Logger::debug("FrameQueue created with max size: {}", config_.maxSize);
  11. }
  12. FrameQueue::~FrameQueue() {
  13. shutdown_ = true;
  14. notEmpty_.notify_all();
  15. notFull_.notify_all();
  16. clear();
  17. Logger::debug("FrameQueue destroyed");
  18. }
  19. ErrorCode FrameQueue::enqueue(std::unique_ptr<FrameQueueItem> item) {
  20. if (!item) {
  21. return ErrorCode::INVALID_PARAMETER;
  22. }
  23. if (shutdown_) {
  24. return ErrorCode::INVALID_STATE;
  25. }
  26. std::unique_lock<std::mutex> lock(queueMutex_);
  27. // 检查是否需要丢帧
  28. if (shouldDropFrame(*item)) {
  29. if (dropCallback_) {
  30. dropCallback_(*item, "Smart frame drop");
  31. }
  32. std::lock_guard<std::mutex> statsLock(statsMutex_);
  33. stats_.totalDropped++;
  34. Logger::debug("Frame dropped by smart drop policy");
  35. return ErrorCode::SUCCESS;
  36. }
  37. // 等待队列有空间
  38. if (blocking_ && queue_.size() >= config_.maxSize) {
  39. if (config_.dropOnFull) {
  40. if (config_.dropOldest) {
  41. dropOldestFrame();
  42. } else {
  43. if (dropCallback_) {
  44. dropCallback_(*item, "Queue full - drop newest");
  45. }
  46. std::lock_guard<std::mutex> statsLock(statsMutex_);
  47. stats_.totalDropped++;
  48. Logger::debug("Frame dropped - queue full");
  49. return ErrorCode::SUCCESS;
  50. }
  51. } else {
  52. // 等待队列有空间
  53. if (!notFull_.wait_for(lock, std::chrono::milliseconds(config_.timeoutMs),
  54. [this] { return queue_.size() < config_.maxSize || shutdown_; })) {
  55. Logger::warning("Enqueue timeout");
  56. return ErrorCode::TIMEOUT;
  57. }
  58. if (shutdown_) {
  59. return ErrorCode::INVALID_STATE;
  60. }
  61. }
  62. }
  63. // 添加到队列
  64. queue_.push(std::move(item));
  65. // 更新统计信息
  66. if (config_.enableStats) {
  67. updateStats(*queue_.back(), true);
  68. }
  69. // 通知等待的消费者
  70. notEmpty_.notify_one();
  71. // 回调
  72. if (enqueueCallback_) {
  73. enqueueCallback_(*queue_.back());
  74. }
  75. return ErrorCode::SUCCESS;
  76. }
  77. std::unique_ptr<FrameQueueItem> FrameQueue::dequeue() {
  78. return dequeue(config_.timeoutMs);
  79. }
  80. std::unique_ptr<FrameQueueItem> FrameQueue::dequeue(int timeoutMs) {
  81. std::unique_lock<std::mutex> lock(queueMutex_);
  82. // 等待队列有数据
  83. if (blocking_ && queue_.empty()) {
  84. if (!notEmpty_.wait_for(lock, std::chrono::milliseconds(timeoutMs),
  85. [this] { return !queue_.empty() || shutdown_; })) {
  86. return nullptr; // 超时
  87. }
  88. if (shutdown_ || queue_.empty()) {
  89. return nullptr;
  90. }
  91. }
  92. if (queue_.empty()) {
  93. return nullptr;
  94. }
  95. // 取出队列头部元素
  96. auto item = std::move(queue_.front());
  97. queue_.pop();
  98. // 更新统计信息
  99. if (config_.enableStats) {
  100. updateStats(*item, false);
  101. }
  102. // 通知等待的生产者
  103. notFull_.notify_one();
  104. // 回调
  105. if (dequeueCallback_) {
  106. dequeueCallback_(*item);
  107. }
  108. return item;
  109. }
  110. ErrorCode FrameQueue::enqueue(AVFrame* frame, int streamIndex) {
  111. if (!frame) {
  112. return ErrorCode::INVALID_PARAMETER;
  113. }
  114. auto item = std::make_unique<FrameQueueItem>(frame, streamIndex);
  115. return enqueue(std::move(item));
  116. }
  117. AVFrame* FrameQueue::dequeueFrame() {
  118. auto item = dequeue();
  119. if (item) {
  120. AVFrame* frame = item->frame;
  121. item->frame = nullptr; // 转移所有权
  122. return frame;
  123. }
  124. return nullptr;
  125. }
  126. AVFrame* FrameQueue::dequeueFrame(int timeoutMs) {
  127. auto item = dequeue(timeoutMs);
  128. if (item) {
  129. AVFrame* frame = item->frame;
  130. item->frame = nullptr; // 转移所有权
  131. return frame;
  132. }
  133. return nullptr;
  134. }
  135. void FrameQueue::clear() {
  136. std::lock_guard<std::mutex> lock(queueMutex_);
  137. while (!queue_.empty()) {
  138. queue_.pop();
  139. }
  140. Logger::debug("Frame queue cleared");
  141. }
  142. void FrameQueue::flush() {
  143. clear();
  144. notEmpty_.notify_all();
  145. notFull_.notify_all();
  146. }
  147. void FrameQueue::setMaxSize(size_t maxSize) {
  148. std::lock_guard<std::mutex> lock(queueMutex_);
  149. config_.maxSize = maxSize;
  150. // 如果当前队列大小超过新的最大值,丢弃多余的帧
  151. while (queue_.size() > maxSize) {
  152. if (config_.dropOldest) {
  153. dropOldestFrame();
  154. } else {
  155. break; // 不丢弃,保持当前状态
  156. }
  157. }
  158. Logger::debug("Frame queue max size set to: {}", maxSize);
  159. }
  160. void FrameQueue::setDropPolicy(bool dropOnFull, bool dropOldest) {
  161. config_.dropOnFull = dropOnFull;
  162. config_.dropOldest = dropOldest;
  163. Logger::debug("Drop policy set: dropOnFull={}, dropOldest={}", dropOnFull, dropOldest);
  164. }
  165. size_t FrameQueue::size() const {
  166. std::lock_guard<std::mutex> lock(queueMutex_);
  167. return queue_.size();
  168. }
  169. bool FrameQueue::empty() const {
  170. std::lock_guard<std::mutex> lock(queueMutex_);
  171. return queue_.empty();
  172. }
  173. bool FrameQueue::full() const {
  174. std::lock_guard<std::mutex> lock(queueMutex_);
  175. return queue_.size() >= config_.maxSize;
  176. }
  177. size_t FrameQueue::capacity() const {
  178. return config_.maxSize;
  179. }
  180. FrameQueueStats FrameQueue::getStats() const {
  181. std::lock_guard<std::mutex> lock(statsMutex_);
  182. FrameQueueStats stats = stats_;
  183. stats.currentSize = size();
  184. return stats;
  185. }
  186. void FrameQueue::resetStats() {
  187. std::lock_guard<std::mutex> lock(statsMutex_);
  188. stats_ = FrameQueueStats();
  189. Logger::debug("Frame queue stats reset");
  190. }
  191. void FrameQueue::enableFrameDrop(bool enable, double maxLatency, int dropRatio) {
  192. config_.enableFrameDrop = enable;
  193. config_.maxLatency = maxLatency;
  194. config_.dropRatio = dropRatio;
  195. Logger::debug("Frame drop enabled: {}, maxLatency: {}ms, dropRatio: {}",
  196. enable, maxLatency, dropRatio);
  197. }
  198. void FrameQueue::setFrameDropCallback(std::function<bool(const FrameQueueItem&)> callback) {
  199. frameDropCallback_ = callback;
  200. }
  201. void FrameQueue::setBlocking(bool blocking) {
  202. blocking_ = blocking;
  203. if (!blocking) {
  204. notEmpty_.notify_all();
  205. notFull_.notify_all();
  206. }
  207. Logger::debug("Frame queue blocking mode: {}", blocking);
  208. }
  209. void FrameQueue::wakeup() {
  210. notEmpty_.notify_all();
  211. notFull_.notify_all();
  212. }
  213. // 内部方法实现
  214. bool FrameQueue::shouldDropFrame(const FrameQueueItem& item) const {
  215. if (!config_.enableFrameDrop) {
  216. return false;
  217. }
  218. // 自定义丢帧回调
  219. if (frameDropCallback_ && frameDropCallback_(item)) {
  220. return true;
  221. }
  222. // 基于延迟的丢帧
  223. if (shouldDropByLatency(item)) {
  224. return true;
  225. }
  226. // 基于比例的丢帧
  227. if (shouldDropByRatio()) {
  228. return true;
  229. }
  230. return false;
  231. }
  232. void FrameQueue::dropOldestFrame() {
  233. if (!queue_.empty()) {
  234. auto& oldestItem = queue_.front();
  235. if (dropCallback_) {
  236. dropCallback_(*oldestItem, "Drop oldest frame");
  237. }
  238. queue_.pop();
  239. std::lock_guard<std::mutex> statsLock(statsMutex_);
  240. stats_.totalDropped++;
  241. Logger::debug("Dropped oldest frame");
  242. }
  243. }
  244. void FrameQueue::dropNewestFrame() {
  245. // 这个方法在enqueue时调用,不需要实际操作队列
  246. // 只需要更新统计信息
  247. }
  248. void FrameQueue::updateStats(const FrameQueueItem& item, bool isEnqueue) {
  249. std::lock_guard<std::mutex> lock(statsMutex_);
  250. if (isEnqueue) {
  251. stats_.totalEnqueued++;
  252. stats_.maxSize = std::max(stats_.maxSize, queue_.size());
  253. } else {
  254. stats_.totalDequeued++;
  255. // 计算等待时间
  256. double waitTime = calculateWaitTime(item);
  257. if (waitTime > 0) {
  258. // 更新平均等待时间
  259. if (stats_.totalDequeued == 1) {
  260. stats_.averageWaitTime = waitTime;
  261. } else {
  262. stats_.averageWaitTime = (stats_.averageWaitTime * (stats_.totalDequeued - 1) + waitTime) / stats_.totalDequeued;
  263. }
  264. stats_.maxWaitTime = std::max(stats_.maxWaitTime, waitTime);
  265. }
  266. }
  267. stats_.currentSize = queue_.size();
  268. stats_.lastUpdateTime = std::chrono::steady_clock::now();
  269. }
  270. double FrameQueue::calculateWaitTime(const FrameQueueItem& item) const {
  271. auto now = std::chrono::steady_clock::now();
  272. auto duration = std::chrono::duration_cast<std::chrono::microseconds>(now - item.enqueueTime);
  273. return duration.count() / 1000.0; // 转换为毫秒
  274. }
  275. bool FrameQueue::shouldDropByLatency(const FrameQueueItem& item) const {
  276. double waitTime = calculateWaitTime(item);
  277. return waitTime > config_.maxLatency;
  278. }
  279. bool FrameQueue::shouldDropByRatio() const {
  280. if (config_.dropRatio <= 1) {
  281. return false;
  282. }
  283. uint64_t count = frameCounter_.fetch_add(1);
  284. return (count % config_.dropRatio) == 0;
  285. }
  286. // 多流帧队列实现
  287. MultiStreamFrameQueue::MultiStreamFrameQueue(const FrameQueueConfig& config)
  288. : defaultConfig_(config) {
  289. Logger::debug("MultiStreamFrameQueue created");
  290. }
  291. MultiStreamFrameQueue::~MultiStreamFrameQueue() {
  292. clear();
  293. Logger::debug("MultiStreamFrameQueue destroyed");
  294. }
  295. ErrorCode MultiStreamFrameQueue::addStream(int streamIndex, const FrameQueueConfig& config) {
  296. std::unique_lock<std::shared_mutex> lock(streamsMutex_);
  297. if (streamQueues_.find(streamIndex) != streamQueues_.end()) {
  298. Logger::warning("Stream {} already exists", streamIndex);
  299. return ErrorCode::ALREADY_EXISTS;
  300. }
  301. streamQueues_[streamIndex] = std::make_unique<FrameQueue>(config);
  302. Logger::debug("Added stream: {}", streamIndex);
  303. return ErrorCode::SUCCESS;
  304. }
  305. ErrorCode MultiStreamFrameQueue::removeStream(int streamIndex) {
  306. std::unique_lock<std::shared_mutex> lock(streamsMutex_);
  307. auto it = streamQueues_.find(streamIndex);
  308. if (it == streamQueues_.end()) {
  309. Logger::warning("Stream {} not found", streamIndex);
  310. return ErrorCode::NOT_FOUND;
  311. }
  312. streamQueues_.erase(it);
  313. Logger::debug("Removed stream: {}", streamIndex);
  314. return ErrorCode::SUCCESS;
  315. }
  316. bool MultiStreamFrameQueue::hasStream(int streamIndex) const {
  317. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  318. return streamQueues_.find(streamIndex) != streamQueues_.end();
  319. }
  320. std::vector<int> MultiStreamFrameQueue::getStreamIndices() const {
  321. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  322. std::vector<int> indices;
  323. for (const auto& pair : streamQueues_) {
  324. indices.push_back(pair.first);
  325. }
  326. return indices;
  327. }
  328. ErrorCode MultiStreamFrameQueue::enqueue(AVFrame* frame, int streamIndex) {
  329. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  330. auto it = streamQueues_.find(streamIndex);
  331. if (it == streamQueues_.end()) {
  332. // 自动创建流队列
  333. lock.unlock();
  334. addStream(streamIndex, defaultConfig_);
  335. lock.lock();
  336. it = streamQueues_.find(streamIndex);
  337. if (it == streamQueues_.end()) {
  338. return ErrorCode::NOT_FOUND;
  339. }
  340. }
  341. return it->second->enqueue(frame, streamIndex);
  342. }
  343. AVFrame* MultiStreamFrameQueue::dequeue(int streamIndex) {
  344. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  345. auto it = streamQueues_.find(streamIndex);
  346. if (it == streamQueues_.end()) {
  347. return nullptr;
  348. }
  349. return it->second->dequeueFrame();
  350. }
  351. AVFrame* MultiStreamFrameQueue::dequeue(int streamIndex, int timeoutMs) {
  352. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  353. auto it = streamQueues_.find(streamIndex);
  354. if (it == streamQueues_.end()) {
  355. return nullptr;
  356. }
  357. return it->second->dequeueFrame(timeoutMs);
  358. }
  359. ErrorCode MultiStreamFrameQueue::enqueueToAll(AVFrame* frame) {
  360. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  361. ErrorCode result = ErrorCode::SUCCESS;
  362. for (const auto& pair : streamQueues_) {
  363. // 为每个流创建帧的副本
  364. AVFrame* frameCopy = av_frame_alloc();
  365. if (!frameCopy) {
  366. result = ErrorCode::OUT_OF_MEMORY;
  367. continue;
  368. }
  369. if (av_frame_ref(frameCopy, frame) < 0) {
  370. av_frame_free(&frameCopy);
  371. result = ErrorCode::COPY_FAILED;
  372. continue;
  373. }
  374. ErrorCode enqueueResult = pair.second->enqueue(frameCopy, pair.first);
  375. if (enqueueResult != ErrorCode::SUCCESS) {
  376. av_frame_free(&frameCopy);
  377. result = enqueueResult;
  378. }
  379. }
  380. return result;
  381. }
  382. std::vector<AVFrame*> MultiStreamFrameQueue::dequeueFromAll() {
  383. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  384. std::vector<AVFrame*> frames;
  385. for (const auto& pair : streamQueues_) {
  386. AVFrame* frame = pair.second->dequeueFrame();
  387. frames.push_back(frame); // 可能为nullptr
  388. }
  389. return frames;
  390. }
  391. void MultiStreamFrameQueue::clear() {
  392. std::unique_lock<std::shared_mutex> lock(streamsMutex_);
  393. for (auto& pair : streamQueues_) {
  394. pair.second->clear();
  395. }
  396. Logger::debug("All stream queues cleared");
  397. }
  398. void MultiStreamFrameQueue::clearStream(int streamIndex) {
  399. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  400. auto it = streamQueues_.find(streamIndex);
  401. if (it != streamQueues_.end()) {
  402. it->second->clear();
  403. Logger::debug("Stream {} queue cleared", streamIndex);
  404. }
  405. }
  406. void MultiStreamFrameQueue::flush() {
  407. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  408. for (auto& pair : streamQueues_) {
  409. pair.second->flush();
  410. }
  411. Logger::debug("All stream queues flushed");
  412. }
  413. void MultiStreamFrameQueue::flushStream(int streamIndex) {
  414. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  415. auto it = streamQueues_.find(streamIndex);
  416. if (it != streamQueues_.end()) {
  417. it->second->flush();
  418. Logger::debug("Stream {} queue flushed", streamIndex);
  419. }
  420. }
  421. size_t MultiStreamFrameQueue::size() const {
  422. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  423. size_t totalSize = 0;
  424. for (const auto& pair : streamQueues_) {
  425. totalSize += pair.second->size();
  426. }
  427. return totalSize;
  428. }
  429. size_t MultiStreamFrameQueue::size(int streamIndex) const {
  430. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  431. auto it = streamQueues_.find(streamIndex);
  432. if (it != streamQueues_.end()) {
  433. return it->second->size();
  434. }
  435. return 0;
  436. }
  437. bool MultiStreamFrameQueue::empty() const {
  438. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  439. for (const auto& pair : streamQueues_) {
  440. if (!pair.second->empty()) {
  441. return false;
  442. }
  443. }
  444. return true;
  445. }
  446. bool MultiStreamFrameQueue::empty(int streamIndex) const {
  447. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  448. auto it = streamQueues_.find(streamIndex);
  449. if (it != streamQueues_.end()) {
  450. return it->second->empty();
  451. }
  452. return true;
  453. }
  454. std::map<int, FrameQueueStats> MultiStreamFrameQueue::getAllStats() const {
  455. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  456. std::map<int, FrameQueueStats> allStats;
  457. for (const auto& pair : streamQueues_) {
  458. allStats[pair.first] = pair.second->getStats();
  459. }
  460. return allStats;
  461. }
  462. FrameQueueStats MultiStreamFrameQueue::getStats(int streamIndex) const {
  463. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  464. auto it = streamQueues_.find(streamIndex);
  465. if (it != streamQueues_.end()) {
  466. return it->second->getStats();
  467. }
  468. return FrameQueueStats();
  469. }
  470. void MultiStreamFrameQueue::resetStats() {
  471. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  472. for (auto& pair : streamQueues_) {
  473. pair.second->resetStats();
  474. }
  475. Logger::debug("All stream queue stats reset");
  476. }
  477. void MultiStreamFrameQueue::resetStats(int streamIndex) {
  478. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  479. auto it = streamQueues_.find(streamIndex);
  480. if (it != streamQueues_.end()) {
  481. it->second->resetStats();
  482. Logger::debug("Stream {} queue stats reset", streamIndex);
  483. }
  484. }
  485. void MultiStreamFrameQueue::setBlocking(bool blocking) {
  486. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  487. for (auto& pair : streamQueues_) {
  488. pair.second->setBlocking(blocking);
  489. }
  490. Logger::debug("All stream queues blocking mode: {}", blocking);
  491. }
  492. void MultiStreamFrameQueue::wakeupAll() {
  493. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  494. for (auto& pair : streamQueues_) {
  495. pair.second->wakeup();
  496. }
  497. }
  498. // 工厂类实现
  499. std::unique_ptr<FrameQueue> FrameQueueFactory::createStandardQueue(size_t maxSize) {
  500. FrameQueueConfig config;
  501. config.maxSize = maxSize;
  502. config.dropOnFull = true;
  503. config.dropOldest = true;
  504. config.timeoutMs = 1000;
  505. config.enableStats = true;
  506. return std::make_unique<FrameQueue>(config);
  507. }
  508. std::unique_ptr<FrameQueue> FrameQueueFactory::createLowLatencyQueue(size_t maxSize) {
  509. FrameQueueConfig config;
  510. config.maxSize = maxSize;
  511. config.dropOnFull = true;
  512. config.dropOldest = true;
  513. config.timeoutMs = 100;
  514. config.enableStats = true;
  515. config.enableFrameDrop = true;
  516. config.maxLatency = 50.0;
  517. config.dropRatio = 3;
  518. return std::make_unique<FrameQueue>(config);
  519. }
  520. std::unique_ptr<FrameQueue> FrameQueueFactory::createHighCapacityQueue(size_t maxSize) {
  521. FrameQueueConfig config;
  522. config.maxSize = maxSize;
  523. config.dropOnFull = false; // 不丢帧,等待
  524. config.dropOldest = true;
  525. config.timeoutMs = 5000;
  526. config.enableStats = true;
  527. config.enableFrameDrop = false;
  528. return std::make_unique<FrameQueue>(config);
  529. }
  530. std::unique_ptr<FrameQueue> FrameQueueFactory::createRealtimeQueue(size_t maxSize, double maxLatency) {
  531. FrameQueueConfig config;
  532. config.maxSize = maxSize;
  533. config.dropOnFull = true;
  534. config.dropOldest = true;
  535. config.timeoutMs = 50;
  536. config.enableStats = true;
  537. config.enableFrameDrop = true;
  538. config.maxLatency = maxLatency;
  539. config.dropRatio = 2;
  540. return std::make_unique<FrameQueue>(config);
  541. }
  542. std::unique_ptr<MultiStreamFrameQueue> FrameQueueFactory::createMultiStreamQueue(const FrameQueueConfig& config) {
  543. return std::make_unique<MultiStreamFrameQueue>(config);
  544. }
  545. } // namespace utils
  546. } // namespace av