#ifndef AV_MUXER_STREAM_MUXER_H #define AV_MUXER_STREAM_MUXER_H #include "muxer_abstract_muxer.h" #include #include #include #include #include #include #include extern "C" { #include #include #include } namespace av { namespace muxer { // 流协议类型 enum class StreamProtocol { RTMP = 0, // RTMP协议 RTMPS, // RTMPS协议 UDP, // UDP协议 TCP, // TCP协议 HTTP, // HTTP协议 HTTPS, // HTTPS协议 SRT, // SRT协议 RIST // RIST协议 }; // 连接状态 enum class ConnectionState { DISCONNECTED = 0, // 未连接 CONNECTING, // 连接中 CONNECTED, // 已连接 RECONNECTING, // 重连中 FAILED // 连接失败 }; // 流复用器参数 struct StreamMuxerParams : public MuxerParams { StreamProtocol protocol = StreamProtocol::RTMP; std::string url; // 推流URL // 连接参数 int connectTimeout = 5000; // 连接超时(毫秒) int sendTimeout = 5000; // 发送超时(毫秒) int receiveTimeout = 5000; // 接收超时(毫秒) // 重连参数 bool enableAutoReconnect = true; // 启用自动重连 int maxReconnectAttempts = 5; // 最大重连次数 int reconnectDelay = 2000; // 重连延迟(毫秒) double reconnectBackoffFactor = 1.5; // 重连退避因子 // 缓冲参数 int sendBufferSize = 1024 * 1024; // 发送缓冲区大小 int receiveBufferSize = 1024 * 1024; // 接收缓冲区大小 // 质量控制 bool enableAdaptiveBitrate = false; // 启用自适应码率 int minBitrate = 500000; // 最小码率(bps) int maxBitrate = 5000000; // 最大码率(bps) double bitrateAdjustFactor = 0.8; // 码率调整因子 // 协议特定参数 std::map protocolOptions; // 协议选项 // RTMP特定参数 std::string rtmpApp; // RTMP应用名 std::string rtmpStreamKey; // RTMP流密钥 bool rtmpLive = true; // RTMP直播模式 // UDP/TCP特定参数 std::string localAddress; // 本地地址 int localPort = 0; // 本地端口 bool reuseAddress = true; // 重用地址 // SRT特定参数 int srtLatency = 120; // SRT延迟(毫秒) std::string srtPassphrase; // SRT密码 StreamMuxerParams() : MuxerParams(MuxerType::STREAM_MUXER) {} }; // 连接统计信息 struct ConnectionStats { ConnectionState state = ConnectionState::DISCONNECTED; std::chrono::steady_clock::time_point connectTime; std::chrono::steady_clock::time_point lastDataTime; // 网络统计 uint64_t bytesSent = 0; // 已发送字节数 uint64_t bytesReceived = 0; // 已接收字节数 uint64_t packetsSent = 0; // 已发送包数 uint64_t packetsLost = 0; // 丢失包数 // 性能统计 double currentBandwidth = 0.0; // 当前带宽(bps) double averageBandwidth = 0.0; // 平均带宽(bps) double rtt = 0.0; // 往返时间(毫秒) double jitter = 0.0; // 抖动(毫秒) // 重连统计 int reconnectCount = 0; // 重连次数 std::chrono::steady_clock::time_point lastReconnectTime; }; // 流复用器类 class StreamMuxer : public AbstractMuxer { public: StreamMuxer(); ~StreamMuxer() override; // 基础接口实现 ErrorCode initialize(const MuxerParams& params) override; ErrorCode start() override; ErrorCode stop() override; ErrorCode close() override; // 写入接口实现 ErrorCode writePacket(AVPacket* packet) override; ErrorCode writeFrame(AVFrame* frame, int streamIndex) override; ErrorCode flush() override; // 流管理实现 ErrorCode addStream(const StreamInfo& streamInfo) override; // 连接管理 ErrorCode connect(); ErrorCode disconnect(); ErrorCode reconnect(); // 状态查询 ConnectionState getConnectionState() const; ConnectionStats getConnectionStats() const; bool isConnected() const; // 流特定接口 ErrorCode setUrl(const std::string& url); std::string getUrl() const; // 自适应码率控制 ErrorCode enableAdaptiveBitrate(bool enable); ErrorCode setBitrateRange(int minBitrate, int maxBitrate); ErrorCode adjustBitrate(double factor); // 协议选项 ErrorCode setProtocolOption(const std::string& key, const std::string& value); std::string getProtocolOption(const std::string& key) const; // 回调设置 using ConnectionCallback = std::function; using BitrateCallback = std::function; // 新码率 void setConnectionCallback(ConnectionCallback callback) { connectionCallback_ = callback; } void setBitrateCallback(BitrateCallback callback) { bitrateCallback_ = callback; } // 工厂类 class StreamMuxerFactory : public MuxerFactory { public: std::unique_ptr createMuxer(MuxerType type) override; bool isTypeSupported(MuxerType type) const override; std::vector getSupportedTypes() const override; // 流复用器特定方法 static std::unique_ptr createRTMPMuxer(const std::string& url); static std::unique_ptr createUDPMuxer(const std::string& address, int port); static std::unique_ptr createTCPMuxer(const std::string& address, int port); static std::unique_ptr createSRTMuxer(const std::string& url); }; protected: // 内部实现方法 ErrorCode setupOutput() override; ErrorCode writeHeader() override; ErrorCode writeTrailer() override; // 连接管理 ErrorCode establishConnection(); ErrorCode closeConnection(); // 协议特定设置 ErrorCode setupRTMP(); ErrorCode setupUDP(); ErrorCode setupTCP(); ErrorCode setupSRT(); // 包处理 ErrorCode processPacket(AVPacket* packet); ErrorCode sendPacket(AVPacket* packet); // 自适应码率 void monitorNetworkCondition(); void adjustBitrateBasedOnCondition(); // 重连逻辑 void reconnectThreadFunc(); bool shouldReconnect() const; int calculateReconnectDelay(int attempt) const; // 统计更新 void updateConnectionStats(); void updateBandwidthStats(size_t bytes); // 状态管理 void setConnectionState(ConnectionState state); void onConnectionStateChanged(ConnectionState state); // 参数验证 bool validateParams(const MuxerParams& params) override; bool validateUrl(const std::string& url) const; private: StreamMuxerParams streamParams_; // 连接状态 std::atomic connectionState_{ConnectionState::DISCONNECTED}; ConnectionStats connectionStats_; mutable std::mutex connectionMutex_; // 重连控制 std::thread reconnectThread_; std::atomic shouldStopReconnect_{false}; std::atomic reconnectRequested_{false}; std::condition_variable reconnectCondition_; std::mutex reconnectMutex_; // 自适应码率 std::atomic adaptiveBitrateEnabled_{false}; std::atomic currentBitrate_{0}; std::thread bitrateMonitorThread_; std::atomic shouldStopBitrateMonitor_{false}; // 网络监控 std::chrono::steady_clock::time_point lastBandwidthUpdate_; uint64_t lastBytesSent_ = 0; double bandwidthSamples_[10] = {0}; // 带宽采样窗口 int bandwidthSampleIndex_ = 0; // 回调函数 ConnectionCallback connectionCallback_; BitrateCallback bitrateCallback_; // 性能监控 std::chrono::steady_clock::time_point lastStatsUpdate_; static constexpr double STATS_UPDATE_INTERVAL = 1.0; // 统计更新间隔(秒) static constexpr double BANDWIDTH_UPDATE_INTERVAL = 0.5; // 带宽更新间隔(秒) }; } // namespace muxer } // namespace av #endif // AV_MUXER_STREAM_MUXER_H