utils_frame_queue.cpp 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726
  1. #include "utils_frame_queue.h"
  2. #include "../base/logger.h"
  3. #include "../base/media_common.h"
  4. #include <algorithm>
  5. #include <deque>
  6. #include <shared_mutex>
  7. namespace av {
  8. namespace utils {
  9. using namespace av;
  10. FrameQueue::FrameQueue(const FrameQueueConfig& config)
  11. : config_(config) {
  12. Logger::instance().debugf("FrameQueue created with max size: {}", config_.maxSize);
  13. }
  14. FrameQueue::~FrameQueue() {
  15. shutdown_ = true;
  16. notEmpty_.notify_all();
  17. notFull_.notify_all();
  18. clear();
  19. Logger::instance().debug("FrameQueue destroyed");
  20. }
  21. ErrorCode FrameQueue::enqueue(std::unique_ptr<FrameQueueItem> item) {
  22. if (!item) {
  23. return ErrorCode::INVALID_PARAMS;
  24. }
  25. if (shutdown_) {
  26. return ErrorCode::INVALID_STATE;
  27. }
  28. std::unique_lock<std::mutex> lock(queueMutex_);
  29. // 检查是否需要丢帧
  30. if (shouldDropFrame(*item)) {
  31. if (dropCallback_) {
  32. dropCallback_(*item, "Smart frame drop");
  33. }
  34. std::lock_guard<std::mutex> statsLock(statsMutex_);
  35. stats_.totalDropped++;
  36. Logger::instance().debug("Frame dropped by smart drop policy");
  37. return ErrorCode::SUCCESS;
  38. }
  39. // 等待队列有空间
  40. if (blocking_ && queue_.size() >= config_.maxSize) {
  41. if (config_.dropOnFull) {
  42. if (config_.dropOldest) {
  43. dropOldestFrame();
  44. } else {
  45. if (dropCallback_) {
  46. dropCallback_(*item, "Queue full - drop newest");
  47. }
  48. std::lock_guard<std::mutex> statsLock(statsMutex_);
  49. stats_.totalDropped++;
  50. Logger::instance().debug("Frame dropped - queue full");
  51. return ErrorCode::SUCCESS;
  52. }
  53. } else {
  54. // 等待队列有空间
  55. if (!notFull_.wait_for(lock, std::chrono::milliseconds(config_.timeoutMs),
  56. [this] { return queue_.size() < config_.maxSize || shutdown_; })) {
  57. Logger::instance().warning("Enqueue timeout");
  58. return ErrorCode::TIMEOUT;
  59. }
  60. if (shutdown_) {
  61. return ErrorCode::INVALID_STATE;
  62. }
  63. }
  64. }
  65. // 添加到队列
  66. queue_.push_back(std::move(item));
  67. // 更新统计信息
  68. if (config_.enableStats) {
  69. updateStats(*queue_.back(), true);
  70. }
  71. // 通知等待的消费者
  72. notEmpty_.notify_one();
  73. // 回调
  74. if (enqueueCallback_) {
  75. enqueueCallback_(*queue_.back());
  76. }
  77. return ErrorCode::SUCCESS;
  78. }
  79. // 预览方法实现
  80. const FrameQueueItem* FrameQueue::peek() const {
  81. std::lock_guard<std::mutex> lock(queueMutex_);
  82. if (queue_.empty()) {
  83. return nullptr;
  84. }
  85. return queue_.front().get();
  86. }
  87. const FrameQueueItem* FrameQueue::peek(int index) const {
  88. std::lock_guard<std::mutex> lock(queueMutex_);
  89. if (index < 0 || static_cast<size_t>(index) >= queue_.size()) {
  90. return nullptr;
  91. }
  92. // 使用deque的随机访问能力
  93. return queue_[index].get();
  94. }
  95. const AVFramePtr& FrameQueue::peekFramePtr() const {
  96. const FrameQueueItem* item = peek();
  97. if (item) {
  98. return item->frame;
  99. }
  100. static AVFramePtr nullFrame;
  101. return nullFrame;
  102. }
  103. std::unique_ptr<FrameQueueItem> FrameQueue::dequeue() {
  104. return dequeue(config_.timeoutMs);
  105. }
  106. std::unique_ptr<FrameQueueItem> FrameQueue::dequeue(int timeoutMs) {
  107. std::unique_lock<std::mutex> lock(queueMutex_);
  108. // 等待队列有数据
  109. if (blocking_ && queue_.empty()) {
  110. if (!notEmpty_.wait_for(lock, std::chrono::milliseconds(timeoutMs),
  111. [this] { return !queue_.empty() || shutdown_; })) {
  112. return nullptr; // 超时
  113. }
  114. if (shutdown_ || queue_.empty()) {
  115. return nullptr;
  116. }
  117. }
  118. if (queue_.empty()) {
  119. return nullptr;
  120. }
  121. // 取出队列头部元素
  122. auto item = std::move(queue_.front());
  123. queue_.pop_front();
  124. // 更新统计信息
  125. if (config_.enableStats) {
  126. updateStats(*item, false);
  127. }
  128. // 通知等待的生产者
  129. notFull_.notify_one();
  130. // 回调
  131. if (dequeueCallback_) {
  132. dequeueCallback_(*item);
  133. }
  134. return item;
  135. }
  136. ErrorCode FrameQueue::enqueue(AVFramePtr frame, int streamIndex) {
  137. if (!frame) {
  138. return ErrorCode::INVALID_PARAMS;
  139. }
  140. auto item = std::make_unique<FrameQueueItem>(std::move(frame), streamIndex);
  141. return enqueue(std::move(item));
  142. }
  143. AVFramePtr FrameQueue::dequeueFrame() {
  144. auto item = dequeue();
  145. if (item && item->frame) {
  146. return std::move(item->frame);
  147. }
  148. return nullptr;
  149. }
  150. AVFramePtr FrameQueue::dequeueFrame(int timeoutMs) {
  151. auto item = dequeue(timeoutMs);
  152. if (item && item->frame) {
  153. return std::move(item->frame);
  154. }
  155. return nullptr;
  156. }
  157. void FrameQueue::clear() {
  158. std::lock_guard<std::mutex> lock(queueMutex_);
  159. while (!queue_.empty()) {
  160. queue_.pop_front();
  161. }
  162. Logger::instance().debug("Frame queue cleared");
  163. }
  164. void FrameQueue::flush() {
  165. clear();
  166. notEmpty_.notify_all();
  167. notFull_.notify_all();
  168. }
  169. void FrameQueue::setMaxSize(size_t maxSize) {
  170. std::lock_guard<std::mutex> lock(queueMutex_);
  171. config_.maxSize = maxSize;
  172. // 如果当前队列大小超过新的最大值,丢弃多余的帧
  173. while (queue_.size() > maxSize) {
  174. if (config_.dropOldest) {
  175. dropOldestFrame();
  176. } else {
  177. break; // 不丢弃,保持当前状态
  178. }
  179. }
  180. Logger::instance().debugf("Frame queue max size set to: {}", maxSize);
  181. }
  182. void FrameQueue::setDropPolicy(bool dropOnFull, bool dropOldest) {
  183. config_.dropOnFull = dropOnFull;
  184. config_.dropOldest = dropOldest;
  185. Logger::instance().debugf("Drop policy set: dropOnFull={}, dropOldest={}", dropOnFull, dropOldest);
  186. }
  187. size_t FrameQueue::size() const {
  188. std::lock_guard<std::mutex> lock(queueMutex_);
  189. return queue_.size();
  190. }
  191. bool FrameQueue::empty() const {
  192. std::lock_guard<std::mutex> lock(queueMutex_);
  193. return queue_.empty();
  194. }
  195. bool FrameQueue::full() const {
  196. std::lock_guard<std::mutex> lock(queueMutex_);
  197. return queue_.size() >= config_.maxSize;
  198. }
  199. size_t FrameQueue::capacity() const {
  200. return config_.maxSize;
  201. }
  202. FrameQueueStats FrameQueue::getStats() const {
  203. std::lock_guard<std::mutex> lock(statsMutex_);
  204. FrameQueueStats stats = stats_;
  205. stats.currentSize = size();
  206. return stats;
  207. }
  208. void FrameQueue::resetStats() {
  209. std::lock_guard<std::mutex> lock(statsMutex_);
  210. stats_ = FrameQueueStats();
  211. Logger::instance().debug("Frame queue stats reset");
  212. }
  213. void FrameQueue::enableFrameDrop(bool enable, double maxLatency, int dropRatio) {
  214. config_.enableFrameDrop = enable;
  215. config_.maxLatency = maxLatency;
  216. config_.dropRatio = dropRatio;
  217. Logger::instance().debugf("Frame drop enabled: {}, maxLatency: {}ms, dropRatio: {}",
  218. enable, maxLatency, dropRatio);
  219. }
  220. void FrameQueue::setFrameDropCallback(std::function<bool(const FrameQueueItem&)> callback) {
  221. frameDropCallback_ = callback;
  222. }
  223. void FrameQueue::setBlocking(bool blocking) {
  224. blocking_ = blocking;
  225. if (!blocking) {
  226. notEmpty_.notify_all();
  227. notFull_.notify_all();
  228. }
  229. Logger::instance().debugf("Frame queue blocking mode: {}", blocking);
  230. }
  231. void FrameQueue::wakeup() {
  232. notEmpty_.notify_all();
  233. notFull_.notify_all();
  234. }
  235. // 内部方法实现
  236. bool FrameQueue::shouldDropFrame(const FrameQueueItem& item) const {
  237. if (!config_.enableFrameDrop) {
  238. return false;
  239. }
  240. // 自定义丢帧回调
  241. if (frameDropCallback_ && frameDropCallback_(item)) {
  242. return true;
  243. }
  244. // 基于延迟的丢帧
  245. if (shouldDropByLatency(item)) {
  246. return true;
  247. }
  248. // 基于比例的丢帧
  249. if (shouldDropByRatio()) {
  250. return true;
  251. }
  252. return false;
  253. }
  254. void FrameQueue::dropOldestFrame() {
  255. if (!queue_.empty()) {
  256. auto& oldestItem = queue_.front();
  257. if (dropCallback_) {
  258. dropCallback_(*oldestItem, "Drop oldest frame");
  259. }
  260. queue_.pop_front();
  261. std::lock_guard<std::mutex> statsLock(statsMutex_);
  262. stats_.totalDropped++;
  263. Logger::instance().debug("Dropped oldest frame");
  264. }
  265. }
  266. void FrameQueue::dropNewestFrame() {
  267. // 这个方法在enqueue时调用,不需要实际操作队列
  268. // 只需要更新统计信息
  269. }
  270. void FrameQueue::updateStats(const FrameQueueItem& item, bool isEnqueue) {
  271. std::lock_guard<std::mutex> lock(statsMutex_);
  272. if (isEnqueue) {
  273. stats_.totalEnqueued++;
  274. stats_.maxSize = std::max(stats_.maxSize, queue_.size());
  275. } else {
  276. stats_.totalDequeued++;
  277. // 计算等待时间
  278. double waitTime = calculateWaitTime(item);
  279. if (waitTime > 0) {
  280. // 更新平均等待时间
  281. if (stats_.totalDequeued == 1) {
  282. stats_.averageWaitTime = waitTime;
  283. } else {
  284. stats_.averageWaitTime = (stats_.averageWaitTime * (stats_.totalDequeued - 1) + waitTime) / stats_.totalDequeued;
  285. }
  286. stats_.maxWaitTime = std::max(stats_.maxWaitTime, waitTime);
  287. }
  288. }
  289. stats_.currentSize = queue_.size();
  290. stats_.lastUpdateTime = std::chrono::steady_clock::now();
  291. }
  292. double FrameQueue::calculateWaitTime(const FrameQueueItem& item) const {
  293. auto now = std::chrono::steady_clock::now();
  294. auto duration = std::chrono::duration_cast<std::chrono::microseconds>(now - item.enqueueTime);
  295. return duration.count() / 1000.0; // 转换为毫秒
  296. }
  297. bool FrameQueue::shouldDropByLatency(const FrameQueueItem& item) const {
  298. double waitTime = calculateWaitTime(item);
  299. return waitTime > config_.maxLatency;
  300. }
  301. bool FrameQueue::shouldDropByRatio() const {
  302. if (config_.dropRatio <= 1) {
  303. return false;
  304. }
  305. uint64_t count = frameCounter_.fetch_add(1);
  306. return (count % config_.dropRatio) == 0;
  307. }
  308. // 多流帧队列实现
  309. MultiStreamFrameQueue::MultiStreamFrameQueue(const FrameQueueConfig& config)
  310. : defaultConfig_(config) {
  311. Logger::instance().debug("MultiStreamFrameQueue created");
  312. }
  313. MultiStreamFrameQueue::~MultiStreamFrameQueue() {
  314. clear();
  315. Logger::instance().debug("MultiStreamFrameQueue destroyed");
  316. }
  317. ErrorCode MultiStreamFrameQueue::addStream(int streamIndex, const FrameQueueConfig& config) {
  318. std::unique_lock<std::shared_mutex> lock(streamsMutex_);
  319. if (streamQueues_.find(streamIndex) != streamQueues_.end()) {
  320. Logger::instance().warningf("Stream {} already exists", streamIndex);
  321. return ErrorCode::ALREADY_EXISTS;
  322. }
  323. streamQueues_[streamIndex] = std::make_unique<FrameQueue>(config);
  324. Logger::instance().debugf("Added stream: {}", streamIndex);
  325. return ErrorCode::SUCCESS;
  326. }
  327. ErrorCode MultiStreamFrameQueue::removeStream(int streamIndex) {
  328. std::unique_lock<std::shared_mutex> lock(streamsMutex_);
  329. auto it = streamQueues_.find(streamIndex);
  330. if (it == streamQueues_.end()) {
  331. Logger::instance().warningf("Stream {} not found", streamIndex);
  332. return ErrorCode::NOT_FOUND;
  333. }
  334. streamQueues_.erase(it);
  335. Logger::instance().debugf("Removed stream: {}", streamIndex);
  336. return ErrorCode::SUCCESS;
  337. }
  338. bool MultiStreamFrameQueue::hasStream(int streamIndex) const {
  339. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  340. return streamQueues_.find(streamIndex) != streamQueues_.end();
  341. }
  342. std::vector<int> MultiStreamFrameQueue::getStreamIndices() const {
  343. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  344. std::vector<int> indices;
  345. for (const auto& pair : streamQueues_) {
  346. indices.push_back(pair.first);
  347. }
  348. return indices;
  349. }
  350. ErrorCode MultiStreamFrameQueue::enqueue(AVFramePtr frame, int streamIndex) {
  351. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  352. auto it = streamQueues_.find(streamIndex);
  353. if (it == streamQueues_.end()) {
  354. // 自动创建流队列
  355. lock.unlock();
  356. addStream(streamIndex, defaultConfig_);
  357. lock.lock();
  358. it = streamQueues_.find(streamIndex);
  359. if (it == streamQueues_.end()) {
  360. return ErrorCode::NOT_FOUND;
  361. }
  362. }
  363. return it->second->enqueue(std::move(frame), streamIndex);
  364. }
  365. AVFramePtr MultiStreamFrameQueue::dequeue(int streamIndex) {
  366. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  367. auto it = streamQueues_.find(streamIndex);
  368. if (it == streamQueues_.end()) {
  369. return nullptr;
  370. }
  371. return it->second->dequeueFrame();
  372. }
  373. AVFramePtr MultiStreamFrameQueue::dequeue(int streamIndex, int timeoutMs) {
  374. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  375. auto it = streamQueues_.find(streamIndex);
  376. if (it == streamQueues_.end()) {
  377. return nullptr;
  378. }
  379. return it->second->dequeueFrame(timeoutMs);
  380. }
  381. ErrorCode MultiStreamFrameQueue::enqueueToAll(AVFramePtr frame) {
  382. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  383. if (!frame) {
  384. return ErrorCode::INVALID_PARAMS;
  385. }
  386. ErrorCode result = ErrorCode::SUCCESS;
  387. for (const auto& pair : streamQueues_) {
  388. // 为每个流创建帧的副本
  389. AVFramePtr frameCopy = makeAVFrame();
  390. if (!frameCopy) {
  391. result = ErrorCode::MEMORY_ALLOC_FAILED;
  392. continue;
  393. }
  394. if (av_frame_ref(frameCopy.get(), frame.get()) < 0) {
  395. result = ErrorCode::COPY_FAILED;
  396. continue;
  397. }
  398. ErrorCode enqueueResult = pair.second->enqueue(std::move(frameCopy), pair.first);
  399. if (enqueueResult != ErrorCode::SUCCESS) {
  400. result = enqueueResult;
  401. }
  402. }
  403. return result;
  404. }
  405. std::vector<AVFramePtr> MultiStreamFrameQueue::dequeueFromAll() {
  406. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  407. std::vector<AVFramePtr> frames;
  408. for (const auto& pair : streamQueues_) {
  409. AVFramePtr frame = pair.second->dequeueFrame();
  410. frames.push_back(std::move(frame)); // 可能为nullptr
  411. }
  412. return frames;
  413. }
  414. void MultiStreamFrameQueue::clear() {
  415. std::unique_lock<std::shared_mutex> lock(streamsMutex_);
  416. for (auto& pair : streamQueues_) {
  417. pair.second->clear();
  418. }
  419. Logger::instance().debug("All stream queues cleared");
  420. }
  421. void MultiStreamFrameQueue::clearStream(int streamIndex) {
  422. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  423. auto it = streamQueues_.find(streamIndex);
  424. if (it != streamQueues_.end()) {
  425. it->second->clear();
  426. Logger::instance().debugf("Stream {} queue cleared", streamIndex);
  427. }
  428. }
  429. void MultiStreamFrameQueue::flush() {
  430. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  431. for (auto& pair : streamQueues_) {
  432. pair.second->flush();
  433. }
  434. Logger::instance().debug("All stream queues flushed");
  435. }
  436. void MultiStreamFrameQueue::flushStream(int streamIndex) {
  437. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  438. auto it = streamQueues_.find(streamIndex);
  439. if (it != streamQueues_.end()) {
  440. it->second->flush();
  441. Logger::instance().debugf("Stream {} queue flushed", streamIndex);
  442. }
  443. }
  444. size_t MultiStreamFrameQueue::size() const {
  445. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  446. size_t totalSize = 0;
  447. for (const auto& pair : streamQueues_) {
  448. totalSize += pair.second->size();
  449. }
  450. return totalSize;
  451. }
  452. size_t MultiStreamFrameQueue::size(int streamIndex) const {
  453. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  454. auto it = streamQueues_.find(streamIndex);
  455. if (it != streamQueues_.end()) {
  456. return it->second->size();
  457. }
  458. return 0;
  459. }
  460. bool MultiStreamFrameQueue::empty() const {
  461. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  462. for (const auto& pair : streamQueues_) {
  463. if (!pair.second->empty()) {
  464. return false;
  465. }
  466. }
  467. return true;
  468. }
  469. bool MultiStreamFrameQueue::empty(int streamIndex) const {
  470. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  471. auto it = streamQueues_.find(streamIndex);
  472. if (it != streamQueues_.end()) {
  473. return it->second->empty();
  474. }
  475. return true;
  476. }
  477. std::map<int, FrameQueueStats> MultiStreamFrameQueue::getAllStats() const {
  478. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  479. std::map<int, FrameQueueStats> allStats;
  480. for (const auto& pair : streamQueues_) {
  481. allStats[pair.first] = pair.second->getStats();
  482. }
  483. return allStats;
  484. }
  485. FrameQueueStats MultiStreamFrameQueue::getStats(int streamIndex) const {
  486. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  487. auto it = streamQueues_.find(streamIndex);
  488. if (it != streamQueues_.end()) {
  489. return it->second->getStats();
  490. }
  491. return FrameQueueStats();
  492. }
  493. void MultiStreamFrameQueue::resetStats() {
  494. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  495. for (auto& pair : streamQueues_) {
  496. pair.second->resetStats();
  497. }
  498. Logger::instance().debug("All stream queue stats reset");
  499. }
  500. void MultiStreamFrameQueue::resetStats(int streamIndex) {
  501. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  502. auto it = streamQueues_.find(streamIndex);
  503. if (it != streamQueues_.end()) {
  504. it->second->resetStats();
  505. Logger::instance().debugf("Stream {} queue stats reset", streamIndex);
  506. }
  507. }
  508. void MultiStreamFrameQueue::setBlocking(bool blocking) {
  509. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  510. for (auto& pair : streamQueues_) {
  511. pair.second->setBlocking(blocking);
  512. }
  513. Logger::instance().debugf("All stream queues blocking mode: {}", blocking);
  514. }
  515. void MultiStreamFrameQueue::wakeupAll() {
  516. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  517. for (auto& pair : streamQueues_) {
  518. pair.second->wakeup();
  519. }
  520. }
  521. // 工厂类实现
  522. std::unique_ptr<FrameQueue> FrameQueueFactory::createStandardQueue(size_t maxSize) {
  523. FrameQueueConfig config;
  524. config.maxSize = maxSize;
  525. config.dropOnFull = true;
  526. config.dropOldest = true;
  527. config.timeoutMs = 1000;
  528. config.enableStats = true;
  529. return std::make_unique<FrameQueue>(config);
  530. }
  531. std::unique_ptr<FrameQueue> FrameQueueFactory::createLowLatencyQueue(size_t maxSize) {
  532. FrameQueueConfig config;
  533. config.maxSize = maxSize;
  534. config.dropOnFull = true;
  535. config.dropOldest = true;
  536. config.timeoutMs = 100;
  537. config.enableStats = true;
  538. config.enableFrameDrop = true;
  539. config.maxLatency = 50.0;
  540. config.dropRatio = 3;
  541. return std::make_unique<FrameQueue>(config);
  542. }
  543. std::unique_ptr<FrameQueue> FrameQueueFactory::createHighCapacityQueue(size_t maxSize) {
  544. FrameQueueConfig config;
  545. config.maxSize = maxSize;
  546. config.dropOnFull = false; // 不丢帧,等待
  547. config.dropOldest = true;
  548. config.timeoutMs = 5000;
  549. config.enableStats = true;
  550. config.enableFrameDrop = false;
  551. return std::make_unique<FrameQueue>(config);
  552. }
  553. std::unique_ptr<FrameQueue> FrameQueueFactory::createRealtimeQueue(size_t maxSize, double maxLatency) {
  554. FrameQueueConfig config;
  555. config.maxSize = maxSize;
  556. config.dropOnFull = true;
  557. config.dropOldest = true;
  558. config.timeoutMs = 50;
  559. config.enableStats = true;
  560. config.enableFrameDrop = true;
  561. config.maxLatency = maxLatency;
  562. config.dropRatio = 2;
  563. return std::make_unique<FrameQueue>(config);
  564. }
  565. std::unique_ptr<MultiStreamFrameQueue> FrameQueueFactory::createMultiStreamQueue(const FrameQueueConfig& config) {
  566. return std::make_unique<MultiStreamFrameQueue>(config);
  567. }
  568. } // namespace utils
  569. } // namespace av