#include "utils_thread_pool.h" #include "../base/logger.h" #include #include #include #include #ifdef _WIN32 #include #else #include #include #endif namespace av { namespace utils { using namespace av; // ThreadPool 实现 ThreadPool::ThreadPool(const ThreadPoolConfig& config) : config_(config) , state_(ThreadPoolState::STOPPED) , shutdown_(false) , activeThreads_(0) , idleThreads_(0) , totalSubmittedTasks_(0) , totalCompletedTasks_(0) , totalFailedTasks_(0) , totalCancelledTasks_(0) , startTime_(std::chrono::steady_clock::now()) { // 验证配置 if (config_.minThreads > config_.maxThreads) { config_.minThreads = config_.maxThreads; } if (config_.minThreads == 0) { config_.minThreads = 1; } stats_.startTime = startTime_; } ThreadPool::~ThreadPool() { shutdown(std::chrono::milliseconds(1000)); } ErrorCode ThreadPool::initialize() { std::lock_guard lock(configMutex_); if (state_.load() != ThreadPoolState::STOPPED) { return ErrorCode::INVALID_STATE; } try { // 预分配线程容器 workers_.reserve(config_.maxThreads); Logger::instance().infof("Thread pool initialized with {} min threads, {} max threads", config_.minThreads, config_.maxThreads); return ErrorCode::SUCCESS; } catch (const std::exception& e) { Logger::instance().errorf("Failed to initialize thread pool: %s", e.what()); return ErrorCode::INITIALIZATION_FAILED; } } ErrorCode ThreadPool::start() { std::lock_guard lock(configMutex_); if (state_.load() == ThreadPoolState::RUNNING) { return ErrorCode::SUCCESS; } if (state_.load() != ThreadPoolState::STOPPED) { return ErrorCode::INVALID_STATE; } state_.store(ThreadPoolState::STARTING); shutdown_.store(false); try { // 创建最小数量的工作线程 for (size_t i = 0; i < config_.minThreads; ++i) { workers_.emplace_back(&ThreadPool::workerThread, this, i); } activeThreads_.store(config_.minThreads); idleThreads_.store(config_.minThreads); state_.store(ThreadPoolState::RUNNING); startTime_ = std::chrono::steady_clock::now(); stats_.startTime = startTime_; Logger::instance().infof("Thread pool started with {} threads", config_.minThreads); return ErrorCode::SUCCESS; } catch (const std::exception& e) { Logger::instance().errorf("Failed to start thread pool: %s", e.what()); state_.store(ThreadPoolState::STOPPED); return ErrorCode::OPERATION_FAILED; } } ErrorCode ThreadPool::stop(bool waitForCompletion) { if (state_.load() == ThreadPoolState::STOPPED) { return ErrorCode::SUCCESS; } state_.store(ThreadPoolState::STOPPING); if (waitForCompletion) { waitForAllTasks(); } else { clearPendingTasks(); } // 通知所有工作线程停止 { std::lock_guard lock(queueMutex_); shutdown_.store(true); } queueCondition_.notify_all(); // 等待所有线程结束 for (auto& worker : workers_) { if (worker.joinable()) { worker.join(); } } workers_.clear(); activeThreads_.store(0); idleThreads_.store(0); state_.store(ThreadPoolState::STOPPED); Logger::instance().info("Thread pool stopped"); return ErrorCode::SUCCESS; } ErrorCode ThreadPool::pause() { if (state_.load() != ThreadPoolState::RUNNING) { return ErrorCode::INVALID_STATE; } state_.store(ThreadPoolState::PAUSED); Logger::instance().info("Thread pool paused"); return ErrorCode::SUCCESS; } ErrorCode ThreadPool::resume() { if (state_.load() != ThreadPoolState::PAUSED) { return ErrorCode::INVALID_STATE; } state_.store(ThreadPoolState::RUNNING); pauseCondition_.notify_all(); Logger::instance().info("Thread pool resumed"); return ErrorCode::SUCCESS; } ErrorCode ThreadPool::shutdown(std::chrono::milliseconds timeout) { auto startTime = std::chrono::steady_clock::now(); // 首先尝试正常停止 ErrorCode result = stop(false); // 等待指定时间 while (state_.load() != ThreadPoolState::STOPPED && std::chrono::steady_clock::now() - startTime < timeout) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); } if (state_.load() != ThreadPoolState::STOPPED) { Logger::instance().info("Thread pool shutdown timeout, forcing termination"); // 强制终止(在实际实现中可能需要更复杂的逻辑) state_.store(ThreadPoolState::STOPPED); } return result; } void ThreadPool::submitTask(std::shared_ptr task) { if (!task || state_.load() == ThreadPoolState::STOPPED) { return; } { std::lock_guard lock(queueMutex_); if (config_.enablePriority) { taskQueue_.push(task); } else { normalQueue_.push(task); } // 记录任务 { std::lock_guard tasksLock(tasksMutex_); activeTasks_[task->getId()] = task; } totalSubmittedTasks_++; } // 检查是否需要创建新线程 if (shouldCreateNewThread()) { adjustThreadPoolSize(); } queueCondition_.notify_one(); } bool ThreadPool::cancelTask(uint64_t taskId) { std::lock_guard lock(tasksMutex_); auto it = activeTasks_.find(taskId); if (it != activeTasks_.end()) { it->second->cancel(); activeTasks_.erase(it); totalCancelledTasks_++; return true; } return false; } bool ThreadPool::isTaskCompleted(uint64_t taskId) const { std::lock_guard lock(tasksMutex_); auto it = completedTasks_.find(taskId); return it != completedTasks_.end(); } TaskStatus ThreadPool::getTaskStatus(uint64_t taskId) const { std::lock_guard lock(tasksMutex_); // 检查活跃任务 auto activeIt = activeTasks_.find(taskId); if (activeIt != activeTasks_.end()) { return activeIt->second->getStatus(); } // 检查已完成任务 auto completedIt = completedTasks_.find(taskId); if (completedIt != completedTasks_.end()) { return completedIt->second->status; } return TaskStatus::PENDING; } std::shared_ptr ThreadPool::getTaskInfo(uint64_t taskId) const { std::lock_guard lock(tasksMutex_); // 检查活跃任务 auto activeIt = activeTasks_.find(taskId); if (activeIt != activeTasks_.end()) { return activeIt->second->getInfo(); } // 检查已完成任务 auto completedIt = completedTasks_.find(taskId); if (completedIt != completedTasks_.end()) { return completedIt->second; } return nullptr; } std::vector> ThreadPool::getAllTaskInfo() const { std::lock_guard lock(tasksMutex_); std::vector> result; // 添加活跃任务 for (const auto& pair : activeTasks_) { result.push_back(pair.second->getInfo()); } // 添加已完成任务 for (const auto& pair : completedTasks_) { result.push_back(pair.second); } return result; } ThreadPoolState ThreadPool::getState() const { return state_.load(); } bool ThreadPool::isRunning() const { return state_.load() == ThreadPoolState::RUNNING; } bool ThreadPool::isPaused() const { return state_.load() == ThreadPoolState::PAUSED; } bool ThreadPool::isStopped() const { return state_.load() == ThreadPoolState::STOPPED; } ThreadPoolStats ThreadPool::getStats() const { std::lock_guard lock(statsMutex_); ThreadPoolStats currentStats = stats_; currentStats.totalThreads = workers_.size(); currentStats.activeThreads = activeThreads_.load(); currentStats.idleThreads = idleThreads_.load(); { std::lock_guard queueLock(queueMutex_); if (config_.enablePriority) { currentStats.pendingTasks = taskQueue_.size(); } else { currentStats.pendingTasks = normalQueue_.size(); } } { std::lock_guard tasksLock(tasksMutex_); currentStats.runningTasks = activeTasks_.size(); } currentStats.completedTasks = totalCompletedTasks_.load(); currentStats.failedTasks = totalFailedTasks_.load(); currentStats.cancelledTasks = totalCancelledTasks_.load(); return currentStats; } size_t ThreadPool::getActiveThreadCount() const { return activeThreads_.load(); } size_t ThreadPool::getIdleThreadCount() const { return idleThreads_.load(); } size_t ThreadPool::getPendingTaskCount() const { std::lock_guard lock(queueMutex_); if (config_.enablePriority) { return taskQueue_.size(); } else { return normalQueue_.size(); } } size_t ThreadPool::getCompletedTaskCount() const { return totalCompletedTasks_.load(); } void ThreadPool::setConfig(const ThreadPoolConfig& config) { std::lock_guard lock(configMutex_); config_ = config; // 验证配置 if (config_.minThreads > config_.maxThreads) { config_.minThreads = config_.maxThreads; } if (config_.minThreads == 0) { config_.minThreads = 1; } } ThreadPoolConfig ThreadPool::getConfig() const { std::lock_guard lock(configMutex_); return config_; } ErrorCode ThreadPool::resizeThreadPool(size_t newSize) { if (newSize == 0) { return ErrorCode::INVALID_PARAMS; } std::lock_guard lock(configMutex_); if (newSize < config_.minThreads) { config_.minThreads = newSize; } if (newSize > config_.maxThreads) { config_.maxThreads = newSize; } adjustThreadPoolSize(); Logger::instance().infof("Thread pool resized to {} threads", newSize); return ErrorCode::SUCCESS; } void ThreadPool::setThreadNamePrefix(const std::string& prefix) { std::lock_guard lock(configMutex_); config_.threadNamePrefix = prefix; } void ThreadPool::waitForAllTasks() { std::unique_lock lock(queueMutex_); completionCondition_.wait(lock, [this] { bool queueEmpty = config_.enablePriority ? taskQueue_.empty() : normalQueue_.empty(); bool noActiveTasks; { std::lock_guard tasksLock(tasksMutex_); noActiveTasks = activeTasks_.empty(); } return queueEmpty && noActiveTasks; }); } bool ThreadPool::waitForAllTasks(std::chrono::milliseconds timeout) { std::unique_lock lock(queueMutex_); return completionCondition_.wait_for(lock, timeout, [this] { bool queueEmpty = config_.enablePriority ? taskQueue_.empty() : normalQueue_.empty(); bool noActiveTasks; { std::lock_guard tasksLock(tasksMutex_); noActiveTasks = activeTasks_.empty(); } return queueEmpty && noActiveTasks; }); } void ThreadPool::clearPendingTasks() { std::lock_guard lock(queueMutex_); size_t cancelledCount = 0; if (config_.enablePriority) { while (!taskQueue_.empty()) { auto task = taskQueue_.top(); taskQueue_.pop(); task->cancel(); cancelledCount++; } } else { while (!normalQueue_.empty()) { auto task = normalQueue_.front(); normalQueue_.pop(); task->cancel(); cancelledCount++; } } totalCancelledTasks_ += cancelledCount; Logger::instance().infof("Cleared {} pending tasks", cancelledCount); } // 私有方法实现 void ThreadPool::workerThread(size_t threadIndex) { // 设置线程名称 std::ostringstream oss; oss << config_.threadNamePrefix << "-" << threadIndex; setThreadName(oss.str()); Logger::instance().debugf("Worker thread %zu started", threadIndex); while (!shutdown_.load()) { // 处理暂停状态 if (state_.load() == ThreadPoolState::PAUSED) { std::unique_lock pauseLock(queueMutex_); pauseCondition_.wait(pauseLock, [this] { return state_.load() != ThreadPoolState::PAUSED || shutdown_.load(); }); continue; } // 获取任务 auto task = getNextTask(); if (!task) { continue; } // 更新线程状态 idleThreads_--; // 执行任务 try { task->execute(); totalCompletedTasks_++; } catch (const std::exception& e) { Logger::instance().errorf("Task execution failed: %s", e.what()); totalFailedTasks_++; } catch (...) { Logger::instance().error("Task execution failed with unknown exception"); totalFailedTasks_++; } // 移动任务到已完成列表 { std::lock_guard tasksLock(tasksMutex_); auto it = activeTasks_.find(task->getId()); if (it != activeTasks_.end()) { completedTasks_[task->getId()] = task->getInfo(); activeTasks_.erase(it); } } // 更新线程状态 idleThreads_++; // 通知等待的线程 completionCondition_.notify_all(); // 清理旧的已完成任务 cleanupCompletedTasks(); } Logger::instance().debugf("Worker thread %zu stopped", threadIndex); } std::shared_ptr ThreadPool::getNextTask() { std::unique_lock lock(queueMutex_); // 等待任务或停止信号 queueCondition_.wait_for(lock, config_.keepAliveTime, [this] { bool hasTask = config_.enablePriority ? !taskQueue_.empty() : !normalQueue_.empty(); return hasTask || shutdown_.load(); }); if (shutdown_.load()) { return nullptr; } std::shared_ptr task; if (config_.enablePriority && !taskQueue_.empty()) { task = taskQueue_.top(); taskQueue_.pop(); } else if (!config_.enablePriority && !normalQueue_.empty()) { task = normalQueue_.front(); normalQueue_.pop(); } return task; } void ThreadPool::adjustThreadPoolSize() { size_t currentThreads = workers_.size(); size_t pendingTasks = getPendingTaskCount(); size_t idleThreads = idleThreads_.load(); // 决定是否需要创建新线程 if (shouldCreateNewThread() && currentThreads < config_.maxThreads) { try { workers_.emplace_back(&ThreadPool::workerThread, this, currentThreads); activeThreads_++; idleThreads_++; Logger::instance().debugf("Created new worker thread, total: %zu", workers_.size()); } catch (const std::exception& e) { Logger::instance().errorf("Failed to create new worker thread: %s", e.what()); } } // 注意:线程销毁通常在线程自然退出时处理,这里不实现动态销毁 } void ThreadPool::updateStatistics() { std::lock_guard lock(statsMutex_); // 更新统计信息 auto now = std::chrono::steady_clock::now(); auto totalTime = std::chrono::duration_cast(now - stats_.startTime).count(); if (totalCompletedTasks_.load() > 0) { stats_.averageExecutionTime = static_cast(totalTime) / totalCompletedTasks_.load(); } } void ThreadPool::cleanupCompletedTasks() { std::lock_guard lock(tasksMutex_); // 保持最近的1000个已完成任务 const size_t maxCompletedTasks = 1000; if (completedTasks_.size() > maxCompletedTasks) { auto it = completedTasks_.begin(); std::advance(it, completedTasks_.size() - maxCompletedTasks); completedTasks_.erase(completedTasks_.begin(), it); } } bool ThreadPool::shouldCreateNewThread() const { size_t currentThreads = workers_.size(); size_t pendingTasks = getPendingTaskCount(); size_t idleThreads = idleThreads_.load(); return currentThreads < config_.maxThreads && pendingTasks > idleThreads && pendingTasks > 0; } bool ThreadPool::shouldDestroyThread() const { size_t currentThreads = workers_.size(); size_t pendingTasks = getPendingTaskCount(); size_t idleThreads = idleThreads_.load(); return currentThreads > config_.minThreads && idleThreads > pendingTasks + 1; } void ThreadPool::setThreadName(const std::string& name) { #ifdef _WIN32 // Windows 线程命名 std::wstring wname(name.begin(), name.end()); SetThreadDescription(GetCurrentThread(), wname.c_str()); #elif defined(__linux__) // Linux 线程命名 prctl(PR_SET_NAME, name.c_str(), 0, 0, 0); #else // 其他平台暂不支持 #endif } // WorkerThread 实现 WorkerThread::WorkerThread(ThreadPool* pool, size_t index) : pool_(pool) , index_(index) , running_(false) , shouldStop_(false) { } WorkerThread::~WorkerThread() { stop(); join(); } void WorkerThread::start() { if (running_.load()) { return; } shouldStop_.store(false); thread_ = std::thread(&WorkerThread::run, this); running_.store(true); } void WorkerThread::stop() { shouldStop_.store(true); } void WorkerThread::join() { if (thread_.joinable()) { thread_.join(); } running_.store(false); } bool WorkerThread::isRunning() const { return running_.load(); } std::thread::id WorkerThread::getId() const { return thread_.get_id(); } size_t WorkerThread::getIndex() const { return index_; } void WorkerThread::run() { // 工作线程的主循环由ThreadPool管理 // 这里可以添加额外的线程特定逻辑 } // ThreadPoolFactory 实现 std::unique_ptr ThreadPoolFactory::createStandardPool() { ThreadPoolConfig config; config.minThreads = 2; config.maxThreads = std::thread::hardware_concurrency(); config.queueCapacity = 1000; config.keepAliveTime = std::chrono::milliseconds(60000); config.enablePriority = true; config.enableStatistics = true; config.threadNamePrefix = "StandardPool"; return std::make_unique(config); } std::unique_ptr ThreadPoolFactory::createHighPerformancePool() { ThreadPoolConfig config; config.minThreads = std::thread::hardware_concurrency(); config.maxThreads = std::thread::hardware_concurrency() * 2; config.queueCapacity = 5000; config.keepAliveTime = std::chrono::milliseconds(30000); config.enablePriority = true; config.enableStatistics = true; config.threadNamePrefix = "HighPerfPool"; return std::make_unique(config); } std::unique_ptr ThreadPoolFactory::createLightweightPool() { ThreadPoolConfig config; config.minThreads = 1; config.maxThreads = 4; config.queueCapacity = 100; config.keepAliveTime = std::chrono::milliseconds(120000); config.enablePriority = false; config.enableStatistics = false; config.threadNamePrefix = "LightPool"; return std::make_unique(config); } std::unique_ptr ThreadPoolFactory::createRealtimePool() { ThreadPoolConfig config; config.minThreads = std::thread::hardware_concurrency(); config.maxThreads = std::thread::hardware_concurrency(); config.queueCapacity = 500; config.keepAliveTime = std::chrono::milliseconds(10000); config.enablePriority = true; config.enableStatistics = true; config.threadNamePrefix = "RealtimePool"; return std::make_unique(config); } std::unique_ptr ThreadPoolFactory::createCustomPool(const ThreadPoolConfig& config) { return std::make_unique(config); } size_t ThreadPoolFactory::getRecommendedThreadCount() { return (std::max)(2u, std::thread::hardware_concurrency()); } size_t ThreadPoolFactory::getCpuCoreCount() { return std::thread::hardware_concurrency(); } // ThreadPoolManager 实现 ThreadPoolManager& ThreadPoolManager::getInstance() { static ThreadPoolManager instance; return instance; } ThreadPoolManager::ThreadPoolManager() { // 创建默认线程池 defaultPool_ = ThreadPoolFactory::createStandardPool(); defaultPool_->initialize(); defaultPool_->start(); } ThreadPoolManager::~ThreadPoolManager() { shutdownAll(); } ThreadPool* ThreadPoolManager::getDefaultPool() { return defaultPool_.get(); } ErrorCode ThreadPoolManager::createPool(const std::string& name, const ThreadPoolConfig& config) { if (name.empty()) { return ErrorCode::INVALID_PARAMS; } std::lock_guard lock(poolsMutex_); if (namedPools_.find(name) != namedPools_.end()) { return ErrorCode::ALREADY_EXISTS; } auto pool = std::make_unique(config); ErrorCode result = pool->initialize(); if (result != ErrorCode::SUCCESS) { return result; } result = pool->start(); if (result != ErrorCode::SUCCESS) { return result; } namedPools_[name] = std::move(pool); Logger::instance().infof("Created thread pool: {}", name); return ErrorCode::SUCCESS; } ThreadPool* ThreadPoolManager::getPool(const std::string& name) { std::lock_guard lock(poolsMutex_); auto it = namedPools_.find(name); if (it != namedPools_.end()) { return it->second.get(); } return nullptr; } ErrorCode ThreadPoolManager::destroyPool(const std::string& name) { std::lock_guard lock(poolsMutex_); auto it = namedPools_.find(name); if (it == namedPools_.end()) { return ErrorCode::NOT_FOUND; } it->second->shutdown(); namedPools_.erase(it); Logger::instance().infof("Destroyed thread pool: {}", name); return ErrorCode::SUCCESS; } std::vector ThreadPoolManager::getPoolNames() const { std::lock_guard lock(poolsMutex_); std::vector names; names.reserve(namedPools_.size()); for (const auto& pair : namedPools_) { names.push_back(pair.first); } return names; } void ThreadPoolManager::shutdownAll() { std::lock_guard lock(poolsMutex_); // 关闭所有命名线程池 for (auto& pair : namedPools_) { pair.second->shutdown(); } namedPools_.clear(); // 关闭默认线程池 if (defaultPool_) { defaultPool_->shutdown(); defaultPool_.reset(); } Logger::instance().info("All thread pools shutdown"); } } // namespace utils } // namespace av