Improve the alffplay queue for FFmpeg's send/receive API

The packet handling thread now calls avcodec_send_packet to give compressed
data to libavcodec, while the audio/video threads call avcodec_receive_frame to
handle decoded frames. The packet thread still maintains local queues for each
stream to avoid starving an A/V thread when the other doesn't want another
frame yet.
This commit is contained in:
Chris Robinson 2017-12-30 20:34:33 -08:00
parent 98731bb574
commit 0136df1e62

View File

@ -69,24 +69,44 @@ enum {
};
struct PacketQueue {
class PacketQueue {
std::deque<AVPacket> mPackets;
std::atomic<int> mTotalSize;
std::atomic<bool> mFinished;
std::mutex mMutex;
std::condition_variable mCond;
size_t mTotalSize;
PacketQueue() : mTotalSize(0), mFinished(false)
{ }
~PacketQueue()
{ clear(); }
public:
PacketQueue() : mTotalSize(0) { }
~PacketQueue() { clear(); }
int put(const AVPacket *pkt);
int peek(AVPacket *pkt, std::atomic<bool> &quit_var);
void pop();
bool empty() const noexcept { return mPackets.empty(); }
size_t totalSize() const noexcept { return mTotalSize; }
void clear();
void finish();
void put(const AVPacket *pkt)
{
mPackets.push_back(AVPacket{});
if(av_packet_ref(&mPackets.back(), pkt) != 0)
mPackets.pop_back();
else
mTotalSize += mPackets.back().size;
}
AVPacket *front() noexcept
{ return &mPackets.front(); }
void pop()
{
AVPacket *pkt = &mPackets.front();
mTotalSize -= pkt->size;
av_packet_unref(pkt);
mPackets.pop_front();
}
void clear()
{
for(AVPacket &pkt : mPackets)
av_packet_unref(&pkt);
mPackets.clear();
mTotalSize = 0;
}
};
@ -98,7 +118,8 @@ struct AudioState {
AVStream *mStream;
AVCodecContext *mCodecCtx;
PacketQueue mQueue;
std::mutex mQueueMtx;
std::condition_variable mQueueCond;
/* Used for clock difference average computation */
struct {
@ -174,7 +195,8 @@ struct VideoState {
AVStream *mStream;
AVCodecContext *mCodecCtx;
PacketQueue mQueue;
std::mutex mQueueMtx;
std::condition_variable mQueueCond;
double mClock;
double mFrameTimer;
@ -241,7 +263,6 @@ struct VideoState {
struct MovieState {
AVFormatContext *mFormatCtx;
int mVideoStream, mAudioStream;
int mAVSyncType;
@ -259,9 +280,9 @@ struct MovieState {
std::string mFilename;
MovieState(std::string fname)
: mFormatCtx(nullptr), mVideoStream(0), mAudioStream(0)
, mAVSyncType(DEFAULT_AV_SYNC_TYPE), mExternalClockBase(0), mQuit(false)
, mAudio(this), mVideo(this), mFilename(std::move(fname))
: mFormatCtx(nullptr), mAVSyncType(DEFAULT_AV_SYNC_TYPE)
, mExternalClockBase(0), mQuit(false), mAudio(this), mVideo(this)
, mFilename(std::move(fname))
{ }
~MovieState()
{
@ -284,68 +305,6 @@ struct MovieState {
};
int PacketQueue::put(const AVPacket *pkt)
{
std::unique_lock<std::mutex> lock(mMutex);
mPackets.push_back(AVPacket{});
if(av_packet_ref(&mPackets.back(), pkt) != 0)
{
mPackets.pop_back();
return -1;
}
mTotalSize += mPackets.back().size;
lock.unlock();
mCond.notify_one();
return 0;
}
int PacketQueue::peek(AVPacket *pkt, std::atomic<bool> &quit_var)
{
std::unique_lock<std::mutex> lock(mMutex);
while(!quit_var.load())
{
if(!mPackets.empty())
{
if(av_packet_ref(pkt, &mPackets.front()) != 0)
return -1;
return 1;
}
if(mFinished.load())
return 0;
mCond.wait(lock);
}
return -1;
}
void PacketQueue::pop()
{
std::unique_lock<std::mutex> lock(mMutex);
AVPacket *pkt = &mPackets.front();
mTotalSize -= pkt->size;
av_packet_unref(pkt);
mPackets.pop_front();
}
void PacketQueue::clear()
{
std::unique_lock<std::mutex> lock(mMutex);
std::for_each(mPackets.begin(), mPackets.end(),
[](AVPacket &pkt) { av_packet_unref(&pkt); }
);
mPackets.clear();
mTotalSize = 0;
}
void PacketQueue::finish()
{
std::unique_lock<std::mutex> lock(mMutex);
mFinished = true;
lock.unlock();
mCond.notify_all();
}
double AudioState::getClock()
{
double pts;
@ -440,29 +399,18 @@ int AudioState::decodeFrame()
{
while(!mMovie->mQuit.load())
{
while(!mMovie->mQuit.load())
{
/* Get the next packet */
AVPacket pkt{};
if(mQueue.peek(&pkt, mMovie->mQuit) <= 0)
return -1;
int ret = avcodec_send_packet(mCodecCtx, &pkt);
if(ret != AVERROR(EAGAIN))
{
if(ret < 0)
std::cerr<< "Failed to send encoded packet: 0x"<<std::hex<<ret<<std::dec <<std::endl;
mQueue.pop();
}
av_packet_unref(&pkt);
if(ret == 0 || ret == AVERROR(EAGAIN))
break;
}
std::unique_lock<std::mutex> lock(mQueueMtx);
int ret = avcodec_receive_frame(mCodecCtx, mDecodedFrame);
if(ret == AVERROR(EAGAIN))
continue;
if(ret == AVERROR_EOF || ret < 0)
{
do {
mQueueCond.wait(lock);
ret = avcodec_receive_frame(mCodecCtx, mDecodedFrame);
} while(ret == AVERROR(EAGAIN));
}
lock.unlock();
if(ret == AVERROR_EOF) break;
if(ret < 0)
{
std::cerr<< "Failed to decode frame: "<<ret <<std::endl;
return 0;
@ -1110,28 +1058,19 @@ int VideoState::handler()
mDecodedFrame = av_frame_alloc();
while(!mMovie->mQuit)
{
while(!mMovie->mQuit)
{
AVPacket packet{};
if(mQueue.peek(&packet, mMovie->mQuit) <= 0)
goto finish;
int ret = avcodec_send_packet(mCodecCtx, &packet);
if(ret != AVERROR(EAGAIN))
{
if(ret < 0)
std::cerr<< "Failed to send encoded packet: 0x"<<std::hex<<ret<<std::dec <<std::endl;
mQueue.pop();
}
av_packet_unref(&packet);
if(ret == 0 || ret == AVERROR(EAGAIN))
break;
}
std::unique_lock<std::mutex> lock(mQueueMtx);
/* Decode video frame */
int ret = avcodec_receive_frame(mCodecCtx, mDecodedFrame);
if(ret == AVERROR(EAGAIN))
continue;
{
do {
mQueueCond.wait(lock);
ret = avcodec_receive_frame(mCodecCtx, mDecodedFrame);
} while(ret == AVERROR(EAGAIN));
}
lock.unlock();
if(ret == AVERROR_EOF)
break;
if(ret < 0)
{
std::cerr<< "Failed to decode frame: "<<ret <<std::endl;
@ -1145,7 +1084,6 @@ int VideoState::handler()
break;
av_frame_unref(mDecodedFrame);
}
finish:
mEOS = true;
av_frame_free(&mDecodedFrame);
@ -1254,7 +1192,6 @@ int MovieState::streamComponentOpen(int stream_index)
switch(avctx->codec_type)
{
case AVMEDIA_TYPE_AUDIO:
mAudioStream = stream_index;
mAudio.mStream = mFormatCtx->streams[stream_index];
mAudio.mCodecCtx = avctx;
@ -1267,7 +1204,6 @@ int MovieState::streamComponentOpen(int stream_index)
break;
case AVMEDIA_TYPE_VIDEO:
mVideoStream = stream_index;
mVideo.mStream = mFormatCtx->streams[stream_index];
mVideo.mCodecCtx = avctx;
@ -1280,10 +1216,10 @@ int MovieState::streamComponentOpen(int stream_index)
default:
avcodec_free_context(&avctx);
break;
return -1;
}
return 0;
return stream_index;
}
int MovieState::parse_handler()
@ -1291,9 +1227,6 @@ int MovieState::parse_handler()
int video_index = -1;
int audio_index = -1;
mVideoStream = -1;
mAudioStream = -1;
/* Dump information about file onto standard error */
av_dump_format(mFormatCtx, 0, mFilename.c_str(), 0);
@ -1309,39 +1242,93 @@ int MovieState::parse_handler()
* components time to start without needing to skip ahead.
*/
mExternalClockBase = av_gettime() + 50000;
if(audio_index >= 0)
streamComponentOpen(audio_index);
if(video_index >= 0)
streamComponentOpen(video_index);
if(audio_index >= 0) audio_index = streamComponentOpen(audio_index);
if(video_index >= 0) video_index = streamComponentOpen(video_index);
if(mVideoStream < 0 && mAudioStream < 0)
if(video_index < 0 && audio_index < 0)
{
std::cerr<< mFilename<<": could not open codecs" <<std::endl;
mQuit = true;
}
/* Main packet handling loop */
while(!mQuit.load())
{
if(mAudio.mQueue.mTotalSize + mVideo.mQueue.mTotalSize >= MAX_QUEUE_SIZE)
{
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
PacketQueue audio_queue, video_queue;
bool input_finished = false;
/* Main packet reading/dispatching loop */
while(!mQuit.load(std::memory_order_relaxed) && !input_finished)
{
AVPacket packet;
if(av_read_frame(mFormatCtx, &packet) < 0)
break;
input_finished = true;
else
{
/* Copy the packet into the queue it's meant for. */
if(packet.stream_index == video_index)
video_queue.put(&packet);
else if(packet.stream_index == audio_index)
audio_queue.put(&packet);
av_packet_unref(&packet);
}
/* Copy the packet in the queue it's meant for. */
if(packet.stream_index == mVideoStream)
mVideo.mQueue.put(&packet);
else if(packet.stream_index == mAudioStream)
mAudio.mQueue.put(&packet);
av_packet_unref(&packet);
do {
/* Send whatever queued packets we have. */
bool sent;
do {
sent = false;
if(!audio_queue.empty())
{
std::unique_lock<std::mutex> lock(mAudio.mQueueMtx);
int ret = avcodec_send_packet(mAudio.mCodecCtx, audio_queue.front());
if(ret != AVERROR(EAGAIN))
{
lock.unlock();
mAudio.mQueueCond.notify_one();
audio_queue.pop();
sent = true;
}
}
if(!video_queue.empty())
{
std::unique_lock<std::mutex> lock(mVideo.mQueueMtx);
int ret = avcodec_send_packet(mVideo.mCodecCtx, video_queue.front());
if(ret != AVERROR(EAGAIN))
{
lock.unlock();
mVideo.mQueueCond.notify_one();
video_queue.pop();
sent = true;
}
}
} while(sent);
/* If the queues are completely empty, or it's not full and there's
* more input to read, go get more.
*/
size_t queue_size = audio_queue.totalSize() + video_queue.totalSize();
if(queue_size == 0 || (queue_size < MAX_QUEUE_SIZE && !input_finished))
break;
/* Nothing to send or get for now, wait a bit and try again. */
std::this_thread::sleep_for(std::chrono::milliseconds(10));
} while(!mQuit.load(std::memory_order_relaxed));
}
mVideo.mQueue.finish();
mAudio.mQueue.finish();
/* Pass a null packet to finish the send buffers (the receive functions
* will get AVERROR_EOF when emptied).
*/
if(mVideo.mCodecCtx != nullptr)
{
{ std::lock_guard<std::mutex> lock(mVideo.mQueueMtx);
avcodec_send_packet(mVideo.mCodecCtx, nullptr);
}
mVideo.mQueueCond.notify_one();
}
if(mAudio.mCodecCtx != nullptr)
{
{ std::lock_guard<std::mutex> lock(mAudio.mQueueMtx);
avcodec_send_packet(mAudio.mCodecCtx, nullptr);
}
mAudio.mQueueCond.notify_one();
}
video_queue.clear();
audio_queue.clear();
/* all done - wait for it */
if(mVideoThread.joinable())