#include "muxer_stream_muxer.h" #include "../base/logger.h" #include #include #include extern "C" { #include #include #include } namespace av { namespace muxer { StreamMuxer::StreamMuxer() { Logger::instance().info("StreamMuxer created"); } StreamMuxer::~StreamMuxer() { close(); Logger::instance().info("StreamMuxer destroyed"); } ErrorCode StreamMuxer::initialize(const MuxerParams& params) { if (!validateParams(params)) { Logger::instance().error("Invalid stream muxer parameters"); return ErrorCode::INVALID_PARAMS; } streamParams_ = static_cast(params); // 重置统计信息 connectionStats_ = ConnectionStats(); // 设置初始状态 setConnectionState(ConnectionState::DISCONNECTED); Logger::instance().infof("StreamMuxer initialized with URL: {}", streamParams_.url); return AbstractMuxer::initialize(params); } ErrorCode StreamMuxer::start() { if (state_ != MuxerState::INITIALIZED) { Logger::instance().error("StreamMuxer not initialized"); return ErrorCode::INVALID_STATE; } // 建立连接 ErrorCode result = connect(); if (result != ErrorCode::SUCCESS) { Logger::instance().error("Failed to connect to stream"); return result; } // 启动重连线程 if (streamParams_.enableAutoReconnect) { shouldStopReconnect_ = false; reconnectThread_ = std::thread(&StreamMuxer::reconnectThreadFunc, this); } // 启动自适应码率监控 if (streamParams_.enableAdaptiveBitrate) { shouldStopBitrateMonitor_ = false; bitrateMonitorThread_ = std::thread(&StreamMuxer::monitorNetworkCondition, this); } return AbstractMuxer::start(); } ErrorCode StreamMuxer::stop() { if (state_ != MuxerState::STARTED) { return ErrorCode::SUCCESS; } // 停止监控线程 shouldStopBitrateMonitor_ = true; if (bitrateMonitorThread_.joinable()) { bitrateMonitorThread_.join(); } // 停止重连线程 shouldStopReconnect_ = true; reconnectCondition_.notify_all(); if (reconnectThread_.joinable()) { reconnectThread_.join(); } // 断开连接 disconnect(); return AbstractMuxer::stop(); } ErrorCode StreamMuxer::close() { stop(); // 清理资源 connectionStats_ = ConnectionStats(); setConnectionState(ConnectionState::DISCONNECTED); return AbstractMuxer::close(); } ErrorCode StreamMuxer::writePacket(AVPacket* packet) { if (!packet) { return ErrorCode::INVALID_PARAMS; } if (!isConnected()) { Logger::instance().warning("Not connected, dropping packet"); return ErrorCode::INVALID_STATE; } return processPacket(packet); } ErrorCode StreamMuxer::writeFrame(AVFrame* frame, int streamIndex) { if (!frame || streamIndex < 0) { return ErrorCode::INVALID_PARAMS; } if (!isConnected()) { Logger::instance().warning("Not connected, dropping frame"); return ErrorCode::INVALID_STATE; } // 将帧编码为包 AVPacket* packet = av_packet_alloc(); if (!packet) { return ErrorCode::MEMORY_ALLOC_FAILED; } // 这里需要编码器支持,暂时返回成功 av_packet_free(&packet); return ErrorCode::SUCCESS; } ErrorCode StreamMuxer::flush() { if (!formatCtx_) { return ErrorCode::INVALID_STATE; } int ret = av_write_trailer(formatCtx_); if (ret < 0) { Logger::instance().errorf("Failed to write trailer: {}", ffmpeg_utils::errorToString(ret)); return ErrorCode::OPERATION_FAILED; } return ErrorCode::SUCCESS; } ErrorCode StreamMuxer::addStream(const StreamInfo& streamInfo) { ErrorCode result = AbstractMuxer::addStream(streamInfo); if (result != ErrorCode::SUCCESS) { return result; } Logger::instance().infof("Added stream: type={}, codec={}", static_cast(streamInfo.type), streamInfo.codecName); return ErrorCode::SUCCESS; } ErrorCode StreamMuxer::connect() { if (connectionState_ == ConnectionState::CONNECTED) { return ErrorCode::SUCCESS; } setConnectionState(ConnectionState::CONNECTING); ErrorCode result = establishConnection(); if (result == ErrorCode::SUCCESS) { setConnectionState(ConnectionState::CONNECTED); connectionStats_.connectTime = std::chrono::steady_clock::now(); connectionStats_.lastDataTime = connectionStats_.connectTime; Logger::instance().infof("Connected to stream: {}", streamParams_.url); } else { setConnectionState(ConnectionState::FAILED); Logger::instance().errorf("Failed to connect to stream: {}", streamParams_.url); } return result; } ErrorCode StreamMuxer::disconnect() { if (connectionState_ == ConnectionState::DISCONNECTED) { return ErrorCode::SUCCESS; } ErrorCode result = closeConnection(); setConnectionState(ConnectionState::DISCONNECTED); Logger::instance().info("Disconnected from stream"); return result; } ErrorCode StreamMuxer::reconnect() { Logger::instance().info("Attempting to reconnect..."); setConnectionState(ConnectionState::RECONNECTING); // 先断开现有连接 closeConnection(); // 等待一段时间 std::this_thread::sleep_for(std::chrono::milliseconds(streamParams_.reconnectDelay)); // 重新连接 ErrorCode result = establishConnection(); if (result == ErrorCode::SUCCESS) { setConnectionState(ConnectionState::CONNECTED); connectionStats_.reconnectCount++; connectionStats_.lastReconnectTime = std::chrono::steady_clock::now(); Logger::instance().info("Reconnected successfully"); } else { setConnectionState(ConnectionState::FAILED); Logger::instance().error("Reconnection failed"); } return result; } ConnectionState StreamMuxer::getConnectionState() const { return connectionState_.load(); } ConnectionStats StreamMuxer::getConnectionStats() const { std::lock_guard lock(connectionMutex_); return connectionStats_; } bool StreamMuxer::isConnected() const { return connectionState_ == ConnectionState::CONNECTED; } ErrorCode StreamMuxer::setUrl(const std::string& url) { if (state_ != MuxerState::IDLE) { Logger::instance().error("Cannot change URL while muxer is active"); return ErrorCode::INVALID_STATE; } if (!validateUrl(url)) { Logger::instance().errorf("Invalid URL: {}", url); return ErrorCode::INVALID_PARAMS; } streamParams_.url = url; Logger::instance().infof("URL set to: {}", url); return ErrorCode::SUCCESS; } std::string StreamMuxer::getUrl() const { return streamParams_.url; } ErrorCode StreamMuxer::enableAdaptiveBitrate(bool enable) { adaptiveBitrateEnabled_ = enable; if (enable && state_ == MuxerState::STARTED && !bitrateMonitorThread_.joinable()) { shouldStopBitrateMonitor_ = false; bitrateMonitorThread_ = std::thread(&StreamMuxer::monitorNetworkCondition, this); } Logger::instance().infof("Adaptive bitrate {}", enable ? "enabled" : "disabled"); return ErrorCode::SUCCESS; } ErrorCode StreamMuxer::setBitrateRange(int minBitrate, int maxBitrate) { if (minBitrate <= 0 || maxBitrate <= minBitrate) { return ErrorCode::INVALID_PARAMS; } streamParams_.minBitrate = minBitrate; streamParams_.maxBitrate = maxBitrate; Logger::instance().infof("Bitrate range set: {} - {} bps", minBitrate, maxBitrate); return ErrorCode::SUCCESS; } ErrorCode StreamMuxer::adjustBitrate(double factor) { if (factor <= 0.0 || factor > 2.0) { return ErrorCode::INVALID_PARAMS; } int newBitrate = static_cast(currentBitrate_ * factor); newBitrate = std::clamp(newBitrate, streamParams_.minBitrate, streamParams_.maxBitrate); if (newBitrate != currentBitrate_) { currentBitrate_ = newBitrate; if (bitrateCallback_) { bitrateCallback_(newBitrate); } Logger::instance().infof("Bitrate adjusted to: {} bps", newBitrate); } return ErrorCode::SUCCESS; } ErrorCode StreamMuxer::setProtocolOption(const std::string& key, const std::string& value) { streamParams_.protocolOptions[key] = value; Logger::instance().debugf("Protocol option set: {}={}", key, value); return ErrorCode::SUCCESS; } std::string StreamMuxer::getProtocolOption(const std::string& key) const { auto it = streamParams_.protocolOptions.find(key); return (it != streamParams_.protocolOptions.end()) ? it->second : ""; } // 内部实现方法 ErrorCode StreamMuxer::setupOutput() { // 根据协议设置输出格式 const char* formatName = nullptr; switch (streamParams_.protocol) { case StreamProtocol::RTMP: case StreamProtocol::RTMPS: formatName = "flv"; break; case StreamProtocol::UDP: case StreamProtocol::TCP: formatName = "mpegts"; break; case StreamProtocol::HTTP: case StreamProtocol::HTTPS: formatName = "hls"; break; case StreamProtocol::SRT: case StreamProtocol::RIST: formatName = "mpegts"; break; default: formatName = "flv"; break; } // 分配格式上下文 int ret = avformat_alloc_output_context2(&formatCtx_, NULL, formatName, streamParams_.url.c_str()); if (ret < 0) { Logger::instance().errorf("Failed to allocate output context: {}", ffmpeg_utils::errorToString(ret)); return ErrorCode::INITIALIZATION_FAILED; } // 设置协议特定选项 switch (streamParams_.protocol) { case StreamProtocol::RTMP: case StreamProtocol::RTMPS: return setupRTMP(); case StreamProtocol::UDP: return setupUDP(); case StreamProtocol::TCP: return setupTCP(); case StreamProtocol::SRT: return setupSRT(); default: break; } return ErrorCode::SUCCESS; } ErrorCode StreamMuxer::writeHeader() { if (!formatCtx_) { return ErrorCode::INVALID_STATE; } // 打开输出 int ret = avio_open(&formatCtx_->pb, streamParams_.url.c_str(), AVIO_FLAG_WRITE); if (ret < 0) { Logger::instance().errorf("Failed to open output: {}", ffmpeg_utils::errorToString(ret)); return ErrorCode::OPERATION_FAILED; } // 写入头部 ret = avformat_write_header(formatCtx_, NULL); if (ret < 0) { Logger::instance().errorf("Failed to write header: {}", ffmpeg_utils::errorToString(ret)); return ErrorCode::OPERATION_FAILED; } return ErrorCode::SUCCESS; } ErrorCode StreamMuxer::writeTrailer() { if (!formatCtx_) { return ErrorCode::INVALID_STATE; } int ret = av_write_trailer(formatCtx_); if (ret < 0) { Logger::instance().errorf("Failed to write trailer: {}", ffmpeg_utils::errorToString(ret)); return ErrorCode::OPERATION_FAILED; } return ErrorCode::SUCCESS; } ErrorCode StreamMuxer::establishConnection() { // 设置输出 ErrorCode result = setupOutput(); if (result != ErrorCode::SUCCESS) { return result; } // 写入头部 result = writeHeader(); if (result != ErrorCode::SUCCESS) { return result; } return ErrorCode::SUCCESS; } ErrorCode StreamMuxer::closeConnection() { if (formatCtx_ && formatCtx_->pb) { // 写入尾部 writeTrailer(); // 关闭输出 avio_closep(&formatCtx_->pb); } return ErrorCode::SUCCESS; } ErrorCode StreamMuxer::setupRTMP() { if (!formatCtx_) { return ErrorCode::INVALID_STATE; } // 设置RTMP选项 AVDictionary* options = NULL; // 连接超时 av_dict_set_int(&options, "timeout", streamParams_.connectTimeout * 1000, 0); // 直播模式 if (streamParams_.rtmpLive) { av_dict_set(&options, "live", "1", 0); } // 缓冲区大小 av_dict_set_int(&options, "buffer_size", streamParams_.sendBufferSize, 0); // 应用协议选项 for (const auto& option : streamParams_.protocolOptions) { av_dict_set(&options, option.first.c_str(), option.second.c_str(), 0); } formatCtx_->metadata = options; Logger::instance().debug("RTMP setup completed"); return ErrorCode::SUCCESS; } ErrorCode StreamMuxer::setupUDP() { if (!formatCtx_) { return ErrorCode::INVALID_STATE; } // 设置UDP选项 AVDictionary* options = NULL; // 缓冲区大小 av_dict_set_int(&options, "buffer_size", streamParams_.sendBufferSize, 0); // 重用地址 if (streamParams_.reuseAddress) { av_dict_set(&options, "reuse", "1", 0); } // 本地地址和端口 if (!streamParams_.localAddress.empty()) { av_dict_set(&options, "localaddr", streamParams_.localAddress.c_str(), 0); } if (streamParams_.localPort > 0) { av_dict_set_int(&options, "localport", streamParams_.localPort, 0); } // 应用协议选项 for (const auto& option : streamParams_.protocolOptions) { av_dict_set(&options, option.first.c_str(), option.second.c_str(), 0); } formatCtx_->metadata = options; Logger::instance().debug("UDP setup completed"); return ErrorCode::SUCCESS; } ErrorCode StreamMuxer::setupTCP() { if (!formatCtx_) { return ErrorCode::INVALID_STATE; } // 设置TCP选项 AVDictionary* options = NULL; // 连接超时 av_dict_set_int(&options, "timeout", streamParams_.connectTimeout * 1000, 0); // 缓冲区大小 av_dict_set_int(&options, "buffer_size", streamParams_.sendBufferSize, 0); // 应用协议选项 for (const auto& option : streamParams_.protocolOptions) { av_dict_set(&options, option.first.c_str(), option.second.c_str(), 0); } formatCtx_->metadata = options; Logger::instance().debug("TCP setup completed"); return ErrorCode::SUCCESS; } ErrorCode StreamMuxer::setupSRT() { if (!formatCtx_) { return ErrorCode::INVALID_STATE; } // 设置SRT选项 AVDictionary* options = NULL; // 延迟 av_dict_set_int(&options, "latency", streamParams_.srtLatency * 1000, 0); // 密码 if (!streamParams_.srtPassphrase.empty()) { av_dict_set(&options, "passphrase", streamParams_.srtPassphrase.c_str(), 0); } // 应用协议选项 for (const auto& option : streamParams_.protocolOptions) { av_dict_set(&options, option.first.c_str(), option.second.c_str(), 0); } formatCtx_->metadata = options; Logger::instance().debug("SRT setup completed"); return ErrorCode::SUCCESS; } ErrorCode StreamMuxer::processPacket(AVPacket* packet) { if (!packet || !formatCtx_) { return ErrorCode::INVALID_PARAMS; } // 发送包 ErrorCode result = sendPacket(packet); if (result == ErrorCode::SUCCESS) { // 更新统计信息 updateConnectionStats(); updateBandwidthStats(packet->size); // 更新包统计 stats_.totalPackets++; stats_.totalBytes += packet->size; connectionStats_.packetsSent++; connectionStats_.bytesSent += packet->size; connectionStats_.lastDataTime = std::chrono::steady_clock::now(); } else { stats_.errorCount++; Logger::instance().warning("Failed to send packet"); } return result; } ErrorCode StreamMuxer::sendPacket(AVPacket* packet) { if (!packet || !formatCtx_) { return ErrorCode::INVALID_PARAMS; } int ret = av_interleaved_write_frame(formatCtx_, packet); if (ret < 0) { Logger::instance().errorf("Failed to write packet: {}", ffmpeg_utils::errorToString(ret)); return ErrorCode::OPERATION_FAILED; } return ErrorCode::SUCCESS; } void StreamMuxer::monitorNetworkCondition() { Logger::instance().debug("Network condition monitoring started"); while (!shouldStopBitrateMonitor_) { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); if (adaptiveBitrateEnabled_ && isConnected()) { adjustBitrateBasedOnCondition(); } } Logger::instance().debug("Network condition monitoring stopped"); } void StreamMuxer::adjustBitrateBasedOnCondition() { // 获取当前网络状态 double currentBandwidth = connectionStats_.currentBandwidth; double rtt = connectionStats_.rtt; // 简单的自适应算法 double targetUtilization = 0.8; // 目标带宽利用率 double currentUtilization = (currentBitrate_ > 0) ? (currentBandwidth / currentBitrate_) : 0.0; if (currentUtilization < targetUtilization * 0.7) { // 网络状况良好,可以提高码率 adjustBitrate(1.1); } else if (currentUtilization > targetUtilization * 1.2 || rtt > 200.0) { // 网络拥塞,降低码率 adjustBitrate(streamParams_.bitrateAdjustFactor); } } void StreamMuxer::reconnectThreadFunc() { Logger::instance().debug("Reconnect thread started"); int attemptCount = 0; while (!shouldStopReconnect_) { std::unique_lock lock(reconnectMutex_); // 等待重连请求或停止信号 reconnectCondition_.wait(lock, [this] { return shouldStopReconnect_ || reconnectRequested_ || shouldReconnect(); }); if (shouldStopReconnect_) { break; } if (shouldReconnect() && attemptCount < streamParams_.maxReconnectAttempts) { attemptCount++; Logger::instance().infof("Reconnect attempt {}/{}", attemptCount, streamParams_.maxReconnectAttempts); lock.unlock(); // 计算重连延迟 int delay = calculateReconnectDelay(attemptCount); std::this_thread::sleep_for(std::chrono::milliseconds(delay)); // 尝试重连 if (reconnect() == ErrorCode::SUCCESS) { attemptCount = 0; // 重连成功,重置计数 } lock.lock(); } reconnectRequested_ = false; } Logger::instance().debug("Reconnect thread stopped"); } bool StreamMuxer::shouldReconnect() const { ConnectionState state = connectionState_.load(); return (state == ConnectionState::FAILED || state == ConnectionState::DISCONNECTED) && streamParams_.enableAutoReconnect; } int StreamMuxer::calculateReconnectDelay(int attempt) const { double delay = streamParams_.reconnectDelay; for (int i = 1; i < attempt; ++i) { delay *= streamParams_.reconnectBackoffFactor; } return static_cast(std::min(delay, 30000.0)); // 最大30秒 } void StreamMuxer::updateConnectionStats() { auto now = std::chrono::steady_clock::now(); // 更新统计信息的时间间隔检查 if (std::chrono::duration(now - lastStatsUpdate_).count() < STATS_UPDATE_INTERVAL) { return; } std::lock_guard lock(connectionMutex_); // 计算平均带宽 double totalBandwidth = 0.0; int validSamples = 0; for (int i = 0; i < 10; ++i) { if (bandwidthSamples_[i] > 0) { totalBandwidth += bandwidthSamples_[i]; validSamples++; } } if (validSamples > 0) { connectionStats_.averageBandwidth = totalBandwidth / validSamples; } lastStatsUpdate_ = now; } void StreamMuxer::updateBandwidthStats(size_t bytes) { auto now = std::chrono::steady_clock::now(); // 更新带宽统计的时间间隔检查 if (std::chrono::duration(now - lastBandwidthUpdate_).count() < BANDWIDTH_UPDATE_INTERVAL) { return; } std::lock_guard lock(connectionMutex_); // 计算当前带宽 double timeDiff = std::chrono::duration(now - lastBandwidthUpdate_).count(); if (timeDiff > 0) { uint64_t bytesDiff = connectionStats_.bytesSent - lastBytesSent_; double bandwidth = (bytesDiff * 8.0) / timeDiff; // bps // 更新带宽采样 bandwidthSamples_[bandwidthSampleIndex_] = bandwidth; bandwidthSampleIndex_ = (bandwidthSampleIndex_ + 1) % 10; connectionStats_.currentBandwidth = bandwidth; lastBytesSent_ = connectionStats_.bytesSent; } lastBandwidthUpdate_ = now; } void StreamMuxer::setConnectionState(ConnectionState state) { ConnectionState oldState = connectionState_.exchange(state); if (oldState != state) { onConnectionStateChanged(state); } } void StreamMuxer::onConnectionStateChanged(ConnectionState state) { Logger::instance().infof("Connection state changed to: {}", static_cast(state)); if (connectionCallback_) { connectionCallback_(state); } // 根据状态变化触发相应操作 if (state == ConnectionState::FAILED && streamParams_.enableAutoReconnect) { reconnectRequested_ = true; reconnectCondition_.notify_one(); } } bool StreamMuxer::validateParams(const MuxerParams& params) { if (!AbstractMuxer::validateParams(params)) { return false; } const StreamMuxerParams& streamParams = static_cast(params); // 验证URL if (!validateUrl(streamParams.url)) { Logger::instance().errorf("Invalid URL: {}", streamParams.url); return false; } // 验证超时参数 if (streamParams.connectTimeout <= 0 || streamParams.sendTimeout <= 0) { Logger::instance().error("Invalid timeout parameters"); return false; } // 验证重连参数 if (streamParams.maxReconnectAttempts < 0 || streamParams.reconnectDelay < 0) { Logger::instance().error("Invalid reconnect parameters"); return false; } // 验证码率参数 if (streamParams.minBitrate <= 0 || streamParams.maxBitrate <= streamParams.minBitrate) { Logger::instance().error("Invalid bitrate parameters"); return false; } return true; } bool StreamMuxer::validateUrl(const std::string& url) const { if (url.empty()) { return false; } // 简单的URL格式验证 std::regex urlPattern(R"(^(rtmp|rtmps|udp|tcp|http|https|srt|rist)://.*)"); return std::regex_match(url, urlPattern); } // 工厂类实现 std::unique_ptr StreamMuxer::StreamMuxerFactory::createMuxer(MuxerType type) { if (type == MuxerType::STREAM_MUXER) { return std::make_unique(); } return nullptr; } bool StreamMuxer::StreamMuxerFactory::isTypeSupported(MuxerType type) const { return type == MuxerType::STREAM_MUXER; } std::vector StreamMuxer::StreamMuxerFactory::getSupportedTypes() const { return {MuxerType::STREAM_MUXER}; } std::unique_ptr StreamMuxer::StreamMuxerFactory::createRTMPMuxer(const std::string& url) { auto muxer = std::make_unique(); StreamMuxerParams params; params.protocol = StreamProtocol::RTMP; params.url = url; params.rtmpLive = true; if (muxer->initialize(params) == ErrorCode::SUCCESS) { return muxer; } return nullptr; } std::unique_ptr StreamMuxer::StreamMuxerFactory::createUDPMuxer(const std::string& address, int port) { auto muxer = std::make_unique(); StreamMuxerParams params; params.protocol = StreamProtocol::UDP; params.url = "udp://" + address + ":" + std::to_string(port); if (muxer->initialize(params) == ErrorCode::SUCCESS) { return muxer; } return nullptr; } std::unique_ptr StreamMuxer::StreamMuxerFactory::createTCPMuxer(const std::string& address, int port) { auto muxer = std::make_unique(); StreamMuxerParams params; params.protocol = StreamProtocol::TCP; params.url = "tcp://" + address + ":" + std::to_string(port); if (muxer->initialize(params) == ErrorCode::SUCCESS) { return muxer; } return nullptr; } std::unique_ptr StreamMuxer::StreamMuxerFactory::createSRTMuxer(const std::string& url) { auto muxer = std::make_unique(); StreamMuxerParams params; params.protocol = StreamProtocol::SRT; params.url = url; if (muxer->initialize(params) == ErrorCode::SUCCESS) { return muxer; } return nullptr; } } // namespace muxer } // namespace av