utils_packet_queue.cpp 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098
  1. #include "utils_packet_queue.h"
  2. #include "../base/base_logger.h"
  3. #include <algorithm>
  4. #include <shared_mutex>
  5. namespace av {
  6. namespace utils {
  7. using namespace av::base;
  8. PacketQueue::PacketQueue(const PacketQueueConfig& config)
  9. : config_(config) {
  10. Logger::debug("PacketQueue created with max size: {}, max bytes: {}",
  11. config_.maxSize, config_.maxBytes);
  12. }
  13. PacketQueue::~PacketQueue() {
  14. shutdown_ = true;
  15. notEmpty_.notify_all();
  16. notFull_.notify_all();
  17. clear();
  18. Logger::debug("PacketQueue destroyed");
  19. }
  20. ErrorCode PacketQueue::enqueue(std::unique_ptr<PacketQueueItem> item) {
  21. if (!item) {
  22. return ErrorCode::INVALID_PARAMETER;
  23. }
  24. if (shutdown_) {
  25. return ErrorCode::INVALID_STATE;
  26. }
  27. // 检查流过滤
  28. if (!isStreamAllowed(item->streamIndex)) {
  29. Logger::debug("Packet dropped - stream {} not allowed", item->streamIndex);
  30. return ErrorCode::SUCCESS;
  31. }
  32. std::unique_lock<std::mutex> lock(queueMutex_);
  33. // 检查是否需要丢包
  34. if (shouldDropPacket(*item)) {
  35. if (dropCallback_) {
  36. dropCallback_(*item, "Smart packet drop");
  37. }
  38. std::lock_guard<std::mutex> statsLock(statsMutex_);
  39. stats_.totalDropped++;
  40. Logger::debug("Packet dropped by smart drop policy");
  41. return ErrorCode::SUCCESS;
  42. }
  43. // 检查队列是否已满
  44. size_t currentSize = config_.priorityQueue ? priorityQueue_.size() : normalQueue_.size();
  45. size_t currentBytes = totalBytes_.load();
  46. bool queueFull = (currentSize >= config_.maxSize) || (currentBytes >= config_.maxBytes);
  47. if (blocking_ && queueFull) {
  48. if (config_.dropOnFull) {
  49. // 根据策略丢弃包
  50. if (config_.dropOldest) {
  51. dropOldestPacket();
  52. } else if (config_.dropNonKeyPackets && !item->isKeyPacket) {
  53. dropNonKeyPackets();
  54. } else {
  55. if (dropCallback_) {
  56. dropCallback_(*item, "Queue full - drop newest");
  57. }
  58. std::lock_guard<std::mutex> statsLock(statsMutex_);
  59. stats_.totalDropped++;
  60. Logger::debug("Packet dropped - queue full");
  61. return ErrorCode::SUCCESS;
  62. }
  63. } else {
  64. // 等待队列有空间
  65. if (!notFull_.wait_for(lock, std::chrono::milliseconds(config_.timeoutMs),
  66. [this, currentSize, currentBytes] {
  67. size_t newSize = config_.priorityQueue ? priorityQueue_.size() : normalQueue_.size();
  68. size_t newBytes = totalBytes_.load();
  69. return (newSize < config_.maxSize && newBytes < config_.maxBytes) || shutdown_;
  70. })) {
  71. Logger::warning("Enqueue timeout");
  72. return ErrorCode::TIMEOUT;
  73. }
  74. if (shutdown_) {
  75. return ErrorCode::INVALID_STATE;
  76. }
  77. }
  78. }
  79. // 更新字节计数
  80. if (item->packet) {
  81. totalBytes_ += item->packet->size;
  82. }
  83. // 添加到队列
  84. if (config_.priorityQueue) {
  85. enqueueToPriorityQueue(std::move(item));
  86. } else {
  87. enqueueToNormalQueue(std::move(item));
  88. }
  89. // 更新统计信息
  90. if (config_.enableStats) {
  91. updateStats(*item, true);
  92. }
  93. // 通知等待的消费者
  94. notEmpty_.notify_one();
  95. // 回调
  96. if (enqueueCallback_) {
  97. enqueueCallback_(*item);
  98. }
  99. return ErrorCode::SUCCESS;
  100. }
  101. std::unique_ptr<PacketQueueItem> PacketQueue::dequeue() {
  102. return dequeue(config_.timeoutMs);
  103. }
  104. std::unique_ptr<PacketQueueItem> PacketQueue::dequeue(int timeoutMs) {
  105. std::unique_lock<std::mutex> lock(queueMutex_);
  106. // 等待队列有数据
  107. if (blocking_) {
  108. bool hasData = config_.priorityQueue ? !priorityQueue_.empty() : !normalQueue_.empty();
  109. if (!hasData) {
  110. if (!notEmpty_.wait_for(lock, std::chrono::milliseconds(timeoutMs),
  111. [this] {
  112. bool hasData = config_.priorityQueue ? !priorityQueue_.empty() : !normalQueue_.empty();
  113. return hasData || shutdown_;
  114. })) {
  115. return nullptr; // 超时
  116. }
  117. if (shutdown_) {
  118. return nullptr;
  119. }
  120. }
  121. }
  122. // 检查队列是否为空
  123. bool isEmpty = config_.priorityQueue ? priorityQueue_.empty() : normalQueue_.empty();
  124. if (isEmpty) {
  125. return nullptr;
  126. }
  127. // 取出包
  128. std::unique_ptr<PacketQueueItem> item;
  129. if (config_.priorityQueue) {
  130. item = dequeueFromPriorityQueue();
  131. } else {
  132. item = dequeueFromNormalQueue();
  133. }
  134. if (!item) {
  135. return nullptr;
  136. }
  137. // 更新字节计数
  138. if (item->packet) {
  139. totalBytes_ -= item->packet->size;
  140. }
  141. // 更新统计信息
  142. if (config_.enableStats) {
  143. updateStats(*item, false);
  144. }
  145. // 通知等待的生产者
  146. notFull_.notify_one();
  147. // 回调
  148. if (dequeueCallback_) {
  149. dequeueCallback_(*item);
  150. }
  151. return item;
  152. }
  153. ErrorCode PacketQueue::enqueue(AVPacket* packet, int streamIndex, int priority) {
  154. if (!packet) {
  155. return ErrorCode::INVALID_PARAMETER;
  156. }
  157. auto item = std::make_unique<PacketQueueItem>(packet, streamIndex, priority);
  158. return enqueue(std::move(item));
  159. }
  160. AVPacket* PacketQueue::dequeuePacket() {
  161. auto item = dequeue();
  162. if (item) {
  163. AVPacket* packet = item->packet;
  164. item->packet = nullptr; // 转移所有权
  165. return packet;
  166. }
  167. return nullptr;
  168. }
  169. AVPacket* PacketQueue::dequeuePacket(int timeoutMs) {
  170. auto item = dequeue(timeoutMs);
  171. if (item) {
  172. AVPacket* packet = item->packet;
  173. item->packet = nullptr; // 转移所有权
  174. return packet;
  175. }
  176. return nullptr;
  177. }
  178. ErrorCode PacketQueue::enqueueWithPriority(AVPacket* packet, int priority, int streamIndex) {
  179. if (!packet) {
  180. return ErrorCode::INVALID_PARAMETER;
  181. }
  182. auto item = std::make_unique<PacketQueueItem>(packet, streamIndex, priority);
  183. return enqueue(std::move(item));
  184. }
  185. std::unique_ptr<PacketQueueItem> PacketQueue::dequeueHighestPriority() {
  186. if (!config_.priorityQueue) {
  187. Logger::warning("Priority queue not enabled");
  188. return dequeue();
  189. }
  190. return dequeue();
  191. }
  192. void PacketQueue::clear() {
  193. std::lock_guard<std::mutex> lock(queueMutex_);
  194. // 清空普通队列
  195. while (!normalQueue_.empty()) {
  196. normalQueue_.pop();
  197. }
  198. // 清空优先级队列
  199. while (!priorityQueue_.empty()) {
  200. priorityQueue_.pop();
  201. }
  202. totalBytes_ = 0;
  203. Logger::debug("Packet queue cleared");
  204. }
  205. void PacketQueue::flush() {
  206. clear();
  207. notEmpty_.notify_all();
  208. notFull_.notify_all();
  209. }
  210. void PacketQueue::setMaxSize(size_t maxSize) {
  211. std::lock_guard<std::mutex> lock(queueMutex_);
  212. config_.maxSize = maxSize;
  213. // 如果当前队列大小超过新的最大值,丢弃多余的包
  214. size_t currentSize = config_.priorityQueue ? priorityQueue_.size() : normalQueue_.size();
  215. while (currentSize > maxSize) {
  216. if (config_.dropOldest) {
  217. dropOldestPacket();
  218. } else {
  219. break;
  220. }
  221. currentSize = config_.priorityQueue ? priorityQueue_.size() : normalQueue_.size();
  222. }
  223. Logger::debug("Packet queue max size set to: {}", maxSize);
  224. }
  225. void PacketQueue::setMaxBytes(size_t maxBytes) {
  226. config_.maxBytes = maxBytes;
  227. Logger::debug("Packet queue max bytes set to: {}", maxBytes);
  228. }
  229. void PacketQueue::setDropPolicy(bool dropOnFull, bool dropOldest) {
  230. config_.dropOnFull = dropOnFull;
  231. config_.dropOldest = dropOldest;
  232. Logger::debug("Drop policy set: dropOnFull={}, dropOldest={}", dropOnFull, dropOldest);
  233. }
  234. void PacketQueue::enablePriorityQueue(bool enable) {
  235. std::lock_guard<std::mutex> lock(queueMutex_);
  236. if (config_.priorityQueue == enable) {
  237. return;
  238. }
  239. // 如果要切换队列类型,需要迁移数据
  240. if (enable) {
  241. // 从普通队列迁移到优先级队列
  242. while (!normalQueue_.empty()) {
  243. auto item = std::move(normalQueue_.front());
  244. normalQueue_.pop();
  245. priorityQueue_.push(std::move(item));
  246. }
  247. } else {
  248. // 从优先级队列迁移到普通队列
  249. std::vector<std::unique_ptr<PacketQueueItem>> tempItems;
  250. while (!priorityQueue_.empty()) {
  251. tempItems.push_back(std::move(const_cast<std::unique_ptr<PacketQueueItem>&>(priorityQueue_.top())));
  252. priorityQueue_.pop();
  253. }
  254. // 按时间戳排序后加入普通队列
  255. std::sort(tempItems.begin(), tempItems.end(),
  256. [](const std::unique_ptr<PacketQueueItem>& a, const std::unique_ptr<PacketQueueItem>& b) {
  257. return a->dts < b->dts;
  258. });
  259. for (auto& item : tempItems) {
  260. normalQueue_.push(std::move(item));
  261. }
  262. }
  263. config_.priorityQueue = enable;
  264. Logger::debug("Priority queue {}", enable ? "enabled" : "disabled");
  265. }
  266. size_t PacketQueue::size() const {
  267. std::lock_guard<std::mutex> lock(queueMutex_);
  268. return config_.priorityQueue ? priorityQueue_.size() : normalQueue_.size();
  269. }
  270. size_t PacketQueue::bytes() const {
  271. return totalBytes_.load();
  272. }
  273. bool PacketQueue::empty() const {
  274. std::lock_guard<std::mutex> lock(queueMutex_);
  275. return config_.priorityQueue ? priorityQueue_.empty() : normalQueue_.empty();
  276. }
  277. bool PacketQueue::full() const {
  278. std::lock_guard<std::mutex> lock(queueMutex_);
  279. size_t currentSize = config_.priorityQueue ? priorityQueue_.size() : normalQueue_.size();
  280. return currentSize >= config_.maxSize;
  281. }
  282. bool PacketQueue::fullByBytes() const {
  283. return totalBytes_.load() >= config_.maxBytes;
  284. }
  285. size_t PacketQueue::capacity() const {
  286. return config_.maxSize;
  287. }
  288. size_t PacketQueue::capacityBytes() const {
  289. return config_.maxBytes;
  290. }
  291. PacketQueueStats PacketQueue::getStats() const {
  292. std::lock_guard<std::mutex> lock(statsMutex_);
  293. PacketQueueStats stats = stats_;
  294. stats.currentSize = size();
  295. stats.totalBytes = bytes();
  296. return stats;
  297. }
  298. void PacketQueue::resetStats() {
  299. std::lock_guard<std::mutex> lock(statsMutex_);
  300. stats_ = PacketQueueStats();
  301. Logger::debug("Packet queue stats reset");
  302. }
  303. void PacketQueue::enablePacketDrop(bool enable, double maxLatency, int dropRatio) {
  304. config_.enablePacketDrop = enable;
  305. config_.maxLatency = maxLatency;
  306. config_.dropRatio = dropRatio;
  307. Logger::debug("Packet drop enabled: {}, maxLatency: {}ms, dropRatio: {}",
  308. enable, maxLatency, dropRatio);
  309. }
  310. void PacketQueue::setPacketDropCallback(std::function<bool(const PacketQueueItem&)> callback) {
  311. packetDropCallback_ = callback;
  312. }
  313. void PacketQueue::setBlocking(bool blocking) {
  314. blocking_ = blocking;
  315. if (!blocking) {
  316. notEmpty_.notify_all();
  317. notFull_.notify_all();
  318. }
  319. Logger::debug("Packet queue blocking mode: {}", blocking);
  320. }
  321. void PacketQueue::wakeup() {
  322. notEmpty_.notify_all();
  323. notFull_.notify_all();
  324. }
  325. void PacketQueue::setStreamFilter(const std::vector<int>& allowedStreams) {
  326. std::lock_guard<std::mutex> lock(streamFilterMutex_);
  327. allowedStreams_ = allowedStreams;
  328. hasStreamFilter_ = !allowedStreams.empty();
  329. Logger::debug("Stream filter set with {} allowed streams", allowedStreams.size());
  330. }
  331. void PacketQueue::clearStreamFilter() {
  332. std::lock_guard<std::mutex> lock(streamFilterMutex_);
  333. allowedStreams_.clear();
  334. hasStreamFilter_ = false;
  335. Logger::debug("Stream filter cleared");
  336. }
  337. // 内部方法实现
  338. bool PacketQueue::shouldDropPacket(const PacketQueueItem& item) const {
  339. if (!config_.enablePacketDrop) {
  340. return false;
  341. }
  342. // 自定义丢包回调
  343. if (packetDropCallback_ && packetDropCallback_(item)) {
  344. return true;
  345. }
  346. // 基于延迟的丢包
  347. if (shouldDropByLatency(item)) {
  348. return true;
  349. }
  350. // 基于比例的丢包
  351. if (shouldDropByRatio()) {
  352. return true;
  353. }
  354. // 基于大小的丢包
  355. if (shouldDropBySize()) {
  356. return true;
  357. }
  358. return false;
  359. }
  360. void PacketQueue::dropOldestPacket() {
  361. if (config_.priorityQueue) {
  362. if (!priorityQueue_.empty()) {
  363. auto& oldestItem = const_cast<std::unique_ptr<PacketQueueItem>&>(priorityQueue_.top());
  364. if (dropCallback_) {
  365. dropCallback_(*oldestItem, "Drop oldest packet");
  366. }
  367. if (oldestItem->packet) {
  368. totalBytes_ -= oldestItem->packet->size;
  369. }
  370. priorityQueue_.pop();
  371. std::lock_guard<std::mutex> statsLock(statsMutex_);
  372. stats_.totalDropped++;
  373. Logger::debug("Dropped oldest packet from priority queue");
  374. }
  375. } else {
  376. if (!normalQueue_.empty()) {
  377. auto& oldestItem = normalQueue_.front();
  378. if (dropCallback_) {
  379. dropCallback_(*oldestItem, "Drop oldest packet");
  380. }
  381. if (oldestItem->packet) {
  382. totalBytes_ -= oldestItem->packet->size;
  383. }
  384. normalQueue_.pop();
  385. std::lock_guard<std::mutex> statsLock(statsMutex_);
  386. stats_.totalDropped++;
  387. Logger::debug("Dropped oldest packet from normal queue");
  388. }
  389. }
  390. }
  391. void PacketQueue::dropNewestPacket() {
  392. // 这个方法在enqueue时调用,不需要实际操作队列
  393. }
  394. void PacketQueue::dropLargestPacket() {
  395. // 找到最大的包并丢弃
  396. // 这里简化实现,直接丢弃最旧的包
  397. dropOldestPacket();
  398. }
  399. void PacketQueue::dropNonKeyPackets() {
  400. std::vector<std::unique_ptr<PacketQueueItem>> keyPackets;
  401. if (config_.priorityQueue) {
  402. while (!priorityQueue_.empty()) {
  403. auto item = std::move(const_cast<std::unique_ptr<PacketQueueItem>&>(priorityQueue_.top()));
  404. priorityQueue_.pop();
  405. if (item->isKeyPacket) {
  406. keyPackets.push_back(std::move(item));
  407. } else {
  408. if (dropCallback_) {
  409. dropCallback_(*item, "Drop non-key packet");
  410. }
  411. if (item->packet) {
  412. totalBytes_ -= item->packet->size;
  413. }
  414. std::lock_guard<std::mutex> statsLock(statsMutex_);
  415. stats_.totalDropped++;
  416. }
  417. }
  418. // 重新加入关键包
  419. for (auto& item : keyPackets) {
  420. priorityQueue_.push(std::move(item));
  421. }
  422. } else {
  423. while (!normalQueue_.empty()) {
  424. auto item = std::move(normalQueue_.front());
  425. normalQueue_.pop();
  426. if (item->isKeyPacket) {
  427. keyPackets.push_back(std::move(item));
  428. } else {
  429. if (dropCallback_) {
  430. dropCallback_(*item, "Drop non-key packet");
  431. }
  432. if (item->packet) {
  433. totalBytes_ -= item->packet->size;
  434. }
  435. std::lock_guard<std::mutex> statsLock(statsMutex_);
  436. stats_.totalDropped++;
  437. }
  438. }
  439. // 重新加入关键包
  440. for (auto& item : keyPackets) {
  441. normalQueue_.push(std::move(item));
  442. }
  443. }
  444. Logger::debug("Dropped non-key packets, kept {} key packets", keyPackets.size());
  445. }
  446. void PacketQueue::updateStats(const PacketQueueItem& item, bool isEnqueue) {
  447. std::lock_guard<std::mutex> lock(statsMutex_);
  448. if (isEnqueue) {
  449. stats_.totalEnqueued++;
  450. size_t currentSize = config_.priorityQueue ? priorityQueue_.size() : normalQueue_.size();
  451. stats_.maxSize = std::max(stats_.maxSize, currentSize);
  452. if (item.packet) {
  453. stats_.totalBytes += item.packet->size;
  454. // 更新平均包大小
  455. if (stats_.totalEnqueued == 1) {
  456. stats_.averagePacketSize = item.packet->size;
  457. } else {
  458. stats_.averagePacketSize = (stats_.averagePacketSize * (stats_.totalEnqueued - 1) + item.packet->size) / stats_.totalEnqueued;
  459. }
  460. }
  461. } else {
  462. stats_.totalDequeued++;
  463. // 计算等待时间
  464. double waitTime = calculateWaitTime(item);
  465. if (waitTime > 0) {
  466. // 更新平均等待时间
  467. if (stats_.totalDequeued == 1) {
  468. stats_.averageWaitTime = waitTime;
  469. } else {
  470. stats_.averageWaitTime = (stats_.averageWaitTime * (stats_.totalDequeued - 1) + waitTime) / stats_.totalDequeued;
  471. }
  472. stats_.maxWaitTime = std::max(stats_.maxWaitTime, waitTime);
  473. }
  474. }
  475. stats_.lastUpdateTime = std::chrono::steady_clock::now();
  476. }
  477. double PacketQueue::calculateWaitTime(const PacketQueueItem& item) const {
  478. auto now = std::chrono::steady_clock::now();
  479. auto duration = std::chrono::duration_cast<std::chrono::microseconds>(now - item.enqueueTime);
  480. return duration.count() / 1000.0; // 转换为毫秒
  481. }
  482. bool PacketQueue::isStreamAllowed(int streamIndex) const {
  483. if (!hasStreamFilter_) {
  484. return true;
  485. }
  486. std::lock_guard<std::mutex> lock(streamFilterMutex_);
  487. return std::find(allowedStreams_.begin(), allowedStreams_.end(), streamIndex) != allowedStreams_.end();
  488. }
  489. bool PacketQueue::shouldDropByLatency(const PacketQueueItem& item) const {
  490. double waitTime = calculateWaitTime(item);
  491. return waitTime > config_.maxLatency;
  492. }
  493. bool PacketQueue::shouldDropByRatio() const {
  494. if (config_.dropRatio <= 1) {
  495. return false;
  496. }
  497. uint64_t count = packetCounter_.fetch_add(1);
  498. return (count % config_.dropRatio) == 0;
  499. }
  500. bool PacketQueue::shouldDropBySize() const {
  501. return totalBytes_.load() > config_.maxBytes;
  502. }
  503. void PacketQueue::enqueueToNormalQueue(std::unique_ptr<PacketQueueItem> item) {
  504. normalQueue_.push(std::move(item));
  505. }
  506. void PacketQueue::enqueueToPriorityQueue(std::unique_ptr<PacketQueueItem> item) {
  507. priorityQueue_.push(std::move(item));
  508. }
  509. std::unique_ptr<PacketQueueItem> PacketQueue::dequeueFromNormalQueue() {
  510. if (normalQueue_.empty()) {
  511. return nullptr;
  512. }
  513. auto item = std::move(normalQueue_.front());
  514. normalQueue_.pop();
  515. return item;
  516. }
  517. std::unique_ptr<PacketQueueItem> PacketQueue::dequeueFromPriorityQueue() {
  518. if (priorityQueue_.empty()) {
  519. return nullptr;
  520. }
  521. auto item = std::move(const_cast<std::unique_ptr<PacketQueueItem>&>(priorityQueue_.top()));
  522. priorityQueue_.pop();
  523. return item;
  524. }
  525. // 多流包队列实现
  526. MultiStreamPacketQueue::MultiStreamPacketQueue(const PacketQueueConfig& config)
  527. : defaultConfig_(config) {
  528. Logger::debug("MultiStreamPacketQueue created");
  529. }
  530. MultiStreamPacketQueue::~MultiStreamPacketQueue() {
  531. clear();
  532. Logger::debug("MultiStreamPacketQueue destroyed");
  533. }
  534. ErrorCode MultiStreamPacketQueue::addStream(int streamIndex, const PacketQueueConfig& config) {
  535. std::unique_lock<std::shared_mutex> lock(streamsMutex_);
  536. if (streamQueues_.find(streamIndex) != streamQueues_.end()) {
  537. Logger::warning("Stream {} already exists", streamIndex);
  538. return ErrorCode::ALREADY_EXISTS;
  539. }
  540. streamQueues_[streamIndex] = std::make_unique<PacketQueue>(config);
  541. Logger::debug("Added stream: {}", streamIndex);
  542. return ErrorCode::SUCCESS;
  543. }
  544. ErrorCode MultiStreamPacketQueue::removeStream(int streamIndex) {
  545. std::unique_lock<std::shared_mutex> lock(streamsMutex_);
  546. auto it = streamQueues_.find(streamIndex);
  547. if (it == streamQueues_.end()) {
  548. Logger::warning("Stream {} not found", streamIndex);
  549. return ErrorCode::NOT_FOUND;
  550. }
  551. streamQueues_.erase(it);
  552. Logger::debug("Removed stream: {}", streamIndex);
  553. return ErrorCode::SUCCESS;
  554. }
  555. bool MultiStreamPacketQueue::hasStream(int streamIndex) const {
  556. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  557. return streamQueues_.find(streamIndex) != streamQueues_.end();
  558. }
  559. std::vector<int> MultiStreamPacketQueue::getStreamIndices() const {
  560. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  561. std::vector<int> indices;
  562. for (const auto& pair : streamQueues_) {
  563. indices.push_back(pair.first);
  564. }
  565. return indices;
  566. }
  567. ErrorCode MultiStreamPacketQueue::enqueue(AVPacket* packet, int streamIndex) {
  568. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  569. auto it = streamQueues_.find(streamIndex);
  570. if (it == streamQueues_.end()) {
  571. // 自动创建流队列
  572. lock.unlock();
  573. addStream(streamIndex, defaultConfig_);
  574. lock.lock();
  575. it = streamQueues_.find(streamIndex);
  576. if (it == streamQueues_.end()) {
  577. return ErrorCode::NOT_FOUND;
  578. }
  579. }
  580. return it->second->enqueue(packet, streamIndex);
  581. }
  582. AVPacket* MultiStreamPacketQueue::dequeue(int streamIndex) {
  583. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  584. auto it = streamQueues_.find(streamIndex);
  585. if (it == streamQueues_.end()) {
  586. return nullptr;
  587. }
  588. return it->second->dequeuePacket();
  589. }
  590. AVPacket* MultiStreamPacketQueue::dequeue(int streamIndex, int timeoutMs) {
  591. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  592. auto it = streamQueues_.find(streamIndex);
  593. if (it == streamQueues_.end()) {
  594. return nullptr;
  595. }
  596. return it->second->dequeuePacket(timeoutMs);
  597. }
  598. ErrorCode MultiStreamPacketQueue::enqueueToAll(AVPacket* packet) {
  599. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  600. ErrorCode result = ErrorCode::SUCCESS;
  601. for (const auto& pair : streamQueues_) {
  602. // 为每个流创建包的副本
  603. AVPacket* packetCopy = av_packet_alloc();
  604. if (!packetCopy) {
  605. result = ErrorCode::OUT_OF_MEMORY;
  606. continue;
  607. }
  608. if (av_packet_ref(packetCopy, packet) < 0) {
  609. av_packet_free(&packetCopy);
  610. result = ErrorCode::COPY_FAILED;
  611. continue;
  612. }
  613. ErrorCode enqueueResult = pair.second->enqueue(packetCopy, pair.first);
  614. if (enqueueResult != ErrorCode::SUCCESS) {
  615. av_packet_free(&packetCopy);
  616. result = enqueueResult;
  617. }
  618. }
  619. return result;
  620. }
  621. std::vector<AVPacket*> MultiStreamPacketQueue::dequeueFromAll() {
  622. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  623. std::vector<AVPacket*> packets;
  624. for (const auto& pair : streamQueues_) {
  625. AVPacket* packet = pair.second->dequeuePacket();
  626. packets.push_back(packet); // 可能为nullptr
  627. }
  628. return packets;
  629. }
  630. AVPacket* MultiStreamPacketQueue::dequeueSynchronized() {
  631. std::lock_guard<std::mutex> lock(syncMutex_);
  632. // 从同步队列中取出时间戳最小的包
  633. if (!syncQueue_.empty()) {
  634. SyncItem item = syncQueue_.top();
  635. syncQueue_.pop();
  636. return item.packet;
  637. }
  638. // 如果同步队列为空,从各个流队列中取包并排序
  639. std::vector<SyncItem> items;
  640. {
  641. std::shared_lock<std::shared_mutex> streamLock(streamsMutex_);
  642. for (const auto& pair : streamQueues_) {
  643. AVPacket* packet = pair.second->dequeuePacket(10); // 短超时
  644. if (packet) {
  645. SyncItem item;
  646. item.packet = packet;
  647. item.streamIndex = pair.first;
  648. item.timestamp = (packet->dts != AV_NOPTS_VALUE) ? packet->dts : packet->pts;
  649. items.push_back(item);
  650. }
  651. }
  652. }
  653. if (items.empty()) {
  654. return nullptr;
  655. }
  656. // 按时间戳排序
  657. std::sort(items.begin(), items.end(), [](const SyncItem& a, const SyncItem& b) {
  658. return a.timestamp < b.timestamp;
  659. });
  660. // 返回时间戳最小的包
  661. AVPacket* result = items[0].packet;
  662. // 将其余包放入同步队列
  663. for (size_t i = 1; i < items.size(); ++i) {
  664. syncQueue_.push(items[i]);
  665. }
  666. return result;
  667. }
  668. std::vector<AVPacket*> MultiStreamPacketQueue::dequeueSynchronizedBatch(size_t maxCount) {
  669. std::vector<AVPacket*> packets;
  670. for (size_t i = 0; i < maxCount; ++i) {
  671. AVPacket* packet = dequeueSynchronized();
  672. if (!packet) {
  673. break;
  674. }
  675. packets.push_back(packet);
  676. }
  677. return packets;
  678. }
  679. void MultiStreamPacketQueue::clear() {
  680. std::unique_lock<std::shared_mutex> lock(streamsMutex_);
  681. for (auto& pair : streamQueues_) {
  682. pair.second->clear();
  683. }
  684. // 清空同步队列
  685. std::lock_guard<std::mutex> syncLock(syncMutex_);
  686. while (!syncQueue_.empty()) {
  687. SyncItem item = syncQueue_.top();
  688. syncQueue_.pop();
  689. if (item.packet) {
  690. av_packet_free(&item.packet);
  691. }
  692. }
  693. Logger::debug("All stream queues cleared");
  694. }
  695. void MultiStreamPacketQueue::clearStream(int streamIndex) {
  696. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  697. auto it = streamQueues_.find(streamIndex);
  698. if (it != streamQueues_.end()) {
  699. it->second->clear();
  700. Logger::debug("Stream {} queue cleared", streamIndex);
  701. }
  702. }
  703. void MultiStreamPacketQueue::flush() {
  704. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  705. for (auto& pair : streamQueues_) {
  706. pair.second->flush();
  707. }
  708. Logger::debug("All stream queues flushed");
  709. }
  710. void MultiStreamPacketQueue::flushStream(int streamIndex) {
  711. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  712. auto it = streamQueues_.find(streamIndex);
  713. if (it != streamQueues_.end()) {
  714. it->second->flush();
  715. Logger::debug("Stream {} queue flushed", streamIndex);
  716. }
  717. }
  718. size_t MultiStreamPacketQueue::size() const {
  719. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  720. size_t totalSize = 0;
  721. for (const auto& pair : streamQueues_) {
  722. totalSize += pair.second->size();
  723. }
  724. return totalSize;
  725. }
  726. size_t MultiStreamPacketQueue::size(int streamIndex) const {
  727. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  728. auto it = streamQueues_.find(streamIndex);
  729. if (it != streamQueues_.end()) {
  730. return it->second->size();
  731. }
  732. return 0;
  733. }
  734. size_t MultiStreamPacketQueue::bytes() const {
  735. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  736. size_t totalBytes = 0;
  737. for (const auto& pair : streamQueues_) {
  738. totalBytes += pair.second->bytes();
  739. }
  740. return totalBytes;
  741. }
  742. size_t MultiStreamPacketQueue::bytes(int streamIndex) const {
  743. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  744. auto it = streamQueues_.find(streamIndex);
  745. if (it != streamQueues_.end()) {
  746. return it->second->bytes();
  747. }
  748. return 0;
  749. }
  750. bool MultiStreamPacketQueue::empty() const {
  751. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  752. for (const auto& pair : streamQueues_) {
  753. if (!pair.second->empty()) {
  754. return false;
  755. }
  756. }
  757. return true;
  758. }
  759. bool MultiStreamPacketQueue::empty(int streamIndex) const {
  760. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  761. auto it = streamQueues_.find(streamIndex);
  762. if (it != streamQueues_.end()) {
  763. return it->second->empty();
  764. }
  765. return true;
  766. }
  767. std::map<int, PacketQueueStats> MultiStreamPacketQueue::getAllStats() const {
  768. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  769. std::map<int, PacketQueueStats> allStats;
  770. for (const auto& pair : streamQueues_) {
  771. allStats[pair.first] = pair.second->getStats();
  772. }
  773. return allStats;
  774. }
  775. PacketQueueStats MultiStreamPacketQueue::getStats(int streamIndex) const {
  776. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  777. auto it = streamQueues_.find(streamIndex);
  778. if (it != streamQueues_.end()) {
  779. return it->second->getStats();
  780. }
  781. return PacketQueueStats();
  782. }
  783. void MultiStreamPacketQueue::resetStats() {
  784. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  785. for (auto& pair : streamQueues_) {
  786. pair.second->resetStats();
  787. }
  788. Logger::debug("All stream queue stats reset");
  789. }
  790. void MultiStreamPacketQueue::resetStats(int streamIndex) {
  791. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  792. auto it = streamQueues_.find(streamIndex);
  793. if (it != streamQueues_.end()) {
  794. it->second->resetStats();
  795. Logger::debug("Stream {} queue stats reset", streamIndex);
  796. }
  797. }
  798. void MultiStreamPacketQueue::setBlocking(bool blocking) {
  799. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  800. for (auto& pair : streamQueues_) {
  801. pair.second->setBlocking(blocking);
  802. }
  803. Logger::debug("All stream queues blocking mode: {}", blocking);
  804. }
  805. void MultiStreamPacketQueue::wakeupAll() {
  806. std::shared_lock<std::shared_mutex> lock(streamsMutex_);
  807. for (auto& pair : streamQueues_) {
  808. pair.second->wakeup();
  809. }
  810. }
  811. // 工厂类实现
  812. std::unique_ptr<PacketQueue> PacketQueueFactory::createStandardQueue(size_t maxSize) {
  813. PacketQueueConfig config;
  814. config.maxSize = maxSize;
  815. config.dropOnFull = true;
  816. config.dropOldest = true;
  817. config.timeoutMs = 1000;
  818. config.enableStats = true;
  819. return std::make_unique<PacketQueue>(config);
  820. }
  821. std::unique_ptr<PacketQueue> PacketQueueFactory::createLowLatencyQueue(size_t maxSize) {
  822. PacketQueueConfig config;
  823. config.maxSize = maxSize;
  824. config.dropOnFull = true;
  825. config.dropOldest = true;
  826. config.timeoutMs = 100;
  827. config.enableStats = true;
  828. config.enablePacketDrop = true;
  829. config.maxLatency = 100.0;
  830. config.dropRatio = 3;
  831. return std::make_unique<PacketQueue>(config);
  832. }
  833. std::unique_ptr<PacketQueue> PacketQueueFactory::createHighCapacityQueue(size_t maxSize) {
  834. PacketQueueConfig config;
  835. config.maxSize = maxSize;
  836. config.maxBytes = 200 * 1024 * 1024; // 200MB
  837. config.dropOnFull = false; // 不丢包,等待
  838. config.dropOldest = true;
  839. config.timeoutMs = 5000;
  840. config.enableStats = true;
  841. config.enablePacketDrop = false;
  842. return std::make_unique<PacketQueue>(config);
  843. }
  844. std::unique_ptr<PacketQueue> PacketQueueFactory::createRealtimeQueue(size_t maxSize, double maxLatency) {
  845. PacketQueueConfig config;
  846. config.maxSize = maxSize;
  847. config.dropOnFull = true;
  848. config.dropOldest = true;
  849. config.timeoutMs = 50;
  850. config.enableStats = true;
  851. config.enablePacketDrop = true;
  852. config.maxLatency = maxLatency;
  853. config.dropRatio = 2;
  854. config.dropNonKeyPackets = true;
  855. return std::make_unique<PacketQueue>(config);
  856. }
  857. std::unique_ptr<PacketQueue> PacketQueueFactory::createPriorityQueue(size_t maxSize) {
  858. PacketQueueConfig config;
  859. config.maxSize = maxSize;
  860. config.priorityQueue = true;
  861. config.dropOnFull = true;
  862. config.dropOldest = true;
  863. config.timeoutMs = 1000;
  864. config.enableStats = true;
  865. return std::make_unique<PacketQueue>(config);
  866. }
  867. std::unique_ptr<MultiStreamPacketQueue> PacketQueueFactory::createMultiStreamQueue(const PacketQueueConfig& config) {
  868. return std::make_unique<MultiStreamPacketQueue>(config);
  869. }
  870. } // namespace utils
  871. } // namespace av