utils_thread_pool.h 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. #pragma once
  2. #include "../base/types.h"
  3. #include <memory>
  4. #include <thread>
  5. #include <vector>
  6. #include <queue>
  7. #include <functional>
  8. #include <mutex>
  9. #include <condition_variable>
  10. #include <atomic>
  11. #include <future>
  12. #include <chrono>
  13. #include <string>
  14. #include <map>
  15. namespace av {
  16. namespace utils {
  17. using namespace av::base;
  18. // 任务优先级
  19. enum class TaskPriority {
  20. LOW = 0,
  21. NORMAL = 1,
  22. HIGH = 2,
  23. CRITICAL = 3
  24. };
  25. // 任务状态
  26. enum class TaskStatus {
  27. PENDING, // 等待执行
  28. RUNNING, // 正在执行
  29. COMPLETED, // 已完成
  30. FAILED, // 执行失败
  31. CANCELLED // 已取消
  32. };
  33. // 线程池状态
  34. enum class ThreadPoolState {
  35. STOPPED, // 已停止
  36. STARTING, // 启动中
  37. RUNNING, // 运行中
  38. STOPPING, // 停止中
  39. PAUSED // 已暂停
  40. };
  41. // 任务信息
  42. struct TaskInfo {
  43. uint64_t id = 0; // 任务ID
  44. std::string name; // 任务名称
  45. TaskPriority priority = TaskPriority::NORMAL; // 优先级
  46. TaskStatus status = TaskStatus::PENDING; // 状态
  47. std::chrono::steady_clock::time_point submitTime; // 提交时间
  48. std::chrono::steady_clock::time_point startTime; // 开始时间
  49. std::chrono::steady_clock::time_point endTime; // 结束时间
  50. std::thread::id threadId; // 执行线程ID
  51. std::string errorMessage; // 错误信息
  52. };
  53. // 线程池统计信息
  54. struct ThreadPoolStats {
  55. size_t totalThreads = 0; // 总线程数
  56. size_t activeThreads = 0; // 活跃线程数
  57. size_t idleThreads = 0; // 空闲线程数
  58. size_t pendingTasks = 0; // 等待任务数
  59. size_t runningTasks = 0; // 运行任务数
  60. size_t completedTasks = 0; // 完成任务数
  61. size_t failedTasks = 0; // 失败任务数
  62. size_t cancelledTasks = 0; // 取消任务数
  63. double averageExecutionTime = 0.0; // 平均执行时间(毫秒)
  64. double averageWaitTime = 0.0; // 平均等待时间(毫秒)
  65. std::chrono::steady_clock::time_point startTime; // 启动时间
  66. };
  67. // 线程池配置
  68. struct ThreadPoolConfig {
  69. size_t minThreads = 2; // 最小线程数
  70. size_t maxThreads = 8; // 最大线程数
  71. size_t queueCapacity = 1000; // 队列容量
  72. std::chrono::milliseconds keepAliveTime{60000}; // 线程保活时间
  73. bool allowCoreThreadTimeout = false; // 允许核心线程超时
  74. bool enablePriority = true; // 启用优先级
  75. bool enableStatistics = true; // 启用统计
  76. std::string threadNamePrefix = "ThreadPool"; // 线程名称前缀
  77. };
  78. // 任务包装器
  79. class Task {
  80. public:
  81. Task() = default;
  82. virtual ~Task() = default;
  83. template<typename F, typename... Args>
  84. Task(F&& f, Args&&... args)
  85. : function_(std::bind(std::forward<F>(f), std::forward<Args>(args)...))
  86. , info_(std::make_shared<TaskInfo>()) {
  87. info_->id = generateTaskId();
  88. info_->submitTime = std::chrono::steady_clock::now();
  89. }
  90. void execute() {
  91. if (function_) {
  92. info_->status = TaskStatus::RUNNING;
  93. info_->startTime = std::chrono::steady_clock::now();
  94. info_->threadId = std::this_thread::get_id();
  95. try {
  96. function_();
  97. info_->status = TaskStatus::COMPLETED;
  98. } catch (const std::exception& e) {
  99. info_->status = TaskStatus::FAILED;
  100. info_->errorMessage = e.what();
  101. } catch (...) {
  102. info_->status = TaskStatus::FAILED;
  103. info_->errorMessage = "Unknown exception";
  104. }
  105. info_->endTime = std::chrono::steady_clock::now();
  106. }
  107. }
  108. void cancel() {
  109. info_->status = TaskStatus::CANCELLED;
  110. }
  111. uint64_t getId() const { return info_->id; }
  112. std::shared_ptr<TaskInfo> getInfo() const { return info_; }
  113. void setName(const std::string& name) { info_->name = name; }
  114. void setPriority(TaskPriority priority) { info_->priority = priority; }
  115. TaskPriority getPriority() const { return info_->priority; }
  116. TaskStatus getStatus() const { return info_->status; }
  117. private:
  118. static uint64_t generateTaskId() {
  119. static std::atomic<uint64_t> counter{0};
  120. return ++counter;
  121. }
  122. private:
  123. std::function<void()> function_;
  124. std::shared_ptr<TaskInfo> info_;
  125. };
  126. // 任务比较器(用于优先级队列)
  127. struct TaskComparator {
  128. bool operator()(const std::shared_ptr<Task>& a, const std::shared_ptr<Task>& b) const {
  129. return a->getPriority() < b->getPriority();
  130. }
  131. };
  132. // 线程池
  133. class ThreadPool {
  134. public:
  135. explicit ThreadPool(const ThreadPoolConfig& config = ThreadPoolConfig());
  136. virtual ~ThreadPool();
  137. // 禁止拷贝和移动
  138. ThreadPool(const ThreadPool&) = delete;
  139. ThreadPool& operator=(const ThreadPool&) = delete;
  140. ThreadPool(ThreadPool&&) = delete;
  141. ThreadPool& operator=(ThreadPool&&) = delete;
  142. // 基本控制
  143. ErrorCode initialize();
  144. ErrorCode start();
  145. ErrorCode stop(bool waitForCompletion = true);
  146. ErrorCode pause();
  147. ErrorCode resume();
  148. ErrorCode shutdown(std::chrono::milliseconds timeout = std::chrono::milliseconds(5000));
  149. // 任务提交
  150. template<typename F, typename... Args>
  151. auto submit(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
  152. using ReturnType = typename std::result_of<F(Args...)>::type;
  153. auto task = std::make_shared<std::packaged_task<ReturnType()>>(
  154. std::bind(std::forward<F>(f), std::forward<Args>(args)...)
  155. );
  156. auto future = task->get_future();
  157. auto taskWrapper = std::make_shared<Task>([task]() { (*task)(); });
  158. submitTask(taskWrapper);
  159. return future;
  160. }
  161. template<typename F, typename... Args>
  162. uint64_t submitWithPriority(TaskPriority priority, F&& f, Args&&... args) {
  163. auto task = std::make_shared<Task>(std::forward<F>(f), std::forward<Args>(args)...);
  164. task->setPriority(priority);
  165. submitTask(task);
  166. return task->getId();
  167. }
  168. template<typename F, typename... Args>
  169. uint64_t submitNamed(const std::string& name, F&& f, Args&&... args) {
  170. auto task = std::make_shared<Task>(std::forward<F>(f), std::forward<Args>(args)...);
  171. task->setName(name);
  172. submitTask(task);
  173. return task->getId();
  174. }
  175. // 任务管理
  176. bool cancelTask(uint64_t taskId);
  177. bool isTaskCompleted(uint64_t taskId) const;
  178. TaskStatus getTaskStatus(uint64_t taskId) const;
  179. std::shared_ptr<TaskInfo> getTaskInfo(uint64_t taskId) const;
  180. std::vector<std::shared_ptr<TaskInfo>> getAllTaskInfo() const;
  181. // 状态查询
  182. ThreadPoolState getState() const;
  183. bool isRunning() const;
  184. bool isPaused() const;
  185. bool isStopped() const;
  186. // 统计信息
  187. ThreadPoolStats getStats() const;
  188. size_t getActiveThreadCount() const;
  189. size_t getIdleThreadCount() const;
  190. size_t getPendingTaskCount() const;
  191. size_t getCompletedTaskCount() const;
  192. // 配置管理
  193. void setConfig(const ThreadPoolConfig& config);
  194. ThreadPoolConfig getConfig() const;
  195. ErrorCode resizeThreadPool(size_t newSize);
  196. void setThreadNamePrefix(const std::string& prefix);
  197. // 等待和同步
  198. void waitForAllTasks();
  199. bool waitForAllTasks(std::chrono::milliseconds timeout);
  200. void clearPendingTasks();
  201. private:
  202. // 内部方法
  203. void submitTask(std::shared_ptr<Task> task);
  204. void workerThread(size_t threadIndex);
  205. std::shared_ptr<Task> getNextTask();
  206. void adjustThreadPoolSize();
  207. void updateStatistics();
  208. void cleanupCompletedTasks();
  209. bool shouldCreateNewThread() const;
  210. bool shouldDestroyThread() const;
  211. void setThreadName(const std::string& name);
  212. private:
  213. ThreadPoolConfig config_;
  214. // 状态管理
  215. std::atomic<ThreadPoolState> state_;
  216. std::atomic<bool> shutdown_;
  217. // 线程管理
  218. std::vector<std::thread> workers_;
  219. std::atomic<size_t> activeThreads_;
  220. std::atomic<size_t> idleThreads_;
  221. // 任务队列
  222. std::priority_queue<std::shared_ptr<Task>, std::vector<std::shared_ptr<Task>>, TaskComparator> taskQueue_;
  223. std::queue<std::shared_ptr<Task>> normalQueue_; // 非优先级队列
  224. // 任务管理
  225. std::map<uint64_t, std::shared_ptr<Task>> activeTasks_;
  226. std::map<uint64_t, std::shared_ptr<TaskInfo>> completedTasks_;
  227. // 统计信息
  228. ThreadPoolStats stats_;
  229. std::atomic<size_t> totalSubmittedTasks_;
  230. std::atomic<size_t> totalCompletedTasks_;
  231. std::atomic<size_t> totalFailedTasks_;
  232. std::atomic<size_t> totalCancelledTasks_;
  233. // 同步控制
  234. mutable std::mutex queueMutex_;
  235. mutable std::mutex tasksMutex_;
  236. mutable std::mutex statsMutex_;
  237. mutable std::mutex configMutex_;
  238. std::condition_variable queueCondition_;
  239. std::condition_variable pauseCondition_;
  240. std::condition_variable completionCondition_;
  241. // 时间管理
  242. std::chrono::steady_clock::time_point startTime_;
  243. };
  244. // 工作线程
  245. class WorkerThread {
  246. public:
  247. WorkerThread(ThreadPool* pool, size_t index);
  248. ~WorkerThread();
  249. void start();
  250. void stop();
  251. void join();
  252. bool isRunning() const;
  253. std::thread::id getId() const;
  254. size_t getIndex() const;
  255. private:
  256. void run();
  257. private:
  258. ThreadPool* pool_;
  259. size_t index_;
  260. std::thread thread_;
  261. std::atomic<bool> running_;
  262. std::atomic<bool> shouldStop_;
  263. };
  264. // 线程池工厂
  265. class ThreadPoolFactory {
  266. public:
  267. // 创建标准线程池
  268. static std::unique_ptr<ThreadPool> createStandardPool();
  269. // 创建高性能线程池
  270. static std::unique_ptr<ThreadPool> createHighPerformancePool();
  271. // 创建轻量级线程池
  272. static std::unique_ptr<ThreadPool> createLightweightPool();
  273. // 创建实时线程池
  274. static std::unique_ptr<ThreadPool> createRealtimePool();
  275. // 创建自定义线程池
  276. static std::unique_ptr<ThreadPool> createCustomPool(const ThreadPoolConfig& config);
  277. // 获取系统推荐的线程数
  278. static size_t getRecommendedThreadCount();
  279. // 获取CPU核心数
  280. static size_t getCpuCoreCount();
  281. };
  282. // 线程池管理器(单例)
  283. class ThreadPoolManager {
  284. public:
  285. static ThreadPoolManager& getInstance();
  286. // 获取默认线程池
  287. ThreadPool* getDefaultPool();
  288. // 创建命名线程池
  289. ErrorCode createPool(const std::string& name, const ThreadPoolConfig& config);
  290. // 获取命名线程池
  291. ThreadPool* getPool(const std::string& name);
  292. // 销毁线程池
  293. ErrorCode destroyPool(const std::string& name);
  294. // 获取所有线程池名称
  295. std::vector<std::string> getPoolNames() const;
  296. // 关闭所有线程池
  297. void shutdownAll();
  298. private:
  299. ThreadPoolManager();
  300. ~ThreadPoolManager();
  301. ThreadPoolManager(const ThreadPoolManager&) = delete;
  302. ThreadPoolManager& operator=(const ThreadPoolManager&) = delete;
  303. private:
  304. std::unique_ptr<ThreadPool> defaultPool_;
  305. std::map<std::string, std::unique_ptr<ThreadPool>> namedPools_;
  306. mutable std::mutex poolsMutex_;
  307. };
  308. // 便利宏
  309. #define THREAD_POOL_SUBMIT(pool, func, ...) \
  310. (pool)->submit(func, ##__VA_ARGS__)
  311. #define THREAD_POOL_SUBMIT_PRIORITY(pool, priority, func, ...) \
  312. (pool)->submitWithPriority(priority, func, ##__VA_ARGS__)
  313. #define THREAD_POOL_SUBMIT_NAMED(pool, name, func, ...) \
  314. (pool)->submitNamed(name, func, ##__VA_ARGS__)
  315. #define DEFAULT_THREAD_POOL() \
  316. av::utils::ThreadPoolManager::getInstance().getDefaultPool()
  317. } // namespace utils
  318. } // namespace av