| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849 |
- #include "utils_thread_pool.h"
- #include "../base/base_logger.h"
- #include <algorithm>
- #include <sstream>
- #include <thread>
- #include <chrono>
- #ifdef _WIN32
- #include <windows.h>
- #else
- #include <pthread.h>
- #include <sys/prctl.h>
- #endif
- namespace av {
- namespace utils {
- using namespace av::base;
- // ThreadPool 实现
- ThreadPool::ThreadPool(const ThreadPoolConfig& config)
- : config_(config)
- , state_(ThreadPoolState::STOPPED)
- , shutdown_(false)
- , activeThreads_(0)
- , idleThreads_(0)
- , totalSubmittedTasks_(0)
- , totalCompletedTasks_(0)
- , totalFailedTasks_(0)
- , totalCancelledTasks_(0)
- , startTime_(std::chrono::steady_clock::now()) {
-
- // 验证配置
- if (config_.minThreads > config_.maxThreads) {
- config_.minThreads = config_.maxThreads;
- }
-
- if (config_.minThreads == 0) {
- config_.minThreads = 1;
- }
-
- stats_.startTime = startTime_;
- }
- ThreadPool::~ThreadPool() {
- shutdown(std::chrono::milliseconds(1000));
- }
- ErrorCode ThreadPool::initialize() {
- std::lock_guard<std::mutex> lock(configMutex_);
-
- if (state_.load() != ThreadPoolState::STOPPED) {
- return ErrorCode::INVALID_STATE;
- }
-
- try {
- // 预分配线程容器
- workers_.reserve(config_.maxThreads);
-
- LOG_INFO("Thread pool initialized with {} min threads, {} max threads",
- config_.minThreads, config_.maxThreads);
-
- return ErrorCode::SUCCESS;
- } catch (const std::exception& e) {
- LOG_ERROR("Failed to initialize thread pool: {}", e.what());
- return ErrorCode::INITIALIZATION_FAILED;
- }
- }
- ErrorCode ThreadPool::start() {
- std::lock_guard<std::mutex> lock(configMutex_);
-
- if (state_.load() == ThreadPoolState::RUNNING) {
- return ErrorCode::SUCCESS;
- }
-
- if (state_.load() != ThreadPoolState::STOPPED) {
- return ErrorCode::INVALID_STATE;
- }
-
- state_.store(ThreadPoolState::STARTING);
- shutdown_.store(false);
-
- try {
- // 创建最小数量的工作线程
- for (size_t i = 0; i < config_.minThreads; ++i) {
- workers_.emplace_back(&ThreadPool::workerThread, this, i);
- }
-
- activeThreads_.store(config_.minThreads);
- idleThreads_.store(config_.minThreads);
-
- state_.store(ThreadPoolState::RUNNING);
- startTime_ = std::chrono::steady_clock::now();
- stats_.startTime = startTime_;
-
- LOG_INFO("Thread pool started with {} threads", config_.minThreads);
-
- return ErrorCode::SUCCESS;
- } catch (const std::exception& e) {
- LOG_ERROR("Failed to start thread pool: {}", e.what());
- state_.store(ThreadPoolState::STOPPED);
- return ErrorCode::OPERATION_FAILED;
- }
- }
- ErrorCode ThreadPool::stop(bool waitForCompletion) {
- if (state_.load() == ThreadPoolState::STOPPED) {
- return ErrorCode::SUCCESS;
- }
-
- state_.store(ThreadPoolState::STOPPING);
-
- if (waitForCompletion) {
- waitForAllTasks();
- } else {
- clearPendingTasks();
- }
-
- // 通知所有工作线程停止
- {
- std::lock_guard<std::mutex> lock(queueMutex_);
- shutdown_.store(true);
- }
- queueCondition_.notify_all();
-
- // 等待所有线程结束
- for (auto& worker : workers_) {
- if (worker.joinable()) {
- worker.join();
- }
- }
-
- workers_.clear();
- activeThreads_.store(0);
- idleThreads_.store(0);
- state_.store(ThreadPoolState::STOPPED);
-
- LOG_INFO("Thread pool stopped");
- return ErrorCode::SUCCESS;
- }
- ErrorCode ThreadPool::pause() {
- if (state_.load() != ThreadPoolState::RUNNING) {
- return ErrorCode::INVALID_STATE;
- }
-
- state_.store(ThreadPoolState::PAUSED);
- LOG_INFO("Thread pool paused");
-
- return ErrorCode::SUCCESS;
- }
- ErrorCode ThreadPool::resume() {
- if (state_.load() != ThreadPoolState::PAUSED) {
- return ErrorCode::INVALID_STATE;
- }
-
- state_.store(ThreadPoolState::RUNNING);
- pauseCondition_.notify_all();
-
- LOG_INFO("Thread pool resumed");
- return ErrorCode::SUCCESS;
- }
- ErrorCode ThreadPool::shutdown(std::chrono::milliseconds timeout) {
- auto startTime = std::chrono::steady_clock::now();
-
- // 首先尝试正常停止
- ErrorCode result = stop(false);
-
- // 等待指定时间
- while (state_.load() != ThreadPoolState::STOPPED &&
- std::chrono::steady_clock::now() - startTime < timeout) {
- std::this_thread::sleep_for(std::chrono::milliseconds(10));
- }
-
- if (state_.load() != ThreadPoolState::STOPPED) {
- LOG_WARNING("Thread pool shutdown timeout, forcing termination");
- // 强制终止(在实际实现中可能需要更复杂的逻辑)
- state_.store(ThreadPoolState::STOPPED);
- }
-
- return result;
- }
- void ThreadPool::submitTask(std::shared_ptr<Task> task) {
- if (!task || state_.load() == ThreadPoolState::STOPPED) {
- return;
- }
-
- {
- std::lock_guard<std::mutex> lock(queueMutex_);
-
- if (config_.enablePriority) {
- taskQueue_.push(task);
- } else {
- normalQueue_.push(task);
- }
-
- // 记录任务
- {
- std::lock_guard<std::mutex> tasksLock(tasksMutex_);
- activeTasks_[task->getId()] = task;
- }
-
- totalSubmittedTasks_++;
- }
-
- // 检查是否需要创建新线程
- if (shouldCreateNewThread()) {
- adjustThreadPoolSize();
- }
-
- queueCondition_.notify_one();
- }
- bool ThreadPool::cancelTask(uint64_t taskId) {
- std::lock_guard<std::mutex> lock(tasksMutex_);
-
- auto it = activeTasks_.find(taskId);
- if (it != activeTasks_.end()) {
- it->second->cancel();
- activeTasks_.erase(it);
- totalCancelledTasks_++;
- return true;
- }
-
- return false;
- }
- bool ThreadPool::isTaskCompleted(uint64_t taskId) const {
- std::lock_guard<std::mutex> lock(tasksMutex_);
-
- auto it = completedTasks_.find(taskId);
- return it != completedTasks_.end();
- }
- TaskStatus ThreadPool::getTaskStatus(uint64_t taskId) const {
- std::lock_guard<std::mutex> lock(tasksMutex_);
-
- // 检查活跃任务
- auto activeIt = activeTasks_.find(taskId);
- if (activeIt != activeTasks_.end()) {
- return activeIt->second->getStatus();
- }
-
- // 检查已完成任务
- auto completedIt = completedTasks_.find(taskId);
- if (completedIt != completedTasks_.end()) {
- return completedIt->second->status;
- }
-
- return TaskStatus::PENDING;
- }
- std::shared_ptr<TaskInfo> ThreadPool::getTaskInfo(uint64_t taskId) const {
- std::lock_guard<std::mutex> lock(tasksMutex_);
-
- // 检查活跃任务
- auto activeIt = activeTasks_.find(taskId);
- if (activeIt != activeTasks_.end()) {
- return activeIt->second->getInfo();
- }
-
- // 检查已完成任务
- auto completedIt = completedTasks_.find(taskId);
- if (completedIt != completedTasks_.end()) {
- return completedIt->second;
- }
-
- return nullptr;
- }
- std::vector<std::shared_ptr<TaskInfo>> ThreadPool::getAllTaskInfo() const {
- std::lock_guard<std::mutex> lock(tasksMutex_);
-
- std::vector<std::shared_ptr<TaskInfo>> result;
-
- // 添加活跃任务
- for (const auto& pair : activeTasks_) {
- result.push_back(pair.second->getInfo());
- }
-
- // 添加已完成任务
- for (const auto& pair : completedTasks_) {
- result.push_back(pair.second);
- }
-
- return result;
- }
- ThreadPoolState ThreadPool::getState() const {
- return state_.load();
- }
- bool ThreadPool::isRunning() const {
- return state_.load() == ThreadPoolState::RUNNING;
- }
- bool ThreadPool::isPaused() const {
- return state_.load() == ThreadPoolState::PAUSED;
- }
- bool ThreadPool::isStopped() const {
- return state_.load() == ThreadPoolState::STOPPED;
- }
- ThreadPoolStats ThreadPool::getStats() const {
- std::lock_guard<std::mutex> lock(statsMutex_);
-
- ThreadPoolStats currentStats = stats_;
- currentStats.totalThreads = workers_.size();
- currentStats.activeThreads = activeThreads_.load();
- currentStats.idleThreads = idleThreads_.load();
-
- {
- std::lock_guard<std::mutex> queueLock(queueMutex_);
- if (config_.enablePriority) {
- currentStats.pendingTasks = taskQueue_.size();
- } else {
- currentStats.pendingTasks = normalQueue_.size();
- }
- }
-
- {
- std::lock_guard<std::mutex> tasksLock(tasksMutex_);
- currentStats.runningTasks = activeTasks_.size();
- }
-
- currentStats.completedTasks = totalCompletedTasks_.load();
- currentStats.failedTasks = totalFailedTasks_.load();
- currentStats.cancelledTasks = totalCancelledTasks_.load();
-
- return currentStats;
- }
- size_t ThreadPool::getActiveThreadCount() const {
- return activeThreads_.load();
- }
- size_t ThreadPool::getIdleThreadCount() const {
- return idleThreads_.load();
- }
- size_t ThreadPool::getPendingTaskCount() const {
- std::lock_guard<std::mutex> lock(queueMutex_);
-
- if (config_.enablePriority) {
- return taskQueue_.size();
- } else {
- return normalQueue_.size();
- }
- }
- size_t ThreadPool::getCompletedTaskCount() const {
- return totalCompletedTasks_.load();
- }
- void ThreadPool::setConfig(const ThreadPoolConfig& config) {
- std::lock_guard<std::mutex> lock(configMutex_);
- config_ = config;
-
- // 验证配置
- if (config_.minThreads > config_.maxThreads) {
- config_.minThreads = config_.maxThreads;
- }
-
- if (config_.minThreads == 0) {
- config_.minThreads = 1;
- }
- }
- ThreadPoolConfig ThreadPool::getConfig() const {
- std::lock_guard<std::mutex> lock(configMutex_);
- return config_;
- }
- ErrorCode ThreadPool::resizeThreadPool(size_t newSize) {
- if (newSize == 0) {
- return ErrorCode::INVALID_PARAMETER;
- }
-
- std::lock_guard<std::mutex> lock(configMutex_);
-
- if (newSize < config_.minThreads) {
- config_.minThreads = newSize;
- }
-
- if (newSize > config_.maxThreads) {
- config_.maxThreads = newSize;
- }
-
- adjustThreadPoolSize();
-
- LOG_INFO("Thread pool resized to {} threads", newSize);
- return ErrorCode::SUCCESS;
- }
- void ThreadPool::setThreadNamePrefix(const std::string& prefix) {
- std::lock_guard<std::mutex> lock(configMutex_);
- config_.threadNamePrefix = prefix;
- }
- void ThreadPool::waitForAllTasks() {
- std::unique_lock<std::mutex> lock(queueMutex_);
-
- completionCondition_.wait(lock, [this] {
- bool queueEmpty = config_.enablePriority ? taskQueue_.empty() : normalQueue_.empty();
- bool noActiveTasks;
- {
- std::lock_guard<std::mutex> tasksLock(tasksMutex_);
- noActiveTasks = activeTasks_.empty();
- }
- return queueEmpty && noActiveTasks;
- });
- }
- bool ThreadPool::waitForAllTasks(std::chrono::milliseconds timeout) {
- std::unique_lock<std::mutex> lock(queueMutex_);
-
- return completionCondition_.wait_for(lock, timeout, [this] {
- bool queueEmpty = config_.enablePriority ? taskQueue_.empty() : normalQueue_.empty();
- bool noActiveTasks;
- {
- std::lock_guard<std::mutex> tasksLock(tasksMutex_);
- noActiveTasks = activeTasks_.empty();
- }
- return queueEmpty && noActiveTasks;
- });
- }
- void ThreadPool::clearPendingTasks() {
- std::lock_guard<std::mutex> lock(queueMutex_);
-
- size_t cancelledCount = 0;
-
- if (config_.enablePriority) {
- while (!taskQueue_.empty()) {
- auto task = taskQueue_.top();
- taskQueue_.pop();
- task->cancel();
- cancelledCount++;
- }
- } else {
- while (!normalQueue_.empty()) {
- auto task = normalQueue_.front();
- normalQueue_.pop();
- task->cancel();
- cancelledCount++;
- }
- }
-
- totalCancelledTasks_ += cancelledCount;
-
- LOG_INFO("Cleared {} pending tasks", cancelledCount);
- }
- // 私有方法实现
- void ThreadPool::workerThread(size_t threadIndex) {
- // 设置线程名称
- std::ostringstream oss;
- oss << config_.threadNamePrefix << "-" << threadIndex;
- setThreadName(oss.str());
-
- LOG_DEBUG("Worker thread {} started", threadIndex);
-
- while (!shutdown_.load()) {
- // 处理暂停状态
- if (state_.load() == ThreadPoolState::PAUSED) {
- std::unique_lock<std::mutex> pauseLock(queueMutex_);
- pauseCondition_.wait(pauseLock, [this] {
- return state_.load() != ThreadPoolState::PAUSED || shutdown_.load();
- });
- continue;
- }
-
- // 获取任务
- auto task = getNextTask();
- if (!task) {
- continue;
- }
-
- // 更新线程状态
- idleThreads_--;
-
- // 执行任务
- try {
- task->execute();
- totalCompletedTasks_++;
- } catch (const std::exception& e) {
- LOG_ERROR("Task execution failed: {}", e.what());
- totalFailedTasks_++;
- } catch (...) {
- LOG_ERROR("Task execution failed with unknown exception");
- totalFailedTasks_++;
- }
-
- // 移动任务到已完成列表
- {
- std::lock_guard<std::mutex> tasksLock(tasksMutex_);
- auto it = activeTasks_.find(task->getId());
- if (it != activeTasks_.end()) {
- completedTasks_[task->getId()] = task->getInfo();
- activeTasks_.erase(it);
- }
- }
-
- // 更新线程状态
- idleThreads_++;
-
- // 通知等待的线程
- completionCondition_.notify_all();
-
- // 清理旧的已完成任务
- cleanupCompletedTasks();
- }
-
- LOG_DEBUG("Worker thread {} stopped", threadIndex);
- }
- std::shared_ptr<Task> ThreadPool::getNextTask() {
- std::unique_lock<std::mutex> lock(queueMutex_);
-
- // 等待任务或停止信号
- queueCondition_.wait_for(lock, config_.keepAliveTime, [this] {
- bool hasTask = config_.enablePriority ? !taskQueue_.empty() : !normalQueue_.empty();
- return hasTask || shutdown_.load();
- });
-
- if (shutdown_.load()) {
- return nullptr;
- }
-
- std::shared_ptr<Task> task;
-
- if (config_.enablePriority && !taskQueue_.empty()) {
- task = taskQueue_.top();
- taskQueue_.pop();
- } else if (!config_.enablePriority && !normalQueue_.empty()) {
- task = normalQueue_.front();
- normalQueue_.pop();
- }
-
- return task;
- }
- void ThreadPool::adjustThreadPoolSize() {
- size_t currentThreads = workers_.size();
- size_t pendingTasks = getPendingTaskCount();
- size_t idleThreads = idleThreads_.load();
-
- // 决定是否需要创建新线程
- if (shouldCreateNewThread() && currentThreads < config_.maxThreads) {
- try {
- workers_.emplace_back(&ThreadPool::workerThread, this, currentThreads);
- activeThreads_++;
- idleThreads_++;
-
- LOG_DEBUG("Created new worker thread, total: {}", workers_.size());
- } catch (const std::exception& e) {
- LOG_ERROR("Failed to create new worker thread: {}", e.what());
- }
- }
-
- // 注意:线程销毁通常在线程自然退出时处理,这里不实现动态销毁
- }
- void ThreadPool::updateStatistics() {
- std::lock_guard<std::mutex> lock(statsMutex_);
-
- // 更新统计信息
- auto now = std::chrono::steady_clock::now();
- auto totalTime = std::chrono::duration_cast<std::chrono::milliseconds>(now - stats_.startTime).count();
-
- if (totalCompletedTasks_.load() > 0) {
- stats_.averageExecutionTime = static_cast<double>(totalTime) / totalCompletedTasks_.load();
- }
- }
- void ThreadPool::cleanupCompletedTasks() {
- std::lock_guard<std::mutex> lock(tasksMutex_);
-
- // 保持最近的1000个已完成任务
- const size_t maxCompletedTasks = 1000;
-
- if (completedTasks_.size() > maxCompletedTasks) {
- auto it = completedTasks_.begin();
- std::advance(it, completedTasks_.size() - maxCompletedTasks);
- completedTasks_.erase(completedTasks_.begin(), it);
- }
- }
- bool ThreadPool::shouldCreateNewThread() const {
- size_t currentThreads = workers_.size();
- size_t pendingTasks = getPendingTaskCount();
- size_t idleThreads = idleThreads_.load();
-
- return currentThreads < config_.maxThreads &&
- pendingTasks > idleThreads &&
- pendingTasks > 0;
- }
- bool ThreadPool::shouldDestroyThread() const {
- size_t currentThreads = workers_.size();
- size_t pendingTasks = getPendingTaskCount();
- size_t idleThreads = idleThreads_.load();
-
- return currentThreads > config_.minThreads &&
- idleThreads > pendingTasks + 1;
- }
- void ThreadPool::setThreadName(const std::string& name) {
- #ifdef _WIN32
- // Windows 线程命名
- std::wstring wname(name.begin(), name.end());
- SetThreadDescription(GetCurrentThread(), wname.c_str());
- #elif defined(__linux__)
- // Linux 线程命名
- prctl(PR_SET_NAME, name.c_str(), 0, 0, 0);
- #else
- // 其他平台暂不支持
- #endif
- }
- // WorkerThread 实现
- WorkerThread::WorkerThread(ThreadPool* pool, size_t index)
- : pool_(pool)
- , index_(index)
- , running_(false)
- , shouldStop_(false) {
- }
- WorkerThread::~WorkerThread() {
- stop();
- join();
- }
- void WorkerThread::start() {
- if (running_.load()) {
- return;
- }
-
- shouldStop_.store(false);
- thread_ = std::thread(&WorkerThread::run, this);
- running_.store(true);
- }
- void WorkerThread::stop() {
- shouldStop_.store(true);
- }
- void WorkerThread::join() {
- if (thread_.joinable()) {
- thread_.join();
- }
- running_.store(false);
- }
- bool WorkerThread::isRunning() const {
- return running_.load();
- }
- std::thread::id WorkerThread::getId() const {
- return thread_.get_id();
- }
- size_t WorkerThread::getIndex() const {
- return index_;
- }
- void WorkerThread::run() {
- // 工作线程的主循环由ThreadPool管理
- // 这里可以添加额外的线程特定逻辑
- }
- // ThreadPoolFactory 实现
- std::unique_ptr<ThreadPool> ThreadPoolFactory::createStandardPool() {
- ThreadPoolConfig config;
- config.minThreads = 2;
- config.maxThreads = std::thread::hardware_concurrency();
- config.queueCapacity = 1000;
- config.keepAliveTime = std::chrono::milliseconds(60000);
- config.enablePriority = true;
- config.enableStatistics = true;
- config.threadNamePrefix = "StandardPool";
-
- return std::make_unique<ThreadPool>(config);
- }
- std::unique_ptr<ThreadPool> ThreadPoolFactory::createHighPerformancePool() {
- ThreadPoolConfig config;
- config.minThreads = std::thread::hardware_concurrency();
- config.maxThreads = std::thread::hardware_concurrency() * 2;
- config.queueCapacity = 5000;
- config.keepAliveTime = std::chrono::milliseconds(30000);
- config.enablePriority = true;
- config.enableStatistics = true;
- config.threadNamePrefix = "HighPerfPool";
-
- return std::make_unique<ThreadPool>(config);
- }
- std::unique_ptr<ThreadPool> ThreadPoolFactory::createLightweightPool() {
- ThreadPoolConfig config;
- config.minThreads = 1;
- config.maxThreads = 4;
- config.queueCapacity = 100;
- config.keepAliveTime = std::chrono::milliseconds(120000);
- config.enablePriority = false;
- config.enableStatistics = false;
- config.threadNamePrefix = "LightPool";
-
- return std::make_unique<ThreadPool>(config);
- }
- std::unique_ptr<ThreadPool> ThreadPoolFactory::createRealtimePool() {
- ThreadPoolConfig config;
- config.minThreads = std::thread::hardware_concurrency();
- config.maxThreads = std::thread::hardware_concurrency();
- config.queueCapacity = 500;
- config.keepAliveTime = std::chrono::milliseconds(10000);
- config.enablePriority = true;
- config.enableStatistics = true;
- config.threadNamePrefix = "RealtimePool";
-
- return std::make_unique<ThreadPool>(config);
- }
- std::unique_ptr<ThreadPool> ThreadPoolFactory::createCustomPool(const ThreadPoolConfig& config) {
- return std::make_unique<ThreadPool>(config);
- }
- size_t ThreadPoolFactory::getRecommendedThreadCount() {
- return std::max(2u, std::thread::hardware_concurrency());
- }
- size_t ThreadPoolFactory::getCpuCoreCount() {
- return std::thread::hardware_concurrency();
- }
- // ThreadPoolManager 实现
- ThreadPoolManager& ThreadPoolManager::getInstance() {
- static ThreadPoolManager instance;
- return instance;
- }
- ThreadPoolManager::ThreadPoolManager() {
- // 创建默认线程池
- defaultPool_ = ThreadPoolFactory::createStandardPool();
- defaultPool_->initialize();
- defaultPool_->start();
- }
- ThreadPoolManager::~ThreadPoolManager() {
- shutdownAll();
- }
- ThreadPool* ThreadPoolManager::getDefaultPool() {
- return defaultPool_.get();
- }
- ErrorCode ThreadPoolManager::createPool(const std::string& name, const ThreadPoolConfig& config) {
- if (name.empty()) {
- return ErrorCode::INVALID_PARAMETER;
- }
-
- std::lock_guard<std::mutex> lock(poolsMutex_);
-
- if (namedPools_.find(name) != namedPools_.end()) {
- return ErrorCode::ALREADY_EXISTS;
- }
-
- auto pool = std::make_unique<ThreadPool>(config);
- ErrorCode result = pool->initialize();
- if (result != ErrorCode::SUCCESS) {
- return result;
- }
-
- result = pool->start();
- if (result != ErrorCode::SUCCESS) {
- return result;
- }
-
- namedPools_[name] = std::move(pool);
-
- LOG_INFO("Created thread pool: {}", name);
- return ErrorCode::SUCCESS;
- }
- ThreadPool* ThreadPoolManager::getPool(const std::string& name) {
- std::lock_guard<std::mutex> lock(poolsMutex_);
-
- auto it = namedPools_.find(name);
- if (it != namedPools_.end()) {
- return it->second.get();
- }
-
- return nullptr;
- }
- ErrorCode ThreadPoolManager::destroyPool(const std::string& name) {
- std::lock_guard<std::mutex> lock(poolsMutex_);
-
- auto it = namedPools_.find(name);
- if (it == namedPools_.end()) {
- return ErrorCode::NOT_FOUND;
- }
-
- it->second->shutdown();
- namedPools_.erase(it);
-
- LOG_INFO("Destroyed thread pool: {}", name);
- return ErrorCode::SUCCESS;
- }
- std::vector<std::string> ThreadPoolManager::getPoolNames() const {
- std::lock_guard<std::mutex> lock(poolsMutex_);
-
- std::vector<std::string> names;
- names.reserve(namedPools_.size());
-
- for (const auto& pair : namedPools_) {
- names.push_back(pair.first);
- }
-
- return names;
- }
- void ThreadPoolManager::shutdownAll() {
- std::lock_guard<std::mutex> lock(poolsMutex_);
-
- // 关闭所有命名线程池
- for (auto& pair : namedPools_) {
- pair.second->shutdown();
- }
- namedPools_.clear();
-
- // 关闭默认线程池
- if (defaultPool_) {
- defaultPool_->shutdown();
- defaultPool_.reset();
- }
-
- LOG_INFO("All thread pools shutdown");
- }
- } // namespace utils
- } // namespace av
|