read_thread.cpp 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. #include "read_thread.h"
  2. #include "playercontroller.h"
  3. #include <QLoggingCategory>
  4. #include <QString>
  5. Q_LOGGING_CATEGORY(playerControllerReadThread, "player.controller.ReadThread")
  6. extern int infinite_buffer;
  7. extern int64_t start_time;
  8. static int64_t duration = AV_NOPTS_VALUE;
  9. ReadThread::ReadThread(VideoState* pState)
  10. : m_pPlayData(pState)
  11. {
  12. qCDebug(playerControllerReadThread) << "[ReadThread] constructed, pState:" << (void*) pState;
  13. }
  14. ReadThread::~ReadThread()
  15. {
  16. qCDebug(playerControllerReadThread) << "[ReadThread] destructed.";
  17. stop();
  18. join();
  19. }
  20. void ReadThread::set_video_state(VideoState* pState)
  21. {
  22. assert(pState);
  23. m_pPlayData = pState;
  24. }
  25. int ReadThread::loop_read()
  26. {
  27. int ret = -1;
  28. VideoState* is = m_pPlayData;
  29. AVPacket* pkt = nullptr;
  30. int pkt_in_play_range = 0;
  31. int64_t stream_start_time = 0;
  32. int64_t pkt_ts = 0;
  33. // 重连计数器和配置
  34. static int reconnect_count = 0;
  35. static const int max_fast_reconnect_attempts = 3; // 快速重连次数
  36. static const int max_normal_reconnect_attempts = 10; // 正常重连次数
  37. static bool infinite_reconnect_mode = false; // 无限重连模式
  38. if (!is)
  39. return ret;
  40. pkt = av_packet_alloc();
  41. if (!pkt) {
  42. av_log(nullptr, AV_LOG_FATAL, "Could not allocate packet.\n");
  43. ret = AVERROR(ENOMEM);
  44. return ret;
  45. }
  46. qCDebug(playerControllerReadThread) << "[ReadThread] loop_read start, file:"
  47. << (is && is->ic ? is->ic->url : "null");
  48. is->read_thread_exit = 0;
  49. for (;;) {
  50. if (isExit()) {
  51. qCDebug(playerControllerReadThread) << "[ReadThread] m_exit set, break.";
  52. break;
  53. }
  54. if (is->abort_request) {
  55. qCDebug(playerControllerReadThread) << "[ReadThread] abort_request set, break.";
  56. break;
  57. }
  58. if (is->paused != is->last_paused) {
  59. is->last_paused = is->paused;
  60. if (is->paused)
  61. is->read_pause_return = av_read_pause(is->ic);
  62. else
  63. av_read_play(is->ic);
  64. }
  65. if (is->seek_req) {
  66. qCDebug(playerControllerReadThread)
  67. << "[ReadThread] seek_req, seek_pos:" << is->seek_pos
  68. << ", seek_rel:" << is->seek_rel << ", seek_flags:" << is->seek_flags;
  69. int64_t seek_target = is->seek_pos;
  70. int64_t seek_min = is->seek_rel > 0 ? seek_target - is->seek_rel + 2 : INT64_MIN;
  71. int64_t seek_max = is->seek_rel < 0 ? seek_target - is->seek_rel - 2 : INT64_MAX;
  72. ret = avformat_seek_file(is->ic, -1, seek_min, seek_target, seek_max, is->seek_flags);
  73. if (ret < 0) {
  74. av_log(nullptr, AV_LOG_ERROR, "%s: error while seeking\n", is->ic->url);
  75. qCWarning(playerControllerReadThread)
  76. << "[ReadThread] avformat_seek_file failed, ret:" << ret;
  77. } else {
  78. qCDebug(playerControllerReadThread)
  79. << "[ReadThread] avformat_seek_file success, flush queues.";
  80. if (is->audio_stream >= 0)
  81. packet_queue_flush(&is->audioq);
  82. if (is->subtitle_stream >= 0)
  83. packet_queue_flush(&is->subtitleq);
  84. if (is->video_stream >= 0)
  85. packet_queue_flush(&is->videoq);
  86. if (is->seek_flags & AVSEEK_FLAG_BYTE) {
  87. set_clock(&is->extclk, NAN, 0);
  88. } else {
  89. set_clock(&is->extclk, seek_target / (double) AV_TIME_BASE, 0);
  90. }
  91. }
  92. is->seek_req = 0;
  93. is->queue_attachments_req = 1;
  94. is->eof = 0;
  95. if (is->paused)
  96. step_to_next_frame(is);
  97. }
  98. if (is->queue_attachments_req) {
  99. qCDebug(playerControllerReadThread)
  100. << "[ReadThread] queue_attachments_req, video_st:" << (void*) is->video_st;
  101. if (is->video_st && is->video_st->disposition & AV_DISPOSITION_ATTACHED_PIC) {
  102. ret = av_packet_ref(pkt, &is->video_st->attached_pic);
  103. if (ret < 0)
  104. break;
  105. packet_queue_put(&is->videoq, pkt);
  106. packet_queue_put_nullpacket(&is->videoq, pkt, is->video_stream);
  107. }
  108. is->queue_attachments_req = 0;
  109. }
  110. /* if the queue are full, no need to read more */
  111. if (infinite_buffer < 1
  112. && (is->audioq.size + is->videoq.size + is->subtitleq.size > MAX_QUEUE_SIZE
  113. || (stream_has_enough_packets(is->audio_st, is->audio_stream, &is->audioq)
  114. && stream_has_enough_packets(is->video_st, is->video_stream, &is->videoq)
  115. && stream_has_enough_packets(is->subtitle_st,
  116. is->subtitle_stream,
  117. &is->subtitleq)))) {
  118. qCDebug(playerControllerReadThread)
  119. << "[ReadThread] queues full, waiting... audioq:" << is->audioq.size
  120. << ", videoq:" << is->videoq.size << ", subtitleq:" << is->subtitleq.size;
  121. std::unique_lock<std::mutex> lock(m_mutex);
  122. m_cv.wait_for(lock, std::chrono::milliseconds(10), [this, is] { return isExit() || is->abort_request; });
  123. continue;
  124. }
  125. if (!is->paused
  126. && (!is->audio_st
  127. || (is->auddec.finished == is->audioq.serial
  128. && frame_queue_nb_remaining(&is->sampq) == 0))
  129. && (!is->video_st
  130. || (is->viddec.finished == is->videoq.serial
  131. && frame_queue_nb_remaining(&is->pictq) == 0))) {
  132. if (is->loop) {
  133. stream_seek(is, 0, 0, 0);
  134. } else {
  135. // autoexit
  136. break;
  137. }
  138. }
  139. ret = av_read_frame(is->ic, pkt);
  140. if (ret < 0) {
  141. char buf[256] = {0};
  142. av_strerror(ret, buf, 256);
  143. qCWarning(playerControllerReadThread)
  144. << "[ReadThread] av_read_frame failed, ret:" << ret << ", eof:" << is->eof
  145. << ", pb_error:" << (is->ic->pb ? is->ic->pb->error : 0)
  146. << "error:" << QString::fromUtf8(buf);
  147. // 统一处理EOF和I/O错误 - 无感重连策略
  148. bool shouldAttemptReconnect = false;
  149. bool isEOFError = (ret == AVERROR_EOF || avio_feof(is->ic->pb)) && !is->eof;
  150. bool isIOError = is->ic->pb && is->ic->pb->error;
  151. // 增强错误检测:检查更多可能的网络错误
  152. bool isNetworkError = (ret == AVERROR(EAGAIN) || ret == AVERROR(ETIMEDOUT) ||
  153. ret == AVERROR(ECONNRESET) || ret == AVERROR(EPIPE) ||
  154. ret == AVERROR(ECONNABORTED) || ret == -5); // -5 是常见的I/O错误
  155. if (isEOFError || isIOError || isNetworkError) {
  156. if (isEOFError) {
  157. qCDebug(playerControllerReadThread) << "[ReadThread] EOF reached for stream:" << is->filename;
  158. }
  159. if (isIOError) {
  160. qCWarning(playerControllerReadThread) << "[ReadThread] IO error detected, error code:" << is->ic->pb->error;
  161. }
  162. if (isNetworkError) {
  163. qCWarning(playerControllerReadThread) << "[ReadThread] Network error detected, return code:" << ret;
  164. }
  165. // 检查是否为实时流(RTMP、RTSP等)
  166. bool isRealTimeStream = false;
  167. if (is->filename) {
  168. QString filename = QString::fromUtf8(is->filename);
  169. isRealTimeStream = filename.startsWith("rtmp://") ||
  170. filename.startsWith("rtsp://") ||
  171. filename.startsWith("rtp://") ||
  172. filename.startsWith("udp://") ||
  173. filename.startsWith("tcp://") ||
  174. filename.startsWith("http://") ||
  175. filename.startsWith("https://");
  176. }
  177. if (isRealTimeStream) {
  178. reconnect_count++;
  179. QString errorType = isEOFError ? "EOF" : (isIOError ? "I/O error" : "Network error");
  180. // 分层重连策略
  181. bool shouldContinueReconnect = false;
  182. int wait_time_ms = 0;
  183. if (reconnect_count <= max_fast_reconnect_attempts) {
  184. // 快速重连阶段:短间隔,快速恢复
  185. wait_time_ms = 500 * reconnect_count; // 0.5s, 1s, 1.5s
  186. shouldContinueReconnect = true;
  187. qCWarning(playerControllerReadThread) << "[ReadThread] Fast reconnection" << errorType
  188. << "attempt" << reconnect_count << "/" << max_fast_reconnect_attempts;
  189. } else if (reconnect_count <= max_normal_reconnect_attempts) {
  190. // 正常重连阶段:较长间隔
  191. wait_time_ms = 2000 + (reconnect_count - max_fast_reconnect_attempts) * 1000; // 2s, 3s, 4s...
  192. shouldContinueReconnect = true;
  193. qCWarning(playerControllerReadThread) << "[ReadThread] Normal reconnection" << errorType
  194. << "attempt" << reconnect_count << "/" << max_normal_reconnect_attempts;
  195. } else {
  196. // 无限重连模式:长间隔,但永不放弃
  197. infinite_reconnect_mode = true;
  198. wait_time_ms = 10000; // 固定10秒间隔
  199. shouldContinueReconnect = true;
  200. qCWarning(playerControllerReadThread) << "[ReadThread] Infinite reconnection mode" << errorType
  201. << "attempt" << reconnect_count << "(never give up)";
  202. }
  203. if (shouldContinueReconnect) {
  204. // 清空当前的错误状态
  205. if (is->ic && is->ic->pb) {
  206. is->ic->pb->error = 0;
  207. is->ic->pb->eof_reached = 0;
  208. }
  209. // 无感重连:不设置EOF状态,保持播放器运行
  210. // 这样即使网络中断,播放器界面也不会退出
  211. qCDebug(playerControllerReadThread) << "[ReadThread] Seamless reconnection: waiting" << wait_time_ms << "ms...";
  212. std::unique_lock<std::mutex> lock(m_mutex);
  213. m_cv.wait_for(lock, std::chrono::milliseconds(wait_time_ms), [this, is] { return isExit() || is->abort_request; });
  214. if (!isExit() && !is->abort_request) {
  215. qCDebug(playerControllerReadThread) << "[ReadThread] Attempting seamless reconnection...";
  216. continue; // 继续循环,永不放弃,真正的无感重连
  217. }
  218. }
  219. } else {
  220. // 非实时流:传统处理方式
  221. qCDebug(playerControllerReadThread) << "[ReadThread] File stream error, send null packets.";
  222. // 发送null packets并设置EOF
  223. if (isEOFError) {
  224. if (is->video_stream >= 0)
  225. packet_queue_put_nullpacket(&is->videoq, pkt, is->video_stream);
  226. if (is->audio_stream >= 0)
  227. packet_queue_put_nullpacket(&is->audioq, pkt, is->audio_stream);
  228. if (is->subtitle_stream >= 0)
  229. packet_queue_put_nullpacket(&is->subtitleq, pkt, is->subtitle_stream);
  230. is->eof = 1;
  231. }
  232. // 对于I/O错误,直接退出循环
  233. if (isIOError) {
  234. break;
  235. }
  236. }
  237. }
  238. // 错误处理已经统一到上面的逻辑中
  239. std::unique_lock<std::mutex> lock(m_mutex);
  240. m_cv.wait_for(lock, std::chrono::milliseconds(10), [this, is] { return isExit() || is->abort_request; });
  241. continue;
  242. } else {
  243. is->eof = 0;
  244. // 成功读取数据时重置重连计数器和状态
  245. if (reconnect_count > 0) {
  246. if (infinite_reconnect_mode) {
  247. qCInfo(playerControllerReadThread) << "[ReadThread] Stream recovered from infinite reconnection mode after"
  248. << reconnect_count << "attempts";
  249. infinite_reconnect_mode = false;
  250. } else {
  251. qCDebug(playerControllerReadThread) << "[ReadThread] Stream recovered, resetting reconnection counter";
  252. }
  253. reconnect_count = 0;
  254. }
  255. qCDebug(playerControllerReadThread)
  256. << "[ReadThread] av_read_frame success, stream_index:" << pkt->stream_index
  257. << ", pts:" << pkt->pts;
  258. }
  259. /* check if packet is in play range specified by user, then queue, otherwise discard */
  260. stream_start_time = is->ic->streams[pkt->stream_index]->start_time;
  261. pkt_ts = pkt->pts == AV_NOPTS_VALUE ? pkt->dts : pkt->pts;
  262. pkt_in_play_range = duration == AV_NOPTS_VALUE
  263. || (pkt_ts
  264. - (stream_start_time != AV_NOPTS_VALUE ? stream_start_time : 0))
  265. * av_q2d(is->ic->streams[pkt->stream_index]->time_base)
  266. - (double) (start_time != AV_NOPTS_VALUE ? start_time : 0)
  267. / 1000000
  268. <= ((double) duration / 1000000);
  269. if (pkt->stream_index == is->audio_stream && pkt_in_play_range) {
  270. qCDebug(playerControllerReadThread) << "[ReadThread] put audio packet, pts:" << pkt->pts;
  271. packet_queue_put(&is->audioq, pkt);
  272. } else if (pkt->stream_index == is->video_stream && pkt_in_play_range
  273. && !(is->video_st->disposition & AV_DISPOSITION_ATTACHED_PIC)) {
  274. qCDebug(playerControllerReadThread) << "[ReadThread] put video packet, pts:" << pkt->pts;
  275. packet_queue_put(&is->videoq, pkt);
  276. } else if (pkt->stream_index == is->subtitle_stream && pkt_in_play_range) {
  277. qCDebug(playerControllerReadThread) << "[ReadThread] put subtitle packet, pts:" << pkt->pts;
  278. packet_queue_put(&is->subtitleq, pkt);
  279. } else {
  280. qCDebug(playerControllerReadThread)
  281. << "[ReadThread] drop packet, stream_index:" << pkt->stream_index;
  282. av_packet_unref(pkt);
  283. }
  284. }
  285. qCWarning(playerControllerReadThread)
  286. << "[ReadThread] loop_read about to exit, eof:" << is->eof
  287. << ", audioq.size:" << is->audioq.size << ", videoq.size:" << is->videoq.size
  288. << ", subtitleq.size:" << is->subtitleq.size << ", abort_request:" << is->abort_request
  289. << ", m_exit:" << (m_exit ? m_exit->load() : -1);
  290. qCDebug(playerControllerReadThread) << "[ReadThread] loop_read exit, set read_thread_exit = -1";
  291. is->read_thread_exit = -1;
  292. if (!is->ic)
  293. avformat_close_input(&is->ic);
  294. av_packet_free(&pkt);
  295. return 0;
  296. }
  297. void ReadThread::run()
  298. {
  299. qCDebug(playerControllerReadThread) << "[ReadThread] run start";
  300. int ret = loop_read();
  301. qCDebug(playerControllerReadThread) << "[ReadThread] run end, loop_read ret:" << ret;
  302. // 可加日志输出
  303. }