| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257 |
- #ifndef AV_MUXER_STREAM_MUXER_H
- #define AV_MUXER_STREAM_MUXER_H
- #include "muxer_abstract_muxer.h"
- #include <string>
- #include <memory>
- #include <thread>
- #include <queue>
- #include <condition_variable>
- #include <atomic>
- #include <chrono>
- extern "C" {
- #include <libavformat/avformat.h>
- #include <libavcodec/avcodec.h>
- #include <libavutil/avutil.h>
- }
- namespace av {
- namespace muxer {
- // 流协议类型
- enum class StreamProtocol {
- RTMP = 0, // RTMP协议
- RTMPS, // RTMPS协议
- UDP, // UDP协议
- TCP, // TCP协议
- HTTP, // HTTP协议
- HTTPS, // HTTPS协议
- SRT, // SRT协议
- RIST // RIST协议
- };
- // 连接状态
- enum class ConnectionState {
- DISCONNECTED = 0, // 未连接
- CONNECTING, // 连接中
- CONNECTED, // 已连接
- RECONNECTING, // 重连中
- FAILED // 连接失败
- };
- // 流复用器参数
- struct StreamMuxerParams : public MuxerParams {
- StreamProtocol protocol = StreamProtocol::RTMP;
- std::string url; // 推流URL
-
- // 连接参数
- int connectTimeout = 5000; // 连接超时(毫秒)
- int sendTimeout = 5000; // 发送超时(毫秒)
- int receiveTimeout = 5000; // 接收超时(毫秒)
-
- // 重连参数
- bool enableAutoReconnect = true; // 启用自动重连
- int maxReconnectAttempts = 5; // 最大重连次数
- int reconnectDelay = 2000; // 重连延迟(毫秒)
- double reconnectBackoffFactor = 1.5; // 重连退避因子
-
- // 缓冲参数
- int sendBufferSize = 1024 * 1024; // 发送缓冲区大小
- int receiveBufferSize = 1024 * 1024; // 接收缓冲区大小
-
- // 质量控制
- bool enableAdaptiveBitrate = false; // 启用自适应码率
- int minBitrate = 500000; // 最小码率(bps)
- int maxBitrate = 5000000; // 最大码率(bps)
- double bitrateAdjustFactor = 0.8; // 码率调整因子
-
- // 协议特定参数
- std::map<std::string, std::string> protocolOptions; // 协议选项
-
- // RTMP特定参数
- std::string rtmpApp; // RTMP应用名
- std::string rtmpStreamKey; // RTMP流密钥
- bool rtmpLive = true; // RTMP直播模式
-
- // UDP/TCP特定参数
- std::string localAddress; // 本地地址
- int localPort = 0; // 本地端口
- bool reuseAddress = true; // 重用地址
-
- // SRT特定参数
- int srtLatency = 120; // SRT延迟(毫秒)
- std::string srtPassphrase; // SRT密码
-
- StreamMuxerParams() : MuxerParams(MuxerType::STREAM_MUXER) {}
- };
- // 连接统计信息
- struct ConnectionStats {
- ConnectionState state = ConnectionState::DISCONNECTED;
- std::chrono::steady_clock::time_point connectTime;
- std::chrono::steady_clock::time_point lastDataTime;
-
- // 网络统计
- uint64_t bytesSent = 0; // 已发送字节数
- uint64_t bytesReceived = 0; // 已接收字节数
- uint64_t packetsSent = 0; // 已发送包数
- uint64_t packetsLost = 0; // 丢失包数
-
- // 性能统计
- double currentBandwidth = 0.0; // 当前带宽(bps)
- double averageBandwidth = 0.0; // 平均带宽(bps)
- double rtt = 0.0; // 往返时间(毫秒)
- double jitter = 0.0; // 抖动(毫秒)
-
- // 重连统计
- int reconnectCount = 0; // 重连次数
- std::chrono::steady_clock::time_point lastReconnectTime;
- };
- // 流复用器类
- class StreamMuxer : public AbstractMuxer {
- public:
- StreamMuxer();
- ~StreamMuxer() override;
-
- // 基础接口实现
- ErrorCode initialize(const MuxerParams& params) override;
- ErrorCode start() override;
- ErrorCode stop() override;
- ErrorCode close() override;
-
- // 写入接口实现
- ErrorCode writePacket(AVPacket* packet) override;
- ErrorCode writeFrame(AVFrame* frame, int streamIndex) override;
- ErrorCode flush() override;
-
- // 流管理实现
- ErrorCode addStream(const StreamInfo& streamInfo) override;
-
- // 连接管理
- ErrorCode connect();
- ErrorCode disconnect();
- ErrorCode reconnect();
-
- // 状态查询
- ConnectionState getConnectionState() const;
- ConnectionStats getConnectionStats() const;
- bool isConnected() const;
-
- // 流特定接口
- ErrorCode setUrl(const std::string& url);
- std::string getUrl() const;
-
- // 自适应码率控制
- ErrorCode enableAdaptiveBitrate(bool enable);
- ErrorCode setBitrateRange(int minBitrate, int maxBitrate);
- ErrorCode adjustBitrate(double factor);
-
- // 协议选项
- ErrorCode setProtocolOption(const std::string& key, const std::string& value);
- std::string getProtocolOption(const std::string& key) const;
-
- // 回调设置
- using ConnectionCallback = std::function<void(ConnectionState)>;
- using BitrateCallback = std::function<void(int)>; // 新码率
-
- void setConnectionCallback(ConnectionCallback callback) { connectionCallback_ = callback; }
- void setBitrateCallback(BitrateCallback callback) { bitrateCallback_ = callback; }
-
- // 工厂类
- class StreamMuxerFactory : public MuxerFactory {
- public:
- std::unique_ptr<AbstractMuxer> createMuxer(MuxerType type) override;
- bool isTypeSupported(MuxerType type) const override;
- std::vector<MuxerType> getSupportedTypes() const override;
-
- // 流复用器特定方法
- static std::unique_ptr<StreamMuxer> createRTMPMuxer(const std::string& url);
- static std::unique_ptr<StreamMuxer> createUDPMuxer(const std::string& address, int port);
- static std::unique_ptr<StreamMuxer> createTCPMuxer(const std::string& address, int port);
- static std::unique_ptr<StreamMuxer> createSRTMuxer(const std::string& url);
- };
-
- protected:
- // 内部实现方法
- ErrorCode setupOutput() override;
- ErrorCode writeHeader() override;
- ErrorCode writeTrailer() override;
-
- // 连接管理
- ErrorCode establishConnection();
- ErrorCode closeConnection();
-
- // 协议特定设置
- ErrorCode setupRTMP();
- ErrorCode setupUDP();
- ErrorCode setupTCP();
- ErrorCode setupSRT();
-
- // 包处理
- ErrorCode processPacket(AVPacket* packet);
- ErrorCode sendPacket(AVPacket* packet);
-
- // 自适应码率
- void monitorNetworkCondition();
- void adjustBitrateBasedOnCondition();
-
- // 重连逻辑
- void reconnectThreadFunc();
- bool shouldReconnect() const;
- int calculateReconnectDelay(int attempt) const;
-
- // 统计更新
- void updateConnectionStats();
- void updateBandwidthStats(size_t bytes);
-
- // 状态管理
- void setConnectionState(ConnectionState state);
- void onConnectionStateChanged(ConnectionState state);
-
- // 参数验证
- bool validateParams(const MuxerParams& params) override;
- bool validateUrl(const std::string& url) const;
-
- private:
- StreamMuxerParams streamParams_;
-
- // 连接状态
- std::atomic<ConnectionState> connectionState_{ConnectionState::DISCONNECTED};
- ConnectionStats connectionStats_;
- mutable std::mutex connectionMutex_;
-
- // 重连控制
- std::thread reconnectThread_;
- std::atomic<bool> shouldStopReconnect_{false};
- std::atomic<bool> reconnectRequested_{false};
- std::condition_variable reconnectCondition_;
- std::mutex reconnectMutex_;
-
- // 自适应码率
- std::atomic<bool> adaptiveBitrateEnabled_{false};
- std::atomic<int> currentBitrate_{0};
- std::thread bitrateMonitorThread_;
- std::atomic<bool> shouldStopBitrateMonitor_{false};
-
- // 网络监控
- std::chrono::steady_clock::time_point lastBandwidthUpdate_;
- uint64_t lastBytesSent_ = 0;
- double bandwidthSamples_[10] = {0}; // 带宽采样窗口
- int bandwidthSampleIndex_ = 0;
-
- // 回调函数
- ConnectionCallback connectionCallback_;
- BitrateCallback bitrateCallback_;
-
- // 性能监控
- std::chrono::steady_clock::time_point lastStatsUpdate_;
- static constexpr double STATS_UPDATE_INTERVAL = 1.0; // 统计更新间隔(秒)
- static constexpr double BANDWIDTH_UPDATE_INTERVAL = 0.5; // 带宽更新间隔(秒)
- };
- } // namespace muxer
- } // namespace av
- #endif // AV_MUXER_STREAM_MUXER_H
|