utils_thread_pool.cpp 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849
  1. #include "utils_thread_pool.h"
  2. #include "../base/base_logger.h"
  3. #include <algorithm>
  4. #include <sstream>
  5. #include <thread>
  6. #include <chrono>
  7. #ifdef _WIN32
  8. #include <windows.h>
  9. #else
  10. #include <pthread.h>
  11. #include <sys/prctl.h>
  12. #endif
  13. namespace av {
  14. namespace utils {
  15. using namespace av::base;
  16. // ThreadPool 实现
  17. ThreadPool::ThreadPool(const ThreadPoolConfig& config)
  18. : config_(config)
  19. , state_(ThreadPoolState::STOPPED)
  20. , shutdown_(false)
  21. , activeThreads_(0)
  22. , idleThreads_(0)
  23. , totalSubmittedTasks_(0)
  24. , totalCompletedTasks_(0)
  25. , totalFailedTasks_(0)
  26. , totalCancelledTasks_(0)
  27. , startTime_(std::chrono::steady_clock::now()) {
  28. // 验证配置
  29. if (config_.minThreads > config_.maxThreads) {
  30. config_.minThreads = config_.maxThreads;
  31. }
  32. if (config_.minThreads == 0) {
  33. config_.minThreads = 1;
  34. }
  35. stats_.startTime = startTime_;
  36. }
  37. ThreadPool::~ThreadPool() {
  38. shutdown(std::chrono::milliseconds(1000));
  39. }
  40. ErrorCode ThreadPool::initialize() {
  41. std::lock_guard<std::mutex> lock(configMutex_);
  42. if (state_.load() != ThreadPoolState::STOPPED) {
  43. return ErrorCode::INVALID_STATE;
  44. }
  45. try {
  46. // 预分配线程容器
  47. workers_.reserve(config_.maxThreads);
  48. LOG_INFO("Thread pool initialized with {} min threads, {} max threads",
  49. config_.minThreads, config_.maxThreads);
  50. return ErrorCode::SUCCESS;
  51. } catch (const std::exception& e) {
  52. LOG_ERROR("Failed to initialize thread pool: {}", e.what());
  53. return ErrorCode::INITIALIZATION_FAILED;
  54. }
  55. }
  56. ErrorCode ThreadPool::start() {
  57. std::lock_guard<std::mutex> lock(configMutex_);
  58. if (state_.load() == ThreadPoolState::RUNNING) {
  59. return ErrorCode::SUCCESS;
  60. }
  61. if (state_.load() != ThreadPoolState::STOPPED) {
  62. return ErrorCode::INVALID_STATE;
  63. }
  64. state_.store(ThreadPoolState::STARTING);
  65. shutdown_.store(false);
  66. try {
  67. // 创建最小数量的工作线程
  68. for (size_t i = 0; i < config_.minThreads; ++i) {
  69. workers_.emplace_back(&ThreadPool::workerThread, this, i);
  70. }
  71. activeThreads_.store(config_.minThreads);
  72. idleThreads_.store(config_.minThreads);
  73. state_.store(ThreadPoolState::RUNNING);
  74. startTime_ = std::chrono::steady_clock::now();
  75. stats_.startTime = startTime_;
  76. LOG_INFO("Thread pool started with {} threads", config_.minThreads);
  77. return ErrorCode::SUCCESS;
  78. } catch (const std::exception& e) {
  79. LOG_ERROR("Failed to start thread pool: {}", e.what());
  80. state_.store(ThreadPoolState::STOPPED);
  81. return ErrorCode::OPERATION_FAILED;
  82. }
  83. }
  84. ErrorCode ThreadPool::stop(bool waitForCompletion) {
  85. if (state_.load() == ThreadPoolState::STOPPED) {
  86. return ErrorCode::SUCCESS;
  87. }
  88. state_.store(ThreadPoolState::STOPPING);
  89. if (waitForCompletion) {
  90. waitForAllTasks();
  91. } else {
  92. clearPendingTasks();
  93. }
  94. // 通知所有工作线程停止
  95. {
  96. std::lock_guard<std::mutex> lock(queueMutex_);
  97. shutdown_.store(true);
  98. }
  99. queueCondition_.notify_all();
  100. // 等待所有线程结束
  101. for (auto& worker : workers_) {
  102. if (worker.joinable()) {
  103. worker.join();
  104. }
  105. }
  106. workers_.clear();
  107. activeThreads_.store(0);
  108. idleThreads_.store(0);
  109. state_.store(ThreadPoolState::STOPPED);
  110. LOG_INFO("Thread pool stopped");
  111. return ErrorCode::SUCCESS;
  112. }
  113. ErrorCode ThreadPool::pause() {
  114. if (state_.load() != ThreadPoolState::RUNNING) {
  115. return ErrorCode::INVALID_STATE;
  116. }
  117. state_.store(ThreadPoolState::PAUSED);
  118. LOG_INFO("Thread pool paused");
  119. return ErrorCode::SUCCESS;
  120. }
  121. ErrorCode ThreadPool::resume() {
  122. if (state_.load() != ThreadPoolState::PAUSED) {
  123. return ErrorCode::INVALID_STATE;
  124. }
  125. state_.store(ThreadPoolState::RUNNING);
  126. pauseCondition_.notify_all();
  127. LOG_INFO("Thread pool resumed");
  128. return ErrorCode::SUCCESS;
  129. }
  130. ErrorCode ThreadPool::shutdown(std::chrono::milliseconds timeout) {
  131. auto startTime = std::chrono::steady_clock::now();
  132. // 首先尝试正常停止
  133. ErrorCode result = stop(false);
  134. // 等待指定时间
  135. while (state_.load() != ThreadPoolState::STOPPED &&
  136. std::chrono::steady_clock::now() - startTime < timeout) {
  137. std::this_thread::sleep_for(std::chrono::milliseconds(10));
  138. }
  139. if (state_.load() != ThreadPoolState::STOPPED) {
  140. LOG_WARNING("Thread pool shutdown timeout, forcing termination");
  141. // 强制终止(在实际实现中可能需要更复杂的逻辑)
  142. state_.store(ThreadPoolState::STOPPED);
  143. }
  144. return result;
  145. }
  146. void ThreadPool::submitTask(std::shared_ptr<Task> task) {
  147. if (!task || state_.load() == ThreadPoolState::STOPPED) {
  148. return;
  149. }
  150. {
  151. std::lock_guard<std::mutex> lock(queueMutex_);
  152. if (config_.enablePriority) {
  153. taskQueue_.push(task);
  154. } else {
  155. normalQueue_.push(task);
  156. }
  157. // 记录任务
  158. {
  159. std::lock_guard<std::mutex> tasksLock(tasksMutex_);
  160. activeTasks_[task->getId()] = task;
  161. }
  162. totalSubmittedTasks_++;
  163. }
  164. // 检查是否需要创建新线程
  165. if (shouldCreateNewThread()) {
  166. adjustThreadPoolSize();
  167. }
  168. queueCondition_.notify_one();
  169. }
  170. bool ThreadPool::cancelTask(uint64_t taskId) {
  171. std::lock_guard<std::mutex> lock(tasksMutex_);
  172. auto it = activeTasks_.find(taskId);
  173. if (it != activeTasks_.end()) {
  174. it->second->cancel();
  175. activeTasks_.erase(it);
  176. totalCancelledTasks_++;
  177. return true;
  178. }
  179. return false;
  180. }
  181. bool ThreadPool::isTaskCompleted(uint64_t taskId) const {
  182. std::lock_guard<std::mutex> lock(tasksMutex_);
  183. auto it = completedTasks_.find(taskId);
  184. return it != completedTasks_.end();
  185. }
  186. TaskStatus ThreadPool::getTaskStatus(uint64_t taskId) const {
  187. std::lock_guard<std::mutex> lock(tasksMutex_);
  188. // 检查活跃任务
  189. auto activeIt = activeTasks_.find(taskId);
  190. if (activeIt != activeTasks_.end()) {
  191. return activeIt->second->getStatus();
  192. }
  193. // 检查已完成任务
  194. auto completedIt = completedTasks_.find(taskId);
  195. if (completedIt != completedTasks_.end()) {
  196. return completedIt->second->status;
  197. }
  198. return TaskStatus::PENDING;
  199. }
  200. std::shared_ptr<TaskInfo> ThreadPool::getTaskInfo(uint64_t taskId) const {
  201. std::lock_guard<std::mutex> lock(tasksMutex_);
  202. // 检查活跃任务
  203. auto activeIt = activeTasks_.find(taskId);
  204. if (activeIt != activeTasks_.end()) {
  205. return activeIt->second->getInfo();
  206. }
  207. // 检查已完成任务
  208. auto completedIt = completedTasks_.find(taskId);
  209. if (completedIt != completedTasks_.end()) {
  210. return completedIt->second;
  211. }
  212. return nullptr;
  213. }
  214. std::vector<std::shared_ptr<TaskInfo>> ThreadPool::getAllTaskInfo() const {
  215. std::lock_guard<std::mutex> lock(tasksMutex_);
  216. std::vector<std::shared_ptr<TaskInfo>> result;
  217. // 添加活跃任务
  218. for (const auto& pair : activeTasks_) {
  219. result.push_back(pair.second->getInfo());
  220. }
  221. // 添加已完成任务
  222. for (const auto& pair : completedTasks_) {
  223. result.push_back(pair.second);
  224. }
  225. return result;
  226. }
  227. ThreadPoolState ThreadPool::getState() const {
  228. return state_.load();
  229. }
  230. bool ThreadPool::isRunning() const {
  231. return state_.load() == ThreadPoolState::RUNNING;
  232. }
  233. bool ThreadPool::isPaused() const {
  234. return state_.load() == ThreadPoolState::PAUSED;
  235. }
  236. bool ThreadPool::isStopped() const {
  237. return state_.load() == ThreadPoolState::STOPPED;
  238. }
  239. ThreadPoolStats ThreadPool::getStats() const {
  240. std::lock_guard<std::mutex> lock(statsMutex_);
  241. ThreadPoolStats currentStats = stats_;
  242. currentStats.totalThreads = workers_.size();
  243. currentStats.activeThreads = activeThreads_.load();
  244. currentStats.idleThreads = idleThreads_.load();
  245. {
  246. std::lock_guard<std::mutex> queueLock(queueMutex_);
  247. if (config_.enablePriority) {
  248. currentStats.pendingTasks = taskQueue_.size();
  249. } else {
  250. currentStats.pendingTasks = normalQueue_.size();
  251. }
  252. }
  253. {
  254. std::lock_guard<std::mutex> tasksLock(tasksMutex_);
  255. currentStats.runningTasks = activeTasks_.size();
  256. }
  257. currentStats.completedTasks = totalCompletedTasks_.load();
  258. currentStats.failedTasks = totalFailedTasks_.load();
  259. currentStats.cancelledTasks = totalCancelledTasks_.load();
  260. return currentStats;
  261. }
  262. size_t ThreadPool::getActiveThreadCount() const {
  263. return activeThreads_.load();
  264. }
  265. size_t ThreadPool::getIdleThreadCount() const {
  266. return idleThreads_.load();
  267. }
  268. size_t ThreadPool::getPendingTaskCount() const {
  269. std::lock_guard<std::mutex> lock(queueMutex_);
  270. if (config_.enablePriority) {
  271. return taskQueue_.size();
  272. } else {
  273. return normalQueue_.size();
  274. }
  275. }
  276. size_t ThreadPool::getCompletedTaskCount() const {
  277. return totalCompletedTasks_.load();
  278. }
  279. void ThreadPool::setConfig(const ThreadPoolConfig& config) {
  280. std::lock_guard<std::mutex> lock(configMutex_);
  281. config_ = config;
  282. // 验证配置
  283. if (config_.minThreads > config_.maxThreads) {
  284. config_.minThreads = config_.maxThreads;
  285. }
  286. if (config_.minThreads == 0) {
  287. config_.minThreads = 1;
  288. }
  289. }
  290. ThreadPoolConfig ThreadPool::getConfig() const {
  291. std::lock_guard<std::mutex> lock(configMutex_);
  292. return config_;
  293. }
  294. ErrorCode ThreadPool::resizeThreadPool(size_t newSize) {
  295. if (newSize == 0) {
  296. return ErrorCode::INVALID_PARAMETER;
  297. }
  298. std::lock_guard<std::mutex> lock(configMutex_);
  299. if (newSize < config_.minThreads) {
  300. config_.minThreads = newSize;
  301. }
  302. if (newSize > config_.maxThreads) {
  303. config_.maxThreads = newSize;
  304. }
  305. adjustThreadPoolSize();
  306. LOG_INFO("Thread pool resized to {} threads", newSize);
  307. return ErrorCode::SUCCESS;
  308. }
  309. void ThreadPool::setThreadNamePrefix(const std::string& prefix) {
  310. std::lock_guard<std::mutex> lock(configMutex_);
  311. config_.threadNamePrefix = prefix;
  312. }
  313. void ThreadPool::waitForAllTasks() {
  314. std::unique_lock<std::mutex> lock(queueMutex_);
  315. completionCondition_.wait(lock, [this] {
  316. bool queueEmpty = config_.enablePriority ? taskQueue_.empty() : normalQueue_.empty();
  317. bool noActiveTasks;
  318. {
  319. std::lock_guard<std::mutex> tasksLock(tasksMutex_);
  320. noActiveTasks = activeTasks_.empty();
  321. }
  322. return queueEmpty && noActiveTasks;
  323. });
  324. }
  325. bool ThreadPool::waitForAllTasks(std::chrono::milliseconds timeout) {
  326. std::unique_lock<std::mutex> lock(queueMutex_);
  327. return completionCondition_.wait_for(lock, timeout, [this] {
  328. bool queueEmpty = config_.enablePriority ? taskQueue_.empty() : normalQueue_.empty();
  329. bool noActiveTasks;
  330. {
  331. std::lock_guard<std::mutex> tasksLock(tasksMutex_);
  332. noActiveTasks = activeTasks_.empty();
  333. }
  334. return queueEmpty && noActiveTasks;
  335. });
  336. }
  337. void ThreadPool::clearPendingTasks() {
  338. std::lock_guard<std::mutex> lock(queueMutex_);
  339. size_t cancelledCount = 0;
  340. if (config_.enablePriority) {
  341. while (!taskQueue_.empty()) {
  342. auto task = taskQueue_.top();
  343. taskQueue_.pop();
  344. task->cancel();
  345. cancelledCount++;
  346. }
  347. } else {
  348. while (!normalQueue_.empty()) {
  349. auto task = normalQueue_.front();
  350. normalQueue_.pop();
  351. task->cancel();
  352. cancelledCount++;
  353. }
  354. }
  355. totalCancelledTasks_ += cancelledCount;
  356. LOG_INFO("Cleared {} pending tasks", cancelledCount);
  357. }
  358. // 私有方法实现
  359. void ThreadPool::workerThread(size_t threadIndex) {
  360. // 设置线程名称
  361. std::ostringstream oss;
  362. oss << config_.threadNamePrefix << "-" << threadIndex;
  363. setThreadName(oss.str());
  364. LOG_DEBUG("Worker thread {} started", threadIndex);
  365. while (!shutdown_.load()) {
  366. // 处理暂停状态
  367. if (state_.load() == ThreadPoolState::PAUSED) {
  368. std::unique_lock<std::mutex> pauseLock(queueMutex_);
  369. pauseCondition_.wait(pauseLock, [this] {
  370. return state_.load() != ThreadPoolState::PAUSED || shutdown_.load();
  371. });
  372. continue;
  373. }
  374. // 获取任务
  375. auto task = getNextTask();
  376. if (!task) {
  377. continue;
  378. }
  379. // 更新线程状态
  380. idleThreads_--;
  381. // 执行任务
  382. try {
  383. task->execute();
  384. totalCompletedTasks_++;
  385. } catch (const std::exception& e) {
  386. LOG_ERROR("Task execution failed: {}", e.what());
  387. totalFailedTasks_++;
  388. } catch (...) {
  389. LOG_ERROR("Task execution failed with unknown exception");
  390. totalFailedTasks_++;
  391. }
  392. // 移动任务到已完成列表
  393. {
  394. std::lock_guard<std::mutex> tasksLock(tasksMutex_);
  395. auto it = activeTasks_.find(task->getId());
  396. if (it != activeTasks_.end()) {
  397. completedTasks_[task->getId()] = task->getInfo();
  398. activeTasks_.erase(it);
  399. }
  400. }
  401. // 更新线程状态
  402. idleThreads_++;
  403. // 通知等待的线程
  404. completionCondition_.notify_all();
  405. // 清理旧的已完成任务
  406. cleanupCompletedTasks();
  407. }
  408. LOG_DEBUG("Worker thread {} stopped", threadIndex);
  409. }
  410. std::shared_ptr<Task> ThreadPool::getNextTask() {
  411. std::unique_lock<std::mutex> lock(queueMutex_);
  412. // 等待任务或停止信号
  413. queueCondition_.wait_for(lock, config_.keepAliveTime, [this] {
  414. bool hasTask = config_.enablePriority ? !taskQueue_.empty() : !normalQueue_.empty();
  415. return hasTask || shutdown_.load();
  416. });
  417. if (shutdown_.load()) {
  418. return nullptr;
  419. }
  420. std::shared_ptr<Task> task;
  421. if (config_.enablePriority && !taskQueue_.empty()) {
  422. task = taskQueue_.top();
  423. taskQueue_.pop();
  424. } else if (!config_.enablePriority && !normalQueue_.empty()) {
  425. task = normalQueue_.front();
  426. normalQueue_.pop();
  427. }
  428. return task;
  429. }
  430. void ThreadPool::adjustThreadPoolSize() {
  431. size_t currentThreads = workers_.size();
  432. size_t pendingTasks = getPendingTaskCount();
  433. size_t idleThreads = idleThreads_.load();
  434. // 决定是否需要创建新线程
  435. if (shouldCreateNewThread() && currentThreads < config_.maxThreads) {
  436. try {
  437. workers_.emplace_back(&ThreadPool::workerThread, this, currentThreads);
  438. activeThreads_++;
  439. idleThreads_++;
  440. LOG_DEBUG("Created new worker thread, total: {}", workers_.size());
  441. } catch (const std::exception& e) {
  442. LOG_ERROR("Failed to create new worker thread: {}", e.what());
  443. }
  444. }
  445. // 注意:线程销毁通常在线程自然退出时处理,这里不实现动态销毁
  446. }
  447. void ThreadPool::updateStatistics() {
  448. std::lock_guard<std::mutex> lock(statsMutex_);
  449. // 更新统计信息
  450. auto now = std::chrono::steady_clock::now();
  451. auto totalTime = std::chrono::duration_cast<std::chrono::milliseconds>(now - stats_.startTime).count();
  452. if (totalCompletedTasks_.load() > 0) {
  453. stats_.averageExecutionTime = static_cast<double>(totalTime) / totalCompletedTasks_.load();
  454. }
  455. }
  456. void ThreadPool::cleanupCompletedTasks() {
  457. std::lock_guard<std::mutex> lock(tasksMutex_);
  458. // 保持最近的1000个已完成任务
  459. const size_t maxCompletedTasks = 1000;
  460. if (completedTasks_.size() > maxCompletedTasks) {
  461. auto it = completedTasks_.begin();
  462. std::advance(it, completedTasks_.size() - maxCompletedTasks);
  463. completedTasks_.erase(completedTasks_.begin(), it);
  464. }
  465. }
  466. bool ThreadPool::shouldCreateNewThread() const {
  467. size_t currentThreads = workers_.size();
  468. size_t pendingTasks = getPendingTaskCount();
  469. size_t idleThreads = idleThreads_.load();
  470. return currentThreads < config_.maxThreads &&
  471. pendingTasks > idleThreads &&
  472. pendingTasks > 0;
  473. }
  474. bool ThreadPool::shouldDestroyThread() const {
  475. size_t currentThreads = workers_.size();
  476. size_t pendingTasks = getPendingTaskCount();
  477. size_t idleThreads = idleThreads_.load();
  478. return currentThreads > config_.minThreads &&
  479. idleThreads > pendingTasks + 1;
  480. }
  481. void ThreadPool::setThreadName(const std::string& name) {
  482. #ifdef _WIN32
  483. // Windows 线程命名
  484. std::wstring wname(name.begin(), name.end());
  485. SetThreadDescription(GetCurrentThread(), wname.c_str());
  486. #elif defined(__linux__)
  487. // Linux 线程命名
  488. prctl(PR_SET_NAME, name.c_str(), 0, 0, 0);
  489. #else
  490. // 其他平台暂不支持
  491. #endif
  492. }
  493. // WorkerThread 实现
  494. WorkerThread::WorkerThread(ThreadPool* pool, size_t index)
  495. : pool_(pool)
  496. , index_(index)
  497. , running_(false)
  498. , shouldStop_(false) {
  499. }
  500. WorkerThread::~WorkerThread() {
  501. stop();
  502. join();
  503. }
  504. void WorkerThread::start() {
  505. if (running_.load()) {
  506. return;
  507. }
  508. shouldStop_.store(false);
  509. thread_ = std::thread(&WorkerThread::run, this);
  510. running_.store(true);
  511. }
  512. void WorkerThread::stop() {
  513. shouldStop_.store(true);
  514. }
  515. void WorkerThread::join() {
  516. if (thread_.joinable()) {
  517. thread_.join();
  518. }
  519. running_.store(false);
  520. }
  521. bool WorkerThread::isRunning() const {
  522. return running_.load();
  523. }
  524. std::thread::id WorkerThread::getId() const {
  525. return thread_.get_id();
  526. }
  527. size_t WorkerThread::getIndex() const {
  528. return index_;
  529. }
  530. void WorkerThread::run() {
  531. // 工作线程的主循环由ThreadPool管理
  532. // 这里可以添加额外的线程特定逻辑
  533. }
  534. // ThreadPoolFactory 实现
  535. std::unique_ptr<ThreadPool> ThreadPoolFactory::createStandardPool() {
  536. ThreadPoolConfig config;
  537. config.minThreads = 2;
  538. config.maxThreads = std::thread::hardware_concurrency();
  539. config.queueCapacity = 1000;
  540. config.keepAliveTime = std::chrono::milliseconds(60000);
  541. config.enablePriority = true;
  542. config.enableStatistics = true;
  543. config.threadNamePrefix = "StandardPool";
  544. return std::make_unique<ThreadPool>(config);
  545. }
  546. std::unique_ptr<ThreadPool> ThreadPoolFactory::createHighPerformancePool() {
  547. ThreadPoolConfig config;
  548. config.minThreads = std::thread::hardware_concurrency();
  549. config.maxThreads = std::thread::hardware_concurrency() * 2;
  550. config.queueCapacity = 5000;
  551. config.keepAliveTime = std::chrono::milliseconds(30000);
  552. config.enablePriority = true;
  553. config.enableStatistics = true;
  554. config.threadNamePrefix = "HighPerfPool";
  555. return std::make_unique<ThreadPool>(config);
  556. }
  557. std::unique_ptr<ThreadPool> ThreadPoolFactory::createLightweightPool() {
  558. ThreadPoolConfig config;
  559. config.minThreads = 1;
  560. config.maxThreads = 4;
  561. config.queueCapacity = 100;
  562. config.keepAliveTime = std::chrono::milliseconds(120000);
  563. config.enablePriority = false;
  564. config.enableStatistics = false;
  565. config.threadNamePrefix = "LightPool";
  566. return std::make_unique<ThreadPool>(config);
  567. }
  568. std::unique_ptr<ThreadPool> ThreadPoolFactory::createRealtimePool() {
  569. ThreadPoolConfig config;
  570. config.minThreads = std::thread::hardware_concurrency();
  571. config.maxThreads = std::thread::hardware_concurrency();
  572. config.queueCapacity = 500;
  573. config.keepAliveTime = std::chrono::milliseconds(10000);
  574. config.enablePriority = true;
  575. config.enableStatistics = true;
  576. config.threadNamePrefix = "RealtimePool";
  577. return std::make_unique<ThreadPool>(config);
  578. }
  579. std::unique_ptr<ThreadPool> ThreadPoolFactory::createCustomPool(const ThreadPoolConfig& config) {
  580. return std::make_unique<ThreadPool>(config);
  581. }
  582. size_t ThreadPoolFactory::getRecommendedThreadCount() {
  583. return std::max(2u, std::thread::hardware_concurrency());
  584. }
  585. size_t ThreadPoolFactory::getCpuCoreCount() {
  586. return std::thread::hardware_concurrency();
  587. }
  588. // ThreadPoolManager 实现
  589. ThreadPoolManager& ThreadPoolManager::getInstance() {
  590. static ThreadPoolManager instance;
  591. return instance;
  592. }
  593. ThreadPoolManager::ThreadPoolManager() {
  594. // 创建默认线程池
  595. defaultPool_ = ThreadPoolFactory::createStandardPool();
  596. defaultPool_->initialize();
  597. defaultPool_->start();
  598. }
  599. ThreadPoolManager::~ThreadPoolManager() {
  600. shutdownAll();
  601. }
  602. ThreadPool* ThreadPoolManager::getDefaultPool() {
  603. return defaultPool_.get();
  604. }
  605. ErrorCode ThreadPoolManager::createPool(const std::string& name, const ThreadPoolConfig& config) {
  606. if (name.empty()) {
  607. return ErrorCode::INVALID_PARAMETER;
  608. }
  609. std::lock_guard<std::mutex> lock(poolsMutex_);
  610. if (namedPools_.find(name) != namedPools_.end()) {
  611. return ErrorCode::ALREADY_EXISTS;
  612. }
  613. auto pool = std::make_unique<ThreadPool>(config);
  614. ErrorCode result = pool->initialize();
  615. if (result != ErrorCode::SUCCESS) {
  616. return result;
  617. }
  618. result = pool->start();
  619. if (result != ErrorCode::SUCCESS) {
  620. return result;
  621. }
  622. namedPools_[name] = std::move(pool);
  623. LOG_INFO("Created thread pool: {}", name);
  624. return ErrorCode::SUCCESS;
  625. }
  626. ThreadPool* ThreadPoolManager::getPool(const std::string& name) {
  627. std::lock_guard<std::mutex> lock(poolsMutex_);
  628. auto it = namedPools_.find(name);
  629. if (it != namedPools_.end()) {
  630. return it->second.get();
  631. }
  632. return nullptr;
  633. }
  634. ErrorCode ThreadPoolManager::destroyPool(const std::string& name) {
  635. std::lock_guard<std::mutex> lock(poolsMutex_);
  636. auto it = namedPools_.find(name);
  637. if (it == namedPools_.end()) {
  638. return ErrorCode::NOT_FOUND;
  639. }
  640. it->second->shutdown();
  641. namedPools_.erase(it);
  642. LOG_INFO("Destroyed thread pool: {}", name);
  643. return ErrorCode::SUCCESS;
  644. }
  645. std::vector<std::string> ThreadPoolManager::getPoolNames() const {
  646. std::lock_guard<std::mutex> lock(poolsMutex_);
  647. std::vector<std::string> names;
  648. names.reserve(namedPools_.size());
  649. for (const auto& pair : namedPools_) {
  650. names.push_back(pair.first);
  651. }
  652. return names;
  653. }
  654. void ThreadPoolManager::shutdownAll() {
  655. std::lock_guard<std::mutex> lock(poolsMutex_);
  656. // 关闭所有命名线程池
  657. for (auto& pair : namedPools_) {
  658. pair.second->shutdown();
  659. }
  660. namedPools_.clear();
  661. // 关闭默认线程池
  662. if (defaultPool_) {
  663. defaultPool_->shutdown();
  664. defaultPool_.reset();
  665. }
  666. LOG_INFO("All thread pools shutdown");
  667. }
  668. } // namespace utils
  669. } // namespace av