#include "media_thread_base.h" #include "../base/logger.h" #include #include namespace av { ThreadBase::ThreadBase() { AV_LOGGER_DEBUG("ThreadBase created"); } ThreadBase::~ThreadBase() { if (isRunning()) { AV_LOGGER_WARNING("ThreadBase destroyed while still running, forcing stop"); stop(); join(); } AV_LOGGER_DEBUG("ThreadBase destroyed"); } bool ThreadBase::start() { std::unique_lock lock(m_stateMutex); if (m_state != ThreadState::STOPPED) { AV_LOGGER_WARNING("Thread already started or starting"); return false; } try { setState(ThreadState::STARTING); m_exitRequested = false; m_thread = std::make_unique(&ThreadBase::threadEntry, this); m_threadId = m_thread->get_id(); // 等待线程真正开始运行 m_stateCondition.wait(lock, [this] { return m_state == ThreadState::RUNNING || m_state == ThreadState::STOPPED; }); bool success = (m_state == ThreadState::RUNNING); if (success) { AV_LOGGER_INFOF("Thread '{}' started successfully", m_threadName.empty() ? "unnamed" : m_threadName); } else { AV_LOGGER_ERRORF("Thread '{}' failed to start", m_threadName.empty() ? "unnamed" : m_threadName); } return success; } catch (const std::exception& e) { setState(ThreadState::STOPPED); AV_LOGGER_ERRORF("Failed to start thread: {}", e.what()); return false; } } void ThreadBase::stop() { { std::lock_guard lock(m_stateMutex); if (m_state == ThreadState::STOPPED || m_state == ThreadState::STOPPING) { return; } setState(ThreadState::STOPPING); } m_exitRequested = true; m_condition.notify_all(); AV_LOGGER_INFOF("Thread '{}' stop requested", m_threadName.empty() ? "unnamed" : m_threadName); } void ThreadBase::join() { std::unique_ptr thread; { std::lock_guard lock(m_stateMutex); thread = std::move(m_thread); } if (thread && thread->joinable()) { // 检查是否在同一线程中调用join if (std::this_thread::get_id() == thread->get_id()) { AV_LOGGER_WARNING("Cannot join thread from itself, detaching instead"); thread->detach(); return; } try { // 使用超时等待 std::unique_lock lock(m_stateMutex); bool finished = m_stateCondition.wait_for(lock, std::chrono::milliseconds(DEFAULT_JOIN_TIMEOUT_MS), [this] { return m_state == ThreadState::STOPPED; }); if (finished) { lock.unlock(); thread->join(); AV_LOGGER_INFOF("Thread '{}' joined successfully", m_threadName.empty() ? "unnamed" : m_threadName); } else { AV_LOGGER_WARNINGF("Thread '{}' join timeout, detaching", m_threadName.empty() ? "unnamed" : m_threadName); thread->detach(); } } catch (const std::exception& e) { AV_LOGGER_ERRORF("Exception during thread join: {}", e.what()); if (thread->joinable()) { thread->detach(); } } } } bool ThreadBase::isRunning() const { return m_state == ThreadState::RUNNING; } bool ThreadBase::isStopping() const { return m_state == ThreadState::STOPPING; } ThreadState ThreadBase::getState() const { return m_state.load(); } std::thread::id ThreadBase::getThreadId() const { std::lock_guard lock(m_stateMutex); return m_threadId; } void ThreadBase::setThreadName(const std::string& name) { std::lock_guard lock(m_stateMutex); m_threadName = name; } std::string ThreadBase::getThreadName() const { std::lock_guard lock(m_stateMutex); return m_threadName; } void ThreadBase::setOnStarted(std::function callback) { std::lock_guard lock(m_stateMutex); m_onStarted = std::move(callback); } void ThreadBase::setOnFinished(std::function callback) { std::lock_guard lock(m_stateMutex); m_onFinished = std::move(callback); } void ThreadBase::setOnError(std::function callback) { std::lock_guard lock(m_stateMutex); m_onError = std::move(callback); } void ThreadBase::waitForCondition(std::function condition, int timeoutMs) { std::unique_lock lock(m_mutex); if (timeoutMs < 0) { m_condition.wait(lock, [this, &condition] { return shouldExit() || condition(); }); } else { m_condition.wait_for(lock, std::chrono::milliseconds(timeoutMs), [this, &condition] { return shouldExit() || condition(); }); } } void ThreadBase::notifyCondition() { m_condition.notify_all(); } bool ThreadBase::shouldExit() const { return m_exitRequested.load(); } void ThreadBase::sleepMs(int ms) { std::this_thread::sleep_for(std::chrono::milliseconds(ms)); } bool ThreadBase::waitForMs(int ms) { std::unique_lock lock(m_mutex); return !m_condition.wait_for(lock, std::chrono::milliseconds(ms), [this] { return shouldExit(); }); } void ThreadBase::reportError(const std::string& error) { AV_LOGGER_ERRORF("Thread '{}' error: {}", m_threadName.empty() ? "unnamed" : m_threadName, error); std::function errorCallback; { std::lock_guard lock(m_stateMutex); errorCallback = m_onError; } if (errorCallback) { try { errorCallback(error); } catch (const std::exception& e) { AV_LOGGER_ERRORF("Exception in error callback: {}", e.what()); } catch (...) { AV_LOGGER_ERROR("Unknown exception in error callback"); } } } void ThreadBase::threadEntry() { try { // 设置线程状态为运行中 { std::lock_guard lock(m_stateMutex); setState(ThreadState::RUNNING); m_stateCondition.notify_all(); } // 调用启动回调 std::function startedCallback; { std::lock_guard lock(m_stateMutex); startedCallback = m_onStarted; } if (startedCallback) { try { startedCallback(); } catch (const std::exception& e) { AV_LOGGER_ERRORF("Exception in started callback: {}", e.what()); } catch (...) { AV_LOGGER_ERROR("Unknown exception in started callback"); } } AV_LOGGER_DEBUGF("Thread '{}' entering run loop", m_threadName.empty() ? "unnamed" : m_threadName); // 执行用户逻辑 run(); AV_LOGGER_DEBUGF("Thread '{}' exiting run loop", m_threadName.empty() ? "unnamed" : m_threadName); } catch (const std::exception& e) { AV_LOGGER_ERRORF("Unhandled exception in thread '{}': {}", m_threadName.empty() ? "unnamed" : m_threadName, e.what()); reportError("Unhandled exception: " + std::string(e.what())); } catch (...) { AV_LOGGER_ERRORF("Unknown unhandled exception in thread '{}'", m_threadName.empty() ? "unnamed" : m_threadName); reportError("Unknown unhandled exception"); } cleanup(); } void ThreadBase::setState(ThreadState state) { m_state = state; AV_LOGGER_DEBUGF("Thread '{}' state changed to {}", m_threadName.empty() ? "unnamed" : m_threadName, static_cast(state)); } void ThreadBase::cleanup() { // 调用完成回调 std::function finishedCallback; { std::lock_guard lock(m_stateMutex); finishedCallback = m_onFinished; } if (finishedCallback) { try { finishedCallback(); } catch (const std::exception& e) { AV_LOGGER_ERRORF("Exception in finished callback: {}", e.what()); } catch (...) { AV_LOGGER_ERROR("Unknown exception in finished callback"); } } // 设置最终状态 { std::lock_guard lock(m_stateMutex); setState(ThreadState::STOPPED); m_stateCondition.notify_all(); } AV_LOGGER_INFOF("Thread '{}' finished", m_threadName.empty() ? "unnamed" : m_threadName); } } // namespace av