muxer_stream_muxer.cpp 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862
  1. #include "muxer_stream_muxer.h"
  2. #include "../base/logger.h"
  3. #include <algorithm>
  4. #include <sstream>
  5. #include <regex>
  6. extern "C" {
  7. #include <libavutil/opt.h>
  8. #include <libavutil/time.h>
  9. #include <libavformat/avio.h>
  10. }
  11. namespace av {
  12. namespace muxer {
  13. StreamMuxer::StreamMuxer() {
  14. Logger::instance().info("StreamMuxer created");
  15. }
  16. StreamMuxer::~StreamMuxer() {
  17. close();
  18. Logger::instance().info("StreamMuxer destroyed");
  19. }
  20. ErrorCode StreamMuxer::initialize(const MuxerParams& params) {
  21. if (!validateParams(params)) {
  22. Logger::instance().error("Invalid stream muxer parameters");
  23. return ErrorCode::INVALID_PARAMS;
  24. }
  25. streamParams_ = static_cast<const StreamMuxerParams&>(params);
  26. // 重置统计信息
  27. connectionStats_ = ConnectionStats();
  28. // 设置初始状态
  29. setConnectionState(ConnectionState::DISCONNECTED);
  30. Logger::instance().infof("StreamMuxer initialized with URL: {}", streamParams_.url);
  31. return AbstractMuxer::initialize(params);
  32. }
  33. ErrorCode StreamMuxer::start() {
  34. if (state_ != MuxerState::INITIALIZED) {
  35. Logger::instance().error("StreamMuxer not initialized");
  36. return ErrorCode::INVALID_STATE;
  37. }
  38. // 建立连接
  39. ErrorCode result = connect();
  40. if (result != ErrorCode::SUCCESS) {
  41. Logger::instance().error("Failed to connect to stream");
  42. return result;
  43. }
  44. // 启动重连线程
  45. if (streamParams_.enableAutoReconnect) {
  46. shouldStopReconnect_ = false;
  47. reconnectThread_ = std::thread(&StreamMuxer::reconnectThreadFunc, this);
  48. }
  49. // 启动自适应码率监控
  50. if (streamParams_.enableAdaptiveBitrate) {
  51. shouldStopBitrateMonitor_ = false;
  52. bitrateMonitorThread_ = std::thread(&StreamMuxer::monitorNetworkCondition, this);
  53. }
  54. return AbstractMuxer::start();
  55. }
  56. ErrorCode StreamMuxer::stop() {
  57. if (state_ != MuxerState::STARTED) {
  58. return ErrorCode::SUCCESS;
  59. }
  60. // 停止监控线程
  61. shouldStopBitrateMonitor_ = true;
  62. if (bitrateMonitorThread_.joinable()) {
  63. bitrateMonitorThread_.join();
  64. }
  65. // 停止重连线程
  66. shouldStopReconnect_ = true;
  67. reconnectCondition_.notify_all();
  68. if (reconnectThread_.joinable()) {
  69. reconnectThread_.join();
  70. }
  71. // 断开连接
  72. disconnect();
  73. return AbstractMuxer::stop();
  74. }
  75. ErrorCode StreamMuxer::close() {
  76. stop();
  77. // 清理资源
  78. connectionStats_ = ConnectionStats();
  79. setConnectionState(ConnectionState::DISCONNECTED);
  80. return AbstractMuxer::close();
  81. }
  82. ErrorCode StreamMuxer::writePacket(AVPacket* packet) {
  83. if (!packet) {
  84. return ErrorCode::INVALID_PARAMS;
  85. }
  86. if (!isConnected()) {
  87. Logger::instance().warning("Not connected, dropping packet");
  88. return ErrorCode::INVALID_STATE;
  89. }
  90. return processPacket(packet);
  91. }
  92. ErrorCode StreamMuxer::writeFrame(AVFrame* frame, int streamIndex) {
  93. if (!frame || streamIndex < 0) {
  94. return ErrorCode::INVALID_PARAMS;
  95. }
  96. if (!isConnected()) {
  97. Logger::instance().warning("Not connected, dropping frame");
  98. return ErrorCode::INVALID_STATE;
  99. }
  100. // 将帧编码为包
  101. AVPacket* packet = av_packet_alloc();
  102. if (!packet) {
  103. return ErrorCode::MEMORY_ALLOC_FAILED;
  104. }
  105. // 这里需要编码器支持,暂时返回成功
  106. av_packet_free(&packet);
  107. return ErrorCode::SUCCESS;
  108. }
  109. ErrorCode StreamMuxer::flush() {
  110. if (!formatCtx_) {
  111. return ErrorCode::INVALID_STATE;
  112. }
  113. int ret = av_write_trailer(formatCtx_);
  114. if (ret < 0) {
  115. Logger::instance().errorf("Failed to write trailer: {}", ffmpeg_utils::errorToString(ret));
  116. return ErrorCode::OPERATION_FAILED;
  117. }
  118. return ErrorCode::SUCCESS;
  119. }
  120. ErrorCode StreamMuxer::addStream(const StreamInfo& streamInfo) {
  121. ErrorCode result = AbstractMuxer::addStream(streamInfo);
  122. if (result != ErrorCode::SUCCESS) {
  123. return result;
  124. }
  125. Logger::instance().infof("Added stream: type={}, codec={}",
  126. static_cast<int>(streamInfo.type), streamInfo.codecName);
  127. return ErrorCode::SUCCESS;
  128. }
  129. ErrorCode StreamMuxer::connect() {
  130. if (connectionState_ == ConnectionState::CONNECTED) {
  131. return ErrorCode::SUCCESS;
  132. }
  133. setConnectionState(ConnectionState::CONNECTING);
  134. ErrorCode result = establishConnection();
  135. if (result == ErrorCode::SUCCESS) {
  136. setConnectionState(ConnectionState::CONNECTED);
  137. connectionStats_.connectTime = std::chrono::steady_clock::now();
  138. connectionStats_.lastDataTime = connectionStats_.connectTime;
  139. Logger::instance().infof("Connected to stream: {}", streamParams_.url);
  140. } else {
  141. setConnectionState(ConnectionState::FAILED);
  142. Logger::instance().errorf("Failed to connect to stream: {}", streamParams_.url);
  143. }
  144. return result;
  145. }
  146. ErrorCode StreamMuxer::disconnect() {
  147. if (connectionState_ == ConnectionState::DISCONNECTED) {
  148. return ErrorCode::SUCCESS;
  149. }
  150. ErrorCode result = closeConnection();
  151. setConnectionState(ConnectionState::DISCONNECTED);
  152. Logger::instance().info("Disconnected from stream");
  153. return result;
  154. }
  155. ErrorCode StreamMuxer::reconnect() {
  156. Logger::instance().info("Attempting to reconnect...");
  157. setConnectionState(ConnectionState::RECONNECTING);
  158. // 先断开现有连接
  159. closeConnection();
  160. // 等待一段时间
  161. std::this_thread::sleep_for(std::chrono::milliseconds(streamParams_.reconnectDelay));
  162. // 重新连接
  163. ErrorCode result = establishConnection();
  164. if (result == ErrorCode::SUCCESS) {
  165. setConnectionState(ConnectionState::CONNECTED);
  166. connectionStats_.reconnectCount++;
  167. connectionStats_.lastReconnectTime = std::chrono::steady_clock::now();
  168. Logger::instance().info("Reconnected successfully");
  169. } else {
  170. setConnectionState(ConnectionState::FAILED);
  171. Logger::instance().error("Reconnection failed");
  172. }
  173. return result;
  174. }
  175. ConnectionState StreamMuxer::getConnectionState() const {
  176. return connectionState_.load();
  177. }
  178. ConnectionStats StreamMuxer::getConnectionStats() const {
  179. std::lock_guard<std::mutex> lock(connectionMutex_);
  180. return connectionStats_;
  181. }
  182. bool StreamMuxer::isConnected() const {
  183. return connectionState_ == ConnectionState::CONNECTED;
  184. }
  185. ErrorCode StreamMuxer::setUrl(const std::string& url) {
  186. if (state_ != MuxerState::IDLE) {
  187. Logger::instance().error("Cannot change URL while muxer is active");
  188. return ErrorCode::INVALID_STATE;
  189. }
  190. if (!validateUrl(url)) {
  191. Logger::instance().errorf("Invalid URL: {}", url);
  192. return ErrorCode::INVALID_PARAMS;
  193. }
  194. streamParams_.url = url;
  195. Logger::instance().infof("URL set to: {}", url);
  196. return ErrorCode::SUCCESS;
  197. }
  198. std::string StreamMuxer::getUrl() const {
  199. return streamParams_.url;
  200. }
  201. ErrorCode StreamMuxer::enableAdaptiveBitrate(bool enable) {
  202. adaptiveBitrateEnabled_ = enable;
  203. if (enable && state_ == MuxerState::STARTED && !bitrateMonitorThread_.joinable()) {
  204. shouldStopBitrateMonitor_ = false;
  205. bitrateMonitorThread_ = std::thread(&StreamMuxer::monitorNetworkCondition, this);
  206. }
  207. Logger::instance().infof("Adaptive bitrate {}", enable ? "enabled" : "disabled");
  208. return ErrorCode::SUCCESS;
  209. }
  210. ErrorCode StreamMuxer::setBitrateRange(int minBitrate, int maxBitrate) {
  211. if (minBitrate <= 0 || maxBitrate <= minBitrate) {
  212. return ErrorCode::INVALID_PARAMS;
  213. }
  214. streamParams_.minBitrate = minBitrate;
  215. streamParams_.maxBitrate = maxBitrate;
  216. Logger::instance().infof("Bitrate range set: {} - {} bps", minBitrate, maxBitrate);
  217. return ErrorCode::SUCCESS;
  218. }
  219. ErrorCode StreamMuxer::adjustBitrate(double factor) {
  220. if (factor <= 0.0 || factor > 2.0) {
  221. return ErrorCode::INVALID_PARAMS;
  222. }
  223. int newBitrate = static_cast<int>(currentBitrate_ * factor);
  224. newBitrate = std::clamp(newBitrate, streamParams_.minBitrate, streamParams_.maxBitrate);
  225. if (newBitrate != currentBitrate_) {
  226. currentBitrate_ = newBitrate;
  227. if (bitrateCallback_) {
  228. bitrateCallback_(newBitrate);
  229. }
  230. Logger::instance().infof("Bitrate adjusted to: {} bps", newBitrate);
  231. }
  232. return ErrorCode::SUCCESS;
  233. }
  234. ErrorCode StreamMuxer::setProtocolOption(const std::string& key, const std::string& value) {
  235. streamParams_.protocolOptions[key] = value;
  236. Logger::instance().debugf("Protocol option set: {}={}", key, value);
  237. return ErrorCode::SUCCESS;
  238. }
  239. std::string StreamMuxer::getProtocolOption(const std::string& key) const {
  240. auto it = streamParams_.protocolOptions.find(key);
  241. return (it != streamParams_.protocolOptions.end()) ? it->second : "";
  242. }
  243. // 内部实现方法
  244. ErrorCode StreamMuxer::setupOutput() {
  245. // 根据协议设置输出格式
  246. const char* formatName = nullptr;
  247. switch (streamParams_.protocol) {
  248. case StreamProtocol::RTMP:
  249. case StreamProtocol::RTMPS:
  250. formatName = "flv";
  251. break;
  252. case StreamProtocol::UDP:
  253. case StreamProtocol::TCP:
  254. formatName = "mpegts";
  255. break;
  256. case StreamProtocol::HTTP:
  257. case StreamProtocol::HTTPS:
  258. formatName = "hls";
  259. break;
  260. case StreamProtocol::SRT:
  261. case StreamProtocol::RIST:
  262. formatName = "mpegts";
  263. break;
  264. default:
  265. formatName = "flv";
  266. break;
  267. }
  268. // 分配格式上下文
  269. int ret = avformat_alloc_output_context2(&formatCtx_, NULL, formatName, streamParams_.url.c_str());
  270. if (ret < 0) {
  271. Logger::instance().errorf("Failed to allocate output context: {}", ffmpeg_utils::errorToString(ret));
  272. return ErrorCode::INITIALIZATION_FAILED;
  273. }
  274. // 设置协议特定选项
  275. switch (streamParams_.protocol) {
  276. case StreamProtocol::RTMP:
  277. case StreamProtocol::RTMPS:
  278. return setupRTMP();
  279. case StreamProtocol::UDP:
  280. return setupUDP();
  281. case StreamProtocol::TCP:
  282. return setupTCP();
  283. case StreamProtocol::SRT:
  284. return setupSRT();
  285. default:
  286. break;
  287. }
  288. return ErrorCode::SUCCESS;
  289. }
  290. ErrorCode StreamMuxer::writeHeader() {
  291. if (!formatCtx_) {
  292. return ErrorCode::INVALID_STATE;
  293. }
  294. // 打开输出
  295. int ret = avio_open(&formatCtx_->pb, streamParams_.url.c_str(), AVIO_FLAG_WRITE);
  296. if (ret < 0) {
  297. Logger::instance().errorf("Failed to open output: {}", ffmpeg_utils::errorToString(ret));
  298. return ErrorCode::OPERATION_FAILED;
  299. }
  300. // 写入头部
  301. ret = avformat_write_header(formatCtx_, NULL);
  302. if (ret < 0) {
  303. Logger::instance().errorf("Failed to write header: {}", ffmpeg_utils::errorToString(ret));
  304. return ErrorCode::OPERATION_FAILED;
  305. }
  306. return ErrorCode::SUCCESS;
  307. }
  308. ErrorCode StreamMuxer::writeTrailer() {
  309. if (!formatCtx_) {
  310. return ErrorCode::INVALID_STATE;
  311. }
  312. int ret = av_write_trailer(formatCtx_);
  313. if (ret < 0) {
  314. Logger::instance().errorf("Failed to write trailer: {}", ffmpeg_utils::errorToString(ret));
  315. return ErrorCode::OPERATION_FAILED;
  316. }
  317. return ErrorCode::SUCCESS;
  318. }
  319. ErrorCode StreamMuxer::establishConnection() {
  320. // 设置输出
  321. ErrorCode result = setupOutput();
  322. if (result != ErrorCode::SUCCESS) {
  323. return result;
  324. }
  325. // 写入头部
  326. result = writeHeader();
  327. if (result != ErrorCode::SUCCESS) {
  328. return result;
  329. }
  330. return ErrorCode::SUCCESS;
  331. }
  332. ErrorCode StreamMuxer::closeConnection() {
  333. if (formatCtx_ && formatCtx_->pb) {
  334. // 写入尾部
  335. writeTrailer();
  336. // 关闭输出
  337. avio_closep(&formatCtx_->pb);
  338. }
  339. return ErrorCode::SUCCESS;
  340. }
  341. ErrorCode StreamMuxer::setupRTMP() {
  342. if (!formatCtx_) {
  343. return ErrorCode::INVALID_STATE;
  344. }
  345. // 设置RTMP选项
  346. AVDictionary* options = NULL;
  347. // 连接超时
  348. av_dict_set_int(&options, "timeout", streamParams_.connectTimeout * 1000, 0);
  349. // 直播模式
  350. if (streamParams_.rtmpLive) {
  351. av_dict_set(&options, "live", "1", 0);
  352. }
  353. // 缓冲区大小
  354. av_dict_set_int(&options, "buffer_size", streamParams_.sendBufferSize, 0);
  355. // 应用协议选项
  356. for (const auto& option : streamParams_.protocolOptions) {
  357. av_dict_set(&options, option.first.c_str(), option.second.c_str(), 0);
  358. }
  359. formatCtx_->metadata = options;
  360. Logger::instance().debug("RTMP setup completed");
  361. return ErrorCode::SUCCESS;
  362. }
  363. ErrorCode StreamMuxer::setupUDP() {
  364. if (!formatCtx_) {
  365. return ErrorCode::INVALID_STATE;
  366. }
  367. // 设置UDP选项
  368. AVDictionary* options = NULL;
  369. // 缓冲区大小
  370. av_dict_set_int(&options, "buffer_size", streamParams_.sendBufferSize, 0);
  371. // 重用地址
  372. if (streamParams_.reuseAddress) {
  373. av_dict_set(&options, "reuse", "1", 0);
  374. }
  375. // 本地地址和端口
  376. if (!streamParams_.localAddress.empty()) {
  377. av_dict_set(&options, "localaddr", streamParams_.localAddress.c_str(), 0);
  378. }
  379. if (streamParams_.localPort > 0) {
  380. av_dict_set_int(&options, "localport", streamParams_.localPort, 0);
  381. }
  382. // 应用协议选项
  383. for (const auto& option : streamParams_.protocolOptions) {
  384. av_dict_set(&options, option.first.c_str(), option.second.c_str(), 0);
  385. }
  386. formatCtx_->metadata = options;
  387. Logger::instance().debug("UDP setup completed");
  388. return ErrorCode::SUCCESS;
  389. }
  390. ErrorCode StreamMuxer::setupTCP() {
  391. if (!formatCtx_) {
  392. return ErrorCode::INVALID_STATE;
  393. }
  394. // 设置TCP选项
  395. AVDictionary* options = NULL;
  396. // 连接超时
  397. av_dict_set_int(&options, "timeout", streamParams_.connectTimeout * 1000, 0);
  398. // 缓冲区大小
  399. av_dict_set_int(&options, "buffer_size", streamParams_.sendBufferSize, 0);
  400. // 应用协议选项
  401. for (const auto& option : streamParams_.protocolOptions) {
  402. av_dict_set(&options, option.first.c_str(), option.second.c_str(), 0);
  403. }
  404. formatCtx_->metadata = options;
  405. Logger::instance().debug("TCP setup completed");
  406. return ErrorCode::SUCCESS;
  407. }
  408. ErrorCode StreamMuxer::setupSRT() {
  409. if (!formatCtx_) {
  410. return ErrorCode::INVALID_STATE;
  411. }
  412. // 设置SRT选项
  413. AVDictionary* options = NULL;
  414. // 延迟
  415. av_dict_set_int(&options, "latency", streamParams_.srtLatency * 1000, 0);
  416. // 密码
  417. if (!streamParams_.srtPassphrase.empty()) {
  418. av_dict_set(&options, "passphrase", streamParams_.srtPassphrase.c_str(), 0);
  419. }
  420. // 应用协议选项
  421. for (const auto& option : streamParams_.protocolOptions) {
  422. av_dict_set(&options, option.first.c_str(), option.second.c_str(), 0);
  423. }
  424. formatCtx_->metadata = options;
  425. Logger::instance().debug("SRT setup completed");
  426. return ErrorCode::SUCCESS;
  427. }
  428. ErrorCode StreamMuxer::processPacket(AVPacket* packet) {
  429. if (!packet || !formatCtx_) {
  430. return ErrorCode::INVALID_PARAMS;
  431. }
  432. // 发送包
  433. ErrorCode result = sendPacket(packet);
  434. if (result == ErrorCode::SUCCESS) {
  435. // 更新统计信息
  436. updateConnectionStats();
  437. updateBandwidthStats(packet->size);
  438. // 更新包统计
  439. stats_.totalPackets++;
  440. stats_.totalBytes += packet->size;
  441. connectionStats_.packetsSent++;
  442. connectionStats_.bytesSent += packet->size;
  443. connectionStats_.lastDataTime = std::chrono::steady_clock::now();
  444. } else {
  445. stats_.errorCount++;
  446. Logger::instance().warning("Failed to send packet");
  447. }
  448. return result;
  449. }
  450. ErrorCode StreamMuxer::sendPacket(AVPacket* packet) {
  451. if (!packet || !formatCtx_) {
  452. return ErrorCode::INVALID_PARAMS;
  453. }
  454. int ret = av_interleaved_write_frame(formatCtx_, packet);
  455. if (ret < 0) {
  456. Logger::instance().errorf("Failed to write packet: {}", ffmpeg_utils::errorToString(ret));
  457. return ErrorCode::OPERATION_FAILED;
  458. }
  459. return ErrorCode::SUCCESS;
  460. }
  461. void StreamMuxer::monitorNetworkCondition() {
  462. Logger::instance().debug("Network condition monitoring started");
  463. while (!shouldStopBitrateMonitor_) {
  464. std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  465. if (adaptiveBitrateEnabled_ && isConnected()) {
  466. adjustBitrateBasedOnCondition();
  467. }
  468. }
  469. Logger::instance().debug("Network condition monitoring stopped");
  470. }
  471. void StreamMuxer::adjustBitrateBasedOnCondition() {
  472. // 获取当前网络状态
  473. double currentBandwidth = connectionStats_.currentBandwidth;
  474. double rtt = connectionStats_.rtt;
  475. // 简单的自适应算法
  476. double targetUtilization = 0.8; // 目标带宽利用率
  477. double currentUtilization = (currentBitrate_ > 0) ? (currentBandwidth / currentBitrate_) : 0.0;
  478. if (currentUtilization < targetUtilization * 0.7) {
  479. // 网络状况良好,可以提高码率
  480. adjustBitrate(1.1);
  481. } else if (currentUtilization > targetUtilization * 1.2 || rtt > 200.0) {
  482. // 网络拥塞,降低码率
  483. adjustBitrate(streamParams_.bitrateAdjustFactor);
  484. }
  485. }
  486. void StreamMuxer::reconnectThreadFunc() {
  487. Logger::instance().debug("Reconnect thread started");
  488. int attemptCount = 0;
  489. while (!shouldStopReconnect_) {
  490. std::unique_lock<std::mutex> lock(reconnectMutex_);
  491. // 等待重连请求或停止信号
  492. reconnectCondition_.wait(lock, [this] {
  493. return shouldStopReconnect_ || reconnectRequested_ || shouldReconnect();
  494. });
  495. if (shouldStopReconnect_) {
  496. break;
  497. }
  498. if (shouldReconnect() && attemptCount < streamParams_.maxReconnectAttempts) {
  499. attemptCount++;
  500. Logger::instance().infof("Reconnect attempt {}/{}", attemptCount, streamParams_.maxReconnectAttempts);
  501. lock.unlock();
  502. // 计算重连延迟
  503. int delay = calculateReconnectDelay(attemptCount);
  504. std::this_thread::sleep_for(std::chrono::milliseconds(delay));
  505. // 尝试重连
  506. if (reconnect() == ErrorCode::SUCCESS) {
  507. attemptCount = 0; // 重连成功,重置计数
  508. }
  509. lock.lock();
  510. }
  511. reconnectRequested_ = false;
  512. }
  513. Logger::instance().debug("Reconnect thread stopped");
  514. }
  515. bool StreamMuxer::shouldReconnect() const {
  516. ConnectionState state = connectionState_.load();
  517. return (state == ConnectionState::FAILED || state == ConnectionState::DISCONNECTED) &&
  518. streamParams_.enableAutoReconnect;
  519. }
  520. int StreamMuxer::calculateReconnectDelay(int attempt) const {
  521. double delay = streamParams_.reconnectDelay;
  522. for (int i = 1; i < attempt; ++i) {
  523. delay *= streamParams_.reconnectBackoffFactor;
  524. }
  525. return static_cast<int>(std::min(delay, 30000.0)); // 最大30秒
  526. }
  527. void StreamMuxer::updateConnectionStats() {
  528. auto now = std::chrono::steady_clock::now();
  529. // 更新统计信息的时间间隔检查
  530. if (std::chrono::duration<double>(now - lastStatsUpdate_).count() < STATS_UPDATE_INTERVAL) {
  531. return;
  532. }
  533. std::lock_guard<std::mutex> lock(connectionMutex_);
  534. // 计算平均带宽
  535. double totalBandwidth = 0.0;
  536. int validSamples = 0;
  537. for (int i = 0; i < 10; ++i) {
  538. if (bandwidthSamples_[i] > 0) {
  539. totalBandwidth += bandwidthSamples_[i];
  540. validSamples++;
  541. }
  542. }
  543. if (validSamples > 0) {
  544. connectionStats_.averageBandwidth = totalBandwidth / validSamples;
  545. }
  546. lastStatsUpdate_ = now;
  547. }
  548. void StreamMuxer::updateBandwidthStats(size_t bytes) {
  549. auto now = std::chrono::steady_clock::now();
  550. // 更新带宽统计的时间间隔检查
  551. if (std::chrono::duration<double>(now - lastBandwidthUpdate_).count() < BANDWIDTH_UPDATE_INTERVAL) {
  552. return;
  553. }
  554. std::lock_guard<std::mutex> lock(connectionMutex_);
  555. // 计算当前带宽
  556. double timeDiff = std::chrono::duration<double>(now - lastBandwidthUpdate_).count();
  557. if (timeDiff > 0) {
  558. uint64_t bytesDiff = connectionStats_.bytesSent - lastBytesSent_;
  559. double bandwidth = (bytesDiff * 8.0) / timeDiff; // bps
  560. // 更新带宽采样
  561. bandwidthSamples_[bandwidthSampleIndex_] = bandwidth;
  562. bandwidthSampleIndex_ = (bandwidthSampleIndex_ + 1) % 10;
  563. connectionStats_.currentBandwidth = bandwidth;
  564. lastBytesSent_ = connectionStats_.bytesSent;
  565. }
  566. lastBandwidthUpdate_ = now;
  567. }
  568. void StreamMuxer::setConnectionState(ConnectionState state) {
  569. ConnectionState oldState = connectionState_.exchange(state);
  570. if (oldState != state) {
  571. onConnectionStateChanged(state);
  572. }
  573. }
  574. void StreamMuxer::onConnectionStateChanged(ConnectionState state) {
  575. Logger::instance().infof("Connection state changed to: {}", static_cast<int>(state));
  576. if (connectionCallback_) {
  577. connectionCallback_(state);
  578. }
  579. // 根据状态变化触发相应操作
  580. if (state == ConnectionState::FAILED && streamParams_.enableAutoReconnect) {
  581. reconnectRequested_ = true;
  582. reconnectCondition_.notify_one();
  583. }
  584. }
  585. bool StreamMuxer::validateParams(const MuxerParams& params) {
  586. if (!AbstractMuxer::validateParams(params)) {
  587. return false;
  588. }
  589. const StreamMuxerParams& streamParams = static_cast<const StreamMuxerParams&>(params);
  590. // 验证URL
  591. if (!validateUrl(streamParams.url)) {
  592. Logger::instance().errorf("Invalid URL: {}", streamParams.url);
  593. return false;
  594. }
  595. // 验证超时参数
  596. if (streamParams.connectTimeout <= 0 || streamParams.sendTimeout <= 0) {
  597. Logger::instance().error("Invalid timeout parameters");
  598. return false;
  599. }
  600. // 验证重连参数
  601. if (streamParams.maxReconnectAttempts < 0 || streamParams.reconnectDelay < 0) {
  602. Logger::instance().error("Invalid reconnect parameters");
  603. return false;
  604. }
  605. // 验证码率参数
  606. if (streamParams.minBitrate <= 0 || streamParams.maxBitrate <= streamParams.minBitrate) {
  607. Logger::instance().error("Invalid bitrate parameters");
  608. return false;
  609. }
  610. return true;
  611. }
  612. bool StreamMuxer::validateUrl(const std::string& url) const {
  613. if (url.empty()) {
  614. return false;
  615. }
  616. // 简单的URL格式验证
  617. std::regex urlPattern(R"(^(rtmp|rtmps|udp|tcp|http|https|srt|rist)://.*)");
  618. return std::regex_match(url, urlPattern);
  619. }
  620. // 工厂类实现
  621. std::unique_ptr<AbstractMuxer> StreamMuxer::StreamMuxerFactory::createMuxer(MuxerType type) {
  622. if (type == MuxerType::STREAM_MUXER) {
  623. return std::make_unique<StreamMuxer>();
  624. }
  625. return nullptr;
  626. }
  627. bool StreamMuxer::StreamMuxerFactory::isTypeSupported(MuxerType type) const {
  628. return type == MuxerType::STREAM_MUXER;
  629. }
  630. std::vector<MuxerType> StreamMuxer::StreamMuxerFactory::getSupportedTypes() const {
  631. return {MuxerType::STREAM_MUXER};
  632. }
  633. std::unique_ptr<StreamMuxer> StreamMuxer::StreamMuxerFactory::createRTMPMuxer(const std::string& url) {
  634. auto muxer = std::make_unique<StreamMuxer>();
  635. StreamMuxerParams params;
  636. params.protocol = StreamProtocol::RTMP;
  637. params.url = url;
  638. params.rtmpLive = true;
  639. if (muxer->initialize(params) == ErrorCode::SUCCESS) {
  640. return muxer;
  641. }
  642. return nullptr;
  643. }
  644. std::unique_ptr<StreamMuxer> StreamMuxer::StreamMuxerFactory::createUDPMuxer(const std::string& address, int port) {
  645. auto muxer = std::make_unique<StreamMuxer>();
  646. StreamMuxerParams params;
  647. params.protocol = StreamProtocol::UDP;
  648. params.url = "udp://" + address + ":" + std::to_string(port);
  649. if (muxer->initialize(params) == ErrorCode::SUCCESS) {
  650. return muxer;
  651. }
  652. return nullptr;
  653. }
  654. std::unique_ptr<StreamMuxer> StreamMuxer::StreamMuxerFactory::createTCPMuxer(const std::string& address, int port) {
  655. auto muxer = std::make_unique<StreamMuxer>();
  656. StreamMuxerParams params;
  657. params.protocol = StreamProtocol::TCP;
  658. params.url = "tcp://" + address + ":" + std::to_string(port);
  659. if (muxer->initialize(params) == ErrorCode::SUCCESS) {
  660. return muxer;
  661. }
  662. return nullptr;
  663. }
  664. std::unique_ptr<StreamMuxer> StreamMuxer::StreamMuxerFactory::createSRTMuxer(const std::string& url) {
  665. auto muxer = std::make_unique<StreamMuxer>();
  666. StreamMuxerParams params;
  667. params.protocol = StreamProtocol::SRT;
  668. params.url = url;
  669. if (muxer->initialize(params) == ErrorCode::SUCCESS) {
  670. return muxer;
  671. }
  672. return nullptr;
  673. }
  674. } // namespace muxer
  675. } // namespace av