threadpool.cpp 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. #include "threadpool.h"
  2. #include <QDebug>
  3. int ThreadPool::m_maxThreads;
  4. int ThreadPool::m_freeThreads;
  5. int ThreadPool::m_initFlag = -1;
  6. int ThreadPool::m_pushIndex = 0;
  7. int ThreadPool::m_readIndex = 0;
  8. int ThreadPool::m_size = 0;
  9. std::vector<ThreadPool::Threads> ThreadPool::m_threadsQueue;
  10. int ThreadPool::m_maxTasks;
  11. std::vector<ThreadPool::Task> ThreadPool::m_tasksQueue;
  12. std::mutex ThreadPool::m_mutex;
  13. std::condition_variable ThreadPool::m_cond;
  14. bool ThreadPool::init(int threadsNum, int tasksNum)
  15. {
  16. if (m_initFlag != -1)
  17. return true;
  18. if (threadsNum <= 0 || tasksNum <= 0)
  19. return false;
  20. m_maxThreads = threadsNum;
  21. m_maxTasks = tasksNum;
  22. m_freeThreads = m_maxThreads;
  23. m_threadsQueue.resize(m_maxThreads);
  24. m_tasksQueue.resize(m_maxTasks);
  25. for (int i = 0; i < m_maxThreads; i++) {
  26. m_threadsQueue[i].isWorking = false;
  27. m_threadsQueue[i].isTerminate = false;
  28. std::thread* _thread = new std::thread(threadEventLoop, &m_threadsQueue[i]);
  29. if (!_thread) {
  30. return false;
  31. }
  32. m_threadsQueue[i].id = _thread->get_id();
  33. _thread->detach();
  34. }
  35. m_initFlag = 1;
  36. return true;
  37. }
  38. bool ThreadPool::addTask(std::function<void(std::shared_ptr<void>)> func, std::shared_ptr<void> par)
  39. {
  40. std::unique_lock<std::mutex> lock(m_mutex);
  41. if (m_size >= m_maxTasks)
  42. return false;
  43. m_tasksQueue.at(m_pushIndex).func = func;
  44. m_tasksQueue.at(m_pushIndex).par = par;
  45. m_size++;
  46. m_pushIndex = (m_pushIndex + 1) % m_maxTasks;
  47. //qDebug()<<"free threads:"<<m_freeThreads;
  48. m_cond.notify_one();
  49. return true;
  50. }
  51. void ThreadPool::threadEventLoop(void* arg)
  52. {
  53. auto theThread = reinterpret_cast<Threads*>(arg);
  54. while (true) {
  55. std::unique_lock<std::mutex> lock(m_mutex);
  56. while (!m_size) {
  57. if (theThread->isTerminate)
  58. break;
  59. // 优化:使用超时等待,避免无限期阻塞
  60. m_cond.wait_for(lock, std::chrono::milliseconds(100));
  61. }
  62. if (theThread->isTerminate)
  63. break;
  64. Task task = m_tasksQueue[m_readIndex];
  65. m_tasksQueue[m_readIndex].func = nullptr;
  66. m_tasksQueue[m_readIndex].par.reset();
  67. m_readIndex = (m_readIndex + 1) % m_maxTasks;
  68. m_size--;
  69. m_freeThreads--;
  70. lock.unlock();
  71. theThread->isWorking = true;
  72. // 性能优化:添加任务执行时间监控
  73. auto startTime = std::chrono::high_resolution_clock::now();
  74. //执行任务函数
  75. task.func(task.par);
  76. auto endTime = std::chrono::high_resolution_clock::now();
  77. auto duration = std::chrono::duration_cast<std::chrono::microseconds>(endTime - startTime);
  78. // 如果任务执行时间过长,输出警告
  79. if (duration.count() > 50000) { // 50ms
  80. qDebug() << "Long running task detected:" << duration.count() << "microseconds";
  81. }
  82. theThread->isWorking = false;
  83. lock.lock();
  84. m_freeThreads++;
  85. }
  86. }
  87. void ThreadPool::releasePool()
  88. {
  89. std::unique_lock<std::mutex> lock(m_mutex);
  90. for (int i = 0; i < m_maxThreads; i++) {
  91. m_threadsQueue[i].isTerminate = true;
  92. }
  93. m_cond.notify_all();
  94. lock.unlock();
  95. //等待线程全部退出
  96. std::this_thread::sleep_for(std::chrono::milliseconds(500));
  97. }