media_thread_base.cpp 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. #include "media_thread_base.h"
  2. #include "../base/logger.h"
  3. #include <stdexcept>
  4. #include <sstream>
  5. namespace av {
  6. ThreadBase::ThreadBase() {
  7. AV_LOGGER_DEBUG("ThreadBase created");
  8. }
  9. ThreadBase::~ThreadBase() {
  10. if (isRunning()) {
  11. AV_LOGGER_WARNING("ThreadBase destroyed while still running, forcing stop");
  12. stop();
  13. join();
  14. }
  15. AV_LOGGER_DEBUG("ThreadBase destroyed");
  16. }
  17. bool ThreadBase::start() {
  18. std::unique_lock<std::mutex> lock(m_stateMutex);
  19. if (m_state != ThreadState::STOPPED) {
  20. AV_LOGGER_WARNING("Thread already started or starting");
  21. return false;
  22. }
  23. try {
  24. setState(ThreadState::STARTING);
  25. m_exitRequested = false;
  26. m_thread = std::make_unique<std::thread>(&ThreadBase::threadEntry, this);
  27. m_threadId = m_thread->get_id();
  28. // 等待线程真正开始运行
  29. m_stateCondition.wait(lock, [this] {
  30. return m_state == ThreadState::RUNNING || m_state == ThreadState::STOPPED;
  31. });
  32. bool success = (m_state == ThreadState::RUNNING);
  33. if (success) {
  34. AV_LOGGER_INFOF("Thread '{}' started successfully", m_threadName.empty() ? "unnamed" : m_threadName);
  35. } else {
  36. AV_LOGGER_ERRORF("Thread '{}' failed to start", m_threadName.empty() ? "unnamed" : m_threadName);
  37. }
  38. return success;
  39. } catch (const std::exception& e) {
  40. setState(ThreadState::STOPPED);
  41. AV_LOGGER_ERRORF("Failed to start thread: {}", e.what());
  42. return false;
  43. }
  44. }
  45. void ThreadBase::stop() {
  46. {
  47. std::lock_guard<std::mutex> lock(m_stateMutex);
  48. if (m_state == ThreadState::STOPPED || m_state == ThreadState::STOPPING) {
  49. return;
  50. }
  51. setState(ThreadState::STOPPING);
  52. }
  53. m_exitRequested = true;
  54. m_condition.notify_all();
  55. AV_LOGGER_INFOF("Thread '{}' stop requested", m_threadName.empty() ? "unnamed" : m_threadName);
  56. }
  57. void ThreadBase::join() {
  58. std::unique_ptr<std::thread> thread;
  59. {
  60. std::lock_guard<std::mutex> lock(m_stateMutex);
  61. thread = std::move(m_thread);
  62. }
  63. if (thread && thread->joinable()) {
  64. // 检查是否在同一线程中调用join
  65. if (std::this_thread::get_id() == thread->get_id()) {
  66. AV_LOGGER_WARNING("Cannot join thread from itself, detaching instead");
  67. thread->detach();
  68. return;
  69. }
  70. try {
  71. // 使用超时等待
  72. std::unique_lock<std::mutex> lock(m_stateMutex);
  73. bool finished = m_stateCondition.wait_for(lock,
  74. std::chrono::milliseconds(DEFAULT_JOIN_TIMEOUT_MS),
  75. [this] { return m_state == ThreadState::STOPPED; });
  76. if (finished) {
  77. lock.unlock();
  78. thread->join();
  79. AV_LOGGER_INFOF("Thread '{}' joined successfully", m_threadName.empty() ? "unnamed" : m_threadName);
  80. } else {
  81. AV_LOGGER_WARNINGF("Thread '{}' join timeout, detaching", m_threadName.empty() ? "unnamed" : m_threadName);
  82. thread->detach();
  83. }
  84. } catch (const std::exception& e) {
  85. AV_LOGGER_ERRORF("Exception during thread join: {}", e.what());
  86. if (thread->joinable()) {
  87. thread->detach();
  88. }
  89. }
  90. }
  91. }
  92. bool ThreadBase::isRunning() const {
  93. return m_state == ThreadState::RUNNING;
  94. }
  95. bool ThreadBase::isStopping() const {
  96. return m_state == ThreadState::STOPPING;
  97. }
  98. ThreadState ThreadBase::getState() const {
  99. return m_state.load();
  100. }
  101. std::thread::id ThreadBase::getThreadId() const {
  102. std::lock_guard<std::mutex> lock(m_stateMutex);
  103. return m_threadId;
  104. }
  105. void ThreadBase::setThreadName(const std::string& name) {
  106. std::lock_guard<std::mutex> lock(m_stateMutex);
  107. m_threadName = name;
  108. }
  109. std::string ThreadBase::getThreadName() const {
  110. std::lock_guard<std::mutex> lock(m_stateMutex);
  111. return m_threadName;
  112. }
  113. void ThreadBase::setOnStarted(std::function<void()> callback) {
  114. std::lock_guard<std::mutex> lock(m_stateMutex);
  115. m_onStarted = std::move(callback);
  116. }
  117. void ThreadBase::setOnFinished(std::function<void()> callback) {
  118. std::lock_guard<std::mutex> lock(m_stateMutex);
  119. m_onFinished = std::move(callback);
  120. }
  121. void ThreadBase::setOnError(std::function<void(const std::string&)> callback) {
  122. std::lock_guard<std::mutex> lock(m_stateMutex);
  123. m_onError = std::move(callback);
  124. }
  125. void ThreadBase::waitForCondition(std::function<bool()> condition, int timeoutMs) {
  126. std::unique_lock<std::mutex> lock(m_mutex);
  127. if (timeoutMs < 0) {
  128. m_condition.wait(lock, [this, &condition] {
  129. return shouldExit() || condition();
  130. });
  131. } else {
  132. m_condition.wait_for(lock, std::chrono::milliseconds(timeoutMs), [this, &condition] {
  133. return shouldExit() || condition();
  134. });
  135. }
  136. }
  137. void ThreadBase::notifyCondition() {
  138. m_condition.notify_all();
  139. }
  140. bool ThreadBase::shouldExit() const {
  141. return m_exitRequested.load();
  142. }
  143. void ThreadBase::sleepMs(int ms) {
  144. std::this_thread::sleep_for(std::chrono::milliseconds(ms));
  145. }
  146. bool ThreadBase::waitForMs(int ms) {
  147. std::unique_lock<std::mutex> lock(m_mutex);
  148. return !m_condition.wait_for(lock, std::chrono::milliseconds(ms), [this] {
  149. return shouldExit();
  150. });
  151. }
  152. void ThreadBase::reportError(const std::string& error) {
  153. AV_LOGGER_ERRORF("Thread '{}' error: {}", m_threadName.empty() ? "unnamed" : m_threadName, error);
  154. std::function<void(const std::string&)> errorCallback;
  155. {
  156. std::lock_guard<std::mutex> lock(m_stateMutex);
  157. errorCallback = m_onError;
  158. }
  159. if (errorCallback) {
  160. try {
  161. errorCallback(error);
  162. } catch (const std::exception& e) {
  163. AV_LOGGER_ERRORF("Exception in error callback: {}", e.what());
  164. } catch (...) {
  165. AV_LOGGER_ERROR("Unknown exception in error callback");
  166. }
  167. }
  168. }
  169. void ThreadBase::threadEntry() {
  170. try {
  171. // 设置线程状态为运行中
  172. {
  173. std::lock_guard<std::mutex> lock(m_stateMutex);
  174. setState(ThreadState::RUNNING);
  175. m_stateCondition.notify_all();
  176. }
  177. // 调用启动回调
  178. std::function<void()> startedCallback;
  179. {
  180. std::lock_guard<std::mutex> lock(m_stateMutex);
  181. startedCallback = m_onStarted;
  182. }
  183. if (startedCallback) {
  184. try {
  185. startedCallback();
  186. } catch (const std::exception& e) {
  187. AV_LOGGER_ERRORF("Exception in started callback: {}", e.what());
  188. } catch (...) {
  189. AV_LOGGER_ERROR("Unknown exception in started callback");
  190. }
  191. }
  192. AV_LOGGER_DEBUGF("Thread '{}' entering run loop", m_threadName.empty() ? "unnamed" : m_threadName);
  193. // 执行用户逻辑
  194. run();
  195. AV_LOGGER_DEBUGF("Thread '{}' exiting run loop", m_threadName.empty() ? "unnamed" : m_threadName);
  196. } catch (const std::exception& e) {
  197. AV_LOGGER_ERRORF("Unhandled exception in thread '{}': {}", m_threadName.empty()
  198. ? "unnamed"
  199. : m_threadName,
  200. e.what());
  201. reportError("Unhandled exception: " + std::string(e.what()));
  202. } catch (...) {
  203. AV_LOGGER_ERRORF("Unknown unhandled exception in thread '{}'",
  204. m_threadName.empty() ? "unnamed" : m_threadName);
  205. reportError("Unknown unhandled exception");
  206. }
  207. cleanup();
  208. }
  209. void ThreadBase::setState(ThreadState state) {
  210. m_state = state;
  211. AV_LOGGER_DEBUGF("Thread '{}' state changed to {}",
  212. m_threadName.empty() ? "unnamed" : m_threadName,
  213. static_cast<int>(state));
  214. }
  215. void ThreadBase::cleanup() {
  216. // 调用完成回调
  217. std::function<void()> finishedCallback;
  218. {
  219. std::lock_guard<std::mutex> lock(m_stateMutex);
  220. finishedCallback = m_onFinished;
  221. }
  222. if (finishedCallback) {
  223. try {
  224. finishedCallback();
  225. } catch (const std::exception& e) {
  226. AV_LOGGER_ERRORF("Exception in finished callback: {}", e.what());
  227. } catch (...) {
  228. AV_LOGGER_ERROR("Unknown exception in finished callback");
  229. }
  230. }
  231. // 设置最终状态
  232. {
  233. std::lock_guard<std::mutex> lock(m_stateMutex);
  234. setState(ThreadState::STOPPED);
  235. m_stateCondition.notify_all();
  236. }
  237. AV_LOGGER_INFOF("Thread '{}' finished", m_threadName.empty() ? "unnamed" : m_threadName);
  238. }
  239. } // namespace av