threadpool.cpp 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. #include "threadpool.h"
  2. #include <QDebug>
  3. int ThreadPool::m_maxThreads;
  4. int ThreadPool::m_freeThreads;
  5. int ThreadPool::m_initFlag = -1;
  6. int ThreadPool::m_pushIndex = 0;
  7. int ThreadPool::m_readIndex = 0;
  8. int ThreadPool::m_size = 0;
  9. std::vector<ThreadPool::Threads> ThreadPool::m_threadsQueue;
  10. int ThreadPool::m_maxTasks;
  11. std::vector<ThreadPool::Task> ThreadPool::m_tasksQueue;
  12. std::mutex ThreadPool::m_mutex;
  13. std::condition_variable ThreadPool::m_cond;
  14. bool ThreadPool::init(int threadsNum, int tasksNum)
  15. {
  16. if (m_initFlag != -1)
  17. return true;
  18. if (threadsNum <= 0 || tasksNum <= 0)
  19. return false;
  20. m_maxThreads = threadsNum;
  21. m_maxTasks = tasksNum;
  22. m_freeThreads = m_maxThreads;
  23. m_threadsQueue.resize(m_maxThreads);
  24. m_tasksQueue.resize(m_maxTasks);
  25. for (int i = 0; i < m_maxThreads; i++) {
  26. m_threadsQueue[i].isWorking = false;
  27. m_threadsQueue[i].isTerminate = false;
  28. std::thread* _thread = new std::thread(threadEventLoop, &m_threadsQueue[i]);
  29. if (!_thread) {
  30. return false;
  31. }
  32. m_threadsQueue[i].id = _thread->get_id();
  33. _thread->detach();
  34. }
  35. m_initFlag = 1;
  36. return true;
  37. }
  38. bool ThreadPool::addTask(std::function<void(std::shared_ptr<void>)> func, std::shared_ptr<void> par)
  39. {
  40. std::unique_lock<std::mutex> lock(m_mutex);
  41. if (m_size >= m_maxTasks)
  42. return false;
  43. m_tasksQueue.at(m_pushIndex).func = func;
  44. m_tasksQueue.at(m_pushIndex).par = par;
  45. m_size++;
  46. m_pushIndex = (m_pushIndex + 1) % m_maxTasks;
  47. //qDebug()<<"free threads:"<<m_freeThreads;
  48. m_cond.notify_one();
  49. return true;
  50. }
  51. void ThreadPool::threadEventLoop(void* arg)
  52. {
  53. auto theThread = reinterpret_cast<Threads*>(arg);
  54. while (true) {
  55. std::unique_lock<std::mutex> lock(m_mutex);
  56. while (!m_size) {
  57. if (theThread->isTerminate)
  58. break;
  59. m_cond.wait(lock);
  60. }
  61. if (theThread->isTerminate)
  62. break;
  63. Task task = m_tasksQueue[m_readIndex];
  64. m_tasksQueue[m_readIndex].func = nullptr;
  65. m_tasksQueue[m_readIndex].par.reset();
  66. m_readIndex = (m_readIndex + 1) % m_maxTasks;
  67. m_size--;
  68. m_freeThreads--;
  69. lock.unlock();
  70. theThread->isWorking = true;
  71. //执行任务函数
  72. task.func(task.par);
  73. theThread->isWorking = false;
  74. lock.lock();
  75. m_freeThreads++;
  76. }
  77. }
  78. void ThreadPool::releasePool()
  79. {
  80. std::unique_lock<std::mutex> lock(m_mutex);
  81. for (int i = 0; i < m_maxThreads; i++) {
  82. m_threadsQueue[i].isTerminate = true;
  83. }
  84. m_cond.notify_all();
  85. lock.unlock();
  86. //等待线程全部退出
  87. std::this_thread::sleep_for(std::chrono::milliseconds(500));
  88. }