utils_frame_queue.cpp 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723
  1. #include "utils_frame_queue.h"
  2. #include "../base/logger.h"
  3. #include <algorithm>
  4. #include <shared_mutex>
  5. #include <deque>
  6. namespace av {
  7. namespace utils {
  8. using namespace av;
  9. FrameQueue::FrameQueue(const FrameQueueConfig& config)
  10. : config_(config) {
  11. Logger::instance().debugf("FrameQueue created with max size: {}", config_.maxSize);
  12. }
  13. FrameQueue::~FrameQueue() {
  14. shutdown_ = true;
  15. notEmpty_.notify_all();
  16. notFull_.notify_all();
  17. clear();
  18. Logger::instance().debug("FrameQueue destroyed");
  19. }
  20. ErrorCode FrameQueue::enqueue(std::unique_ptr<FrameQueueItem> item) {
  21. if (!item) {
  22. return ErrorCode::INVALID_PARAMS;
  23. }
  24. if (shutdown_) {
  25. return ErrorCode::INVALID_STATE;
  26. }
  27. std::unique_lock<std::mutex> lock(queueMutex_);
  28. // 检查是否需要丢帧
  29. if (shouldDropFrame(*item)) {
  30. if (dropCallback_) {
  31. dropCallback_(*item, "Smart frame drop");
  32. }
  33. std::lock_guard<std::mutex> statsLock(statsMutex_);
  34. stats_.totalDropped++;
  35. Logger::instance().debug("Frame dropped by smart drop policy");
  36. return ErrorCode::SUCCESS;
  37. }
  38. // 等待队列有空间
  39. if (blocking_ && queue_.size() >= config_.maxSize) {
  40. if (config_.dropOnFull) {
  41. if (config_.dropOldest) {
  42. dropOldestFrame();
  43. } else {
  44. if (dropCallback_) {
  45. dropCallback_(*item, "Queue full - drop newest");
  46. }
  47. std::lock_guard<std::mutex> statsLock(statsMutex_);
  48. stats_.totalDropped++;
  49. Logger::instance().debug("Frame dropped - queue full");
  50. return ErrorCode::SUCCESS;
  51. }
  52. } else {
  53. // 等待队列有空间
  54. if (!notFull_.wait_for(lock, std::chrono::milliseconds(config_.timeoutMs),
  55. [this] { return queue_.size() < config_.maxSize || shutdown_; })) {
  56. Logger::instance().warning("Enqueue timeout");
  57. return ErrorCode::TIMEOUT;
  58. }
  59. if (shutdown_) {
  60. return ErrorCode::INVALID_STATE;
  61. }
  62. }
  63. }
  64. // 添加到队列
  65. queue_.push_back(std::move(item));
  66. // 更新统计信息
  67. if (config_.enableStats) {
  68. updateStats(*queue_.back(), true);
  69. }
  70. // 通知等待的消费者
  71. notEmpty_.notify_one();
  72. // 回调
  73. if (enqueueCallback_) {
  74. enqueueCallback_(*queue_.back());
  75. }
  76. return ErrorCode::SUCCESS;
  77. }
  78. // 预览方法实现
  79. const FrameQueueItem* FrameQueue::peek() const {
  80. std::lock_guard<std::mutex> lock(queueMutex_);
  81. if (queue_.empty()) {
  82. return nullptr;
  83. }
  84. return queue_.front().get();
  85. }
  86. const FrameQueueItem* FrameQueue::peek(int index) const {
  87. std::lock_guard<std::mutex> lock(queueMutex_);
  88. if (index < 0 || static_cast<size_t>(index) >= queue_.size()) {
  89. return nullptr;
  90. }
  91. // 使用deque的随机访问能力
  92. return queue_[index].get();
  93. }
  94. AVFrame* FrameQueue::peekFrame() const {
  95. const FrameQueueItem* item = peek();
  96. return item ? item->frame : nullptr;
  97. }
  98. std::unique_ptr<FrameQueueItem> FrameQueue::dequeue() {
  99. return dequeue(config_.timeoutMs);
  100. }
  101. std::unique_ptr<FrameQueueItem> FrameQueue::dequeue(int timeoutMs) {
  102. std::unique_lock<std::mutex> lock(queueMutex_);
  103. // 等待队列有数据
  104. if (blocking_ && queue_.empty()) {
  105. if (!notEmpty_.wait_for(lock, std::chrono::milliseconds(timeoutMs),
  106. [this] { return !queue_.empty() || shutdown_; })) {
  107. return nullptr; // 超时
  108. }
  109. if (shutdown_ || queue_.empty()) {
  110. return nullptr;
  111. }
  112. }
  113. if (queue_.empty()) {
  114. return nullptr;
  115. }
  116. // 取出队列头部元素
  117. auto item = std::move(queue_.front());
  118. queue_.pop_front();
  119. // 更新统计信息
  120. if (config_.enableStats) {
  121. updateStats(*item, false);
  122. }
  123. // 通知等待的生产者
  124. notFull_.notify_one();
  125. // 回调
  126. if (dequeueCallback_) {
  127. dequeueCallback_(*item);
  128. }
  129. return item;
  130. }
  131. ErrorCode FrameQueue::enqueue(AVFrame* frame, int streamIndex) {
  132. if (!frame) {
  133. return ErrorCode::INVALID_PARAMS;
  134. }
  135. auto item = std::make_unique<FrameQueueItem>(frame, streamIndex);
  136. return enqueue(std::move(item));
  137. }
  138. AVFrame* FrameQueue::dequeueFrame() {
  139. auto item = dequeue();
  140. if (item) {
  141. AVFrame* frame = item->frame;
  142. item->frame = nullptr; // 转移所有权
  143. return frame;
  144. }
  145. return nullptr;
  146. }
  147. AVFrame* FrameQueue::dequeueFrame(int timeoutMs) {
  148. auto item = dequeue(timeoutMs);
  149. if (item) {
  150. AVFrame* frame = item->frame;
  151. item->frame = nullptr; // 转移所有权
  152. return frame;
  153. }
  154. return nullptr;
  155. }
  156. void FrameQueue::clear() {
  157. std::lock_guard<std::mutex> lock(queueMutex_);
  158. while (!queue_.empty()) {
  159. queue_.pop_front();
  160. }
  161. Logger::instance().debug("Frame queue cleared");
  162. }
  163. void FrameQueue::flush() {
  164. clear();
  165. notEmpty_.notify_all();
  166. notFull_.notify_all();
  167. }
  168. void FrameQueue::setMaxSize(size_t maxSize) {
  169. std::lock_guard<std::mutex> lock(queueMutex_);
  170. config_.maxSize = maxSize;
  171. // 如果当前队列大小超过新的最大值,丢弃多余的帧
  172. while (queue_.size() > maxSize) {
  173. if (config_.dropOldest) {
  174. dropOldestFrame();
  175. } else {
  176. break; // 不丢弃,保持当前状态
  177. }
  178. }
  179. Logger::instance().debugf("Frame queue max size set to: {}", maxSize);
  180. }
  181. void FrameQueue::setDropPolicy(bool dropOnFull, bool dropOldest) {
  182. config_.dropOnFull = dropOnFull;
  183. config_.dropOldest = dropOldest;
  184. Logger::instance().debugf("Drop policy set: dropOnFull={}, dropOldest={}", dropOnFull, dropOldest);
  185. }
  186. size_t FrameQueue::size() const {
  187. std::lock_guard<std::mutex> lock(queueMutex_);
  188. return queue_.size();
  189. }
  190. bool FrameQueue::empty() const {
  191. std::lock_guard<std::mutex> lock(queueMutex_);
  192. return queue_.empty();
  193. }
  194. bool FrameQueue::full() const {
  195. std::lock_guard<std::mutex> lock(queueMutex_);
  196. return queue_.size() >= config_.maxSize;
  197. }
  198. size_t FrameQueue::capacity() const {
  199. return config_.maxSize;
  200. }
  201. FrameQueueStats FrameQueue::getStats() const {
  202. std::lock_guard<std::mutex> lock(statsMutex_);
  203. FrameQueueStats stats = stats_;
  204. stats.currentSize = size();
  205. return stats;
  206. }
  207. void FrameQueue::resetStats() {
  208. std::lock_guard<std::mutex> lock(statsMutex_);
  209. stats_ = FrameQueueStats();
  210. Logger::instance().debug("Frame queue stats reset");
  211. }
  212. void FrameQueue::enableFrameDrop(bool enable, double maxLatency, int dropRatio) {
  213. config_.enableFrameDrop = enable;
  214. config_.maxLatency = maxLatency;
  215. config_.dropRatio = dropRatio;
  216. Logger::instance().debugf("Frame drop enabled: {}, maxLatency: {}ms, dropRatio: {}",
  217. enable, maxLatency, dropRatio);
  218. }
  219. void FrameQueue::setFrameDropCallback(std::function<bool(const FrameQueueItem&)> callback) {
  220. frameDropCallback_ = callback;
  221. }
  222. void FrameQueue::setBlocking(bool blocking) {
  223. blocking_ = blocking;
  224. if (!blocking) {
  225. notEmpty_.notify_all();
  226. notFull_.notify_all();
  227. }
  228. Logger::instance().debugf("Frame queue blocking mode: {}", blocking);
  229. }
  230. void FrameQueue::wakeup() {
  231. notEmpty_.notify_all();
  232. notFull_.notify_all();
  233. }
  234. // 内部方法实现
  235. bool FrameQueue::shouldDropFrame(const FrameQueueItem& item) const {
  236. if (!config_.enableFrameDrop) {
  237. return false;
  238. }
  239. // 自定义丢帧回调
  240. if (frameDropCallback_ && frameDropCallback_(item)) {
  241. return true;
  242. }
  243. // 基于延迟的丢帧
  244. if (shouldDropByLatency(item)) {
  245. return true;
  246. }
  247. // 基于比例的丢帧
  248. if (shouldDropByRatio()) {
  249. return true;
  250. }
  251. return false;
  252. }
  253. void FrameQueue::dropOldestFrame() {
  254. if (!queue_.empty()) {
  255. auto& oldestItem = queue_.front();
  256. if (dropCallback_) {
  257. dropCallback_(*oldestItem, "Drop oldest frame");
  258. }
  259. queue_.pop_front();
  260. std::lock_guard<std::mutex> statsLock(statsMutex_);
  261. stats_.totalDropped++;
  262. Logger::instance().debug("Dropped oldest frame");
  263. }
  264. }
  265. void FrameQueue::dropNewestFrame() {
  266. // 这个方法在enqueue时调用,不需要实际操作队列
  267. // 只需要更新统计信息
  268. }
  269. void FrameQueue::updateStats(const FrameQueueItem& item, bool isEnqueue) {
  270. std::lock_guard<std::mutex> lock(statsMutex_);
  271. if (isEnqueue) {
  272. stats_.totalEnqueued++;
  273. stats_.maxSize = std::max(stats_.maxSize, queue_.size());
  274. } else {
  275. stats_.totalDequeued++;
  276. // 计算等待时间
  277. double waitTime = calculateWaitTime(item);
  278. if (waitTime > 0) {
  279. // 更新平均等待时间
  280. if (stats_.totalDequeued == 1) {
  281. stats_.averageWaitTime = waitTime;
  282. } else {
  283. stats_.averageWaitTime = (stats_.averageWaitTime * (stats_.totalDequeued - 1) + waitTime) / stats_.totalDequeued;
  284. }
  285. stats_.maxWaitTime = std::max(stats_.maxWaitTime, waitTime);
  286. }
  287. }
  288. stats_.currentSize = queue_.size();
  289. stats_.lastUpdateTime = std::chrono::steady_clock::now();
  290. }
  291. double FrameQueue::calculateWaitTime(const FrameQueueItem& item) const {
  292. auto now = std::chrono::steady_clock::now();
  293. auto duration = std::chrono::duration_cast<std::chrono::microseconds>(now - item.enqueueTime);
  294. return duration.count() / 1000.0; // 转换为毫秒
  295. }
  296. bool FrameQueue::shouldDropByLatency(const FrameQueueItem& item) const {
  297. double waitTime = calculateWaitTime(item);
  298. return waitTime > config_.maxLatency;
  299. }
  300. bool FrameQueue::shouldDropByRatio() const {
  301. if (config_.dropRatio <= 1) {
  302. return false;
  303. }
  304. uint64_t count = frameCounter_.fetch_add(1);
  305. return (count % config_.dropRatio) == 0;
  306. }
  307. // 多流帧队列实现
  308. MultiStreamFrameQueue::MultiStreamFrameQueue(const FrameQueueConfig& config)
  309. : defaultConfig_(config) {
  310. Logger::instance().debug("MultiStreamFrameQueue created");
  311. }
  312. MultiStreamFrameQueue::~MultiStreamFrameQueue() {
  313. clear();
  314. Logger::instance().debug("MultiStreamFrameQueue destroyed");
  315. }
  316. ErrorCode MultiStreamFrameQueue::addStream(int streamIndex, const FrameQueueConfig& config) {
  317. std::unique_lock<std::shared_mutex> lock(streamsMutex_);
  318. if (streamQueues_.find(streamIndex) != streamQueues_.end()) {
  319. Logger::instance().warningf("Stream {} already exists", streamIndex);
  320. return ErrorCode::ALREADY_EXISTS;
  321. }
  322. streamQueues_[streamIndex] = std::make_unique<FrameQueue>(config);
  323. Logger::instance().debugf("Added stream: {}", streamIndex);
  324. return ErrorCode::SUCCESS;
  325. }
  326. ErrorCode MultiStreamFrameQueue::removeStream(int streamIndex) {
  327. std::unique_lock<std::shared_mutex> lock(streamsMutex_);
  328. auto it = streamQueues_.find(streamIndex);
  329. if (it == streamQueues_.end()) {
  330. Logger::instance().warningf("Stream {} not found", streamIndex);
  331. return ErrorCode::NOT_FOUND;
  332. }
  333. streamQueues_.erase(it);
  334. Logger::instance().debugf("Removed stream: {}", streamIndex);
  335. return ErrorCode::SUCCESS;
  336. }
  337. bool MultiStreamFrameQueue::hasStream(int streamIndex) const {
  338. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  339. return streamQueues_.find(streamIndex) != streamQueues_.end();
  340. }
  341. std::vector<int> MultiStreamFrameQueue::getStreamIndices() const {
  342. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  343. std::vector<int> indices;
  344. for (const auto& pair : streamQueues_) {
  345. indices.push_back(pair.first);
  346. }
  347. return indices;
  348. }
  349. ErrorCode MultiStreamFrameQueue::enqueue(AVFrame* frame, int streamIndex) {
  350. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  351. auto it = streamQueues_.find(streamIndex);
  352. if (it == streamQueues_.end()) {
  353. // 自动创建流队列
  354. lock.unlock();
  355. addStream(streamIndex, defaultConfig_);
  356. lock.lock();
  357. it = streamQueues_.find(streamIndex);
  358. if (it == streamQueues_.end()) {
  359. return ErrorCode::NOT_FOUND;
  360. }
  361. }
  362. return it->second->enqueue(frame, streamIndex);
  363. }
  364. AVFrame* MultiStreamFrameQueue::dequeue(int streamIndex) {
  365. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  366. auto it = streamQueues_.find(streamIndex);
  367. if (it == streamQueues_.end()) {
  368. return nullptr;
  369. }
  370. return it->second->dequeueFrame();
  371. }
  372. AVFrame* MultiStreamFrameQueue::dequeue(int streamIndex, int timeoutMs) {
  373. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  374. auto it = streamQueues_.find(streamIndex);
  375. if (it == streamQueues_.end()) {
  376. return nullptr;
  377. }
  378. return it->second->dequeueFrame(timeoutMs);
  379. }
  380. ErrorCode MultiStreamFrameQueue::enqueueToAll(AVFrame* frame) {
  381. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  382. ErrorCode result = ErrorCode::SUCCESS;
  383. for (const auto& pair : streamQueues_) {
  384. // 为每个流创建帧的副本
  385. AVFrame* frameCopy = av_frame_alloc();
  386. if (!frameCopy) {
  387. result = ErrorCode::MEMORY_ALLOC_FAILED;
  388. continue;
  389. }
  390. if (av_frame_ref(frameCopy, frame) < 0) {
  391. av_frame_free(&frameCopy);
  392. result = ErrorCode::COPY_FAILED;
  393. continue;
  394. }
  395. ErrorCode enqueueResult = pair.second->enqueue(frameCopy, pair.first);
  396. if (enqueueResult != ErrorCode::SUCCESS) {
  397. av_frame_free(&frameCopy);
  398. result = enqueueResult;
  399. }
  400. }
  401. return result;
  402. }
  403. std::vector<AVFrame*> MultiStreamFrameQueue::dequeueFromAll() {
  404. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  405. std::vector<AVFrame*> frames;
  406. for (const auto& pair : streamQueues_) {
  407. AVFrame* frame = pair.second->dequeueFrame();
  408. frames.push_back(frame); // 可能为nullptr
  409. }
  410. return frames;
  411. }
  412. void MultiStreamFrameQueue::clear() {
  413. std::unique_lock<std::shared_mutex> lock(streamsMutex_);
  414. for (auto& pair : streamQueues_) {
  415. pair.second->clear();
  416. }
  417. Logger::instance().debug("All stream queues cleared");
  418. }
  419. void MultiStreamFrameQueue::clearStream(int streamIndex) {
  420. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  421. auto it = streamQueues_.find(streamIndex);
  422. if (it != streamQueues_.end()) {
  423. it->second->clear();
  424. Logger::instance().debugf("Stream {} queue cleared", streamIndex);
  425. }
  426. }
  427. void MultiStreamFrameQueue::flush() {
  428. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  429. for (auto& pair : streamQueues_) {
  430. pair.second->flush();
  431. }
  432. Logger::instance().debug("All stream queues flushed");
  433. }
  434. void MultiStreamFrameQueue::flushStream(int streamIndex) {
  435. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  436. auto it = streamQueues_.find(streamIndex);
  437. if (it != streamQueues_.end()) {
  438. it->second->flush();
  439. Logger::instance().debugf("Stream {} queue flushed", streamIndex);
  440. }
  441. }
  442. size_t MultiStreamFrameQueue::size() const {
  443. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  444. size_t totalSize = 0;
  445. for (const auto& pair : streamQueues_) {
  446. totalSize += pair.second->size();
  447. }
  448. return totalSize;
  449. }
  450. size_t MultiStreamFrameQueue::size(int streamIndex) const {
  451. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  452. auto it = streamQueues_.find(streamIndex);
  453. if (it != streamQueues_.end()) {
  454. return it->second->size();
  455. }
  456. return 0;
  457. }
  458. bool MultiStreamFrameQueue::empty() const {
  459. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  460. for (const auto& pair : streamQueues_) {
  461. if (!pair.second->empty()) {
  462. return false;
  463. }
  464. }
  465. return true;
  466. }
  467. bool MultiStreamFrameQueue::empty(int streamIndex) const {
  468. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  469. auto it = streamQueues_.find(streamIndex);
  470. if (it != streamQueues_.end()) {
  471. return it->second->empty();
  472. }
  473. return true;
  474. }
  475. std::map<int, FrameQueueStats> MultiStreamFrameQueue::getAllStats() const {
  476. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  477. std::map<int, FrameQueueStats> allStats;
  478. for (const auto& pair : streamQueues_) {
  479. allStats[pair.first] = pair.second->getStats();
  480. }
  481. return allStats;
  482. }
  483. FrameQueueStats MultiStreamFrameQueue::getStats(int streamIndex) const {
  484. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  485. auto it = streamQueues_.find(streamIndex);
  486. if (it != streamQueues_.end()) {
  487. return it->second->getStats();
  488. }
  489. return FrameQueueStats();
  490. }
  491. void MultiStreamFrameQueue::resetStats() {
  492. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  493. for (auto& pair : streamQueues_) {
  494. pair.second->resetStats();
  495. }
  496. Logger::instance().debug("All stream queue stats reset");
  497. }
  498. void MultiStreamFrameQueue::resetStats(int streamIndex) {
  499. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  500. auto it = streamQueues_.find(streamIndex);
  501. if (it != streamQueues_.end()) {
  502. it->second->resetStats();
  503. Logger::instance().debugf("Stream {} queue stats reset", streamIndex);
  504. }
  505. }
  506. void MultiStreamFrameQueue::setBlocking(bool blocking) {
  507. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  508. for (auto& pair : streamQueues_) {
  509. pair.second->setBlocking(blocking);
  510. }
  511. Logger::instance().debugf("All stream queues blocking mode: {}", blocking);
  512. }
  513. void MultiStreamFrameQueue::wakeupAll() {
  514. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  515. for (auto& pair : streamQueues_) {
  516. pair.second->wakeup();
  517. }
  518. }
  519. // 工厂类实现
  520. std::unique_ptr<FrameQueue> FrameQueueFactory::createStandardQueue(size_t maxSize) {
  521. FrameQueueConfig config;
  522. config.maxSize = maxSize;
  523. config.dropOnFull = true;
  524. config.dropOldest = true;
  525. config.timeoutMs = 1000;
  526. config.enableStats = true;
  527. return std::make_unique<FrameQueue>(config);
  528. }
  529. std::unique_ptr<FrameQueue> FrameQueueFactory::createLowLatencyQueue(size_t maxSize) {
  530. FrameQueueConfig config;
  531. config.maxSize = maxSize;
  532. config.dropOnFull = true;
  533. config.dropOldest = true;
  534. config.timeoutMs = 100;
  535. config.enableStats = true;
  536. config.enableFrameDrop = true;
  537. config.maxLatency = 50.0;
  538. config.dropRatio = 3;
  539. return std::make_unique<FrameQueue>(config);
  540. }
  541. std::unique_ptr<FrameQueue> FrameQueueFactory::createHighCapacityQueue(size_t maxSize) {
  542. FrameQueueConfig config;
  543. config.maxSize = maxSize;
  544. config.dropOnFull = false; // 不丢帧,等待
  545. config.dropOldest = true;
  546. config.timeoutMs = 5000;
  547. config.enableStats = true;
  548. config.enableFrameDrop = false;
  549. return std::make_unique<FrameQueue>(config);
  550. }
  551. std::unique_ptr<FrameQueue> FrameQueueFactory::createRealtimeQueue(size_t maxSize, double maxLatency) {
  552. FrameQueueConfig config;
  553. config.maxSize = maxSize;
  554. config.dropOnFull = true;
  555. config.dropOldest = true;
  556. config.timeoutMs = 50;
  557. config.enableStats = true;
  558. config.enableFrameDrop = true;
  559. config.maxLatency = maxLatency;
  560. config.dropRatio = 2;
  561. return std::make_unique<FrameQueue>(config);
  562. }
  563. std::unique_ptr<MultiStreamFrameQueue> FrameQueueFactory::createMultiStreamQueue(const FrameQueueConfig& config) {
  564. return std::make_unique<MultiStreamFrameQueue>(config);
  565. }
  566. } // namespace utils
  567. } // namespace av