#include "muxer_file_muxer.h" #include "../base/logger.h" #include "../base/media_common.h" #include #include #include #include extern "C" { #include #include #include #include } namespace av { namespace muxer { FileMuxer::FileMuxer() { AV_LOGGER_DEBUG("创建文件复用器"); lastFlushTime_ = std::chrono::steady_clock::now(); } FileMuxer::~FileMuxer() { close(); AV_LOGGER_DEBUG("文件复用器已销毁"); } ErrorCode FileMuxer::initialize(const MuxerParams& params) { if (params.type != MuxerType::FILE_MUXER) { AV_LOGGER_ERROR("参数类型不是文件复用器"); return ErrorCode::INVALID_ARGUMENT; } fileMuxerParams_ = static_cast(params); if (!validateParams(fileMuxerParams_)) { return ErrorCode::INVALID_PARAMS; } // 设置输出文件 if (!fileMuxerParams_.outputFile.empty()) { currentOutputFile_ = fileMuxerParams_.outputFile; } else { currentOutputFile_ = fileMuxerParams_.outputPath; } ErrorCode result = validateFilePath(currentOutputFile_); if (result != ErrorCode::OK) { return result; } // 设置分段 if (fileMuxerParams_.enableSegmentation) { segmentationEnabled_ = true; if (fileMuxerParams_.segmentPattern.empty()) { // 生成默认分段模式 std::filesystem::path path(currentOutputFile_); std::string stem = path.stem().string(); std::string ext = path.extension().string(); fileMuxerParams_.segmentPattern = stem + "_%03d" + ext; } } setState(MuxerState::INITIALIZED); AV_LOGGER_INFOF("文件复用器初始化成功: {}", currentOutputFile_); return ErrorCode::OK; } ErrorCode FileMuxer::start() { if (getState() != MuxerState::INITIALIZED) { AV_LOGGER_ERROR("复用器状态无效,无法启动"); return ErrorCode::INVALID_STATE; } ErrorCode result = setupOutput(); if (result != ErrorCode::OK) { return result; } result = writeHeader(); if (result != ErrorCode::OK) { return result; } // 启动写入线程 shouldStopWriting_ = false; try { writeThread_ = std::thread(&FileMuxer::writeThreadFunc, this); setState(MuxerState::STARTED); fileStartTime_ = std::chrono::steady_clock::now(); AV_LOGGER_INFO("文件复用器已启动"); return ErrorCode::OK; } catch (const std::exception& e) { AV_LOGGER_ERRORF("启动写入线程失败: {}", e.what()); return ErrorCode::THREAD_ERROR; } } ErrorCode FileMuxer::stop() { if (getState() != MuxerState::STARTED && getState() != MuxerState::PAUSED) { return ErrorCode::OK; } // 停止写入线程 shouldStopWriting_ = true; queueCondition_.notify_all(); if (writeThread_.joinable()) { writeThread_.join(); } // 刷新剩余数据 flush(); // 写入文件尾 ErrorCode result = writeTrailer(); if (result != ErrorCode::OK) { AV_LOGGER_ERRORF("写入文件尾失败: {}", static_cast(result)); } setState(MuxerState::STOPPED); AV_LOGGER_INFO("文件复用器已停止"); return ErrorCode::OK; } ErrorCode FileMuxer::close() { stop(); ErrorCode result = closeOutputFile(); // 清空队列 { std::lock_guard lock(queueMutex_); while (!packetQueue_.empty()) { packetQueue_.pop(); } } setState(MuxerState::IDLE); AV_LOGGER_INFO("文件复用器已关闭"); return result; } ErrorCode FileMuxer::writePacket(AVPacket* packet) { if (!packet) { AV_LOGGER_ERROR("包指针为空"); return ErrorCode::INVALID_ARGUMENT; } if (getState() != MuxerState::STARTED) { AV_LOGGER_ERROR("复用器未启动,无法写入包"); return ErrorCode::INVALID_STATE; } // 检查队列大小 { std::lock_guard lock(queueMutex_); if (packetQueue_.size() >= MAX_QUEUE_SIZE) { AV_LOGGER_WARNING("包队列已满,丢弃包"); { std::lock_guard statsLock(statsMutex_); stats_.droppedPackets++; } return ErrorCode::QUEUE_FULL; } } // 复制包 AVPacket* packetCopy = av_packet_alloc(); if (!packetCopy) { AV_LOGGER_ERROR("分配包内存失败"); return ErrorCode::OUT_OF_MEMORY; } int ret = av_packet_ref(packetCopy, packet); if (ret < 0) { av_packet_free(&packetCopy); AV_LOGGER_ERRORF("复制包失败: {}", ffmpeg_utils::errorToString(ret)); return static_cast(ret); } // 添加到队列 auto queueItem = std::make_unique(packetCopy, packet->stream_index); { std::lock_guard lock(queueMutex_); packetQueue_.push(std::move(queueItem)); queuedPackets_++; } queueCondition_.notify_one(); return ErrorCode::OK; } ErrorCode FileMuxer::writeFrame(AVFrame* frame, int streamIndex) { if (!frame) { AV_LOGGER_ERROR("帧指针为空"); return ErrorCode::INVALID_ARGUMENT; } // 这里需要编码器将帧编码为包,然后调用writePacket // 由于这是复用器,通常接收已编码的包 AV_LOGGER_WARNING("文件复用器不直接支持写入原始帧,请先编码为包"); return ErrorCode::NOT_SUPPORTED; } ErrorCode FileMuxer::flush() { if (!formatCtx_) { return ErrorCode::OK; } std::lock_guard lock(fileMutex_); int ret = av_write_frame(formatCtx_, nullptr); // 刷新 if (ret < 0) { AV_LOGGER_ERRORF("刷新复用器失败: {}", ffmpeg_utils::errorToString(ret)); return static_cast(ret); } if (formatCtx_->pb) { avio_flush(formatCtx_->pb); } lastFlushTime_ = std::chrono::steady_clock::now(); AV_LOGGER_DEBUG("复用器已刷新"); return ErrorCode::OK; } ErrorCode FileMuxer::addStream(const StreamInfo& streamInfo) { if (getState() != MuxerState::INITIALIZED) { AV_LOGGER_ERROR("只能在初始化状态下添加流"); return ErrorCode::INVALID_STATE; } std::lock_guard lock(streamsMutex_); // 检查流索引是否已存在 auto it = std::find_if(streams_.begin(), streams_.end(), [&streamInfo](const StreamInfo& info) { return info.index == streamInfo.index; }); if (it != streams_.end()) { AV_LOGGER_ERRORF("流索引 {} 已存在", streamInfo.index); return ErrorCode::STREAM_EXISTS; } streams_.push_back(streamInfo); AV_LOGGER_INFOF("已添加流: 索引={}, 类型={}, 编解码器={}", streamInfo.index, static_cast(streamInfo.type), streamInfo.codecName); return ErrorCode::OK; } ErrorCode FileMuxer::setOutputFile(const std::string& filename) { if (getState() != MuxerState::IDLE && getState() != MuxerState::INITIALIZED) { AV_LOGGER_ERROR("无法在运行时更改输出文件"); return ErrorCode::INVALID_STATE; } ErrorCode result = validateFilePath(filename); if (result != ErrorCode::OK) { return result; } currentOutputFile_ = filename; fileMuxerParams_.outputFile = filename; AV_LOGGER_INFOF("输出文件已设置为: {}", filename); return ErrorCode::OK; } std::string FileMuxer::getOutputFile() const { return currentOutputFile_; } int64_t FileMuxer::getCurrentFileSize() const { return currentFileSize_; } double FileMuxer::getCurrentDuration() const { auto now = std::chrono::steady_clock::now(); return std::chrono::duration(now - fileStartTime_).count(); } ErrorCode FileMuxer::enableSegmentation(bool enable, int duration) { if (getState() == MuxerState::STARTED) { AV_LOGGER_ERROR("无法在运行时更改分段设置"); return ErrorCode::INVALID_STATE; } segmentationEnabled_ = enable; fileMuxerParams_.enableSegmentation = enable; fileMuxerParams_.segmentDuration = duration; AV_LOGGER_INFOF("分段已{}: 时长={}秒", enable ? "启用" : "禁用", duration); return ErrorCode::OK; } ErrorCode FileMuxer::forceNewSegment() { if (!segmentationEnabled_) { AV_LOGGER_ERROR("分段功能未启用"); return ErrorCode::INVALID_STATE; } if (getState() != MuxerState::STARTED) { AV_LOGGER_ERROR("复用器未启动"); return ErrorCode::INVALID_STATE; } return createNewSegment(); } std::vector FileMuxer::getSegmentFiles() const { return segmentFiles_; } ErrorCode FileMuxer::setFastStart(bool enable) { fileMuxerParams_.enableFastStart = enable; if (enable) { fileMuxerParams_.movFlags |= 0x01; // AVFMT_FLAG_FASTSTART } else { fileMuxerParams_.movFlags &= ~0x01; } AV_LOGGER_INFOF("快速开始已{}", enable ? "启用" : "禁用"); return ErrorCode::OK; } ErrorCode FileMuxer::setMovFlags(int flags) { fileMuxerParams_.movFlags = flags; AV_LOGGER_INFOF("MOV标志已设置为: 0x{:X}", flags); return ErrorCode::OK; } ErrorCode FileMuxer::setupOutput() { // 分配格式上下文 const char* formatName = fileMuxerParams_.format.empty() ? nullptr : fileMuxerParams_.format.c_str(); const char* filename = currentOutputFile_.c_str(); int ret = avformat_alloc_output_context2(&formatCtx_, nullptr, formatName, filename); if (ret < 0) { AV_LOGGER_ERRORF("分配输出格式上下文失败: {}", ffmpeg_utils::errorToString(ret)); return static_cast(ret); } if (!formatCtx_->oformat) { AV_LOGGER_ERROR("无法确定输出格式"); return ErrorCode::FORMAT_NOT_SUPPORTED; } AV_LOGGER_INFOF("输出格式: {} ({})", formatCtx_->oformat->name, formatCtx_->oformat->long_name); // 设置流 ErrorCode result = setupStreams(); if (result != ErrorCode::OK) { return result; } // 设置元数据 for (const auto& meta : fileMuxerParams_.metadata) { av_dict_set(&formatCtx_->metadata, meta.first.c_str(), meta.second.c_str(), 0); } // 打开输出文件 return openOutputFile(); } ErrorCode FileMuxer::writeHeader() { if (!formatCtx_) { AV_LOGGER_ERROR("格式上下文未初始化"); return ErrorCode::INVALID_STATE; } // 设置格式选项 AVDictionary* options = nullptr; for (const auto& option : fileMuxerParams_.options) { av_dict_set(&options, option.first.c_str(), option.second.c_str(), 0); } // 设置MOV标志 if (fileMuxerParams_.movFlags != 0) { av_dict_set_int(&options, "movflags", fileMuxerParams_.movFlags, 0); } // 写入文件头 int ret = avformat_write_header(formatCtx_, &options); av_dict_free(&options); if (ret < 0) { AV_LOGGER_ERRORF("写入文件头失败: {}", ffmpeg_utils::errorToString(ret)); return static_cast(ret); } AV_LOGGER_INFO("文件头已写入"); return ErrorCode::OK; } ErrorCode FileMuxer::writeTrailer() { if (!formatCtx_) { return ErrorCode::OK; } std::lock_guard lock(fileMutex_); int ret = av_write_trailer(formatCtx_); if (ret < 0) { AV_LOGGER_ERRORF("写入文件尾失败: {}", ffmpeg_utils::errorToString(ret)); return static_cast(ret); } AV_LOGGER_INFO("文件尾已写入"); return ErrorCode::OK; } ErrorCode FileMuxer::openOutputFile() { if (!formatCtx_) { return ErrorCode::INVALID_STATE; } // 检查是否需要覆盖文件 if (!fileMuxerParams_.overwrite && std::filesystem::exists(currentOutputFile_)) { AV_LOGGER_ERRORF("文件已存在且不允许覆盖: {}", currentOutputFile_); return ErrorCode::FILE_EXISTS; } // 打开输出文件 if (!(formatCtx_->oformat->flags & AVFMT_NOFILE)) { int ret = avio_open(&formatCtx_->pb, currentOutputFile_.c_str(), AVIO_FLAG_WRITE); if (ret < 0) { AV_LOGGER_ERRORF("打开输出文件失败: {} ({})", ffmpeg_utils::errorToString(ret), currentOutputFile_); return static_cast(ret); } } currentFileSize_ = 0; AV_LOGGER_INFOF("输出文件已打开: {}", currentOutputFile_); return ErrorCode::OK; } ErrorCode FileMuxer::closeOutputFile() { if (!formatCtx_) { return ErrorCode::OK; } std::lock_guard lock(fileMutex_); // 关闭文件 if (formatCtx_->pb && !(formatCtx_->oformat->flags & AVFMT_NOFILE)) { avio_closep(&formatCtx_->pb); } // 释放格式上下文 avformat_free_context(formatCtx_); formatCtx_ = nullptr; // 清理流映射 streamMap_.clear(); AV_LOGGER_INFO("输出文件已关闭"); return ErrorCode::OK; } ErrorCode FileMuxer::createNewSegment() { if (!segmentationEnabled_) { return ErrorCode::OK; } // 写入当前段的尾部 ErrorCode result = writeTrailer(); if (result != ErrorCode::OK) { return result; } // 关闭当前文件 result = closeOutputFile(); if (result != ErrorCode::OK) { return result; } // 添加到段文件列表 segmentFiles_.push_back(currentOutputFile_); // 生成新的段文件名 currentSegmentIndex_++; currentOutputFile_ = generateSegmentFilename(currentSegmentIndex_); // 重新设置输出 result = setupOutput(); if (result != ErrorCode::OK) { return result; } // 写入新段的头部 result = writeHeader(); if (result != ErrorCode::OK) { return result; } fileStartTime_ = std::chrono::steady_clock::now(); AV_LOGGER_INFOF("创建新段: {}", currentOutputFile_); return ErrorCode::OK; } ErrorCode FileMuxer::setupStreams() { std::lock_guard lock(streamsMutex_); for (const auto& streamInfo : streams_) { AVStream* stream = createAVStream(streamInfo); if (!stream) { AV_LOGGER_ERRORF("创建流失败: 索引={}", streamInfo.index); return ErrorCode::STREAM_CREATE_FAILED; } streamMap_[streamInfo.index] = stream; } AV_LOGGER_INFOF("已设置 {} 个流", streams_.size()); return ErrorCode::OK; } AVStream* FileMuxer::createAVStream(const StreamInfo& streamInfo) { if (!formatCtx_) { return nullptr; } AVStream* stream = avformat_new_stream(formatCtx_, nullptr); if (!stream) { AV_LOGGER_ERROR("创建新流失败"); return nullptr; } stream->id = streamInfo.index; stream->time_base = streamInfo.timeBase; // 设置编解码器参数 AVCodecParameters* codecpar = stream->codecpar; codecpar->codec_id = streamInfo.codecId; codecpar->codec_type = (streamInfo.type == StreamType::VIDEO) ? AVMEDIA_TYPE_VIDEO : AVMEDIA_TYPE_AUDIO; if (streamInfo.type == StreamType::VIDEO) { codecpar->width = streamInfo.width; codecpar->height = streamInfo.height; codecpar->format = streamInfo.pixelFormat; codecpar->bit_rate = streamInfo.bitrate; // 设置帧率 stream->avg_frame_rate = streamInfo.frameRate; stream->r_frame_rate = streamInfo.frameRate; } else if (streamInfo.type == StreamType::AUDIO) { codecpar->sample_rate = streamInfo.sampleRate; av_channel_layout_default(&codecpar->ch_layout, streamInfo.channels); codecpar->format = streamInfo.sampleFormat; codecpar->bit_rate = streamInfo.bitrate; } AV_LOGGER_INFOF("创建流成功: 索引={}, 类型={}", streamInfo.index, static_cast(streamInfo.type)); return stream; } ErrorCode FileMuxer::processPacket(AVPacket* packet) { if (!packet || !formatCtx_) { return ErrorCode::INVALID_ARGUMENT; } // 检查流索引 auto it = streamMap_.find(packet->stream_index); if (it == streamMap_.end()) { AV_LOGGER_ERRORF("未找到流索引: {}", packet->stream_index); return ErrorCode::STREAM_NOT_FOUND; } // 检查是否需要创建新段 if (shouldCreateNewSegment()) { ErrorCode result = createNewSegment(); if (result != ErrorCode::OK) { return result; } } return writePacketInternal(packet); } ErrorCode FileMuxer::writePacketInternal(AVPacket* packet) { std::lock_guard lock(fileMutex_); auto startTime = std::chrono::steady_clock::now(); // 写入包 int ret = av_interleaved_write_frame(formatCtx_, packet); if (ret < 0) { AV_LOGGER_ERRORF("写入包失败: {}", ffmpeg_utils::errorToString(ret)); return static_cast(ret); } // 更新统计信息 updateStats(packet); currentFileSize_ += packet->size; writtenPackets_++; // 更新平均写入时间 auto endTime = std::chrono::steady_clock::now(); double writeTime = std::chrono::duration(endTime - startTime).count(); averageWriteTime_ = (averageWriteTime_ * 0.9) + (writeTime * 0.1); // 定期刷新 if (!fileMuxerParams_.syncMode) { auto now = std::chrono::steady_clock::now(); auto elapsed = std::chrono::duration(now - lastFlushTime_).count(); if (elapsed >= fileMuxerParams_.flushInterval) { flush(); } } return ErrorCode::OK; } void FileMuxer::writeThreadFunc() { AV_LOGGER_INFO("写入线程已启动"); while (!shouldStopWriting_) { std::unique_ptr item; // 从队列获取包 { std::unique_lock lock(queueMutex_); queueCondition_.wait(lock, [this] { return !packetQueue_.empty() || shouldStopWriting_; }); if (shouldStopWriting_ && packetQueue_.empty()) { break; } if (!packetQueue_.empty()) { item = std::move(packetQueue_.front()); packetQueue_.pop(); } } if (item && item->packet) { ErrorCode result = processPacket(item->packet); if (result != ErrorCode::OK) { onError(result, "写入包失败"); } } } // 处理剩余的包 while (true) { std::unique_ptr item; { std::lock_guard lock(queueMutex_); if (packetQueue_.empty()) { break; } item = std::move(packetQueue_.front()); packetQueue_.pop(); } if (item && item->packet) { processPacket(item->packet); } } AV_LOGGER_INFO("写入线程已退出"); } std::string FileMuxer::generateSegmentFilename(int segmentIndex) { if (fileMuxerParams_.segmentPattern.empty()) { std::filesystem::path path(currentOutputFile_); std::string stem = path.stem().string(); std::string ext = path.extension().string(); std::ostringstream oss; oss << stem << "_" << std::setfill('0') << std::setw(3) << segmentIndex << ext; return oss.str(); } else { char buffer[1024]; snprintf(buffer, sizeof(buffer), fileMuxerParams_.segmentPattern.c_str(), segmentIndex); return std::string(buffer); } } bool FileMuxer::shouldCreateNewSegment() const { if (!segmentationEnabled_) { return false; } // 检查时长 if (fileMuxerParams_.segmentDuration > 0) { double currentDuration = getCurrentDuration(); if (currentDuration >= fileMuxerParams_.segmentDuration) { return true; } } // 检查文件大小 if (fileMuxerParams_.maxFileSize > 0) { if (currentFileSize_ >= fileMuxerParams_.maxFileSize) { return true; } } return false; } ErrorCode FileMuxer::validateFilePath(const std::string& path) { if (path.empty()) { AV_LOGGER_ERROR("文件路径不能为空"); return ErrorCode::INVALID_ARGUMENT; } try { std::filesystem::path filePath(path); // 检查父目录是否存在 std::filesystem::path parentDir = filePath.parent_path(); if (!parentDir.empty() && !std::filesystem::exists(parentDir)) { // 尝试创建目录 std::filesystem::create_directories(parentDir); } // 检查文件扩展名 std::string extension = filePath.extension().string(); if (extension.empty()) { AV_LOGGER_WARNING("文件没有扩展名,可能导致格式检测问题"); } } catch (const std::exception& e) { AV_LOGGER_ERRORF("文件路径验证失败: {}", e.what()); return ErrorCode::INVALID_PATH; } return ErrorCode::OK; } bool FileMuxer::validateParams(const MuxerParams& params) { if (!AbstractMuxer::validateParams(params)) { return false; } const auto& fileParams = static_cast(params); if (fileParams.outputFile.empty() && fileParams.outputPath.empty()) { AV_LOGGER_ERROR("输出文件路径不能为空"); return false; } if (fileParams.segmentDuration < 0) { AV_LOGGER_ERROR("分段时长不能为负数"); return false; } if (fileParams.maxFileSize < 0) { AV_LOGGER_ERROR("最大文件大小不能为负数"); return false; } if (fileParams.flushInterval <= 0) { AV_LOGGER_ERROR("刷新间隔必须大于0"); return false; } return true; } // FileMuxerFactory 实现 std::unique_ptr FileMuxer::FileMuxerFactory::createMuxer(MuxerType type) { if (type == MuxerType::FILE_MUXER) { return std::make_unique(); } return nullptr; } bool FileMuxer::FileMuxerFactory::isTypeSupported(MuxerType type) const { return type == MuxerType::FILE_MUXER; } std::vector FileMuxer::FileMuxerFactory::getSupportedTypes() const { return {MuxerType::FILE_MUXER}; } std::unique_ptr FileMuxer::FileMuxerFactory::createFileMuxer(const std::string& filename) { auto muxer = std::make_unique(); FileMuxerParams params; params.outputFile = filename; params.format = AbstractMuxer::getFormatFromExtension(filename); ErrorCode result = muxer->initialize(params); if (result != ErrorCode::OK) { AV_LOGGER_ERRORF("创建文件复用器失败: {}", static_cast(result)); return nullptr; } return muxer; } std::unique_ptr FileMuxer::FileMuxerFactory::createSegmentedMuxer(const std::string& pattern, int duration) { auto muxer = std::make_unique(); FileMuxerParams params; params.outputFile = pattern; params.enableSegmentation = true; params.segmentDuration = duration; params.segmentPattern = pattern; // 从模式中推断格式 std::string firstFile = pattern; size_t pos = firstFile.find("%"); if (pos != std::string::npos) { firstFile.replace(pos, firstFile.find('d', pos) - pos + 1, "001"); } params.format = AbstractMuxer::getFormatFromExtension(firstFile); ErrorCode result = muxer->initialize(params); if (result != ErrorCode::OK) { AV_LOGGER_ERRORF("创建分段复用器失败: {}", static_cast(result)); return nullptr; } return muxer; } } // namespace muxer } // namespace av