#pragma once #include "../base/types.h" #include #include #include #include #include #include #include #include #include #include #include #include namespace av { namespace utils { using namespace av::base; // 任务优先级 enum class TaskPriority { LOW = 0, NORMAL = 1, HIGH = 2, CRITICAL = 3 }; // 任务状态 enum class TaskStatus { PENDING, // 等待执行 RUNNING, // 正在执行 COMPLETED, // 已完成 FAILED, // 执行失败 CANCELLED // 已取消 }; // 线程池状态 enum class ThreadPoolState { STOPPED, // 已停止 STARTING, // 启动中 RUNNING, // 运行中 STOPPING, // 停止中 PAUSED // 已暂停 }; // 任务信息 struct TaskInfo { uint64_t id = 0; // 任务ID std::string name; // 任务名称 TaskPriority priority = TaskPriority::NORMAL; // 优先级 TaskStatus status = TaskStatus::PENDING; // 状态 std::chrono::steady_clock::time_point submitTime; // 提交时间 std::chrono::steady_clock::time_point startTime; // 开始时间 std::chrono::steady_clock::time_point endTime; // 结束时间 std::thread::id threadId; // 执行线程ID std::string errorMessage; // 错误信息 }; // 线程池统计信息 struct ThreadPoolStats { size_t totalThreads = 0; // 总线程数 size_t activeThreads = 0; // 活跃线程数 size_t idleThreads = 0; // 空闲线程数 size_t pendingTasks = 0; // 等待任务数 size_t runningTasks = 0; // 运行任务数 size_t completedTasks = 0; // 完成任务数 size_t failedTasks = 0; // 失败任务数 size_t cancelledTasks = 0; // 取消任务数 double averageExecutionTime = 0.0; // 平均执行时间(毫秒) double averageWaitTime = 0.0; // 平均等待时间(毫秒) std::chrono::steady_clock::time_point startTime; // 启动时间 }; // 线程池配置 struct ThreadPoolConfig { size_t minThreads = 2; // 最小线程数 size_t maxThreads = 8; // 最大线程数 size_t queueCapacity = 1000; // 队列容量 std::chrono::milliseconds keepAliveTime{60000}; // 线程保活时间 bool allowCoreThreadTimeout = false; // 允许核心线程超时 bool enablePriority = true; // 启用优先级 bool enableStatistics = true; // 启用统计 std::string threadNamePrefix = "ThreadPool"; // 线程名称前缀 }; // 任务包装器 class Task { public: Task() = default; virtual ~Task() = default; template Task(F&& f, Args&&... args) : function_(std::bind(std::forward(f), std::forward(args)...)) , info_(std::make_shared()) { info_->id = generateTaskId(); info_->submitTime = std::chrono::steady_clock::now(); } void execute() { if (function_) { info_->status = TaskStatus::RUNNING; info_->startTime = std::chrono::steady_clock::now(); info_->threadId = std::this_thread::get_id(); try { function_(); info_->status = TaskStatus::COMPLETED; } catch (const std::exception& e) { info_->status = TaskStatus::FAILED; info_->errorMessage = e.what(); } catch (...) { info_->status = TaskStatus::FAILED; info_->errorMessage = "Unknown exception"; } info_->endTime = std::chrono::steady_clock::now(); } } void cancel() { info_->status = TaskStatus::CANCELLED; } uint64_t getId() const { return info_->id; } std::shared_ptr getInfo() const { return info_; } void setName(const std::string& name) { info_->name = name; } void setPriority(TaskPriority priority) { info_->priority = priority; } TaskPriority getPriority() const { return info_->priority; } TaskStatus getStatus() const { return info_->status; } private: static uint64_t generateTaskId() { static std::atomic counter{0}; return ++counter; } private: std::function function_; std::shared_ptr info_; }; // 任务比较器(用于优先级队列) struct TaskComparator { bool operator()(const std::shared_ptr& a, const std::shared_ptr& b) const { return a->getPriority() < b->getPriority(); } }; // 线程池 class ThreadPool { public: explicit ThreadPool(const ThreadPoolConfig& config = ThreadPoolConfig()); virtual ~ThreadPool(); // 禁止拷贝和移动 ThreadPool(const ThreadPool&) = delete; ThreadPool& operator=(const ThreadPool&) = delete; ThreadPool(ThreadPool&&) = delete; ThreadPool& operator=(ThreadPool&&) = delete; // 基本控制 ErrorCode initialize(); ErrorCode start(); ErrorCode stop(bool waitForCompletion = true); ErrorCode pause(); ErrorCode resume(); ErrorCode shutdown(std::chrono::milliseconds timeout = std::chrono::milliseconds(5000)); // 任务提交 template auto submit(F&& f, Args&&... args) -> std::future::type> { using ReturnType = typename std::result_of::type; auto task = std::make_shared>( std::bind(std::forward(f), std::forward(args)...) ); auto future = task->get_future(); auto taskWrapper = std::make_shared([task]() { (*task)(); }); submitTask(taskWrapper); return future; } template uint64_t submitWithPriority(TaskPriority priority, F&& f, Args&&... args) { auto task = std::make_shared(std::forward(f), std::forward(args)...); task->setPriority(priority); submitTask(task); return task->getId(); } template uint64_t submitNamed(const std::string& name, F&& f, Args&&... args) { auto task = std::make_shared(std::forward(f), std::forward(args)...); task->setName(name); submitTask(task); return task->getId(); } // 任务管理 bool cancelTask(uint64_t taskId); bool isTaskCompleted(uint64_t taskId) const; TaskStatus getTaskStatus(uint64_t taskId) const; std::shared_ptr getTaskInfo(uint64_t taskId) const; std::vector> getAllTaskInfo() const; // 状态查询 ThreadPoolState getState() const; bool isRunning() const; bool isPaused() const; bool isStopped() const; // 统计信息 ThreadPoolStats getStats() const; size_t getActiveThreadCount() const; size_t getIdleThreadCount() const; size_t getPendingTaskCount() const; size_t getCompletedTaskCount() const; // 配置管理 void setConfig(const ThreadPoolConfig& config); ThreadPoolConfig getConfig() const; ErrorCode resizeThreadPool(size_t newSize); void setThreadNamePrefix(const std::string& prefix); // 等待和同步 void waitForAllTasks(); bool waitForAllTasks(std::chrono::milliseconds timeout); void clearPendingTasks(); private: // 内部方法 void submitTask(std::shared_ptr task); void workerThread(size_t threadIndex); std::shared_ptr getNextTask(); void adjustThreadPoolSize(); void updateStatistics(); void cleanupCompletedTasks(); bool shouldCreateNewThread() const; bool shouldDestroyThread() const; void setThreadName(const std::string& name); private: ThreadPoolConfig config_; // 状态管理 std::atomic state_; std::atomic shutdown_; // 线程管理 std::vector workers_; std::atomic activeThreads_; std::atomic idleThreads_; // 任务队列 std::priority_queue, std::vector>, TaskComparator> taskQueue_; std::queue> normalQueue_; // 非优先级队列 // 任务管理 std::map> activeTasks_; std::map> completedTasks_; // 统计信息 ThreadPoolStats stats_; std::atomic totalSubmittedTasks_; std::atomic totalCompletedTasks_; std::atomic totalFailedTasks_; std::atomic totalCancelledTasks_; // 同步控制 mutable std::mutex queueMutex_; mutable std::mutex tasksMutex_; mutable std::mutex statsMutex_; mutable std::mutex configMutex_; std::condition_variable queueCondition_; std::condition_variable pauseCondition_; std::condition_variable completionCondition_; // 时间管理 std::chrono::steady_clock::time_point startTime_; }; // 工作线程 class WorkerThread { public: WorkerThread(ThreadPool* pool, size_t index); ~WorkerThread(); void start(); void stop(); void join(); bool isRunning() const; std::thread::id getId() const; size_t getIndex() const; private: void run(); private: ThreadPool* pool_; size_t index_; std::thread thread_; std::atomic running_; std::atomic shouldStop_; }; // 线程池工厂 class ThreadPoolFactory { public: // 创建标准线程池 static std::unique_ptr createStandardPool(); // 创建高性能线程池 static std::unique_ptr createHighPerformancePool(); // 创建轻量级线程池 static std::unique_ptr createLightweightPool(); // 创建实时线程池 static std::unique_ptr createRealtimePool(); // 创建自定义线程池 static std::unique_ptr createCustomPool(const ThreadPoolConfig& config); // 获取系统推荐的线程数 static size_t getRecommendedThreadCount(); // 获取CPU核心数 static size_t getCpuCoreCount(); }; // 线程池管理器(单例) class ThreadPoolManager { public: static ThreadPoolManager& getInstance(); // 获取默认线程池 ThreadPool* getDefaultPool(); // 创建命名线程池 ErrorCode createPool(const std::string& name, const ThreadPoolConfig& config); // 获取命名线程池 ThreadPool* getPool(const std::string& name); // 销毁线程池 ErrorCode destroyPool(const std::string& name); // 获取所有线程池名称 std::vector getPoolNames() const; // 关闭所有线程池 void shutdownAll(); private: ThreadPoolManager(); ~ThreadPoolManager(); ThreadPoolManager(const ThreadPoolManager&) = delete; ThreadPoolManager& operator=(const ThreadPoolManager&) = delete; private: std::unique_ptr defaultPool_; std::map> namedPools_; mutable std::mutex poolsMutex_; }; // 便利宏 #define THREAD_POOL_SUBMIT(pool, func, ...) \ (pool)->submit(func, ##__VA_ARGS__) #define THREAD_POOL_SUBMIT_PRIORITY(pool, priority, func, ...) \ (pool)->submitWithPriority(priority, func, ##__VA_ARGS__) #define THREAD_POOL_SUBMIT_NAMED(pool, name, func, ...) \ (pool)->submitNamed(name, func, ##__VA_ARGS__) #define DEFAULT_THREAD_POOL() \ av::utils::ThreadPoolManager::getInstance().getDefaultPool() } // namespace utils } // namespace av