#include "utils_synchronizer.h" #include "../base/logger.h" #include #include #include namespace av { namespace utils { using namespace av; Synchronizer::Synchronizer(const SyncConfig& config) : config_(config) , masterClockType_(ClockType::AUDIO) , state_(SyncState::IDLE) , initialized_(false) , running_(false) , paused_(false) { // 初始化时钟 audioClock_ = ClockInfo(); videoClock_ = ClockInfo(); externalClock_ = ClockInfo(); // 初始化历史记录 audioClockHistory_.reserve(config_.smoothingWindow); videoClockHistory_.reserve(config_.smoothingWindow); syncErrorHistory_.reserve(config_.smoothingWindow); Logger::instance().debugf("Synchronizer created with strategy: {}", static_cast(config_.strategy)); } Synchronizer::~Synchronizer() { close(); Logger::instance().debug("Synchronizer destroyed"); } ErrorCode Synchronizer::initialize() { if (initialized_) { return ErrorCode::ALREADY_INITIALIZED; } std::lock_guard lock(clockMutex_); // 重置所有时钟 resetClock(ClockType::AUDIO); resetClock(ClockType::VIDEO); resetClock(ClockType::EXTERNAL); // 选择主时钟 selectMasterClock(); // 重置统计信息 resetStats(); // 设置开始时间 startTime_ = std::chrono::steady_clock::now(); lastClockUpdate_ = startTime_; state_ = SyncState::IDLE; initialized_ = true; Logger::instance().info("Synchronizer initialized"); return ErrorCode::SUCCESS; } ErrorCode Synchronizer::start() { if (!initialized_) { return ErrorCode::NOT_INITIALIZED; } if (running_) { return ErrorCode::ALREADY_STARTED; } running_ = true; paused_ = false; state_ = SyncState::SYNCING; startTime_ = std::chrono::steady_clock::now(); Logger::instance().info("Synchronizer started"); return ErrorCode::SUCCESS; } ErrorCode Synchronizer::stop() { if (!running_) { return ErrorCode::NOT_STARTED; } running_ = false; paused_ = false; state_ = SyncState::IDLE; Logger::instance().info("Synchronizer stopped"); return ErrorCode::SUCCESS; } ErrorCode Synchronizer::pause() { if (!running_) { return ErrorCode::NOT_STARTED; } if (paused_) { return ErrorCode::ALREADY_PAUSED; } std::lock_guard lock(clockMutex_); paused_ = true; // 暂停所有时钟 pauseClock(ClockType::AUDIO, true); pauseClock(ClockType::VIDEO, true); pauseClock(ClockType::EXTERNAL, true); Logger::instance().info("Synchronizer paused"); return ErrorCode::SUCCESS; } ErrorCode Synchronizer::resume() { if (!running_) { return ErrorCode::NOT_STARTED; } if (!paused_) { return ErrorCode::NOT_PAUSED; } std::lock_guard lock(clockMutex_); paused_ = false; // 恢复所有时钟 pauseClock(ClockType::AUDIO, false); pauseClock(ClockType::VIDEO, false); pauseClock(ClockType::EXTERNAL, false); state_ = SyncState::SYNCING; Logger::instance().info("Synchronizer resumed"); return ErrorCode::SUCCESS; } ErrorCode Synchronizer::reset() { std::lock_guard clockLock(clockMutex_); std::lock_guard statsLock(statsMutex_); // 重置时钟 resetClock(ClockType::AUDIO); resetClock(ClockType::VIDEO); resetClock(ClockType::EXTERNAL); // 重置统计信息 stats_ = SyncStats(); // 清空历史记录 audioClockHistory_.clear(); videoClockHistory_.clear(); syncErrorHistory_.clear(); // 重置状态 state_ = running_ ? SyncState::SYNCING : SyncState::IDLE; // 重新选择主时钟 selectMasterClock(); Logger::instance().info("Synchronizer reset"); return ErrorCode::SUCCESS; } ErrorCode Synchronizer::close() { if (running_) { stop(); } initialized_ = false; Logger::instance().info("Synchronizer closed"); return ErrorCode::SUCCESS; } ErrorCode Synchronizer::setAudioClock(double pts, double time) { return updateClock(ClockType::AUDIO, pts, time); } ErrorCode Synchronizer::setVideoClock(double pts, double time) { return updateClock(ClockType::VIDEO, pts, time); } ErrorCode Synchronizer::setExternalClock(double time) { if (time < 0) { time = getCurrentTime(); } std::lock_guard lock(clockMutex_); externalClock_.pts = time; externalClock_.time = time; externalClock_.lastUpdate = std::chrono::steady_clock::now(); if (config_.enableStats) { updateStats(); } return ErrorCode::SUCCESS; } ErrorCode Synchronizer::updateClock(ClockType type, double pts, double time) { if (!running_) { return ErrorCode::NOT_STARTED; } if (time < 0) { time = getCurrentTime(); } std::lock_guard lock(clockMutex_); ClockInfo* clock = nullptr; switch (type) { case ClockType::AUDIO: clock = &audioClock_; break; case ClockType::VIDEO: clock = &videoClock_; break; case ClockType::EXTERNAL: clock = &externalClock_; break; } if (!clock) { return ErrorCode::INVALID_PARAMS; } // 平滑处理 double smoothedPts = config_.enableAdaptiveSync ? smoothClock(type, pts) : pts; clock->pts = smoothedPts; clock->time = time; clock->lastUpdate = std::chrono::steady_clock::now(); // 更新主时钟 updateMasterClock(); // 更新同步状态 updateSyncState(); // 更新统计信息 if (config_.enableStats) { updateStats(); } return ErrorCode::SUCCESS; } double Synchronizer::getAudioClock() const { return getClockValue(ClockType::AUDIO); } double Synchronizer::getVideoClock() const { return getClockValue(ClockType::VIDEO); } double Synchronizer::getExternalClock() const { return getClockValue(ClockType::EXTERNAL); } double Synchronizer::getMasterClock() const { return getClockValue(masterClockType_); } ClockType Synchronizer::getMasterClockType() const { return masterClockType_; } ErrorCode Synchronizer::synchronize() { if (!running_) { return ErrorCode::NOT_STARTED; } std::lock_guard lock(clockMutex_); // 更新主时钟 updateMasterClock(); // 计算同步误差 double syncError = calculateSyncError(); // 检查是否需要调整 if (std::abs(syncError) > config_.syncThreshold) { // 调整播放速度 if (config_.enableAdaptiveSync) { adjustPlaybackSpeed(); } // 更新状态 if (std::abs(syncError) > config_.maxSyncError) { state_ = SyncState::ERROR; } else { state_ = SyncState::DRIFT; } } else { state_ = SyncState::SYNCHRONIZED; } // 更新统计信息 if (config_.enableStats) { updateSyncError(syncError); } // 通知状态变化 if (syncEventCallback_) { syncEventCallback_(state_, syncError); } return ErrorCode::SUCCESS; } ErrorCode Synchronizer::synchronizeAudio(double audioPts, double& delay) { if (!running_) { return ErrorCode::NOT_STARTED; } // 更新音频时钟 updateClock(ClockType::AUDIO, audioPts); // 计算延迟 delay = calculateAudioDelay(audioPts); // 检查是否需要丢帧 if (config_.enableFrameDrop && shouldDropFrame(ClockType::AUDIO, audioPts)) { if (frameDropCallback_) { frameDropCallback_(ClockType::AUDIO, audioPts); } updateDropCount(); } return ErrorCode::SUCCESS; } ErrorCode Synchronizer::synchronizeVideo(double videoPts, double& delay) { if (!running_) { return ErrorCode::NOT_STARTED; } // 更新视频时钟 updateClock(ClockType::VIDEO, videoPts); // 计算延迟 delay = calculateVideoDelay(videoPts); // 检查是否需要丢帧或重复帧 if (config_.enableFrameDrop && shouldDropFrame(ClockType::VIDEO, videoPts)) { if (frameDropCallback_) { frameDropCallback_(ClockType::VIDEO, videoPts); } updateDropCount(); } else if (config_.enableFrameDuplicate && shouldDuplicateFrame(ClockType::VIDEO, videoPts)) { if (frameDuplicateCallback_) { frameDuplicateCallback_(ClockType::VIDEO, videoPts); } updateDuplicateCount(); } return ErrorCode::SUCCESS; } bool Synchronizer::shouldDropFrame(ClockType type, double pts) { return needFrameDrop(type, pts); } bool Synchronizer::shouldDuplicateFrame(ClockType type, double pts) { return needFrameDuplicate(type, pts); } double Synchronizer::calculateAudioDelay(double audioPts) const { double masterClock = getMasterClock(); double delay = audioPts - masterClock; // 限制延迟范围 delay = std::max(-config_.maxAudioDelay, std::min(config_.maxAudioDelay, delay)); return delay; } double Synchronizer::calculateVideoDelay(double videoPts) const { double masterClock = getMasterClock(); double delay = videoPts - masterClock; // 限制延迟范围 delay = std::max(-config_.maxVideoDelay, std::min(config_.maxVideoDelay, delay)); return delay; } double Synchronizer::calculateSyncError() const { double audioClock = getAudioClock(); double videoClock = getVideoClock(); if (audioClock <= 0 || videoClock <= 0) { return 0.0; } return std::abs(audioClock - videoClock); } void Synchronizer::setConfig(const SyncConfig& config) { std::lock_guard lock(configMutex_); config_ = config; // 重新选择主时钟 selectMasterClock(); Logger::instance().debug("Synchronizer config updated"); } SyncConfig Synchronizer::getConfig() const { std::lock_guard lock(configMutex_); return config_; } void Synchronizer::setSyncStrategy(SyncStrategy strategy) { std::lock_guard lock(configMutex_); config_.strategy = strategy; selectMasterClock(); Logger::instance().debugf("Sync strategy set to: {}", static_cast(strategy)); } SyncStrategy Synchronizer::getSyncStrategy() const { std::lock_guard lock(configMutex_); return config_.strategy; } void Synchronizer::setSyncThreshold(double threshold) { std::lock_guard lock(configMutex_); config_.syncThreshold = threshold; Logger::instance().debugf("Sync threshold set to: {}ms", threshold * 1000); } void Synchronizer::setMaxSyncError(double maxError) { std::lock_guard lock(configMutex_); config_.maxSyncError = maxError; Logger::instance().debugf("Max sync error set to: {}ms", maxError * 1000); } void Synchronizer::setPlaybackSpeed(double speed) { std::lock_guard lock(clockMutex_); audioClock_.speed = speed; videoClock_.speed = speed; externalClock_.speed = speed; Logger::instance().debugf("Playback speed set to: {}x", speed); } double Synchronizer::getPlaybackSpeed() const { std::lock_guard lock(clockMutex_); return audioClock_.speed; } void Synchronizer::setPaused(bool paused) { if (paused) { pause(); } else { resume(); } } bool Synchronizer::isPaused() const { return paused_; } SyncState Synchronizer::getState() const { return state_; } bool Synchronizer::isSynchronized() const { return state_ == SyncState::SYNCHRONIZED; } SyncStats Synchronizer::getStats() const { std::lock_guard lock(statsMutex_); SyncStats stats = stats_; // 更新当前时钟值 stats.audioClock = getAudioClock(); stats.videoClock = getVideoClock(); stats.externalClock = getExternalClock(); stats.masterClock = getMasterClock(); stats.state = state_; return stats; } void Synchronizer::resetStats() { std::lock_guard lock(statsMutex_); stats_ = SyncStats(); stats_.lastUpdateTime = std::chrono::steady_clock::now(); Logger::instance().debug("Synchronizer stats reset"); } void Synchronizer::setSyncEventCallback(SyncEventCallback callback) { syncEventCallback_ = callback; } void Synchronizer::setFrameDropCallback(FrameDropCallback callback) { frameDropCallback_ = callback; } void Synchronizer::setFrameDuplicateCallback(FrameDuplicateCallback callback) { frameDuplicateCallback_ = callback; } void Synchronizer::enableAdaptiveSync(bool enable) { std::lock_guard lock(configMutex_); config_.enableAdaptiveSync = enable; Logger::instance().debugf("Adaptive sync {}", enable ? "enabled" : "disabled"); } void Synchronizer::setClockUpdateInterval(double interval) { std::lock_guard lock(configMutex_); config_.clockUpdateInterval = interval; Logger::instance().debugf("Clock update interval set to: {}ms", interval * 1000); } void Synchronizer::setSmoothingWindow(int window) { std::lock_guard lock(configMutex_); config_.smoothingWindow = window; // 调整历史记录大小 audioClockHistory_.reserve(window); videoClockHistory_.reserve(window); syncErrorHistory_.reserve(window); Logger::instance().debugf("Smoothing window set to: {}", window); } // 内部方法实现 void Synchronizer::updateMasterClock() { // 根据策略选择主时钟 switch (config_.strategy) { case SyncStrategy::AUDIO_MASTER: masterClockType_ = ClockType::AUDIO; break; case SyncStrategy::VIDEO_MASTER: masterClockType_ = ClockType::VIDEO; break; case SyncStrategy::EXTERNAL_CLOCK: masterClockType_ = ClockType::EXTERNAL; break; case SyncStrategy::AUTO_SELECT: selectMasterClock(); break; } } void Synchronizer::updateSyncState() { if (!running_) { state_ = SyncState::IDLE; return; } if (paused_) { return; } double syncError = calculateSyncError(); if (syncError <= config_.syncThreshold) { state_ = SyncState::SYNCHRONIZED; } else if (syncError <= config_.maxSyncError) { state_ = SyncState::DRIFT; } else { state_ = SyncState::ERROR; } } void Synchronizer::updateStats() { std::lock_guard lock(statsMutex_); stats_.audioClock = getAudioClock(); stats_.videoClock = getVideoClock(); stats_.externalClock = getExternalClock(); stats_.masterClock = getMasterClock(); stats_.audioDelay = calculateAudioDelay(stats_.audioClock); stats_.videoDelay = calculateVideoDelay(stats_.videoClock); stats_.syncError = calculateSyncError(); stats_.state = state_; stats_.lastUpdateTime = std::chrono::steady_clock::now(); // 更新最大同步误差 stats_.maxSyncError = std::max(stats_.maxSyncError, stats_.syncError); // 更新同步次数 stats_.syncCount++; // 更新平均同步误差 if (stats_.syncCount == 1) { stats_.avgSyncError = stats_.syncError; } else { stats_.avgSyncError = (stats_.avgSyncError * (stats_.syncCount - 1) + stats_.syncError) / stats_.syncCount; } } double Synchronizer::getCurrentTime() const { auto now = std::chrono::steady_clock::now(); auto duration = std::chrono::duration_cast(now - startTime_); return duration.count() / 1000000.0; // 转换为秒 } double Synchronizer::smoothClock(ClockType type, double newValue) { std::vector* history = nullptr; switch (type) { case ClockType::AUDIO: history = &audioClockHistory_; break; case ClockType::VIDEO: history = &videoClockHistory_; break; default: return newValue; } // 添加新值到历史记录 history->push_back(newValue); // 保持历史记录大小 if (history->size() > static_cast(config_.smoothingWindow)) { history->erase(history->begin()); } // 计算平均值 double sum = 0.0; for (double value : *history) { sum += value; } return sum / history->size(); } bool Synchronizer::isClockValid(ClockType type) const { const ClockInfo* clock = nullptr; switch (type) { case ClockType::AUDIO: clock = &audioClock_; break; case ClockType::VIDEO: clock = &videoClock_; break; case ClockType::EXTERNAL: clock = &externalClock_; break; } if (!clock) { return false; } // 检查时钟是否有效(最近更新过) auto now = std::chrono::steady_clock::now(); auto elapsed = std::chrono::duration_cast(now - clock->lastUpdate); return elapsed.count() < 1000; // 1秒内更新过认为有效 } void Synchronizer::selectMasterClock() { // 自动选择最稳定的时钟作为主时钟 if (isClockValid(ClockType::AUDIO)) { masterClockType_ = ClockType::AUDIO; } else if (isClockValid(ClockType::VIDEO)) { masterClockType_ = ClockType::VIDEO; } else { masterClockType_ = ClockType::EXTERNAL; } } void Synchronizer::handleSyncDrift() { // 处理时钟漂移 double syncError = calculateSyncError(); if (syncError > config_.maxSyncError) { // 重置时钟 Logger::instance().warning("Sync drift detected, resetting clocks"); reset(); } } void Synchronizer::notifyStateChange(SyncState newState) { if (state_ != newState) { SyncState oldState = state_; state_ = newState; if (syncEventCallback_) { syncEventCallback_(newState, calculateSyncError()); } Logger::instance().debugf("Sync state changed from {} to {}", static_cast(oldState), static_cast(newState)); } } double Synchronizer::getClockValue(ClockType type) const { std::lock_guard lock(clockMutex_); const ClockInfo* clock = nullptr; switch (type) { case ClockType::AUDIO: clock = &audioClock_; break; case ClockType::VIDEO: clock = &videoClock_; break; case ClockType::EXTERNAL: clock = &externalClock_; break; } if (!clock || clock->paused) { return clock ? clock->pts : 0.0; } // 计算当前时钟值 auto now = std::chrono::steady_clock::now(); auto elapsed = std::chrono::duration_cast(now - clock->lastUpdate); double elapsedSeconds = elapsed.count() / 1000000.0; return clock->pts + elapsedSeconds * clock->speed; } void Synchronizer::setClockValue(ClockType type, double value, double time) { ClockInfo* clock = nullptr; switch (type) { case ClockType::AUDIO: clock = &audioClock_; break; case ClockType::VIDEO: clock = &videoClock_; break; case ClockType::EXTERNAL: clock = &externalClock_; break; } if (clock) { clock->pts = value; clock->time = (time >= 0) ? time : getCurrentTime(); clock->lastUpdate = std::chrono::steady_clock::now(); } } void Synchronizer::resetClock(ClockType type) { ClockInfo* clock = nullptr; switch (type) { case ClockType::AUDIO: clock = &audioClock_; break; case ClockType::VIDEO: clock = &videoClock_; break; case ClockType::EXTERNAL: clock = &externalClock_; break; } if (clock) { *clock = ClockInfo(); clock->lastUpdate = std::chrono::steady_clock::now(); } } void Synchronizer::pauseClock(ClockType type, bool pause) { ClockInfo* clock = nullptr; switch (type) { case ClockType::AUDIO: clock = &audioClock_; break; case ClockType::VIDEO: clock = &videoClock_; break; case ClockType::EXTERNAL: clock = &externalClock_; break; } if (clock) { clock->paused = pause; if (!pause) { clock->lastUpdate = std::chrono::steady_clock::now(); } } } double Synchronizer::calculateOptimalDelay(ClockType type, double pts) { double masterClock = getMasterClock(); double delay = pts - masterClock; // 根据类型调整延迟 if (type == ClockType::AUDIO) { delay = std::max(-config_.maxAudioDelay, std::min(config_.maxAudioDelay, delay)); } else if (type == ClockType::VIDEO) { delay = std::max(-config_.maxVideoDelay, std::min(config_.maxVideoDelay, delay)); } return delay; } bool Synchronizer::needFrameDrop(ClockType type, double pts) { double delay = calculateOptimalDelay(type, pts); // 如果延迟太大(帧太晚),需要丢帧 if (type == ClockType::AUDIO) { return delay < -config_.maxAudioDelay / 2; } else if (type == ClockType::VIDEO) { return delay < -config_.maxVideoDelay / 2; } return false; } bool Synchronizer::needFrameDuplicate(ClockType type, double pts) { double delay = calculateOptimalDelay(type, pts); // 如果延迟太小(帧太早),需要重复帧 if (type == ClockType::VIDEO) { return delay > config_.maxVideoDelay / 2; } return false; } void Synchronizer::adjustPlaybackSpeed() { double syncError = calculateSyncError(); if (syncError > config_.syncThreshold) { // 根据同步误差调整播放速度 double speedAdjustment = std::min(0.1, syncError / 10.0); double newSpeed = 1.0; if (masterClockType_ == ClockType::AUDIO) { // 音频为主,调整视频速度 newSpeed = videoClock_.speed + speedAdjustment; } else if (masterClockType_ == ClockType::VIDEO) { // 视频为主,调整音频速度 newSpeed = audioClock_.speed + speedAdjustment; } // 限制速度范围 newSpeed = std::max(0.5, std::min(2.0, newSpeed)); if (masterClockType_ == ClockType::AUDIO) { videoClock_.speed = newSpeed; } else if (masterClockType_ == ClockType::VIDEO) { audioClock_.speed = newSpeed; } Logger::instance().debugf("Adjusted playback speed to: {}x", newSpeed); } } void Synchronizer::updateSyncError(double error) { syncErrorHistory_.push_back(error); if (syncErrorHistory_.size() > static_cast(config_.smoothingWindow)) { syncErrorHistory_.erase(syncErrorHistory_.begin()); } } void Synchronizer::updateDropCount() { std::lock_guard lock(statsMutex_); stats_.dropCount++; } void Synchronizer::updateDuplicateCount() { std::lock_guard lock(statsMutex_); stats_.duplicateCount++; } // 多流同步器实现 MultiStreamSynchronizer::MultiStreamSynchronizer(const SyncConfig& config) : defaultConfig_(config) , masterStreamIndex_(-1) , globalState_(SyncState::IDLE) { Logger::instance().debug("MultiStreamSynchronizer created"); } MultiStreamSynchronizer::~MultiStreamSynchronizer() { streamSynchronizers_.clear(); Logger::instance().debug("MultiStreamSynchronizer destroyed"); } ErrorCode MultiStreamSynchronizer::addStream(int streamIndex, ClockType type) { std::unique_lock lock(streamsMutex_); if (streamSynchronizers_.find(streamIndex) != streamSynchronizers_.end()) { Logger::instance().warningf("Stream {} already exists", streamIndex); return ErrorCode::ALREADY_EXISTS; } auto synchronizer = std::make_unique(defaultConfig_); synchronizer->initialize(); streamSynchronizers_[streamIndex] = std::move(synchronizer); streamTypes_[streamIndex] = type; // 如果是第一个流,设为主流 if (masterStreamIndex_ == -1) { masterStreamIndex_ = streamIndex; } Logger::instance().debugf("Added stream: {} with type: {}", streamIndex, static_cast(type)); return ErrorCode::SUCCESS; } ErrorCode MultiStreamSynchronizer::removeStream(int streamIndex) { std::unique_lock lock(streamsMutex_); auto it = streamSynchronizers_.find(streamIndex); if (it == streamSynchronizers_.end()) { Logger::instance().warningf("Stream {} not found", streamIndex); return ErrorCode::NOT_FOUND; } streamSynchronizers_.erase(it); streamTypes_.erase(streamIndex); // 如果删除的是主流,重新选择主流 if (masterStreamIndex_ == streamIndex) { updateMasterStream(); } Logger::instance().debugf("Removed stream: {}", streamIndex); return ErrorCode::SUCCESS; } bool MultiStreamSynchronizer::hasStream(int streamIndex) const { std::shared_lock lock(streamsMutex_); return streamSynchronizers_.find(streamIndex) != streamSynchronizers_.end(); } std::vector MultiStreamSynchronizer::getStreamIndices() const { std::shared_lock lock(streamsMutex_); std::vector indices; for (const auto& pair : streamSynchronizers_) { indices.push_back(pair.first); } return indices; } ErrorCode MultiStreamSynchronizer::synchronizeStream(int streamIndex, double pts, double& delay) { std::shared_lock lock(streamsMutex_); auto it = streamSynchronizers_.find(streamIndex); if (it == streamSynchronizers_.end()) { return ErrorCode::NOT_FOUND; } auto typeIt = streamTypes_.find(streamIndex); if (typeIt == streamTypes_.end()) { return ErrorCode::NOT_FOUND; } ClockType type = typeIt->second; if (type == ClockType::AUDIO) { return it->second->synchronizeAudio(pts, delay); } else if (type == ClockType::VIDEO) { return it->second->synchronizeVideo(pts, delay); } return ErrorCode::INVALID_PARAMS; } ErrorCode MultiStreamSynchronizer::synchronizeAllStreams() { std::shared_lock lock(streamsMutex_); ErrorCode result = ErrorCode::SUCCESS; for (auto& pair : streamSynchronizers_) { ErrorCode syncResult = pair.second->synchronize(); if (syncResult != ErrorCode::SUCCESS) { result = syncResult; } } // 同步到主流 syncToMaster(); return result; } bool MultiStreamSynchronizer::shouldDropFrame(int streamIndex, double pts) { std::shared_lock lock(streamsMutex_); auto it = streamSynchronizers_.find(streamIndex); if (it == streamSynchronizers_.end()) { return false; } auto typeIt = streamTypes_.find(streamIndex); if (typeIt == streamTypes_.end()) { return false; } return it->second->shouldDropFrame(typeIt->second, pts); } bool MultiStreamSynchronizer::shouldDuplicateFrame(int streamIndex, double pts) { std::shared_lock lock(streamsMutex_); auto it = streamSynchronizers_.find(streamIndex); if (it == streamSynchronizers_.end()) { return false; } auto typeIt = streamTypes_.find(streamIndex); if (typeIt == streamTypes_.end()) { return false; } return it->second->shouldDuplicateFrame(typeIt->second, pts); } ErrorCode MultiStreamSynchronizer::updateStreamClock(int streamIndex, double pts, double time) { std::shared_lock lock(streamsMutex_); auto it = streamSynchronizers_.find(streamIndex); if (it == streamSynchronizers_.end()) { return ErrorCode::NOT_FOUND; } auto typeIt = streamTypes_.find(streamIndex); if (typeIt == streamTypes_.end()) { return ErrorCode::NOT_FOUND; } return it->second->updateClock(typeIt->second, pts, time); } double MultiStreamSynchronizer::getStreamClock(int streamIndex) const { std::shared_lock lock(streamsMutex_); auto it = streamSynchronizers_.find(streamIndex); if (it == streamSynchronizers_.end()) { return 0.0; } return it->second->getMasterClock(); } double MultiStreamSynchronizer::getMasterClock() const { std::lock_guard lock(masterMutex_); if (masterStreamIndex_ == -1) { return 0.0; } return getStreamClock(masterStreamIndex_); } int MultiStreamSynchronizer::getMasterStreamIndex() const { std::lock_guard lock(masterMutex_); return masterStreamIndex_; } void MultiStreamSynchronizer::setConfig(const SyncConfig& config) { defaultConfig_ = config; std::shared_lock lock(streamsMutex_); for (auto& pair : streamSynchronizers_) { pair.second->setConfig(config); } Logger::instance().debug("MultiStreamSynchronizer config updated"); } SyncConfig MultiStreamSynchronizer::getConfig() const { return defaultConfig_; } SyncState MultiStreamSynchronizer::getState() const { return globalState_; } std::map MultiStreamSynchronizer::getAllStats() const { std::shared_lock lock(streamsMutex_); std::map allStats; for (const auto& pair : streamSynchronizers_) { allStats[pair.first] = pair.second->getStats(); } return allStats; } SyncStats MultiStreamSynchronizer::getStats(int streamIndex) const { std::shared_lock lock(streamsMutex_); auto it = streamSynchronizers_.find(streamIndex); if (it != streamSynchronizers_.end()) { return it->second->getStats(); } return SyncStats(); } void MultiStreamSynchronizer::resetStats() { std::shared_lock lock(streamsMutex_); for (auto& pair : streamSynchronizers_) { pair.second->resetStats(); } Logger::instance().debug("All stream synchronizer stats reset"); } void MultiStreamSynchronizer::resetStats(int streamIndex) { std::shared_lock lock(streamsMutex_); auto it = streamSynchronizers_.find(streamIndex); if (it != streamSynchronizers_.end()) { it->second->resetStats(); Logger::instance().debugf("Stream {} synchronizer stats reset", streamIndex); } } ErrorCode MultiStreamSynchronizer::start() { std::shared_lock lock(streamsMutex_); ErrorCode result = ErrorCode::SUCCESS; for (auto& pair : streamSynchronizers_) { ErrorCode startResult = pair.second->start(); if (startResult != ErrorCode::SUCCESS) { result = startResult; } } globalState_ = SyncState::SYNCING; Logger::instance().info("MultiStreamSynchronizer started"); return result; } ErrorCode MultiStreamSynchronizer::stop() { std::shared_lock lock(streamsMutex_); ErrorCode result = ErrorCode::SUCCESS; for (auto& pair : streamSynchronizers_) { ErrorCode stopResult = pair.second->stop(); if (stopResult != ErrorCode::SUCCESS) { result = stopResult; } } globalState_ = SyncState::IDLE; Logger::instance().info("MultiStreamSynchronizer stopped"); return result; } ErrorCode MultiStreamSynchronizer::pause() { std::shared_lock lock(streamsMutex_); ErrorCode result = ErrorCode::SUCCESS; for (auto& pair : streamSynchronizers_) { ErrorCode pauseResult = pair.second->pause(); if (pauseResult != ErrorCode::SUCCESS) { result = pauseResult; } } Logger::instance().info("MultiStreamSynchronizer paused"); return result; } ErrorCode MultiStreamSynchronizer::resume() { std::shared_lock lock(streamsMutex_); ErrorCode result = ErrorCode::SUCCESS; for (auto& pair : streamSynchronizers_) { ErrorCode resumeResult = pair.second->resume(); if (resumeResult != ErrorCode::SUCCESS) { result = resumeResult; } } globalState_ = SyncState::SYNCING; Logger::instance().info("MultiStreamSynchronizer resumed"); return result; } ErrorCode MultiStreamSynchronizer::reset() { std::shared_lock lock(streamsMutex_); ErrorCode result = ErrorCode::SUCCESS; for (auto& pair : streamSynchronizers_) { ErrorCode resetResult = pair.second->reset(); if (resetResult != ErrorCode::SUCCESS) { result = resetResult; } } Logger::instance().info("MultiStreamSynchronizer reset"); return result; } void MultiStreamSynchronizer::updateMasterStream() { std::lock_guard lock(masterMutex_); if (streamSynchronizers_.empty()) { masterStreamIndex_ = -1; return; } // 选择第一个音频流作为主流,如果没有音频流则选择第一个视频流 for (const auto& pair : streamTypes_) { if (pair.second == ClockType::AUDIO) { masterStreamIndex_ = pair.first; return; } } // 如果没有音频流,选择第一个流 masterStreamIndex_ = streamSynchronizers_.begin()->first; } void MultiStreamSynchronizer::syncToMaster() { if (masterStreamIndex_ == -1) { return; } double masterClock = getMasterClock(); // 将其他流同步到主流 for (auto& pair : streamSynchronizers_) { if (pair.first != masterStreamIndex_) { // 这里可以实现更复杂的同步逻辑 // 暂时简化处理 } } } // 工厂类实现 std::unique_ptr SynchronizerFactory::createStandardSynchronizer() { SyncConfig config; config.strategy = SyncStrategy::AUDIO_MASTER; config.maxAudioDelay = 0.1; config.maxVideoDelay = 0.04; config.syncThreshold = 0.01; config.enableFrameDrop = true; config.enableFrameDuplicate = true; config.enableAdaptiveSync = true; return std::make_unique(config); } std::unique_ptr SynchronizerFactory::createLowLatencySynchronizer() { SyncConfig config; config.strategy = SyncStrategy::AUTO_SELECT; config.maxAudioDelay = 0.05; config.maxVideoDelay = 0.02; config.syncThreshold = 0.005; config.maxSyncError = 0.1; config.enableFrameDrop = true; config.enableFrameDuplicate = false; config.enableAdaptiveSync = true; config.smoothingWindow = 5; return std::make_unique(config); } std::unique_ptr SynchronizerFactory::createHighPrecisionSynchronizer() { SyncConfig config; config.strategy = SyncStrategy::EXTERNAL_CLOCK; config.maxAudioDelay = 0.2; config.maxVideoDelay = 0.08; config.syncThreshold = 0.001; config.maxSyncError = 0.01; config.enableFrameDrop = true; config.enableFrameDuplicate = true; config.enableAdaptiveSync = true; config.smoothingWindow = 20; config.clockUpdateInterval = 0.01; return std::make_unique(config); } std::unique_ptr SynchronizerFactory::createRealtimeSynchronizer() { SyncConfig config; config.strategy = SyncStrategy::VIDEO_MASTER; config.maxAudioDelay = 0.03; config.maxVideoDelay = 0.016; config.syncThreshold = 0.008; config.maxSyncError = 0.05; config.enableFrameDrop = true; config.enableFrameDuplicate = false; config.enableAdaptiveSync = true; config.smoothingWindow = 3; config.clockUpdateInterval = 0.016; return std::make_unique(config); } std::unique_ptr SynchronizerFactory::createMultiStreamSynchronizer(const SyncConfig& config) { return std::make_unique(config); } } // namespace utils } // namespace av