Commit 77057895 authored by David Reid's avatar David Reid

Set up some infrastructure for enabling multiple job threads.

parent 861f6a23
......@@ -238,7 +238,7 @@ typedef struct
MA_API ma_job ma_job_init(ma_uint16 code);
#define MA_JOB_QUEUE_NON_BLOCKING 0x00000001 /* When set, ma_job_queue_next() will not wait and no semaphore will be signaled in ma_job_queue_post(). ma_job_queue_next() will return MA_NO_DATA_AVAILABLE if nothing is available. */
#define MA_JOB_QUEUE_FLAG_NON_BLOCKING 0x00000001 /* When set, ma_job_queue_next() will not wait and no semaphore will be signaled in ma_job_queue_post(). ma_job_queue_next() will return MA_NO_DATA_AVAILABLE if nothing is available. */
typedef struct
{
......@@ -253,11 +253,15 @@ typedef struct
MA_API ma_result ma_job_queue_init(ma_uint32 flags, ma_job_queue* pQueue);
MA_API ma_result ma_job_queue_uninit(ma_job_queue* pQueue);
MA_API ma_result ma_job_queue_post(ma_job_queue* pQueue, const ma_job* pJob);
MA_API ma_result ma_job_queue_next(ma_job_queue* pQueue, ma_job* pJob);
MA_API ma_result ma_job_queue_next(ma_job_queue* pQueue, ma_job* pJob); /* Returns MA_CANCELLED if the next job is a quit job. */
#define MA_RESOURCE_MANAGER_NO_JOB_THREAD 0x00000001 /* Enable this to manage the job thread yourself. */
#define MA_RESOURCE_MANAGER_NON_BLOCKING 0x00000002 /* Indicates ma_resource_manager_next_job() should not block. Only valid with MA_RESOURCE_MANAGER_NO_JOB_THREAD. */
/* Maximum job thread count will be restricted to this, but this may be removed later and replaced with a heap allocation thereby removing any limitation. */
#ifndef MA_RESOURCE_MANAGER_MAX_JOB_THREAD_COUNT
#define MA_RESOURCE_MANAGER_MAX_JOB_THREAD_COUNT 64
#endif
#define MA_RESOURCE_MANAGER_FLAG_NON_BLOCKING 0x00000001 /* Indicates ma_resource_manager_next_job() should not block. Only valid with MA_RESOURCE_MANAGER_NO_JOB_THREAD. */
typedef struct
{
......@@ -358,6 +362,7 @@ typedef struct
ma_format decodedFormat;
ma_uint32 decodedChannels;
ma_uint32 decodedSampleRate;
ma_uint32 jobThreadCount; /* Set to 0 if you want to self-manage your job threads. Defaults to 1. */
ma_uint32 flags;
ma_vfs* pVFS; /* Can be NULL in which case defaults will be used. */
} ma_resource_manager_config;
......@@ -369,11 +374,12 @@ struct ma_resource_manager
ma_resource_manager_config config;
ma_resource_manager_data_buffer* pRootDataBuffer; /* The root buffer in the binary tree. */
ma_mutex dataBufferLock; /* For synchronizing access to the data buffer binary tree. */
ma_thread jobThread; /* The thread for executing jobs. Will probably turn this into an array. */
ma_thread jobThreads[MA_RESOURCE_MANAGER_MAX_JOB_THREAD_COUNT]; /* The thread for executing jobs. Will probably turn this into an array. */
ma_job_queue jobQueue; /* Lock-free multi-consumer, multi-producer job queue for managing jobs for asynchronous decoding and streaming. */
ma_default_vfs defaultVFS; /* Only used if a custom VFS is not specified. */
};
/* Init. */
MA_API ma_result ma_resource_manager_init(const ma_resource_manager_config* pConfig, ma_resource_manager* pResourceManager);
MA_API void ma_resource_manager_uninit(ma_resource_manager* pResourceManager);
......@@ -833,7 +839,7 @@ MA_API ma_result ma_job_queue_init(ma_uint32 flags, ma_job_queue* pQueue)
ma_slot_allocator_init(&pQueue->allocator); /* Will not fail. */
/* We need a semaphore if we're running in synchronous mode. */
if ((pQueue->flags & MA_JOB_QUEUE_NON_BLOCKING) == 0) {
if ((pQueue->flags & MA_JOB_QUEUE_FLAG_NON_BLOCKING) == 0) {
ma_semaphore_init(0, &pQueue->sem);
}
......@@ -856,7 +862,7 @@ MA_API ma_result ma_job_queue_uninit(ma_job_queue* pQueue)
}
/* All we need to do is uninitialize the semaphore. */
if ((pQueue->flags & MA_JOB_QUEUE_NON_BLOCKING) == 0) {
if ((pQueue->flags & MA_JOB_QUEUE_FLAG_NON_BLOCKING) == 0) {
ma_semaphore_uninit(&pQueue->sem);
}
......@@ -908,7 +914,7 @@ MA_API ma_result ma_job_queue_post(ma_job_queue* pQueue, const ma_job* pJob)
/* Signal the semaphore as the last step if we're using synchronous mode. */
if ((pQueue->flags & MA_JOB_QUEUE_NON_BLOCKING) == 0) {
if ((pQueue->flags & MA_JOB_QUEUE_FLAG_NON_BLOCKING) == 0) {
ma_semaphore_release(&pQueue->sem);
}
......@@ -926,7 +932,7 @@ MA_API ma_result ma_job_queue_next(ma_job_queue* pQueue, ma_job* pJob)
}
/* If we're running in synchronous mode we'll need to wait on a semaphore. */
if ((pQueue->flags & MA_JOB_QUEUE_NON_BLOCKING) == 0) {
if ((pQueue->flags & MA_JOB_QUEUE_FLAG_NON_BLOCKING) == 0) {
ma_semaphore_wait(&pQueue->sem);
}
......@@ -953,6 +959,16 @@ MA_API ma_result ma_job_queue_next(ma_job_queue* pQueue, ma_job* pJob)
ma_slot_allocator_free(&pQueue->allocator, head);
/*
If it's a quit job make sure it's put back on the queue to ensure other threads have an opportunity to detect it and terminate naturally. We
could instead just leave it on the queue, but that would involve fiddling with the lock-free code above and I want to keep that as simple as
possible.
*/
if (pJob->toc.code == MA_JOB_QUIT) {
ma_job_queue_post(pQueue, pJob);
return MA_CANCELLED; /* Return a cancelled status just in case the thread is checking return codes and not properly checking for a quit job. */
}
return MA_SUCCESS;
}
......@@ -1433,6 +1449,7 @@ MA_API ma_resource_manager_config ma_resource_manager_config_init(ma_format deco
config.decodedFormat = decodedFormat;
config.decodedChannels = decodedChannels;
config.decodedSampleRate = decodedSampleRate;
config.jobThreadCount = 1; /* A single miniaudio-managed job thread by default. */
if (pAllocationCallbacks != NULL) {
config.allocationCallbacks = *pAllocationCallbacks;
......@@ -1446,6 +1463,7 @@ MA_API ma_result ma_resource_manager_init(const ma_resource_manager_config* pCon
{
ma_result result;
ma_uint32 jobQueueFlags;
ma_uint32 iJobThread;
if (pResourceManager == NULL) {
return MA_INVALID_ARGS;
......@@ -1457,6 +1475,10 @@ MA_API ma_result ma_resource_manager_init(const ma_resource_manager_config* pCon
return MA_INVALID_ARGS;
}
if (pConfig->jobThreadCount > ma_countof(pResourceManager->jobThreads)) {
return MA_INVALID_ARGS; /* Requesting too many job threads. */
}
pResourceManager->config = *pConfig;
ma_allocation_callbacks_init_copy(&pResourceManager->config.allocationCallbacks, &pConfig->allocationCallbacks);
......@@ -1471,12 +1493,12 @@ MA_API ma_result ma_resource_manager_init(const ma_resource_manager_config* pCon
/* Job queue. */
jobQueueFlags = 0;
if ((pConfig->flags & MA_RESOURCE_MANAGER_NON_BLOCKING) != 0) {
if ((pConfig->flags & MA_RESOURCE_MANAGER_NO_JOB_THREAD) != 0) {
if ((pConfig->flags & MA_RESOURCE_MANAGER_FLAG_NON_BLOCKING) != 0) {
if (pConfig->jobThreadCount > 0) {
return MA_INVALID_ARGS; /* Non-blocking mode is only valid for self-managed job threads. */
}
jobQueueFlags |= MA_JOB_QUEUE_NON_BLOCKING;
jobQueueFlags |= MA_JOB_QUEUE_FLAG_NON_BLOCKING;
}
result = ma_job_queue_init(jobQueueFlags, &pResourceManager->jobQueue);
......@@ -1485,22 +1507,22 @@ MA_API ma_result ma_resource_manager_init(const ma_resource_manager_config* pCon
return result;
}
/* Data buffer lock. */
result = ma_mutex_init(&pResourceManager->dataBufferLock);
if (result != MA_SUCCESS) {
return result;
}
/* Create the job thread last to ensure the new thread has access to valid data. */
if ((pConfig->flags & MA_RESOURCE_MANAGER_NO_JOB_THREAD) == 0) {
result = ma_thread_create(&pResourceManager->jobThread, ma_thread_priority_normal, 0, ma_resource_manager_job_thread, pResourceManager);
/* Create the job threads last to ensure the threads has access to valid data. */
for (iJobThread = 0; iJobThread < pConfig->jobThreadCount; iJobThread += 1) {
result = ma_thread_create(&pResourceManager->jobThreads[iJobThread], ma_thread_priority_normal, 0, ma_resource_manager_job_thread, pResourceManager);
if (result != MA_SUCCESS) {
ma_mutex_uninit(&pResourceManager->dataBufferLock);
ma_job_queue_uninit(&pResourceManager->jobQueue);
return result;
}
} else {
pResourceManager->jobThread = NULL;
}
return MA_SUCCESS;
......@@ -1524,15 +1546,23 @@ static void ma_resource_manager_delete_all_data_buffers(ma_resource_manager* pRe
MA_API void ma_resource_manager_uninit(ma_resource_manager* pResourceManager)
{
ma_job quitJob;
ma_uint32 iJobThread;
if (pResourceManager == NULL) {
return;
}
/* The job thread need to be killed first. To do this we need to post a quit message to the message queue and then wait for the thread. */
/*
Job threads need to be killed first. To do this we need to post a quit message to the message queue and then wait for the thread. The quit message will never be removed from the
queue which means it will never not be returned after being encounted for the first time which means all threads will eventually receive it.
*/
quitJob = ma_job_init(MA_JOB_QUIT);
ma_resource_manager_post_job(pResourceManager, &quitJob);
ma_thread_wait(&pResourceManager->jobThread);
/* Wait for every job to finish before continuing to ensure nothing is sill trying to access any of our objects below. */
for (iJobThread = 0; iJobThread < pResourceManager->config.jobThreadCount; iJobThread += 1) {
ma_thread_wait(&pResourceManager->jobThreads[iJobThread]);
}
/* At this point the thread should have returned and no other thread should be accessing our data. We can now delete all data buffers. */
ma_resource_manager_delete_all_data_buffers(pResourceManager);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment