解复用
解复用,读取视频文件,生成数据包(packet),同时,实现数据包队列,存储数据包,用来解码生成数据帧(frame)
解复用线程
read_thread:
- 创建上下文结构体:
avformat_alloc_context - 打开文件
avformat_open_input - 获取流信息
avformat_find_stream_info - 区分视频流和音频流
av_find_best_stream - stream_component_open(自实现):
打开对应的解码器并做初始化
创建和启动解码线程
初始化⾳频或视频输出设备 - 循环读取数据包,插入数据包队列,释放包
av_read_frame
packet_queue_put
av_packet_unref
int FFPlayer::read_thread()
{
int err, i, ret;
int st_index[AVMEDIA_TYPE_NB]; // AVMEDIA_TYPE_VIDEO/ AVMEDIA_TYPE_AUDIO 等,用来保存stream index
AVPacket pkt1;
AVPacket *pkt = &pkt1; //
// 初始化为-1,如果一直为-1说明没相应steam
memset(st_index, -1, sizeof(st_index));
video_stream = -1;
audio_stream = -1;
eof = 0;
// 1. 创建上下文结构体,这个结构体是最上层的结构体,表示输入上下文
ic = avformat_alloc_context();
if (!ic) {
av_log(NULL, AV_LOG_FATAL, "Could not allocate context.\n");
ret = AVERROR(ENOMEM);
goto fail;
}
/* 3.打开文件,主要是探测协议类型,如果是网络文件则创建网络链接等 */
err = avformat_open_input(&ic, input_filename_ , NULL, NULL);
if (err < 0) {
print_error(input_filename_, err);
ret = -1;
goto fail;
}
ffp_notify_msg1(this, FFP_MSG_OPEN_INPUT);
std::cout << "read_thread FFP_MSG_OPEN_INPUT " << this << std::endl;
/*
* 4.探测媒体类型,可得到当前文件的封装格式,音视频编码参数等信息
* 调用该函数后得多的参数信息会比只调用avformat_open_input更为详细,
* 其本质上是去做了decdoe packet获取信息的工作
* codecpar, filled by libavformat on stream creation or
* in avformat_find_stream_info()
*/
err = avformat_find_stream_info(ic, NULL);
if (err < 0) {
av_log(NULL, AV_LOG_WARNING,
"%s: could not find codec parameters\n", input_filename_);
ret = -1;
goto fail;
}
ffp_notify_msg1(this, FFP_MSG_FIND_STREAM_INFO);
std::cout << "read_thread FFP_MSG_FIND_STREAM_INFO " << this << std::endl;
// 6.2 利用av_find_best_stream选择流,
st_index[AVMEDIA_TYPE_VIDEO] =
av_find_best_stream(ic, AVMEDIA_TYPE_VIDEO,
st_index[AVMEDIA_TYPE_VIDEO], -1, NULL, 0);
st_index[AVMEDIA_TYPE_AUDIO] =
av_find_best_stream(ic, AVMEDIA_TYPE_AUDIO,
st_index[AVMEDIA_TYPE_AUDIO],
st_index[AVMEDIA_TYPE_VIDEO],
NULL, 0);
/* open the streams */
/* 8. 打开视频、音频解码器。在此会打开相应解码器,并创建相应的解码线程。 */
if (st_index[AVMEDIA_TYPE_AUDIO] >= 0) {// 如果有音频流则打开音频流
stream_component_open(st_index[AVMEDIA_TYPE_AUDIO]);
}
ret = -1;
if (st_index[AVMEDIA_TYPE_VIDEO] >= 0) { // 如果有视频流则打开视频流
ret = stream_component_open( st_index[AVMEDIA_TYPE_VIDEO]);
}
ffp_notify_msg1(this, FFP_MSG_COMPONENT_OPEN);
std::cout << "read_thread FFP_MSG_COMPONENT_OPEN " << this << std::endl;
if (video_stream < 0 && audio_stream < 0) {
av_log(NULL, AV_LOG_FATAL, "Failed to open file '%s' or configure filtergraph\n",
input_filename_);
ret = -1;
goto fail;
}
ffp_notify_msg1(this, FFP_MSG_PREPARED);
std::cout << "read_thread FFP_MSG_PREPARED " << this << std::endl;
while (1) {
// std::cout << "read_thread sleep, mp:" << this << std::endl;
// 先模拟线程运行
// std::this_thread::sleep_for(std::chrono::milliseconds(10));
if(abort_request) {
break;
}
// 7.读取媒体数据,得到的是音视频分离后、解码前的数据
ret = av_read_frame(ic, pkt); // 调用不会释放pkt的数据,需要我们自己去释放packet的数据
if(ret < 0) { // 出错或者已经读取完毕了
if ((ret == AVERROR_EOF || avio_feof(ic->pb)) && !eof) { // 读取完毕了
eof = 1;
}
if (ic->pb && ic->pb->error) // io异常 // 退出循环
break;
std::this_thread::sleep_for(std::chrono::milliseconds(10)); // 读取完数据了,这里可以使用timeout的方式休眠等待下一步的检测
continue; // 继续循环
} else {
eof = 0;
}
// 插入队列 先只处理音频包
if (pkt->stream_index == audio_stream) {
printf("audio ===== pkt pts:%ld, dts:%ld\n", pkt->pts/48, pkt->dts);
packet_queue_put(&audioq, pkt);
} else if (pkt->stream_index == video_stream) {
printf("video ===== pkt pts:%ld, dts:%ld\n", pkt->pts/48, pkt->dts);
packet_queue_put(&videoq, pkt);
} else {
av_packet_unref(pkt);// // 不入队列则直接释放数据
}
}
std::cout << __FUNCTION__ << " leave" << std::endl;
fail:
return 0;
}
线程调用
int FFPlayer::stream_open(const char *file_name)
{
read_thread_ = new std::thread(&FFPlayer::read_thread, this);
}
线程退出
void FFPlayer::stream_close()
{
if(read_thread_ && read_thread_->joinable()) {
read_thread_->join(); // 等待线程退出
}
}
数据包队列
类型定义
封装数据包,packet
typedef struct MyAVPacketList {
AVPacket pkt; //解封装后的数据
struct MyAVPacketList *next; //下一个节点
int serial; //播放序列
} MyAVPacketList;
数据包队列
typedef struct PacketQueue {
MyAVPacketList *first_pkt, *last_pkt; // 队首,队尾指针
int nb_packets; // 包数量,也就是队列元素数量
int size; // 队列所有元素的数据大小总和
int64_t duration; // 队列所有元素的数据播放持续时间
int abort_request; // 用户退出请求标志
int serial; // 播放序列号,和MyAVPacketList的serial作用相同,但改变的时序稍微有点不同
SDL_mutex *mutex; // 用于维持PacketQueue的多线程安全(SDL_mutex可以按pthread_mutex_t理解)
SDL_cond *cond; // 用于读、写线程相互通知(SDL_cond可以按pthread_cond_t理解)
} PacketQueue;
数据包队列api实现
- 插入packet
static AVPacket flush_pkt;
static int packet_queue_put_private(PacketQueue *q, AVPacket *pkt)
{
MyAVPacketList *pkt1;
if (q->abort_request) //如果已中止,则放入失败
return -1;
pkt1 = (MyAVPacketList *)av_malloc(sizeof(MyAVPacketList)); //分配节点内存
if (!pkt1) //内存不足,则放入失败
return -1;
// 没有做引用计数,那这里也说明av_read_frame不会释放替用户释放buffer。
pkt1->pkt = *pkt; //拷贝AVPacket(浅拷贝,AVPacket.data等内存并没有拷贝)
pkt1->next = NULL;
if (pkt == &flush_pkt)//如果放入的是flush_pkt,需要增加队列的播放序列号,以区分不连续的两段数据
{
q->serial++;
printf("q->serial = %d\n", q->serial);
}
pkt1->serial = q->serial; //用队列序列号标记节点
/* 队列操作:如果last_pkt为空,说明队列是空的,新增节点为队头;
* 否则,队列有数据,则让原队尾的next为新增节点。 最后将队尾指向新增节点
*/
if (!q->last_pkt)
q->first_pkt = pkt1;
else
q->last_pkt->next = pkt1;
q->last_pkt = pkt1;
//队列属性操作:增加节点数、cache大小、cache总时长, 用来控制队列的大小
q->nb_packets++;
q->size += pkt1->pkt.size + sizeof(*pkt1);
q->duration += pkt1->pkt.duration;
/* XXX: should duplicate packet data in DV case */
//发出信号,表明当前队列中有数据了,通知等待中的读线程可以取数据了
SDL_CondSignal(q->cond);
return 0;
}
- 封装插入packet
int packet_queue_put(PacketQueue *q, AVPacket *pkt)
{
int ret;
SDL_LockMutex(q->mutex);
ret = packet_queue_put_private(q, pkt);//主要实现
SDL_UnlockMutex(q->mutex);
if (pkt != &flush_pkt && ret < 0)
av_packet_unref(pkt); //放入失败,释放AVPacket
return ret;
}
- 获取packet
int packet_queue_get(PacketQueue *q, AVPacket *pkt, int block, int *serial)
{
MyAVPacketList *pkt1;
int ret;
SDL_LockMutex(q->mutex); // 加锁
for (;;) {
if (q->abort_request) {
ret = -1;
break;
}
pkt1 = q->first_pkt; //MyAVPacketList *pkt1; 从队头拿数据
if (pkt1) { //队列中有数据
q->first_pkt = pkt1->next; //队头移到第二个节点
if (!q->first_pkt)
q->last_pkt = NULL;
q->nb_packets--; //节点数减1
q->size -= pkt1->pkt.size + sizeof(*pkt1); //cache大小扣除一个节点
q->duration -= pkt1->pkt.duration; //总时长扣除一个节点
//返回AVPacket,这里发生一次AVPacket结构体拷贝,AVPacket的data只拷贝了指针
*pkt = pkt1->pkt;
if (serial) //如果需要输出serial,把serial输出
*serial = pkt1->serial;
av_free(pkt1); //释放节点内存,只是释放节点,而不是释放AVPacket
ret = 1;
break;
} else if (!block) { //队列中没有数据,且非阻塞调用
ret = 0;
break;
} else { //队列中没有数据,且阻塞调用
//这里没有break。for循环的另一个作用是在条件变量满足后重复上述代码取出节点
SDL_CondWait(q->cond, q->mutex);
}
}
SDL_UnlockMutex(q->mutex); // 释放锁
return ret;
}
- 初始化
int packet_queue_init(PacketQueue *q)
{
memset(q, 0, sizeof(PacketQueue));
q->mutex = SDL_CreateMutex();
if (!q->mutex) {
av_log(NULL, AV_LOG_FATAL, "SDL_CreateMutex(): %s\n", SDL_GetError());
return AVERROR(ENOMEM);
}
q->cond = SDL_CreateCond();
if (!q->cond) {
av_log(NULL, AV_LOG_FATAL, "SDL_CreateCond(): %s\n", SDL_GetError());
return AVERROR(ENOMEM);
}
q->abort_request = 1;
return 0;
}
- 队列刷新
void packet_queue_flush(PacketQueue *q)
{
MyAVPacketList *pkt, *pkt1;
SDL_LockMutex(q->mutex);
for (pkt = q->first_pkt; pkt; pkt = pkt1) {
pkt1 = pkt->next;
av_packet_unref(&pkt->pkt);
av_freep(&pkt);
}
q->last_pkt = NULL;
q->first_pkt = NULL;
q->nb_packets = 0;
q->size = 0;
q->duration = 0;
SDL_UnlockMutex(q->mutex);
}
- 销毁队列
void packet_queue_destroy(PacketQueue *q)
{
packet_queue_flush(q); //先清除所有的节点
SDL_DestroyMutex(q->mutex);
SDL_DestroyCond(q->cond);
}
- 停止队列
void packet_queue_abort(PacketQueue *q)
{
SDL_LockMutex(q->mutex);
q->abort_request = 1; // 请求退出
SDL_CondSignal(q->cond); //释放一个条件信号
SDL_UnlockMutex(q->mutex);
}
- 启动队列
void packet_queue_start(PacketQueue *q)
{
SDL_LockMutex(q->mutex);
q->abort_request = 0;
packet_queue_put_private(q, &flush_pkt); //这里放入了一个flush_pkt
SDL_UnlockMutex(q->mutex);
}