utils_packet_queue.cpp 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101
  1. #include "utils_packet_queue.h"
  2. #include "../base/logger.h"
  3. #include <algorithm>
  4. #include <shared_mutex>
  5. namespace av {
  6. namespace utils {
  7. using namespace av;
  8. PacketQueue::PacketQueue(const PacketQueueConfig& config)
  9. : config_(config) {
  10. Logger::instance().debugf("PacketQueue created with max size: {}, max bytes: {}",
  11. config_.maxSize, config_.maxBytes);
  12. }
  13. PacketQueue::~PacketQueue() {
  14. shutdown_ = true;
  15. notEmpty_.notify_all();
  16. notFull_.notify_all();
  17. clear();
  18. Logger::instance().debug("PacketQueue destroyed");
  19. }
  20. ErrorCode PacketQueue::enqueue(std::unique_ptr<PacketQueueItem> item) {
  21. if (!item) {
  22. return ErrorCode::INVALID_PARAMS;
  23. }
  24. if (shutdown_) {
  25. return ErrorCode::INVALID_STATE;
  26. }
  27. // 检查流过滤
  28. if (!isStreamAllowed(item->streamIndex)) {
  29. Logger::instance().debugf("Packet dropped - stream {} not allowed", item->streamIndex);
  30. return ErrorCode::SUCCESS;
  31. }
  32. std::unique_lock<std::mutex> lock(queueMutex_);
  33. // 检查是否需要丢包
  34. if (shouldDropPacket(*item)) {
  35. if (dropCallback_) {
  36. dropCallback_(*item, "Smart packet drop");
  37. }
  38. std::lock_guard<std::mutex> statsLock(statsMutex_);
  39. stats_.totalDropped++;
  40. Logger::instance().debug("Packet dropped by smart drop policy");
  41. return ErrorCode::SUCCESS;
  42. }
  43. // 检查队列是否已满
  44. size_t currentSize = config_.priorityQueue ? priorityQueue_.size() : normalQueue_.size();
  45. size_t currentBytes = totalBytes_.load();
  46. bool queueFull = (currentSize >= config_.maxSize) || (currentBytes >= config_.maxBytes);
  47. if (blocking_ && queueFull) {
  48. if (config_.dropOnFull) {
  49. // 根据策略丢弃包
  50. if (config_.dropOldest) {
  51. dropOldestPacket();
  52. } else if (config_.dropNonKeyPackets && !item->isKeyPacket) {
  53. dropNonKeyPackets();
  54. } else {
  55. if (dropCallback_) {
  56. dropCallback_(*item, "Queue full - drop newest");
  57. }
  58. std::lock_guard<std::mutex> statsLock(statsMutex_);
  59. stats_.totalDropped++;
  60. Logger::instance().debug("Packet dropped - queue full");
  61. return ErrorCode::SUCCESS;
  62. }
  63. } else {
  64. // 等待队列有空间
  65. if (!notFull_.wait_for(lock, std::chrono::milliseconds(config_.timeoutMs),
  66. [this, currentSize, currentBytes] {
  67. size_t newSize = config_.priorityQueue ? priorityQueue_.size() : normalQueue_.size();
  68. size_t newBytes = totalBytes_.load();
  69. return (newSize < config_.maxSize && newBytes < config_.maxBytes) || shutdown_;
  70. })) {
  71. Logger::instance().warning("Enqueue timeout");
  72. return ErrorCode::TIMEOUT;
  73. }
  74. if (shutdown_) {
  75. return ErrorCode::INVALID_STATE;
  76. }
  77. }
  78. }
  79. // 更新字节计数
  80. if (item->packet) {
  81. totalBytes_ += item->packet->size;
  82. }
  83. // 在move之前保存item的引用,用于后续的统计和回调
  84. const PacketQueueItem& itemRef = *item;
  85. // 添加到队列
  86. if (config_.priorityQueue) {
  87. enqueueToPriorityQueue(std::move(item));
  88. } else {
  89. enqueueToNormalQueue(std::move(item));
  90. }
  91. // 更新统计信息
  92. if (config_.enableStats) {
  93. updateStats(itemRef, true);
  94. }
  95. // 通知等待的消费者
  96. notEmpty_.notify_one();
  97. // 回调
  98. if (enqueueCallback_) {
  99. enqueueCallback_(itemRef);
  100. }
  101. return ErrorCode::SUCCESS;
  102. }
  103. std::unique_ptr<PacketQueueItem> PacketQueue::dequeue() {
  104. return dequeue(config_.timeoutMs);
  105. }
  106. std::unique_ptr<PacketQueueItem> PacketQueue::dequeue(int timeoutMs) {
  107. std::unique_lock<std::mutex> lock(queueMutex_);
  108. // 等待队列有数据
  109. if (blocking_) {
  110. bool hasData = config_.priorityQueue ? !priorityQueue_.empty() : !normalQueue_.empty();
  111. if (!hasData) {
  112. if (!notEmpty_.wait_for(lock, std::chrono::milliseconds(timeoutMs),
  113. [this] {
  114. bool hasData = config_.priorityQueue ? !priorityQueue_.empty() : !normalQueue_.empty();
  115. return hasData || shutdown_;
  116. })) {
  117. return nullptr; // 超时
  118. }
  119. if (shutdown_) {
  120. return nullptr;
  121. }
  122. }
  123. }
  124. // 检查队列是否为空
  125. bool isEmpty = config_.priorityQueue ? priorityQueue_.empty() : normalQueue_.empty();
  126. if (isEmpty) {
  127. return nullptr;
  128. }
  129. // 取出包
  130. std::unique_ptr<PacketQueueItem> item;
  131. if (config_.priorityQueue) {
  132. item = dequeueFromPriorityQueue();
  133. } else {
  134. item = dequeueFromNormalQueue();
  135. }
  136. if (!item) {
  137. return nullptr;
  138. }
  139. // 更新字节计数
  140. if (item->packet) {
  141. totalBytes_ -= item->packet->size;
  142. }
  143. // 更新统计信息
  144. if (config_.enableStats) {
  145. updateStats(*item, false);
  146. }
  147. // 通知等待的生产者
  148. notFull_.notify_one();
  149. // 回调
  150. if (dequeueCallback_) {
  151. dequeueCallback_(*item);
  152. }
  153. return item;
  154. }
  155. ErrorCode PacketQueue::enqueue(AVPacket* packet, int streamIndex, int priority) {
  156. if (!packet) {
  157. return ErrorCode::INVALID_PARAMS;
  158. }
  159. auto item = std::make_unique<PacketQueueItem>(packet, streamIndex, priority);
  160. return enqueue(std::move(item));
  161. }
  162. AVPacket* PacketQueue::dequeuePacket() {
  163. auto item = dequeue();
  164. if (item) {
  165. AVPacket* packet = item->packet;
  166. item->packet = nullptr; // 转移所有权
  167. return packet;
  168. }
  169. return nullptr;
  170. }
  171. AVPacket* PacketQueue::dequeuePacket(int timeoutMs) {
  172. auto item = dequeue(timeoutMs);
  173. if (item) {
  174. AVPacket* packet = item->packet;
  175. item->packet = nullptr; // 转移所有权
  176. return packet;
  177. }
  178. return nullptr;
  179. }
  180. ErrorCode PacketQueue::enqueueWithPriority(AVPacket* packet, int priority, int streamIndex) {
  181. if (!packet) {
  182. return ErrorCode::INVALID_PARAMS;
  183. }
  184. auto item = std::make_unique<PacketQueueItem>(packet, streamIndex, priority);
  185. return enqueue(std::move(item));
  186. }
  187. std::unique_ptr<PacketQueueItem> PacketQueue::dequeueHighestPriority() {
  188. if (!config_.priorityQueue) {
  189. Logger::instance().warning("Priority queue not enabled");
  190. return dequeue();
  191. }
  192. return dequeue();
  193. }
  194. void PacketQueue::clear() {
  195. std::lock_guard<std::mutex> lock(queueMutex_);
  196. // 清空普通队列
  197. while (!normalQueue_.empty()) {
  198. normalQueue_.pop();
  199. }
  200. // 清空优先级队列
  201. while (!priorityQueue_.empty()) {
  202. priorityQueue_.pop();
  203. }
  204. totalBytes_ = 0;
  205. Logger::instance().debug("Packet queue cleared");
  206. }
  207. void PacketQueue::flush() {
  208. clear();
  209. notEmpty_.notify_all();
  210. notFull_.notify_all();
  211. }
  212. void PacketQueue::setMaxSize(size_t maxSize) {
  213. std::lock_guard<std::mutex> lock(queueMutex_);
  214. config_.maxSize = maxSize;
  215. // 如果当前队列大小超过新的最大值,丢弃多余的包
  216. size_t currentSize = config_.priorityQueue ? priorityQueue_.size() : normalQueue_.size();
  217. while (currentSize > maxSize) {
  218. if (config_.dropOldest) {
  219. dropOldestPacket();
  220. } else {
  221. break;
  222. }
  223. currentSize = config_.priorityQueue ? priorityQueue_.size() : normalQueue_.size();
  224. }
  225. Logger::instance().debugf("Packet queue max size set to: {}", maxSize);
  226. }
  227. void PacketQueue::setMaxBytes(size_t maxBytes) {
  228. config_.maxBytes = maxBytes;
  229. Logger::instance().debugf("Packet queue max bytes set to: {}", maxBytes);
  230. }
  231. void PacketQueue::setDropPolicy(bool dropOnFull, bool dropOldest) {
  232. config_.dropOnFull = dropOnFull;
  233. config_.dropOldest = dropOldest;
  234. Logger::instance().debugf("Drop policy set: dropOnFull={}, dropOldest={}", dropOnFull, dropOldest);
  235. }
  236. void PacketQueue::enablePriorityQueue(bool enable) {
  237. std::lock_guard<std::mutex> lock(queueMutex_);
  238. if (config_.priorityQueue == enable) {
  239. return;
  240. }
  241. // 如果要切换队列类型,需要迁移数据
  242. if (enable) {
  243. // 从普通队列迁移到优先级队列
  244. while (!normalQueue_.empty()) {
  245. auto item = std::move(normalQueue_.front());
  246. normalQueue_.pop();
  247. priorityQueue_.push(std::move(item));
  248. }
  249. } else {
  250. // 从优先级队列迁移到普通队列
  251. std::vector<std::unique_ptr<PacketQueueItem>> tempItems;
  252. while (!priorityQueue_.empty()) {
  253. tempItems.push_back(std::move(const_cast<std::unique_ptr<PacketQueueItem>&>(priorityQueue_.top())));
  254. priorityQueue_.pop();
  255. }
  256. // 按时间戳排序后加入普通队列
  257. std::sort(tempItems.begin(), tempItems.end(),
  258. [](const std::unique_ptr<PacketQueueItem>& a, const std::unique_ptr<PacketQueueItem>& b) {
  259. return a->dts < b->dts;
  260. });
  261. for (auto& item : tempItems) {
  262. normalQueue_.push(std::move(item));
  263. }
  264. }
  265. config_.priorityQueue = enable;
  266. Logger::instance().debugf("Priority queue {}", enable ? "enabled" : "disabled");
  267. }
  268. size_t PacketQueue::size() const {
  269. std::lock_guard<std::mutex> lock(queueMutex_);
  270. return config_.priorityQueue ? priorityQueue_.size() : normalQueue_.size();
  271. }
  272. size_t PacketQueue::bytes() const {
  273. return totalBytes_.load();
  274. }
  275. bool PacketQueue::empty() const {
  276. std::lock_guard<std::mutex> lock(queueMutex_);
  277. return config_.priorityQueue ? priorityQueue_.empty() : normalQueue_.empty();
  278. }
  279. bool PacketQueue::full() const {
  280. std::lock_guard<std::mutex> lock(queueMutex_);
  281. size_t currentSize = config_.priorityQueue ? priorityQueue_.size() : normalQueue_.size();
  282. return currentSize >= config_.maxSize;
  283. }
  284. bool PacketQueue::fullByBytes() const {
  285. return totalBytes_.load() >= config_.maxBytes;
  286. }
  287. size_t PacketQueue::capacity() const {
  288. return config_.maxSize;
  289. }
  290. size_t PacketQueue::capacityBytes() const {
  291. return config_.maxBytes;
  292. }
  293. PacketQueueStats PacketQueue::getStats() const {
  294. std::lock_guard<std::mutex> lock(statsMutex_);
  295. PacketQueueStats stats = stats_;
  296. stats.currentSize = size();
  297. stats.totalBytes = bytes();
  298. return stats;
  299. }
  300. void PacketQueue::resetStats() {
  301. std::lock_guard<std::mutex> lock(statsMutex_);
  302. stats_ = PacketQueueStats();
  303. Logger::instance().debug("Packet queue stats reset");
  304. }
  305. void PacketQueue::enablePacketDrop(bool enable, double maxLatency, int dropRatio) {
  306. config_.enablePacketDrop = enable;
  307. config_.maxLatency = maxLatency;
  308. config_.dropRatio = dropRatio;
  309. Logger::instance().debugf("Packet drop enabled: {}, maxLatency: {}ms, dropRatio: {}",
  310. enable, maxLatency, dropRatio);
  311. }
  312. void PacketQueue::setPacketDropCallback(std::function<bool(const PacketQueueItem&)> callback) {
  313. packetDropCallback_ = callback;
  314. }
  315. void PacketQueue::setBlocking(bool blocking) {
  316. blocking_ = blocking;
  317. if (!blocking) {
  318. notEmpty_.notify_all();
  319. notFull_.notify_all();
  320. }
  321. Logger::instance().debugf("Packet queue blocking mode: {}", blocking);
  322. }
  323. void PacketQueue::wakeup() {
  324. notEmpty_.notify_all();
  325. notFull_.notify_all();
  326. }
  327. void PacketQueue::setStreamFilter(const std::vector<int>& allowedStreams) {
  328. std::lock_guard<std::mutex> lock(streamFilterMutex_);
  329. allowedStreams_ = allowedStreams;
  330. hasStreamFilter_ = !allowedStreams.empty();
  331. Logger::instance().debugf("Stream filter set with {} allowed streams", allowedStreams.size());
  332. }
  333. void PacketQueue::clearStreamFilter() {
  334. std::lock_guard<std::mutex> lock(streamFilterMutex_);
  335. allowedStreams_.clear();
  336. hasStreamFilter_ = false;
  337. Logger::instance().debug("Stream filter cleared");
  338. }
  339. // 内部方法实现
  340. bool PacketQueue::shouldDropPacket(const PacketQueueItem& item) const {
  341. if (!config_.enablePacketDrop) {
  342. return false;
  343. }
  344. // 自定义丢包回调
  345. if (packetDropCallback_ && packetDropCallback_(item)) {
  346. return true;
  347. }
  348. // 基于延迟的丢包
  349. if (shouldDropByLatency(item)) {
  350. return true;
  351. }
  352. // 基于比例的丢包
  353. if (shouldDropByRatio()) {
  354. return true;
  355. }
  356. // 基于大小的丢包
  357. if (shouldDropBySize()) {
  358. return true;
  359. }
  360. return false;
  361. }
  362. void PacketQueue::dropOldestPacket() {
  363. if (config_.priorityQueue) {
  364. if (!priorityQueue_.empty()) {
  365. auto& oldestItem = const_cast<std::unique_ptr<PacketQueueItem>&>(priorityQueue_.top());
  366. if (dropCallback_) {
  367. dropCallback_(*oldestItem, "Drop oldest packet");
  368. }
  369. if (oldestItem->packet) {
  370. totalBytes_ -= oldestItem->packet->size;
  371. }
  372. priorityQueue_.pop();
  373. std::lock_guard<std::mutex> statsLock(statsMutex_);
  374. stats_.totalDropped++;
  375. Logger::instance().debug("Dropped oldest packet from priority queue");
  376. }
  377. } else {
  378. if (!normalQueue_.empty()) {
  379. auto& oldestItem = normalQueue_.front();
  380. if (dropCallback_) {
  381. dropCallback_(*oldestItem, "Drop oldest packet");
  382. }
  383. if (oldestItem->packet) {
  384. totalBytes_ -= oldestItem->packet->size;
  385. }
  386. normalQueue_.pop();
  387. std::lock_guard<std::mutex> statsLock(statsMutex_);
  388. stats_.totalDropped++;
  389. Logger::instance().debug("Dropped oldest packet from normal queue");
  390. }
  391. }
  392. }
  393. void PacketQueue::dropNewestPacket() {
  394. // 这个方法在enqueue时调用,不需要实际操作队列
  395. }
  396. void PacketQueue::dropLargestPacket() {
  397. // 找到最大的包并丢弃
  398. // 这里简化实现,直接丢弃最旧的包
  399. dropOldestPacket();
  400. }
  401. void PacketQueue::dropNonKeyPackets() {
  402. std::vector<std::unique_ptr<PacketQueueItem>> keyPackets;
  403. if (config_.priorityQueue) {
  404. while (!priorityQueue_.empty()) {
  405. auto item = std::move(const_cast<std::unique_ptr<PacketQueueItem>&>(priorityQueue_.top()));
  406. priorityQueue_.pop();
  407. if (item->isKeyPacket) {
  408. keyPackets.push_back(std::move(item));
  409. } else {
  410. if (dropCallback_) {
  411. dropCallback_(*item, "Drop non-key packet");
  412. }
  413. if (item->packet) {
  414. totalBytes_ -= item->packet->size;
  415. }
  416. std::lock_guard<std::mutex> statsLock(statsMutex_);
  417. stats_.totalDropped++;
  418. }
  419. }
  420. // 重新加入关键包
  421. for (auto& item : keyPackets) {
  422. priorityQueue_.push(std::move(item));
  423. }
  424. } else {
  425. while (!normalQueue_.empty()) {
  426. auto item = std::move(normalQueue_.front());
  427. normalQueue_.pop();
  428. if (item->isKeyPacket) {
  429. keyPackets.push_back(std::move(item));
  430. } else {
  431. if (dropCallback_) {
  432. dropCallback_(*item, "Drop non-key packet");
  433. }
  434. if (item->packet) {
  435. totalBytes_ -= item->packet->size;
  436. }
  437. std::lock_guard<std::mutex> statsLock(statsMutex_);
  438. stats_.totalDropped++;
  439. }
  440. }
  441. // 重新加入关键包
  442. for (auto& item : keyPackets) {
  443. normalQueue_.push(std::move(item));
  444. }
  445. }
  446. Logger::instance().debugf("Dropped non-key packets, kept {} key packets", keyPackets.size());
  447. }
  448. void PacketQueue::updateStats(const PacketQueueItem& item, bool isEnqueue) {
  449. std::lock_guard<std::mutex> lock(statsMutex_);
  450. if (isEnqueue) {
  451. stats_.totalEnqueued++;
  452. size_t currentSize = config_.priorityQueue ? priorityQueue_.size() : normalQueue_.size();
  453. stats_.maxSize = std::max(stats_.maxSize, currentSize);
  454. if (item.packet) {
  455. stats_.totalBytes += item.packet->size;
  456. // 更新平均包大小
  457. if (stats_.totalEnqueued == 1) {
  458. stats_.averagePacketSize = item.packet->size;
  459. } else {
  460. stats_.averagePacketSize = (stats_.averagePacketSize * (stats_.totalEnqueued - 1) + item.packet->size) / stats_.totalEnqueued;
  461. }
  462. }
  463. } else {
  464. stats_.totalDequeued++;
  465. // 计算等待时间
  466. double waitTime = calculateWaitTime(item);
  467. if (waitTime > 0) {
  468. // 更新平均等待时间
  469. if (stats_.totalDequeued == 1) {
  470. stats_.averageWaitTime = waitTime;
  471. } else {
  472. stats_.averageWaitTime = (stats_.averageWaitTime * (stats_.totalDequeued - 1) + waitTime) / stats_.totalDequeued;
  473. }
  474. stats_.maxWaitTime = std::max(stats_.maxWaitTime, waitTime);
  475. }
  476. }
  477. stats_.lastUpdateTime = std::chrono::steady_clock::now();
  478. }
  479. double PacketQueue::calculateWaitTime(const PacketQueueItem& item) const {
  480. auto now = std::chrono::steady_clock::now();
  481. auto duration = std::chrono::duration_cast<std::chrono::microseconds>(now - item.enqueueTime);
  482. return duration.count() / 1000.0; // 转换为毫秒
  483. }
  484. bool PacketQueue::isStreamAllowed(int streamIndex) const {
  485. if (!hasStreamFilter_) {
  486. return true;
  487. }
  488. std::lock_guard<std::mutex> lock(streamFilterMutex_);
  489. return std::find(allowedStreams_.begin(), allowedStreams_.end(), streamIndex) != allowedStreams_.end();
  490. }
  491. bool PacketQueue::shouldDropByLatency(const PacketQueueItem& item) const {
  492. double waitTime = calculateWaitTime(item);
  493. return waitTime > config_.maxLatency;
  494. }
  495. bool PacketQueue::shouldDropByRatio() const {
  496. if (config_.dropRatio <= 1) {
  497. return false;
  498. }
  499. uint64_t count = packetCounter_.fetch_add(1);
  500. return (count % config_.dropRatio) == 0;
  501. }
  502. bool PacketQueue::shouldDropBySize() const {
  503. return totalBytes_.load() > config_.maxBytes;
  504. }
  505. void PacketQueue::enqueueToNormalQueue(std::unique_ptr<PacketQueueItem> item) {
  506. normalQueue_.push(std::move(item));
  507. }
  508. void PacketQueue::enqueueToPriorityQueue(std::unique_ptr<PacketQueueItem> item) {
  509. priorityQueue_.push(std::move(item));
  510. }
  511. std::unique_ptr<PacketQueueItem> PacketQueue::dequeueFromNormalQueue() {
  512. if (normalQueue_.empty()) {
  513. return nullptr;
  514. }
  515. auto item = std::move(normalQueue_.front());
  516. normalQueue_.pop();
  517. return item;
  518. }
  519. std::unique_ptr<PacketQueueItem> PacketQueue::dequeueFromPriorityQueue() {
  520. if (priorityQueue_.empty()) {
  521. return nullptr;
  522. }
  523. auto item = std::move(const_cast<std::unique_ptr<PacketQueueItem>&>(priorityQueue_.top()));
  524. priorityQueue_.pop();
  525. return item;
  526. }
  527. // 多流包队列实现
  528. MultiStreamPacketQueue::MultiStreamPacketQueue(const PacketQueueConfig& config)
  529. : defaultConfig_(config) {
  530. Logger::instance().debug("MultiStreamPacketQueue created");
  531. }
  532. MultiStreamPacketQueue::~MultiStreamPacketQueue() {
  533. clear();
  534. Logger::instance().debug("MultiStreamPacketQueue destroyed");
  535. }
  536. ErrorCode MultiStreamPacketQueue::addStream(int streamIndex, const PacketQueueConfig& config) {
  537. std::unique_lock<std::shared_mutex> lock(streamsMutex_);
  538. if (streamQueues_.find(streamIndex) != streamQueues_.end()) {
  539. Logger::instance().warningf("Stream {} already exists", streamIndex);
  540. return ErrorCode::ALREADY_EXISTS;
  541. }
  542. streamQueues_[streamIndex] = std::make_unique<PacketQueue>(config);
  543. Logger::instance().debugf("Added stream: {}", streamIndex);
  544. return ErrorCode::SUCCESS;
  545. }
  546. ErrorCode MultiStreamPacketQueue::removeStream(int streamIndex) {
  547. std::unique_lock<std::shared_mutex> lock(streamsMutex_);
  548. auto it = streamQueues_.find(streamIndex);
  549. if (it == streamQueues_.end()) {
  550. Logger::instance().warningf("Stream {} not found", streamIndex);
  551. return ErrorCode::NOT_FOUND;
  552. }
  553. streamQueues_.erase(it);
  554. Logger::instance().debugf("Removed stream: {}", streamIndex);
  555. return ErrorCode::SUCCESS;
  556. }
  557. bool MultiStreamPacketQueue::hasStream(int streamIndex) const {
  558. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  559. return streamQueues_.find(streamIndex) != streamQueues_.end();
  560. }
  561. std::vector<int> MultiStreamPacketQueue::getStreamIndices() const {
  562. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  563. std::vector<int> indices;
  564. for (const auto& pair : streamQueues_) {
  565. indices.push_back(pair.first);
  566. }
  567. return indices;
  568. }
  569. ErrorCode MultiStreamPacketQueue::enqueue(AVPacket* packet, int streamIndex) {
  570. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  571. auto it = streamQueues_.find(streamIndex);
  572. if (it == streamQueues_.end()) {
  573. // 自动创建流队列
  574. lock.unlock();
  575. addStream(streamIndex, defaultConfig_);
  576. lock.lock();
  577. it = streamQueues_.find(streamIndex);
  578. if (it == streamQueues_.end()) {
  579. return ErrorCode::NOT_FOUND;
  580. }
  581. }
  582. return it->second->enqueue(packet, streamIndex);
  583. }
  584. AVPacket* MultiStreamPacketQueue::dequeue(int streamIndex) {
  585. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  586. auto it = streamQueues_.find(streamIndex);
  587. if (it == streamQueues_.end()) {
  588. return nullptr;
  589. }
  590. return it->second->dequeuePacket();
  591. }
  592. AVPacket* MultiStreamPacketQueue::dequeue(int streamIndex, int timeoutMs) {
  593. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  594. auto it = streamQueues_.find(streamIndex);
  595. if (it == streamQueues_.end()) {
  596. return nullptr;
  597. }
  598. return it->second->dequeuePacket(timeoutMs);
  599. }
  600. ErrorCode MultiStreamPacketQueue::enqueueToAll(AVPacket* packet) {
  601. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  602. ErrorCode result = ErrorCode::SUCCESS;
  603. for (const auto& pair : streamQueues_) {
  604. // 为每个流创建包的副本
  605. AVPacket* packetCopy = av_packet_alloc();
  606. if (!packetCopy) {
  607. result = ErrorCode::MEMORY_ALLOC_FAILED;
  608. continue;
  609. }
  610. if (av_packet_ref(packetCopy, packet) < 0) {
  611. av_packet_free(&packetCopy);
  612. result = ErrorCode::COPY_FAILED;
  613. continue;
  614. }
  615. ErrorCode enqueueResult = pair.second->enqueue(packetCopy, pair.first);
  616. if (enqueueResult != ErrorCode::SUCCESS) {
  617. av_packet_free(&packetCopy);
  618. result = enqueueResult;
  619. }
  620. }
  621. return result;
  622. }
  623. std::vector<AVPacket*> MultiStreamPacketQueue::dequeueFromAll() {
  624. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  625. std::vector<AVPacket*> packets;
  626. for (const auto& pair : streamQueues_) {
  627. AVPacket* packet = pair.second->dequeuePacket();
  628. packets.push_back(packet); // 可能为nullptr
  629. }
  630. return packets;
  631. }
  632. AVPacket* MultiStreamPacketQueue::dequeueSynchronized() {
  633. std::lock_guard<std::mutex> lock(syncMutex_);
  634. // 从同步队列中取出时间戳最小的包
  635. if (!syncQueue_.empty()) {
  636. SyncItem item = syncQueue_.top();
  637. syncQueue_.pop();
  638. return item.packet;
  639. }
  640. // 如果同步队列为空,从各个流队列中取包并排序
  641. std::vector<SyncItem> items;
  642. {
  643. std::shared_lock<std::shared_mutex> streamLock(streamsMutex_);
  644. for (const auto& pair : streamQueues_) {
  645. AVPacket* packet = pair.second->dequeuePacket(10); // 短超时
  646. if (packet) {
  647. SyncItem item;
  648. item.packet = packet;
  649. item.streamIndex = pair.first;
  650. item.timestamp = (packet->dts != AV_NOPTS_VALUE) ? packet->dts : packet->pts;
  651. items.push_back(item);
  652. }
  653. }
  654. }
  655. if (items.empty()) {
  656. return nullptr;
  657. }
  658. // 按时间戳排序
  659. std::sort(items.begin(), items.end(), [](const SyncItem& a, const SyncItem& b) {
  660. return a.timestamp < b.timestamp;
  661. });
  662. // 返回时间戳最小的包
  663. AVPacket* result = items[0].packet;
  664. // 将其余包放入同步队列
  665. for (size_t i = 1; i < items.size(); ++i) {
  666. syncQueue_.push(items[i]);
  667. }
  668. return result;
  669. }
  670. std::vector<AVPacket*> MultiStreamPacketQueue::dequeueSynchronizedBatch(size_t maxCount) {
  671. std::vector<AVPacket*> packets;
  672. for (size_t i = 0; i < maxCount; ++i) {
  673. AVPacket* packet = dequeueSynchronized();
  674. if (!packet) {
  675. break;
  676. }
  677. packets.push_back(packet);
  678. }
  679. return packets;
  680. }
  681. void MultiStreamPacketQueue::clear() {
  682. std::unique_lock<std::shared_mutex> lock(streamsMutex_);
  683. for (auto& pair : streamQueues_) {
  684. pair.second->clear();
  685. }
  686. // 清空同步队列
  687. std::lock_guard<std::mutex> syncLock(syncMutex_);
  688. while (!syncQueue_.empty()) {
  689. SyncItem item = syncQueue_.top();
  690. syncQueue_.pop();
  691. if (item.packet) {
  692. av_packet_free(&item.packet);
  693. }
  694. }
  695. Logger::instance().debug("All stream queues cleared");
  696. }
  697. void MultiStreamPacketQueue::clearStream(int streamIndex) {
  698. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  699. auto it = streamQueues_.find(streamIndex);
  700. if (it != streamQueues_.end()) {
  701. it->second->clear();
  702. Logger::instance().debugf("Stream {} queue cleared", streamIndex);
  703. }
  704. }
  705. void MultiStreamPacketQueue::flush() {
  706. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  707. for (auto& pair : streamQueues_) {
  708. pair.second->flush();
  709. }
  710. Logger::instance().debug("All stream queues flushed");
  711. }
  712. void MultiStreamPacketQueue::flushStream(int streamIndex) {
  713. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  714. auto it = streamQueues_.find(streamIndex);
  715. if (it != streamQueues_.end()) {
  716. it->second->flush();
  717. Logger::instance().debugf("Stream {} queue flushed", streamIndex);
  718. }
  719. }
  720. size_t MultiStreamPacketQueue::size() const {
  721. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  722. size_t totalSize = 0;
  723. for (const auto& pair : streamQueues_) {
  724. totalSize += pair.second->size();
  725. }
  726. return totalSize;
  727. }
  728. size_t MultiStreamPacketQueue::size(int streamIndex) const {
  729. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  730. auto it = streamQueues_.find(streamIndex);
  731. if (it != streamQueues_.end()) {
  732. return it->second->size();
  733. }
  734. return 0;
  735. }
  736. size_t MultiStreamPacketQueue::bytes() const {
  737. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  738. size_t totalBytes = 0;
  739. for (const auto& pair : streamQueues_) {
  740. totalBytes += pair.second->bytes();
  741. }
  742. return totalBytes;
  743. }
  744. size_t MultiStreamPacketQueue::bytes(int streamIndex) const {
  745. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  746. auto it = streamQueues_.find(streamIndex);
  747. if (it != streamQueues_.end()) {
  748. return it->second->bytes();
  749. }
  750. return 0;
  751. }
  752. bool MultiStreamPacketQueue::empty() const {
  753. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  754. for (const auto& pair : streamQueues_) {
  755. if (!pair.second->empty()) {
  756. return false;
  757. }
  758. }
  759. return true;
  760. }
  761. bool MultiStreamPacketQueue::empty(int streamIndex) const {
  762. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  763. auto it = streamQueues_.find(streamIndex);
  764. if (it != streamQueues_.end()) {
  765. return it->second->empty();
  766. }
  767. return true;
  768. }
  769. std::map<int, PacketQueueStats> MultiStreamPacketQueue::getAllStats() const {
  770. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  771. std::map<int, PacketQueueStats> allStats;
  772. for (const auto& pair : streamQueues_) {
  773. allStats[pair.first] = pair.second->getStats();
  774. }
  775. return allStats;
  776. }
  777. PacketQueueStats MultiStreamPacketQueue::getStats(int streamIndex) const {
  778. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  779. auto it = streamQueues_.find(streamIndex);
  780. if (it != streamQueues_.end()) {
  781. return it->second->getStats();
  782. }
  783. return PacketQueueStats();
  784. }
  785. void MultiStreamPacketQueue::resetStats() {
  786. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  787. for (auto& pair : streamQueues_) {
  788. pair.second->resetStats();
  789. }
  790. Logger::instance().debug("All stream queue stats reset");
  791. }
  792. void MultiStreamPacketQueue::resetStats(int streamIndex) {
  793. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  794. auto it = streamQueues_.find(streamIndex);
  795. if (it != streamQueues_.end()) {
  796. it->second->resetStats();
  797. Logger::instance().debugf("Stream {} queue stats reset", streamIndex);
  798. }
  799. }
  800. void MultiStreamPacketQueue::setBlocking(bool blocking) {
  801. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  802. for (auto& pair : streamQueues_) {
  803. pair.second->setBlocking(blocking);
  804. }
  805. Logger::instance().debugf("All stream queues blocking mode: {}", blocking);
  806. }
  807. void MultiStreamPacketQueue::wakeupAll() {
  808. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  809. for (auto& pair : streamQueues_) {
  810. pair.second->wakeup();
  811. }
  812. }
  813. // 工厂类实现
  814. std::unique_ptr<PacketQueue> PacketQueueFactory::createStandardQueue(size_t maxSize) {
  815. PacketQueueConfig config;
  816. config.maxSize = maxSize;
  817. config.dropOnFull = true;
  818. config.dropOldest = true;
  819. config.timeoutMs = 1000;
  820. config.enableStats = true;
  821. return std::make_unique<PacketQueue>(config);
  822. }
  823. std::unique_ptr<PacketQueue> PacketQueueFactory::createLowLatencyQueue(size_t maxSize) {
  824. PacketQueueConfig config;
  825. config.maxSize = maxSize;
  826. config.dropOnFull = true;
  827. config.dropOldest = true;
  828. config.timeoutMs = 100;
  829. config.enableStats = true;
  830. config.enablePacketDrop = true;
  831. config.maxLatency = 100.0;
  832. config.dropRatio = 3;
  833. return std::make_unique<PacketQueue>(config);
  834. }
  835. std::unique_ptr<PacketQueue> PacketQueueFactory::createHighCapacityQueue(size_t maxSize) {
  836. PacketQueueConfig config;
  837. config.maxSize = maxSize;
  838. config.maxBytes = 200 * 1024 * 1024; // 200MB
  839. config.dropOnFull = false; // 不丢包,等待
  840. config.dropOldest = true;
  841. config.timeoutMs = 5000;
  842. config.enableStats = true;
  843. config.enablePacketDrop = false;
  844. return std::make_unique<PacketQueue>(config);
  845. }
  846. std::unique_ptr<PacketQueue> PacketQueueFactory::createRealtimeQueue(size_t maxSize, double maxLatency) {
  847. PacketQueueConfig config;
  848. config.maxSize = maxSize;
  849. config.dropOnFull = true;
  850. config.dropOldest = true;
  851. config.timeoutMs = 50;
  852. config.enableStats = true;
  853. config.enablePacketDrop = true;
  854. config.maxLatency = maxLatency;
  855. config.dropRatio = 2;
  856. config.dropNonKeyPackets = true;
  857. return std::make_unique<PacketQueue>(config);
  858. }
  859. std::unique_ptr<PacketQueue> PacketQueueFactory::createPriorityQueue(size_t maxSize) {
  860. PacketQueueConfig config;
  861. config.maxSize = maxSize;
  862. config.priorityQueue = true;
  863. config.dropOnFull = true;
  864. config.dropOldest = true;
  865. config.timeoutMs = 1000;
  866. config.enableStats = true;
  867. return std::make_unique<PacketQueue>(config);
  868. }
  869. std::unique_ptr<MultiStreamPacketQueue> PacketQueueFactory::createMultiStreamQueue(const PacketQueueConfig& config) {
  870. return std::make_unique<MultiStreamPacketQueue>(config);
  871. }
  872. } // namespace utils
  873. } // namespace av