| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290 |
- #include "media_thread_base.h"
- #include "../base/logger.h"
- #include <stdexcept>
- #include <sstream>
- namespace av {
- ThreadBase::ThreadBase() {
- AV_LOGGER_DEBUG("ThreadBase created");
- }
- ThreadBase::~ThreadBase() {
- if (isRunning()) {
- AV_LOGGER_WARNING("ThreadBase destroyed while still running, forcing stop");
- stop();
- join();
- }
- AV_LOGGER_DEBUG("ThreadBase destroyed");
- }
- bool ThreadBase::start() {
- std::unique_lock<std::mutex> lock(m_stateMutex);
-
- if (m_state != ThreadState::STOPPED) {
- AV_LOGGER_WARNING("Thread already started or starting");
- return false;
- }
-
- try {
- setState(ThreadState::STARTING);
- m_exitRequested = false;
-
- m_thread = std::make_unique<std::thread>(&ThreadBase::threadEntry, this);
- m_threadId = m_thread->get_id();
-
- // 等待线程真正开始运行
- m_stateCondition.wait(lock, [this] {
- return m_state == ThreadState::RUNNING || m_state == ThreadState::STOPPED;
- });
-
- bool success = (m_state == ThreadState::RUNNING);
- if (success) {
- AV_LOGGER_INFOF("Thread '{}' started successfully", m_threadName.empty() ? "unnamed" : m_threadName);
- } else {
- AV_LOGGER_ERRORF("Thread '{}' failed to start", m_threadName.empty() ? "unnamed" : m_threadName);
- }
-
- return success;
- } catch (const std::exception& e) {
- setState(ThreadState::STOPPED);
- AV_LOGGER_ERRORF("Failed to start thread: {}", e.what());
- return false;
- }
- }
- void ThreadBase::stop() {
- {
- std::lock_guard<std::mutex> lock(m_stateMutex);
- if (m_state == ThreadState::STOPPED || m_state == ThreadState::STOPPING) {
- return;
- }
- setState(ThreadState::STOPPING);
- }
-
- m_exitRequested = true;
- m_condition.notify_all();
-
- AV_LOGGER_INFOF("Thread '{}' stop requested", m_threadName.empty() ? "unnamed" : m_threadName);
- }
- void ThreadBase::join() {
- std::unique_ptr<std::thread> thread;
- {
- std::lock_guard<std::mutex> lock(m_stateMutex);
- thread = std::move(m_thread);
- }
-
- if (thread && thread->joinable()) {
- // 检查是否在同一线程中调用join
- if (std::this_thread::get_id() == thread->get_id()) {
- AV_LOGGER_WARNING("Cannot join thread from itself, detaching instead");
- thread->detach();
- return;
- }
-
- try {
- // 使用超时等待
- std::unique_lock<std::mutex> lock(m_stateMutex);
- bool finished = m_stateCondition.wait_for(lock,
- std::chrono::milliseconds(DEFAULT_JOIN_TIMEOUT_MS),
- [this] { return m_state == ThreadState::STOPPED; });
-
- if (finished) {
- lock.unlock();
- thread->join();
- AV_LOGGER_INFOF("Thread '{}' joined successfully", m_threadName.empty() ? "unnamed" : m_threadName);
- } else {
- AV_LOGGER_WARNINGF("Thread '{}' join timeout, detaching", m_threadName.empty() ? "unnamed" : m_threadName);
- thread->detach();
- }
- } catch (const std::exception& e) {
- AV_LOGGER_ERRORF("Exception during thread join: {}", e.what());
- if (thread->joinable()) {
- thread->detach();
- }
- }
- }
- }
- bool ThreadBase::isRunning() const {
- return m_state == ThreadState::RUNNING;
- }
- bool ThreadBase::isStopping() const {
- return m_state == ThreadState::STOPPING;
- }
- ThreadState ThreadBase::getState() const {
- return m_state.load();
- }
- std::thread::id ThreadBase::getThreadId() const {
- std::lock_guard<std::mutex> lock(m_stateMutex);
- return m_threadId;
- }
- void ThreadBase::setThreadName(const std::string& name) {
- std::lock_guard<std::mutex> lock(m_stateMutex);
- m_threadName = name;
- }
- std::string ThreadBase::getThreadName() const {
- std::lock_guard<std::mutex> lock(m_stateMutex);
- return m_threadName;
- }
- void ThreadBase::setOnStarted(std::function<void()> callback) {
- std::lock_guard<std::mutex> lock(m_stateMutex);
- m_onStarted = std::move(callback);
- }
- void ThreadBase::setOnFinished(std::function<void()> callback) {
- std::lock_guard<std::mutex> lock(m_stateMutex);
- m_onFinished = std::move(callback);
- }
- void ThreadBase::setOnError(std::function<void(const std::string&)> callback) {
- std::lock_guard<std::mutex> lock(m_stateMutex);
- m_onError = std::move(callback);
- }
- void ThreadBase::waitForCondition(std::function<bool()> condition, int timeoutMs) {
- std::unique_lock<std::mutex> lock(m_mutex);
-
- if (timeoutMs < 0) {
- m_condition.wait(lock, [this, &condition] {
- return shouldExit() || condition();
- });
- } else {
- m_condition.wait_for(lock, std::chrono::milliseconds(timeoutMs), [this, &condition] {
- return shouldExit() || condition();
- });
- }
- }
- void ThreadBase::notifyCondition() {
- m_condition.notify_all();
- }
- bool ThreadBase::shouldExit() const {
- return m_exitRequested.load();
- }
- void ThreadBase::sleepMs(int ms) {
- std::this_thread::sleep_for(std::chrono::milliseconds(ms));
- }
- bool ThreadBase::waitForMs(int ms) {
- std::unique_lock<std::mutex> lock(m_mutex);
- return !m_condition.wait_for(lock, std::chrono::milliseconds(ms), [this] {
- return shouldExit();
- });
- }
- void ThreadBase::reportError(const std::string& error) {
- AV_LOGGER_ERRORF("Thread '{}' error: {}", m_threadName.empty() ? "unnamed" : m_threadName, error);
-
- std::function<void(const std::string&)> errorCallback;
- {
- std::lock_guard<std::mutex> lock(m_stateMutex);
- errorCallback = m_onError;
- }
-
- if (errorCallback) {
- try {
- errorCallback(error);
- } catch (const std::exception& e) {
- AV_LOGGER_ERRORF("Exception in error callback: {}", e.what());
- } catch (...) {
- AV_LOGGER_ERROR("Unknown exception in error callback");
- }
- }
- }
- void ThreadBase::threadEntry() {
- try {
- // 设置线程状态为运行中
- {
- std::lock_guard<std::mutex> lock(m_stateMutex);
- setState(ThreadState::RUNNING);
- m_stateCondition.notify_all();
- }
-
- // 调用启动回调
- std::function<void()> startedCallback;
- {
- std::lock_guard<std::mutex> lock(m_stateMutex);
- startedCallback = m_onStarted;
- }
-
- if (startedCallback) {
- try {
- startedCallback();
- } catch (const std::exception& e) {
- AV_LOGGER_ERRORF("Exception in started callback: {}", e.what());
- } catch (...) {
- AV_LOGGER_ERROR("Unknown exception in started callback");
- }
- }
-
- AV_LOGGER_DEBUGF("Thread '{}' entering run loop", m_threadName.empty() ? "unnamed" : m_threadName);
-
- // 执行用户逻辑
- run();
-
- AV_LOGGER_DEBUGF("Thread '{}' exiting run loop", m_threadName.empty() ? "unnamed" : m_threadName);
-
- } catch (const std::exception& e) {
- AV_LOGGER_ERRORF("Unhandled exception in thread '{}': {}", m_threadName.empty()
- ? "unnamed"
- : m_threadName,
- e.what());
- reportError("Unhandled exception: " + std::string(e.what()));
- } catch (...) {
- AV_LOGGER_ERRORF("Unknown unhandled exception in thread '{}'",
- m_threadName.empty() ? "unnamed" : m_threadName);
- reportError("Unknown unhandled exception");
- }
-
- cleanup();
- }
- void ThreadBase::setState(ThreadState state) {
- m_state = state;
- AV_LOGGER_DEBUGF("Thread '{}' state changed to {}",
- m_threadName.empty() ? "unnamed" : m_threadName,
- static_cast<int>(state));
- }
- void ThreadBase::cleanup() {
- // 调用完成回调
- std::function<void()> finishedCallback;
- {
- std::lock_guard<std::mutex> lock(m_stateMutex);
- finishedCallback = m_onFinished;
- }
-
- if (finishedCallback) {
- try {
- finishedCallback();
- } catch (const std::exception& e) {
- AV_LOGGER_ERRORF("Exception in finished callback: {}", e.what());
- } catch (...) {
- AV_LOGGER_ERROR("Unknown exception in finished callback");
- }
- }
-
- // 设置最终状态
- {
- std::lock_guard<std::mutex> lock(m_stateMutex);
- setState(ThreadState::STOPPED);
- m_stateCondition.notify_all();
- }
-
- AV_LOGGER_INFOF("Thread '{}' finished", m_threadName.empty() ? "unnamed" : m_threadName);
- }
- } // namespace av
|