#include "thread_manager.h" #include "../utils/utils_packet_queue.h" #include "../utils/utils_frame_queue.h" #include "thread_manager.h" #include "../base/logger.h" #include "../base/media_common.h" #include "../codec/codec_video_decoder.h" #include "../codec/codec_audio_decoder.h" #include "../utils/utils_packet_queue.h" #include "../utils/utils_frame_queue.h" #include "../utils/utils_synchronizer.h" #include #include #include namespace av { namespace player { // ThreadBase 实现 ThreadBase::ThreadBase(const std::string& name) : m_name(name) { av::Logger::instance().debug("Thread created: " + m_name); } ThreadBase::~ThreadBase() { stop(); join(); av::Logger::instance().debug("Thread destroyed: " + m_name); } bool ThreadBase::start() { if (m_running.load()) { av::Logger::instance().warning("Thread already running: " + m_name); return false; } m_shouldStop = false; m_thread = std::make_unique(&ThreadBase::threadEntry, this); av::Logger::instance().info("Thread started: " + m_name); return true; } void ThreadBase::stop() { if (!m_running.load()) { return; } m_shouldStop = true; av::Logger::instance().info("Thread stop requested: " + m_name); } void ThreadBase::join() { if (m_thread && m_thread->joinable()) { m_thread->join(); m_thread.reset(); av::Logger::instance().info("Thread joined: " + m_name); } } bool ThreadBase::isRunning() const { return m_running.load(); } void ThreadBase::threadEntry() { m_running = true; try { av::Logger::instance().info("Thread running: " + m_name); run(); } catch (const std::exception& e) { av::Logger::instance().error("Thread exception in " + m_name + ": " + e.what()); } catch (...) { av::Logger::instance().error("Unknown exception in thread: " + m_name); } m_running = false; av::Logger::instance().info("Thread finished: " + m_name); } // ReadThread 实现 ReadThread::ReadThread(AVFormatContext* formatContext, av::utils::PacketQueue* packetQueue, int videoStreamIndex, int audioStreamIndex) : ThreadBase("ReadThread") , m_formatContext(formatContext) , m_packetQueue(packetQueue) , m_videoStreamIndex(videoStreamIndex) , m_audioStreamIndex(audioStreamIndex) { } void ReadThread::seek(int64_t timestamp) { m_seekTarget = timestamp; m_seeking = true; av::Logger::instance().info("Seek requested to: " + std::to_string(timestamp)); } void ReadThread::run() { if (!m_formatContext || !m_packetQueue) { av::Logger::instance().error("Invalid parameters for ReadThread"); return; } while (!shouldStop()) { // 处理跳转 if (m_seeking.load()) { handleSeek(); continue; } // 检查队列是否已满 if (m_packetQueue->full()) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); continue; } // 读取数据包 if (!readPacket()) { // 读取结束或出错 // EOF处理由队列内部管理 break; } } av::Logger::instance().info("ReadThread finished"); } bool ReadThread::readPacket() { AVPacket* packet = av_packet_alloc(); if (!packet) { av::Logger::instance().error("Failed to allocate packet"); return false; } int ret = av_read_frame(m_formatContext, packet); if (ret < 0) { av_packet_free(&packet); if (ret == AVERROR_EOF) { av::Logger::instance().info("End of file reached"); } else { av::Logger::instance().error("Error reading frame: " + std::to_string(ret)); } return false; } // 只处理我们关心的流 if (packet->stream_index == m_videoStreamIndex || packet->stream_index == m_audioStreamIndex) { if (m_packetQueue->enqueue(packet) != av::ErrorCode::SUCCESS) { av_packet_free(&packet); return false; } } else { av_packet_free(&packet); } return true; } void ReadThread::handleSeek() { int64_t seekTarget = m_seekTarget.load(); // 执行跳转 int ret = av_seek_frame(m_formatContext, -1, seekTarget, AVSEEK_FLAG_BACKWARD); if (ret < 0) { av::Logger::instance().error("Seek failed: " + std::to_string(ret)); } else { av::Logger::instance().info("Seek completed to: " + std::to_string(seekTarget)); // 清空队列 m_packetQueue->clear(); } m_seeking = false; } // VideoDecodeThread 实现 VideoDecodeThread::VideoDecodeThread(av::utils::PacketQueue* packetQueue, av::utils::FrameQueue* frameQueue, VideoDecoder* decoder, av::utils::Synchronizer* synchronizer, int streamIndex, AVCodecParameters* codecParams) : ThreadBase("VideoDecodeThread") , m_packetQueue(packetQueue) , m_frameQueue(frameQueue) , m_decoder(decoder) , m_synchronizer(synchronizer) , m_streamIndex(streamIndex) , m_bsfContext(nullptr) { if (codecParams) { initBitStreamFilter(codecParams); } } VideoDecodeThread::~VideoDecodeThread() { cleanupBitStreamFilter(); } void VideoDecodeThread::run() { if (!m_packetQueue || !m_frameQueue || !m_decoder) { av::Logger::instance().error("Invalid parameters for VideoDecodeThread"); return; } while (!shouldStop()) { // 检查帧队列是否已满 if (m_frameQueue->full()) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); continue; } // 解码帧 if (!decodeFrame()) { if (m_packetQueue->empty()) { break; } std::this_thread::sleep_for(std::chrono::milliseconds(1)); } } av::Logger::instance().info("VideoDecodeThread finished"); } bool VideoDecodeThread::decodeFrame() { // 获取数据包 AVPacket* packet = m_packetQueue->dequeuePacket(10); // 10ms超时 if (!packet) { return false; } // 检查是否是视频包 if (packet->stream_index != m_streamIndex) { av_packet_free(&packet); return true; } // 如果有比特流过滤器,先过滤数据包 AVPacket* filteredPacket = packet; if (m_bsfContext) { int ret = av_bsf_send_packet(m_bsfContext, packet); if (ret < 0) { av::Logger::instance().errorf("Failed to send packet to bitstream filter: {}", av::ffmpeg_utils::errorToString(ret)); av_packet_free(&packet); return false; } AVPacket* outputPacket = av_packet_alloc(); ret = av_bsf_receive_packet(m_bsfContext, outputPacket); if (ret < 0) { if (ret != AVERROR(EAGAIN) && ret != AVERROR_EOF) { av::Logger::instance().errorf("Failed to receive packet from bitstream filter: {}", av::ffmpeg_utils::errorToString(ret)); } av_packet_free(&packet); av_packet_free(&outputPacket); return false; } av_packet_free(&packet); filteredPacket = outputPacket; } // 解码 std::vector frames; AVPacketPtr packetPtr(filteredPacket); ErrorCode result = m_decoder->decode(packetPtr, frames); if (result != ErrorCode::SUCCESS) { av::Logger::instance().errorf("Video decode failed with error: {}", static_cast(result)); return false; } if (frames.empty()) { // 这是正常情况,解码器可能需要更多输入 return true; } av::Logger::instance().debugf("Successfully decoded {} video frames", frames.size()); // 处理解码后的帧 for (auto& frame : frames) { // 更新视频时钟 if (m_synchronizer && frame->pts != AV_NOPTS_VALUE) { // 需要获取时间基,这里暂时使用默认值 double pts = frame->pts * 0.000040; // 临时值,需要从解码器获取正确的时间基 m_synchronizer->setVideoClock(pts); } // 添加到帧队列 if (m_frameQueue->enqueue(frame.release()) != av::ErrorCode::SUCCESS) { return false; } } return true; } bool VideoDecodeThread::initBitStreamFilter(AVCodecParameters* codecParams) { if (!codecParams || codecParams->codec_id != AV_CODEC_ID_H264) { return true; // 不需要比特流过滤器 } const AVBitStreamFilter* bsf = av_bsf_get_by_name("h264_mp4toannexb"); if (!bsf) { av::Logger::instance().error("Failed to find h264_mp4toannexb bitstream filter"); return false; } int ret = av_bsf_alloc(bsf, &m_bsfContext); if (ret < 0) { av::Logger::instance().errorf("Failed to allocate bitstream filter context: {}", av::ffmpeg_utils::errorToString(ret)); return false; } ret = avcodec_parameters_copy(m_bsfContext->par_in, codecParams); if (ret < 0) { av::Logger::instance().errorf("Failed to copy codec parameters to bitstream filter: {}", av::ffmpeg_utils::errorToString(ret)); av_bsf_free(&m_bsfContext); return false; } ret = av_bsf_init(m_bsfContext); if (ret < 0) { av::Logger::instance().errorf("Failed to initialize bitstream filter: {}", av::ffmpeg_utils::errorToString(ret)); av_bsf_free(&m_bsfContext); return false; } av::Logger::instance().info("H.264 bitstream filter initialized successfully"); return true; } void VideoDecodeThread::cleanupBitStreamFilter() { if (m_bsfContext) { av_bsf_free(&m_bsfContext); m_bsfContext = nullptr; } } // AudioDecodeThread 实现 AudioDecodeThread::AudioDecodeThread(av::utils::PacketQueue* packetQueue, av::utils::FrameQueue* frameQueue, AudioDecoder* decoder, av::utils::Synchronizer* synchronizer, int streamIndex) : ThreadBase("AudioDecodeThread") , m_packetQueue(packetQueue) , m_frameQueue(frameQueue) , m_decoder(decoder) , m_synchronizer(synchronizer) , m_streamIndex(streamIndex) { } void AudioDecodeThread::run() { if (!m_packetQueue || !m_frameQueue || !m_decoder) { av::Logger::instance().error("Invalid parameters for AudioDecodeThread"); return; } while (!shouldStop()) { // 检查帧队列是否已满 if (m_frameQueue->full()) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); continue; } // 解码帧 if (!decodeFrame()) { if (m_packetQueue->empty()) { break; } std::this_thread::sleep_for(std::chrono::milliseconds(1)); } } av::Logger::instance().info("AudioDecodeThread finished"); } bool AudioDecodeThread::decodeFrame() { // 获取数据包 AVPacket* packet = m_packetQueue->dequeuePacket(10); // 10ms超时 if (!packet) { return false; } // 检查是否是音频包 if (packet->stream_index != m_streamIndex) { av_packet_free(&packet); return true; } // 解码 std::vector frames; AVPacketPtr packetPtr(packet); ErrorCode result = m_decoder->decode(packetPtr, frames); if (result != ErrorCode::SUCCESS) { av::Logger::instance().errorf("Audio decode failed with error: {}", static_cast(result)); return false; } if (frames.empty()) { // 这是正常情况,解码器可能需要更多输入 return true; } av::Logger::instance().debugf("Successfully decoded {} audio frames", frames.size()); // 处理解码后的帧 for (auto& frame : frames) { // 更新音频时钟 if (m_synchronizer && frame->pts != AV_NOPTS_VALUE) { // 需要获取时间基,这里暂时使用默认值 double pts = frame->pts * 0.000023; // 临时值,需要从解码器获取正确的时间基 m_synchronizer->setAudioClock(pts); } // 添加到帧队列 if (m_frameQueue->enqueue(frame.release()) != av::ErrorCode::SUCCESS) { return false; } } return true; } // ThreadManager 实现 ThreadManager::ThreadManager() { av::Logger::instance().debug("ThreadManager created"); } ThreadManager::~ThreadManager() { stopAll(); joinAll(); av::Logger::instance().debug("ThreadManager destroyed"); } bool ThreadManager::createReadThread(AVFormatContext* formatContext, av::utils::PacketQueue* packetQueue, int videoStreamIndex, int audioStreamIndex) { if (m_readThread) { av::Logger::instance().warning("ReadThread already exists"); return false; } m_readThread = std::make_unique(formatContext, packetQueue, videoStreamIndex, audioStreamIndex); av::Logger::instance().info("ReadThread created"); return true; } bool ThreadManager::createVideoDecodeThread(av::utils::PacketQueue* packetQueue, av::utils::FrameQueue* frameQueue, VideoDecoder* decoder, av::utils::Synchronizer* synchronizer, int streamIndex, AVCodecParameters* codecParams) { if (m_videoDecodeThread) { av::Logger::instance().warning("VideoDecodeThread already exists"); return false; } m_videoDecodeThread = std::make_unique(packetQueue, frameQueue, decoder, synchronizer, streamIndex, codecParams); av::Logger::instance().info("VideoDecodeThread created"); return true; } bool ThreadManager::createAudioDecodeThread(av::utils::PacketQueue* packetQueue, av::utils::FrameQueue* frameQueue, AudioDecoder* decoder, av::utils::Synchronizer* synchronizer, int streamIndex) { if (m_audioDecodeThread) { av::Logger::instance().warning("AudioDecodeThread already exists"); return false; } m_audioDecodeThread = std::make_unique(packetQueue, frameQueue, decoder, synchronizer, streamIndex); av::Logger::instance().info("AudioDecodeThread created"); return true; } bool ThreadManager::startAll() { bool success = true; if (m_readThread && !m_readThread->start()) { av::Logger::instance().error("Failed to start ReadThread"); success = false; } if (m_videoDecodeThread && !m_videoDecodeThread->start()) { av::Logger::instance().error("Failed to start VideoDecodeThread"); success = false; } if (m_audioDecodeThread && !m_audioDecodeThread->start()) { av::Logger::instance().error("Failed to start AudioDecodeThread"); success = false; } if (success) { av::Logger::instance().info("All threads started successfully"); } return success; } void ThreadManager::stopAll() { if (m_readThread) { m_readThread->stop(); } if (m_videoDecodeThread) { m_videoDecodeThread->stop(); } if (m_audioDecodeThread) { m_audioDecodeThread->stop(); } av::Logger::instance().info("All threads stop requested"); } void ThreadManager::joinAll() { if (m_readThread) { m_readThread->join(); m_readThread.reset(); } if (m_videoDecodeThread) { m_videoDecodeThread->join(); m_videoDecodeThread.reset(); } if (m_audioDecodeThread) { m_audioDecodeThread->join(); m_audioDecodeThread.reset(); } av::Logger::instance().info("All threads joined"); } bool ThreadManager::hasRunningThreads() const { return (m_readThread && m_readThread->isRunning()) || (m_videoDecodeThread && m_videoDecodeThread->isRunning()) || (m_audioDecodeThread && m_audioDecodeThread->isRunning()); } } // namespace player } // namespace av