#include "muxer_abstract_muxer.h" #include "../base/logger.h" #include #include extern "C" { #include #include #include } namespace av { namespace muxer { AbstractMuxer::AbstractMuxer() { AV_LOGGER_DEBUG("创建抽象复用器"); stats_.startTime = std::chrono::steady_clock::now(); lastStatsUpdate_ = stats_.startTime; } AbstractMuxer::~AbstractMuxer() { close(); AV_LOGGER_DEBUG("抽象复用器已销毁"); } ErrorCode AbstractMuxer::pause() { if (getState() != MuxerState::STARTED) { AV_LOGGER_ERROR("复用器状态无效,无法暂停"); return ErrorCode::INVALID_STATE; } setState(MuxerState::PAUSED); AV_LOGGER_INFO("复用器已暂停"); return ErrorCode::SUCCESS; } ErrorCode AbstractMuxer::resume() { if (getState() != MuxerState::PAUSED) { AV_LOGGER_ERROR("复用器状态无效,无法恢复"); return ErrorCode::INVALID_STATE; } setState(MuxerState::STARTED); AV_LOGGER_INFO("复用器已恢复"); return ErrorCode::SUCCESS; } ErrorCode AbstractMuxer::reset() { ErrorCode result = stop(); if (result != ErrorCode::SUCCESS) { return result; } // 清理流信息 { std::lock_guard lock(streamsMutex_); streams_.clear(); streamMap_.clear(); } // 重置统计信息 resetStats(); setState(MuxerState::INITIALIZED); AV_LOGGER_INFO("复用器已重置"); return ErrorCode::SUCCESS; } ErrorCode AbstractMuxer::removeStream(int streamIndex) { std::lock_guard lock(streamsMutex_); auto it = std::find_if(streams_.begin(), streams_.end(), [streamIndex](const StreamInfo& info) { return info.index == streamIndex; }); if (it == streams_.end()) { AV_LOGGER_ERRORF("流索引 {} 不存在", streamIndex); return ErrorCode::STREAM_NOT_FOUND; } streams_.erase(it); streamMap_.erase(streamIndex); AV_LOGGER_INFOF("已移除流: 索引={}", streamIndex); return ErrorCode::SUCCESS; } std::vector AbstractMuxer::getStreams() const { std::lock_guard lock(streamsMutex_); return streams_; } StreamInfo AbstractMuxer::getStreamInfo(int streamIndex) const { std::lock_guard lock(streamsMutex_); auto it = std::find_if(streams_.begin(), streams_.end(), [streamIndex](const StreamInfo& info) { return info.index == streamIndex; }); if (it != streams_.end()) { return *it; } return StreamInfo(); // 返回空的流信息 } MuxerStats AbstractMuxer::getStats() const { std::lock_guard lock(statsMutex_); MuxerStats stats = stats_; // 计算持续时间 auto now = std::chrono::steady_clock::now(); stats.duration = std::chrono::duration(now - stats_.startTime).count(); return stats; } void AbstractMuxer::resetStats() { std::lock_guard lock(statsMutex_); stats_ = MuxerStats(); stats_.startTime = std::chrono::steady_clock::now(); lastStatsUpdate_ = stats_.startTime; AV_LOGGER_DEBUG("复用器统计信息已重置"); } std::string AbstractMuxer::getLastError() const { return lastError_; } std::vector AbstractMuxer::getSupportedFormats() { std::vector formats; const AVOutputFormat* format = nullptr; void* opaque = nullptr; while ((format = av_muxer_iterate(&opaque))) { if (format->name) { formats.push_back(format->name); } } std::sort(formats.begin(), formats.end()); return formats; } std::vector AbstractMuxer::getSupportedCodecs(const std::string& format) { std::vector codecs; const AVOutputFormat* outputFormat = av_guess_format(format.c_str(), nullptr, nullptr); if (!outputFormat) { AV_LOGGER_ERRORF("不支持的格式: {}", format); return codecs; } // 获取支持的视频编解码器 if (outputFormat->video_codec != AV_CODEC_ID_NONE) { const AVCodec* codec = avcodec_find_encoder(outputFormat->video_codec); if (codec && codec->name) { codecs.push_back(std::string("video:") + codec->name); } } // 获取支持的音频编解码器 if (outputFormat->audio_codec != AV_CODEC_ID_NONE) { const AVCodec* codec = avcodec_find_encoder(outputFormat->audio_codec); if (codec && codec->name) { codecs.push_back(std::string("audio:") + codec->name); } } // 遍历所有编解码器,查找支持该格式的 const AVCodec* codec = nullptr; void* opaque = nullptr; while ((codec = av_codec_iterate(&opaque))) { if (av_codec_is_encoder(codec)) { // 检查编解码器是否支持该格式 if (avformat_query_codec(outputFormat, codec->id, FF_COMPLIANCE_NORMAL) == 1) { std::string codecName = codec->name; std::string prefix = (codec->type == AVMEDIA_TYPE_VIDEO) ? "video:" : "audio:"; std::string fullName = prefix + codecName; if (std::find(codecs.begin(), codecs.end(), fullName) == codecs.end()) { codecs.push_back(fullName); } } } } std::sort(codecs.begin(), codecs.end()); return codecs; } bool AbstractMuxer::isFormatSupported(const std::string& format) { const AVOutputFormat* outputFormat = av_guess_format(format.c_str(), nullptr, nullptr); return outputFormat != nullptr; } std::string AbstractMuxer::getFormatFromExtension(const std::string& filename) { const AVOutputFormat* format = av_guess_format(nullptr, filename.c_str(), nullptr); if (format && format->name) { return format->name; } return ""; } std::string AbstractMuxer::getDefaultExtension(const std::string& format) { const AVOutputFormat* outputFormat = av_guess_format(format.c_str(), nullptr, nullptr); if (outputFormat && outputFormat->extensions) { std::string extensions = outputFormat->extensions; size_t commaPos = extensions.find(','); if (commaPos != std::string::npos) { return extensions.substr(0, commaPos); } return extensions; } return ""; } void AbstractMuxer::setState(MuxerState state) { MuxerState oldState = state_.exchange(state); if (oldState != state) { AV_LOGGER_DEBUGF("复用器状态变更: {} -> {}", static_cast(oldState), static_cast(state)); } } void AbstractMuxer::onError(ErrorCode code, const std::string& message) { lastError_ = message; setState(MuxerState::ERROR_STATE); { std::lock_guard lock(statsMutex_); stats_.errorCount++; } AV_LOGGER_ERRORF("复用器错误: {} (代码: {})", message, static_cast(code)); if (errorCallback_) { errorCallback_(code, message); } } void AbstractMuxer::updateStats(AVPacket* packet) { if (!packet) { return; } std::lock_guard lock(statsMutex_); stats_.totalPackets++; stats_.totalBytes += packet->size; stats_.lastPacketTime = std::chrono::steady_clock::now(); // 更新流统计 stats_.streamPackets[packet->stream_index]++; stats_.streamBytes[packet->stream_index] += packet->size; // 定期更新码率 auto now = std::chrono::steady_clock::now(); auto elapsed = std::chrono::duration(now - lastStatsUpdate_).count(); if (elapsed >= STATS_UPDATE_INTERVAL) { updateBitrate(); lastStatsUpdate_ = now; // 触发统计回调 if (statsCallback_) { MuxerStats currentStats = stats_; currentStats.duration = std::chrono::duration(now - stats_.startTime).count(); statsCallback_(currentStats); } } } void AbstractMuxer::updateBitrate() { auto now = std::chrono::steady_clock::now(); double totalDuration = std::chrono::duration(now - stats_.startTime).count(); if (totalDuration > 0) { stats_.averageBitrate = (stats_.totalBytes * 8.0) / totalDuration; // 计算当前码率(基于最近的数据) double recentDuration = std::chrono::duration(now - lastStatsUpdate_).count(); if (recentDuration > 0) { static uint64_t lastTotalBytes = 0; uint64_t recentBytes = stats_.totalBytes - lastTotalBytes; stats_.currentBitrate = (recentBytes * 8.0) / recentDuration; lastTotalBytes = stats_.totalBytes; } } } bool AbstractMuxer::validateParams(const MuxerParams& params) { if (params.outputPath.empty()) { AV_LOGGER_ERROR("输出路径不能为空"); return false; } if (params.format.empty()) { AV_LOGGER_ERROR("输出格式不能为空"); return false; } if (!isFormatSupported(params.format)) { AV_LOGGER_ERRORF("不支持的输出格式: {}", params.format); return false; } if (params.streams.empty()) { AV_LOGGER_ERROR("至少需要一个流"); return false; } // 验证流参数 for (const auto& stream : params.streams) { if (stream.codecId == AV_CODEC_ID_NONE) { AV_LOGGER_ERRORF("流 {} 的编解码器ID无效", stream.index); return false; } if (stream.type == StreamType::VIDEO) { if (stream.width <= 0 || stream.height <= 0) { AV_LOGGER_ERRORF("视频流 {} 的分辨率无效: {}x{}", stream.index, stream.width, stream.height); return false; } if (stream.frameRate.num <= 0 || stream.frameRate.den <= 0) { AV_LOGGER_ERRORF("视频流 {} 的帧率无效: {}/{}", stream.index, stream.frameRate.num, stream.frameRate.den); return false; } } else if (stream.type == StreamType::AUDIO) { if (stream.sampleRate <= 0) { AV_LOGGER_ERRORF("音频流 {} 的采样率无效: {}", stream.index, stream.sampleRate); return false; } if (stream.channels <= 0) { AV_LOGGER_ERRORF("音频流 {} 的声道数无效: {}", stream.index, stream.channels); return false; } } } if (params.bufferSize <= 0) { AV_LOGGER_ERROR("缓冲区大小必须大于0"); return false; } if (params.maxDelay < 0) { AV_LOGGER_ERROR("最大延迟不能为负数"); return false; } return true; } std::string MuxerFactory::getTypeName(MuxerType type) const { switch (type) { case MuxerType::FILE_MUXER: return "文件复用器"; case MuxerType::STREAM_MUXER: return "流复用器"; case MuxerType::RTMP_MUXER: return "RTMP推流"; case MuxerType::UDP_MUXER: return "UDP推流"; case MuxerType::TCP_MUXER: return "TCP推流"; case MuxerType::HLS_MUXER: return "HLS分片"; case MuxerType::DASH_MUXER: return "DASH分片"; default: return "未知类型"; } } // 为纯虚函数提供默认实现 ErrorCode AbstractMuxer::initialize(const MuxerParams& params) { if (!validateParams(params)) { return ErrorCode::INVALID_PARAMS; } // 添加参数中的流到streams_容器 { std::lock_guard lock(streamsMutex_); streams_.clear(); // 清空现有流 for (const auto& stream : params.streams) { streams_.push_back(stream); AV_LOGGER_INFOF("已添加流: 索引={}, 类型={}", stream.index, static_cast(stream.type)); } } setState(MuxerState::INITIALIZED); AV_LOGGER_INFO("抽象复用器初始化完成"); return ErrorCode::SUCCESS; } ErrorCode AbstractMuxer::start() { if (getState() != MuxerState::INITIALIZED) { AV_LOGGER_ERROR("复用器状态无效,无法启动"); return ErrorCode::INVALID_STATE; } setState(MuxerState::STARTED); AV_LOGGER_INFO("抽象复用器已启动"); return ErrorCode::SUCCESS; } ErrorCode AbstractMuxer::stop() { if (getState() != MuxerState::STARTED && getState() != MuxerState::PAUSED) { return ErrorCode::SUCCESS; } setState(MuxerState::STOPPED); AV_LOGGER_INFO("抽象复用器已停止"); return ErrorCode::SUCCESS; } ErrorCode AbstractMuxer::close() { stop(); setState(MuxerState::IDLE); AV_LOGGER_INFO("抽象复用器已关闭"); return ErrorCode::SUCCESS; } ErrorCode AbstractMuxer::writePacket(AVPacket* packet) { if (!packet) { return ErrorCode::INVALID_PARAMS; } updateStats(packet); return ErrorCode::SUCCESS; } ErrorCode AbstractMuxer::writeFrame(AVFrame* frame, int streamIndex) { if (!frame) { return ErrorCode::INVALID_PARAMS; } return ErrorCode::SUCCESS; } ErrorCode AbstractMuxer::flush() { return ErrorCode::SUCCESS; } ErrorCode AbstractMuxer::addStream(const StreamInfo& streamInfo) { std::lock_guard lock(streamsMutex_); // 检查流索引是否已存在 auto it = std::find_if(streams_.begin(), streams_.end(), [&streamInfo](const StreamInfo& info) { return info.index == streamInfo.index; }); if (it != streams_.end()) { AV_LOGGER_ERRORF("流索引 {} 已存在", streamInfo.index); return ErrorCode::STREAM_EXISTS; } streams_.push_back(streamInfo); AV_LOGGER_INFOF("已添加流: 索引={}, 类型={}", streamInfo.index, static_cast(streamInfo.type)); return ErrorCode::SUCCESS; } ErrorCode AbstractMuxer::setupOutput() { return ErrorCode::SUCCESS; } ErrorCode AbstractMuxer::writeHeader() { return ErrorCode::SUCCESS; } ErrorCode AbstractMuxer::writeTrailer() { return ErrorCode::SUCCESS; } // 默认工厂实现 class DefaultMuxerFactory : public MuxerFactory { public: std::unique_ptr createMuxer(MuxerType type) override { switch (type) { case MuxerType::FILE_MUXER: // 这里应该返回FileMuxer的实例,但为了避免循环依赖,暂时返回nullptr return nullptr; case MuxerType::STREAM_MUXER: // 这里应该返回StreamMuxer的实例,但为了避免循环依赖,暂时返回nullptr return nullptr; default: return nullptr; } } bool isTypeSupported(MuxerType type) const override { return type == MuxerType::FILE_MUXER || type == MuxerType::STREAM_MUXER; } std::vector getSupportedTypes() const override { return {MuxerType::FILE_MUXER, MuxerType::STREAM_MUXER}; } }; std::unique_ptr MuxerFactory::createFactory() { return std::make_unique(); } } // namespace muxer } // namespace av