| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390 |
- #pragma once
- #include "../base/types.h"
- #include <memory>
- #include <thread>
- #include <vector>
- #include <queue>
- #include <functional>
- #include <mutex>
- #include <condition_variable>
- #include <atomic>
- #include <future>
- #include <chrono>
- #include <string>
- #include <map>
- namespace av {
- namespace utils {
- using namespace av::base;
- // 任务优先级
- enum class TaskPriority {
- LOW = 0,
- NORMAL = 1,
- HIGH = 2,
- CRITICAL = 3
- };
- // 任务状态
- enum class TaskStatus {
- PENDING, // 等待执行
- RUNNING, // 正在执行
- COMPLETED, // 已完成
- FAILED, // 执行失败
- CANCELLED // 已取消
- };
- // 线程池状态
- enum class ThreadPoolState {
- STOPPED, // 已停止
- STARTING, // 启动中
- RUNNING, // 运行中
- STOPPING, // 停止中
- PAUSED // 已暂停
- };
- // 任务信息
- struct TaskInfo {
- uint64_t id = 0; // 任务ID
- std::string name; // 任务名称
- TaskPriority priority = TaskPriority::NORMAL; // 优先级
- TaskStatus status = TaskStatus::PENDING; // 状态
- std::chrono::steady_clock::time_point submitTime; // 提交时间
- std::chrono::steady_clock::time_point startTime; // 开始时间
- std::chrono::steady_clock::time_point endTime; // 结束时间
- std::thread::id threadId; // 执行线程ID
- std::string errorMessage; // 错误信息
- };
- // 线程池统计信息
- struct ThreadPoolStats {
- size_t totalThreads = 0; // 总线程数
- size_t activeThreads = 0; // 活跃线程数
- size_t idleThreads = 0; // 空闲线程数
- size_t pendingTasks = 0; // 等待任务数
- size_t runningTasks = 0; // 运行任务数
- size_t completedTasks = 0; // 完成任务数
- size_t failedTasks = 0; // 失败任务数
- size_t cancelledTasks = 0; // 取消任务数
- double averageExecutionTime = 0.0; // 平均执行时间(毫秒)
- double averageWaitTime = 0.0; // 平均等待时间(毫秒)
- std::chrono::steady_clock::time_point startTime; // 启动时间
- };
- // 线程池配置
- struct ThreadPoolConfig {
- size_t minThreads = 2; // 最小线程数
- size_t maxThreads = 8; // 最大线程数
- size_t queueCapacity = 1000; // 队列容量
- std::chrono::milliseconds keepAliveTime{60000}; // 线程保活时间
- bool allowCoreThreadTimeout = false; // 允许核心线程超时
- bool enablePriority = true; // 启用优先级
- bool enableStatistics = true; // 启用统计
- std::string threadNamePrefix = "ThreadPool"; // 线程名称前缀
- };
- // 任务包装器
- class Task {
- public:
- Task() = default;
- virtual ~Task() = default;
-
- template<typename F, typename... Args>
- Task(F&& f, Args&&... args)
- : function_(std::bind(std::forward<F>(f), std::forward<Args>(args)...))
- , info_(std::make_shared<TaskInfo>()) {
- info_->id = generateTaskId();
- info_->submitTime = std::chrono::steady_clock::now();
- }
-
- void execute() {
- if (function_) {
- info_->status = TaskStatus::RUNNING;
- info_->startTime = std::chrono::steady_clock::now();
- info_->threadId = std::this_thread::get_id();
-
- try {
- function_();
- info_->status = TaskStatus::COMPLETED;
- } catch (const std::exception& e) {
- info_->status = TaskStatus::FAILED;
- info_->errorMessage = e.what();
- } catch (...) {
- info_->status = TaskStatus::FAILED;
- info_->errorMessage = "Unknown exception";
- }
-
- info_->endTime = std::chrono::steady_clock::now();
- }
- }
-
- void cancel() {
- info_->status = TaskStatus::CANCELLED;
- }
-
- uint64_t getId() const { return info_->id; }
- std::shared_ptr<TaskInfo> getInfo() const { return info_; }
-
- void setName(const std::string& name) { info_->name = name; }
- void setPriority(TaskPriority priority) { info_->priority = priority; }
-
- TaskPriority getPriority() const { return info_->priority; }
- TaskStatus getStatus() const { return info_->status; }
-
- private:
- static uint64_t generateTaskId() {
- static std::atomic<uint64_t> counter{0};
- return ++counter;
- }
-
- private:
- std::function<void()> function_;
- std::shared_ptr<TaskInfo> info_;
- };
- // 任务比较器(用于优先级队列)
- struct TaskComparator {
- bool operator()(const std::shared_ptr<Task>& a, const std::shared_ptr<Task>& b) const {
- return a->getPriority() < b->getPriority();
- }
- };
- // 线程池
- class ThreadPool {
- public:
- explicit ThreadPool(const ThreadPoolConfig& config = ThreadPoolConfig());
- virtual ~ThreadPool();
-
- // 禁止拷贝和移动
- ThreadPool(const ThreadPool&) = delete;
- ThreadPool& operator=(const ThreadPool&) = delete;
- ThreadPool(ThreadPool&&) = delete;
- ThreadPool& operator=(ThreadPool&&) = delete;
-
- // 基本控制
- ErrorCode initialize();
- ErrorCode start();
- ErrorCode stop(bool waitForCompletion = true);
- ErrorCode pause();
- ErrorCode resume();
- ErrorCode shutdown(std::chrono::milliseconds timeout = std::chrono::milliseconds(5000));
-
- // 任务提交
- template<typename F, typename... Args>
- auto submit(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
- using ReturnType = typename std::result_of<F(Args...)>::type;
-
- auto task = std::make_shared<std::packaged_task<ReturnType()>>(
- std::bind(std::forward<F>(f), std::forward<Args>(args)...)
- );
-
- auto future = task->get_future();
- auto taskWrapper = std::make_shared<Task>([task]() { (*task)(); });
-
- submitTask(taskWrapper);
- return future;
- }
-
- template<typename F, typename... Args>
- uint64_t submitWithPriority(TaskPriority priority, F&& f, Args&&... args) {
- auto task = std::make_shared<Task>(std::forward<F>(f), std::forward<Args>(args)...);
- task->setPriority(priority);
- submitTask(task);
- return task->getId();
- }
-
- template<typename F, typename... Args>
- uint64_t submitNamed(const std::string& name, F&& f, Args&&... args) {
- auto task = std::make_shared<Task>(std::forward<F>(f), std::forward<Args>(args)...);
- task->setName(name);
- submitTask(task);
- return task->getId();
- }
-
- // 任务管理
- bool cancelTask(uint64_t taskId);
- bool isTaskCompleted(uint64_t taskId) const;
- TaskStatus getTaskStatus(uint64_t taskId) const;
- std::shared_ptr<TaskInfo> getTaskInfo(uint64_t taskId) const;
- std::vector<std::shared_ptr<TaskInfo>> getAllTaskInfo() const;
-
- // 状态查询
- ThreadPoolState getState() const;
- bool isRunning() const;
- bool isPaused() const;
- bool isStopped() const;
-
- // 统计信息
- ThreadPoolStats getStats() const;
- size_t getActiveThreadCount() const;
- size_t getIdleThreadCount() const;
- size_t getPendingTaskCount() const;
- size_t getCompletedTaskCount() const;
-
- // 配置管理
- void setConfig(const ThreadPoolConfig& config);
- ThreadPoolConfig getConfig() const;
- ErrorCode resizeThreadPool(size_t newSize);
- void setThreadNamePrefix(const std::string& prefix);
-
- // 等待和同步
- void waitForAllTasks();
- bool waitForAllTasks(std::chrono::milliseconds timeout);
- void clearPendingTasks();
-
- private:
- // 内部方法
- void submitTask(std::shared_ptr<Task> task);
- void workerThread(size_t threadIndex);
- std::shared_ptr<Task> getNextTask();
- void adjustThreadPoolSize();
- void updateStatistics();
- void cleanupCompletedTasks();
- bool shouldCreateNewThread() const;
- bool shouldDestroyThread() const;
- void setThreadName(const std::string& name);
-
- private:
- ThreadPoolConfig config_;
-
- // 状态管理
- std::atomic<ThreadPoolState> state_;
- std::atomic<bool> shutdown_;
-
- // 线程管理
- std::vector<std::thread> workers_;
- std::atomic<size_t> activeThreads_;
- std::atomic<size_t> idleThreads_;
-
- // 任务队列
- std::priority_queue<std::shared_ptr<Task>, std::vector<std::shared_ptr<Task>>, TaskComparator> taskQueue_;
- std::queue<std::shared_ptr<Task>> normalQueue_; // 非优先级队列
-
- // 任务管理
- std::map<uint64_t, std::shared_ptr<Task>> activeTasks_;
- std::map<uint64_t, std::shared_ptr<TaskInfo>> completedTasks_;
-
- // 统计信息
- ThreadPoolStats stats_;
- std::atomic<size_t> totalSubmittedTasks_;
- std::atomic<size_t> totalCompletedTasks_;
- std::atomic<size_t> totalFailedTasks_;
- std::atomic<size_t> totalCancelledTasks_;
-
- // 同步控制
- mutable std::mutex queueMutex_;
- mutable std::mutex tasksMutex_;
- mutable std::mutex statsMutex_;
- mutable std::mutex configMutex_;
-
- std::condition_variable queueCondition_;
- std::condition_variable pauseCondition_;
- std::condition_variable completionCondition_;
-
- // 时间管理
- std::chrono::steady_clock::time_point startTime_;
- };
- // 工作线程
- class WorkerThread {
- public:
- WorkerThread(ThreadPool* pool, size_t index);
- ~WorkerThread();
-
- void start();
- void stop();
- void join();
-
- bool isRunning() const;
- std::thread::id getId() const;
- size_t getIndex() const;
-
- private:
- void run();
-
- private:
- ThreadPool* pool_;
- size_t index_;
- std::thread thread_;
- std::atomic<bool> running_;
- std::atomic<bool> shouldStop_;
- };
- // 线程池工厂
- class ThreadPoolFactory {
- public:
- // 创建标准线程池
- static std::unique_ptr<ThreadPool> createStandardPool();
-
- // 创建高性能线程池
- static std::unique_ptr<ThreadPool> createHighPerformancePool();
-
- // 创建轻量级线程池
- static std::unique_ptr<ThreadPool> createLightweightPool();
-
- // 创建实时线程池
- static std::unique_ptr<ThreadPool> createRealtimePool();
-
- // 创建自定义线程池
- static std::unique_ptr<ThreadPool> createCustomPool(const ThreadPoolConfig& config);
-
- // 获取系统推荐的线程数
- static size_t getRecommendedThreadCount();
-
- // 获取CPU核心数
- static size_t getCpuCoreCount();
- };
- // 线程池管理器(单例)
- class ThreadPoolManager {
- public:
- static ThreadPoolManager& getInstance();
-
- // 获取默认线程池
- ThreadPool* getDefaultPool();
-
- // 创建命名线程池
- ErrorCode createPool(const std::string& name, const ThreadPoolConfig& config);
-
- // 获取命名线程池
- ThreadPool* getPool(const std::string& name);
-
- // 销毁线程池
- ErrorCode destroyPool(const std::string& name);
-
- // 获取所有线程池名称
- std::vector<std::string> getPoolNames() const;
-
- // 关闭所有线程池
- void shutdownAll();
-
- private:
- ThreadPoolManager();
- ~ThreadPoolManager();
-
- ThreadPoolManager(const ThreadPoolManager&) = delete;
- ThreadPoolManager& operator=(const ThreadPoolManager&) = delete;
-
- private:
- std::unique_ptr<ThreadPool> defaultPool_;
- std::map<std::string, std::unique_ptr<ThreadPool>> namedPools_;
- mutable std::mutex poolsMutex_;
- };
- // 便利宏
- #define THREAD_POOL_SUBMIT(pool, func, ...) \
- (pool)->submit(func, ##__VA_ARGS__)
- #define THREAD_POOL_SUBMIT_PRIORITY(pool, priority, func, ...) \
- (pool)->submitWithPriority(priority, func, ##__VA_ARGS__)
- #define THREAD_POOL_SUBMIT_NAMED(pool, name, func, ...) \
- (pool)->submitNamed(name, func, ##__VA_ARGS__)
- #define DEFAULT_THREAD_POOL() \
- av::utils::ThreadPoolManager::getInstance().getDefaultPool()
- } // namespace utils
- } // namespace av
|