| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574 |
- // 工具类模块测试程序
- #include "code/base/logger.h"
- #include "code/base/types.h"
- #include "code/utils/utils_thread_pool.h"
- #include "code/utils/utils_frame_queue.h"
- #include "code/utils/utils_packet_queue.h"
- #include "code/utils/utils_performance_monitor.h"
- #include "code/utils/utils_synchronizer.h"
- #include <iostream>
- #include <chrono>
- #include <thread>
- #include <vector>
- #include <atomic>
- #include <random>
- #include <future>
- #include <mutex>
- extern "C" {
- #include <libavformat/avformat.h>
- #include <libavcodec/avcodec.h>
- #include <libavutil/frame.h>
- }
- using namespace av;
- using namespace av::utils;
- class UtilsTester {
- public:
- UtilsTester() {
- Logger::instance().setLevel(LogLevel::DEBUG);
- Logger::instance().info("=== AV工具类模块测试套件 ===");
- }
-
- // 测试线程池基本功能
- bool testThreadPoolBasic() {
- Logger::instance().info("[测试] 线程池基本功能...");
-
- try {
- // 创建线程池配置
- ThreadPoolConfig config;
- config.minThreads = 2;
- config.maxThreads = 4;
- config.queueCapacity = 100;
- config.enablePriority = true;
- config.enableStatistics = true;
- config.threadNamePrefix = "TestPool";
-
- // 创建线程池
- auto pool = std::make_unique<ThreadPool>(config);
-
- // 初始化和启动
- ErrorCode result = pool->initialize();
- if (result != ErrorCode::SUCCESS) {
- Logger::instance().errorf("[失败] 线程池初始化失败: {}", static_cast<int>(result));
- return false;
- }
-
- result = pool->start();
- if (result != ErrorCode::SUCCESS) {
- Logger::instance().errorf("[失败] 线程池启动失败: {}", static_cast<int>(result));
- return false;
- }
-
- Logger::instance().info("线程池启动成功");
-
- // 提交简单任务
- std::atomic<int> counter{0};
- std::vector<std::future<void>> futures;
-
- for (int i = 0; i < 10; ++i) {
- auto future = pool->submit([&counter, i]() {
- std::this_thread::sleep_for(std::chrono::milliseconds(10));
- counter.fetch_add(1);
- Logger::instance().infof("任务 {} 完成", i);
- });
- futures.push_back(std::move(future));
- }
-
- // 等待所有任务完成
- for (auto& future : futures) {
- future.wait();
- }
-
- if (counter.load() != 10) {
- Logger::instance().errorf("[失败] 任务计数错误: 期望10,实际{}", counter.load());
- return false;
- }
-
- // 获取统计信息
- auto stats = pool->getStats();
- Logger::instance().infof("线程池统计: 总线程数={}, 完成任务数={}",
- stats.totalThreads, stats.completedTasks);
-
- // 停止线程池
- pool->stop(true);
-
- Logger::instance().info("[成功] 线程池基本功能测试通过");
- return true;
-
- } catch (const std::exception& e) {
- Logger::instance().errorf("[失败] 线程池测试异常: {}", e.what());
- return false;
- }
- }
-
- // 测试线程池优先级功能
- bool testThreadPoolPriority() {
- Logger::instance().info("[测试] 线程池优先级功能...");
-
- try {
- ThreadPoolConfig config;
- config.minThreads = 1;
- config.maxThreads = 1; // 单线程确保优先级顺序
- config.enablePriority = true;
-
- auto pool = std::make_unique<ThreadPool>(config);
- pool->initialize();
- pool->start();
-
- std::vector<int> executionOrder;
- std::mutex orderMutex;
-
- // 提交不同优先级的任务
- pool->submitWithPriority(TaskPriority::LOW, [&]() {
- std::lock_guard<std::mutex> lock(orderMutex);
- executionOrder.push_back(1);
- Logger::instance().info("低优先级任务执行");
- });
-
- pool->submitWithPriority(TaskPriority::HIGH, [&]() {
- std::lock_guard<std::mutex> lock(orderMutex);
- executionOrder.push_back(3);
- Logger::instance().info("高优先级任务执行");
- });
-
- pool->submitWithPriority(TaskPriority::NORMAL, [&]() {
- std::lock_guard<std::mutex> lock(orderMutex);
- executionOrder.push_back(2);
- Logger::instance().info("普通优先级任务执行");
- });
-
- pool->submitWithPriority(TaskPriority::CRITICAL, [&]() {
- std::lock_guard<std::mutex> lock(orderMutex);
- executionOrder.push_back(4);
- Logger::instance().info("关键优先级任务执行");
- });
-
- // 等待所有任务完成
- pool->waitForAllTasks();
-
- // 验证执行顺序(应该是:CRITICAL > HIGH > NORMAL > LOW)
- if (executionOrder.size() != 4) {
- Logger::instance().errorf("[失败] 任务数量错误: {}", executionOrder.size());
- return false;
- }
-
- Logger::instance().info("任务执行顺序:");
- for (size_t i = 0; i < executionOrder.size(); ++i) {
- Logger::instance().infof(" 第{}个执行的任务ID: {}", i + 1, executionOrder[i]);
- }
-
- pool->stop(true);
-
- Logger::instance().info("[成功] 线程池优先级功能测试通过");
- return true;
-
- } catch (const std::exception& e) {
- Logger::instance().errorf("[失败] 线程池优先级测试异常: {}", e.what());
- return false;
- }
- }
-
- // 测试帧队列功能
- bool testFrameQueue() {
- Logger::instance().info("[测试] 帧队列功能...");
-
- try {
- // 创建帧队列
- FrameQueueConfig config;
- config.maxSize = 10;
- FrameQueue frameQueue(config);
-
- // 测试基本的入队出队
- for (int i = 0; i < 5; ++i) {
- AVFrame* frame = av_frame_alloc();
- if (!frame) {
- Logger::instance().error("[失败] 分配帧失败");
- return false;
- }
-
- frame->pts = i;
- frame->width = 1920;
- frame->height = 1080;
-
- if (frameQueue.enqueue(frame) != ErrorCode::SUCCESS) {
- Logger::instance().errorf("[失败] 帧入队失败: {}", i);
- av_frame_free(&frame);
- return false;
- }
- }
-
- Logger::instance().infof("队列大小: {}", frameQueue.size());
-
- // 测试出队
- for (int i = 0; i < 5; ++i) {
- AVFrame* frame = frameQueue.dequeueFrame();
- if (!frame) {
- Logger::instance().errorf("[失败] 帧出队失败: {}", i);
- return false;
- }
-
- if (frame->pts != i) {
- Logger::instance().errorf("[失败] 帧PTS错误: 期望{}, 实际{}", i, frame->pts);
- av_frame_free(&frame);
- return false;
- }
- av_frame_free(&frame);
- }
-
- if (!frameQueue.empty()) {
- Logger::instance().error("[失败] 队列应该为空");
- return false;
- }
-
- Logger::instance().info("[成功] 帧队列功能测试通过");
- return true;
-
- } catch (const std::exception& e) {
- Logger::instance().errorf("[失败] 帧队列测试异常: {}", e.what());
- return false;
- }
- }
-
- // 测试数据包队列功能
- bool testPacketQueue() {
- Logger::instance().info("[测试] 数据包队列功能...");
-
- try {
- // 创建数据包队列
- PacketQueueConfig config;
- config.maxSize = 20;
- PacketQueue packetQueue(config);
-
- // 测试基本的入队出队
- for (int i = 0; i < 10; ++i) {
- AVPacket* packet = av_packet_alloc();
- if (!packet) {
- Logger::instance().error("[失败] 分配数据包失败");
- return false;
- }
-
- // av_packet_alloc已经初始化了packet,只需要设置字段
- packet->pts = i;
- packet->dts = i;
- packet->size = 1024;
- // 不设置data,保持为nullptr
-
- if (packetQueue.enqueue(packet) != ErrorCode::SUCCESS) {
- Logger::instance().errorf("[失败] 数据包入队失败: {}", i);
- av_packet_free(&packet);
- return false;
- }
- }
-
- Logger::instance().infof("队列大小: {}", packetQueue.size());
-
- // 测试出队
- for (int i = 0; i < 10; ++i) {
- AVPacket* packet = packetQueue.dequeuePacket();
- if (!packet) {
- Logger::instance().errorf("[失败] 数据包出队失败: {}", i);
- return false;
- }
-
- if (packet->pts != i) {
- Logger::instance().errorf("[失败] 数据包PTS错误: 期望{}, 实际{}", i, packet->pts);
- av_packet_free(&packet);
- return false;
- }
- av_packet_free(&packet);
- }
-
- if (!packetQueue.empty()) {
- Logger::instance().error("[失败] 队列应该为空");
- return false;
- }
-
- Logger::instance().info("[成功] 数据包队列功能测试通过");
- return true;
-
- } catch (const std::exception& e) {
- Logger::instance().errorf("[失败] 数据包队列测试异常: {}", e.what());
- return false;
- }
- }
-
- // 测试性能监控功能
- bool testPerformanceMonitor() {
- Logger::instance().info("[测试] 性能监控功能...");
-
- try {
- // 创建性能监控器
- PerformanceMonitor monitor;
-
- // 初始化监控器
- monitor.initialize();
-
- // 注册测试指标
- monitor.registerMetric("frames_processed", MetricType::COUNTER);
- monitor.registerMetric("bytes_processed", MetricType::COUNTER);
-
- // 开始监控
- monitor.start();
-
- // 模拟一些工作负载
- for (int i = 0; i < 100; ++i) {
- monitor.incrementCounter("frames_processed");
- monitor.incrementCounter("bytes_processed", 1024);
- std::this_thread::sleep_for(std::chrono::milliseconds(1));
- }
-
- // 停止监控
- monitor.stop();
-
- // 获取统计信息
- auto stats = monitor.getAllStats();
-
- Logger::instance().infof("性能统计:");
- if (stats.find("frames_processed") != stats.end()) {
- Logger::instance().infof(" 处理帧数: {}", stats["frames_processed"].count);
- }
- if (stats.find("bytes_processed") != stats.end()) {
- Logger::instance().infof(" 处理字节数: {}", stats["bytes_processed"].value);
- }
-
- // 简化验证逻辑
- if (stats.find("frames_processed") == stats.end() || stats["frames_processed"].count < 90) {
- Logger::instance().error("[失败] 帧数统计不足");
- return false;
- }
-
- Logger::instance().info("[成功] 性能监控功能测试通过");
- return true;
-
- } catch (const std::exception& e) {
- Logger::instance().errorf("[失败] 性能监控测试异常: {}", e.what());
- return false;
- }
- }
-
- // 测试同步器功能
- bool testSynchronizer() {
- Logger::instance().info("[测试] 同步器功能...");
-
- try {
- // 创建同步器
- Synchronizer sync;
-
- // 启动同步器
- sync.start();
-
- // 测试音视频同步
- double videoPts = 1.0; // 1秒
- double audioPts = 1.1; // 1.1秒
-
- // 设置时钟
- sync.setVideoClock(videoPts);
- sync.setAudioClock(audioPts);
-
- auto videoDelay = sync.calculateVideoDelay(videoPts);
- auto audioDelay = sync.calculateAudioDelay(audioPts);
-
- Logger::instance().infof("视频延迟: {:.2f} ms", videoDelay);
- Logger::instance().infof("音频延迟: {:.2f} ms", audioDelay);
-
- // 测试同步状态
- bool inSync = sync.isSynchronized();
- Logger::instance().infof("音视频同步状态: {}", inSync ? "同步" : "不同步");
-
- // 获取同步误差
- double syncError = sync.calculateSyncError();
- Logger::instance().infof("同步误差: {:.2f} ms", syncError);
-
- sync.stop();
-
- Logger::instance().info("[成功] 同步器功能测试通过");
- return true;
-
- } catch (const std::exception& e) {
- Logger::instance().errorf("[失败] 同步器测试异常: {}", e.what());
- return false;
- }
- }
-
- // 测试线程池工厂
- bool testThreadPoolFactory() {
- Logger::instance().info("[测试] 线程池工厂功能...");
-
- try {
- // 测试标准线程池
- auto standardPool = ThreadPoolFactory::createStandardPool();
- if (!standardPool) {
- Logger::instance().error("[失败] 创建标准线程池失败");
- return false;
- }
-
- standardPool->initialize();
- standardPool->start();
-
- // 提交测试任务
- auto future = standardPool->submit([]() {
- return 42;
- });
-
- int result = future.get();
- if (result != 42) {
- Logger::instance().errorf("[失败] 任务结果错误: 期望42, 实际{}", result);
- return false;
- }
-
- standardPool->stop(true);
-
- // 测试高性能线程池
- auto highPerfPool = ThreadPoolFactory::createHighPerformancePool();
- if (!highPerfPool) {
- Logger::instance().error("[失败] 创建高性能线程池失败");
- return false;
- }
-
- // 测试轻量级线程池
- auto lightPool = ThreadPoolFactory::createLightweightPool();
- if (!lightPool) {
- Logger::instance().error("[失败] 创建轻量级线程池失败");
- return false;
- }
-
- // 测试系统信息
- size_t coreCount = ThreadPoolFactory::getCpuCoreCount();
- size_t recommendedThreads = ThreadPoolFactory::getRecommendedThreadCount();
-
- Logger::instance().infof("CPU核心数: {}", coreCount);
- Logger::instance().infof("推荐线程数: {}", recommendedThreads);
-
- Logger::instance().info("[成功] 线程池工厂功能测试通过");
- return true;
-
- } catch (const std::exception& e) {
- Logger::instance().errorf("[失败] 线程池工厂测试异常: {}", e.what());
- return false;
- }
- }
-
- // 测试线程池管理器
- bool testThreadPoolManager() {
- Logger::instance().info("[测试] 线程池管理器功能...");
-
- try {
- auto& manager = ThreadPoolManager::getInstance();
-
- // 获取默认线程池
- auto* defaultPool = manager.getDefaultPool();
- if (!defaultPool) {
- Logger::instance().error("[失败] 获取默认线程池失败");
- return false;
- }
-
- // 创建命名线程池
- ThreadPoolConfig config;
- config.minThreads = 2;
- config.maxThreads = 4;
-
- ErrorCode result = manager.createPool("test_pool", config);
- if (result != ErrorCode::SUCCESS) {
- Logger::instance().errorf("[失败] 创建命名线程池失败: {}", static_cast<int>(result));
- return false;
- }
-
- // 获取命名线程池
- auto* namedPool = manager.getPool("test_pool");
- if (!namedPool) {
- Logger::instance().error("[失败] 获取命名线程池失败");
- return false;
- }
-
- // 获取所有线程池名称
- auto poolNames = manager.getPoolNames();
- Logger::instance().infof("线程池数量: {}", poolNames.size());
- for (const auto& name : poolNames) {
- Logger::instance().infof(" - {}", name);
- }
-
- // 销毁线程池
- result = manager.destroyPool("test_pool");
- if (result != ErrorCode::SUCCESS) {
- Logger::instance().errorf("[失败] 销毁线程池失败: {}", static_cast<int>(result));
- return false;
- }
-
- Logger::instance().info("[成功] 线程池管理器功能测试通过");
- return true;
-
- } catch (const std::exception& e) {
- Logger::instance().errorf("[失败] 线程池管理器测试异常: {}", e.what());
- return false;
- }
- }
-
- // 运行所有测试
- bool runAllTests() {
- Logger::instance().info("开始运行所有工具类测试...");
- std::vector<std::pair<std::string, std::function<bool()>>> tests
- = {{"线程池基本功能", [this]() { return testThreadPoolBasic(); }},
- {"线程池优先级功能", [this]() { return testThreadPoolPriority(); }},
- {"帧队列功能", [this]() { return testFrameQueue(); }},
- {"数据包队列功能", [this]() { return testPacketQueue(); }}, // 临时注释掉
- {"性能监控功能", [this]() { return testPerformanceMonitor(); }},
- {"同步器功能", [this]() { return testSynchronizer(); }},
- {"线程池工厂功能", [this]() { return testThreadPoolFactory(); }},
- {"线程池管理器功能", [this]() { return testThreadPoolManager(); }}};
- int passedTests = 0;
- int totalTests = tests.size();
-
- for (const auto& test : tests) {
- Logger::instance().infof("\n=== 运行测试: {} ===", test.first);
-
- try {
- if (test.second()) {
- passedTests++;
- Logger::instance().infof("✓ {} 测试通过", test.first);
- } else {
- Logger::instance().errorf("✗ {} 测试失败", test.first);
- }
- } catch (const std::exception& e) {
- Logger::instance().errorf("✗ {} 测试异常: {}", test.first, e.what());
- }
- }
-
- Logger::instance().infof("\n=== 测试结果汇总 ===");
- Logger::instance().infof("通过测试: {}/{}", passedTests, totalTests);
- Logger::instance().infof("成功率: {}%", (double) passedTests / totalTests * 100.0);
- return passedTests == totalTests;
- }
- };
- int main() {
- std::cout << "=== AV工具类模块测试程序 ===" << std::endl;
-
- // 初始化日志系统
- Logger::initialize("utils_test_log.txt", LogLevel::DEBUG, false, true);
-
- try {
- UtilsTester tester;
-
- bool allTestsPassed = tester.runAllTests();
-
- if (allTestsPassed) {
- std::cout << "\n🎉 所有测试通过!" << std::endl;
- Logger::instance().info("所有工具类测试通过");
- return 0;
- } else {
- std::cout << "\n❌ 部分测试失败!" << std::endl;
- Logger::instance().error("部分工具类测试失败");
- return 1;
- }
-
- } catch (const std::exception& e) {
- std::cerr << "测试过程中发生异常: " << e.what() << std::endl;
- Logger::instance().errorf("测试过程中发生异常: {}", e.what());
- return 1;
- }
- }
|