Commit cc2b1485 authored by David Reid's avatar David Reid

Add support for configuring the size of the job queue.

This commit also decouples the job queue from the resource manager
which will make it easier to use in other areas if it ever comes up.
parent 0dc3e013
......@@ -1337,18 +1337,34 @@ This flag should always be used for platforms that do not support multithreading
*/
#define MA_RESOURCE_MANAGER_JOB_QUEUE_FLAG_NON_BLOCKING 0x00000001
typedef struct
{
ma_uint32 flags;
ma_uint32 capacity; /* The maximum number of jobs that can fit in the queue at a time. */
} ma_resource_manager_job_queue_config;
MA_API ma_resource_manager_job_queue_config ma_resource_manager_job_queue_config_init(ma_uint32 flags, ma_uint32 capacity);
typedef struct
{
ma_uint32 flags; /* Flags passed in at initialization time. */
ma_uint32 capacity; /* The maximum number of jobs that can fit in the queue at a time. Set by the config. */
ma_uint64 head; /* The first item in the list. Required for removing from the top of the list. */
ma_uint64 tail; /* The last item in the list. Required for appending to the end of the list. */
ma_semaphore sem; /* Only used when MA_RESOURCE_MANAGER_JOB_QUEUE_FLAG_NON_BLOCKING is unset. */
ma_slot_allocator allocator;
ma_resource_manager_job jobs[MA_RESOURCE_MANAGER_JOB_QUEUE_CAPACITY];
ma_resource_manager_job* pJobs;
/* Memory management. */
void* _pHeap;
ma_bool32 _ownsHeap;
} ma_resource_manager_job_queue;
MA_API ma_result ma_resource_manager_job_queue_init(ma_uint32 flags, const ma_allocation_callbacks* pAllocationCallbacks, ma_resource_manager_job_queue* pQueue);
MA_API ma_result ma_resource_manager_job_queue_uninit(ma_resource_manager_job_queue* pQueue, const ma_allocation_callbacks* pAllocationCallbacks);
MA_API ma_result ma_resource_manager_job_queue_get_heap_size(const ma_resource_manager_job_queue_config* pConfig, size_t* pHeapSizeInBytes);
MA_API ma_result ma_resource_manager_job_queue_init_preallocated(const ma_resource_manager_job_queue_config* pConfig, void* pHeap, ma_resource_manager_job_queue* pQueue);
MA_API ma_result ma_resource_manager_job_queue_init(const ma_resource_manager_job_queue_config* pConfig, const ma_allocation_callbacks* pAllocationCallbacks, ma_resource_manager_job_queue* pQueue);
MA_API void ma_resource_manager_job_queue_uninit(ma_resource_manager_job_queue* pQueue, const ma_allocation_callbacks* pAllocationCallbacks);
MA_API ma_result ma_resource_manager_job_queue_post(ma_resource_manager_job_queue* pQueue, const ma_resource_manager_job* pJob);
MA_API ma_result ma_resource_manager_job_queue_next(ma_resource_manager_job_queue* pQueue, ma_resource_manager_job* pJob); /* Returns MA_CANCELLED if the next job is a quit job. */
......@@ -1485,6 +1501,7 @@ typedef struct
ma_uint32 decodedChannels; /* The decoded channel count to use. Set to 0 (default) to use the file's native channel count. */
ma_uint32 decodedSampleRate; /* the decoded sample rate to use. Set to 0 (default) to use the file's native sample rate. */
ma_uint32 jobThreadCount; /* Set to 0 if you want to self-manage your job threads. Defaults to 1. */
ma_uint32 jobQueueCapacity; /* The maximum number of jobs that can fit in the queue at a time. Defaults to MA_RESOURCE_MANAGER_JOB_QUEUE_CAPACITY. Cannot be zero. */
ma_uint32 flags;
ma_vfs* pVFS; /* Can be NULL in which case defaults will be used. */
ma_decoding_backend_vtable** ppCustomDecodingBackendVTables;
......@@ -4906,11 +4923,11 @@ static ma_result ma_slot_allocator_get_heap_layout(const ma_slot_allocator_confi
/* Groups. */
pHeapLayout->groupsOffset = pHeapLayout->sizeInBytes;
pHeapLayout->sizeInBytes += ma_slot_allocator_calculate_group_capacity(pConfig->capacity) * sizeof(ma_slot_allocator_group);
pHeapLayout->sizeInBytes += ma_align_64(ma_slot_allocator_calculate_group_capacity(pConfig->capacity) * sizeof(ma_slot_allocator_group));
/* Slots. */
pHeapLayout->slotsOffset = pHeapLayout->sizeInBytes;
pHeapLayout->sizeInBytes += pConfig->capacity * sizeof(ma_uint32);
pHeapLayout->sizeInBytes += ma_align_64(pConfig->capacity * sizeof(ma_uint32));
return MA_SUCCESS;
}
......@@ -4986,6 +5003,7 @@ MA_API ma_result ma_slot_allocator_init(const ma_slot_allocator_config* pConfig,
result = ma_slot_allocator_init_preallocated(pConfig, pHeap, pAllocator);
if (result != MA_SUCCESS) {
ma_free(pHeap, pAllocationCallbacks);
return result;
}
......@@ -4999,7 +5017,7 @@ MA_API void ma_slot_allocator_uninit(ma_slot_allocator* pAllocator, const ma_all
return;
}
if (pAllocator->_ownsHeap && pAllocator->_pHeap != NULL) {
if (pAllocator->_ownsHeap) {
ma_free(pAllocator->_pHeap, pAllocationCallbacks);
}
}
......@@ -5407,12 +5425,90 @@ MA_API ma_resource_manager_job ma_resource_manager_job_init(ma_uint16 code)
}
/*
Lock free queue implementation based on the paper by Michael and Scott: Nonblocking Algorithms and Preemption-Safe Locking on Multiprogrammed Shared Memory Multiprocessors
*/
MA_API ma_result ma_resource_manager_job_queue_init(ma_uint32 flags, const ma_allocation_callbacks* pAllocationCallbacks, ma_resource_manager_job_queue* pQueue)
MA_API ma_resource_manager_job_queue_config ma_resource_manager_job_queue_config_init(ma_uint32 flags, ma_uint32 capacity)
{
ma_resource_manager_job_queue_config config;
config.flags = flags;
config.capacity = capacity;
return config;
}
typedef struct
{
size_t sizeInBytes;
size_t allocatorOffset;
size_t jobsOffset;
} ma_resource_manager_job_queue_heap_layout;
static ma_result ma_resource_manager_job_queue_get_heap_layout(const ma_resource_manager_job_queue_config* pConfig, ma_resource_manager_job_queue_heap_layout* pHeapLayout)
{
ma_result result;
MA_ASSERT(pHeapLayout != NULL);
MA_ZERO_OBJECT(pHeapLayout);
if (pConfig == NULL) {
return MA_INVALID_ARGS;
}
if (pConfig->capacity == 0) {
return MA_INVALID_ARGS;
}
pHeapLayout->sizeInBytes = 0;
/* Allocator. */
{
ma_slot_allocator_config allocatorConfig;
size_t allocatorHeapSizeInBytes;
allocatorConfig = ma_slot_allocator_config_init(pConfig->capacity);
result = ma_slot_allocator_get_heap_size(&allocatorConfig, &allocatorHeapSizeInBytes);
if (result != MA_SUCCESS) {
return result;
}
pHeapLayout->allocatorOffset = pHeapLayout->sizeInBytes;
pHeapLayout->sizeInBytes += allocatorHeapSizeInBytes;
}
/* Jobs. */
pHeapLayout->jobsOffset = pHeapLayout->sizeInBytes;
pHeapLayout->sizeInBytes += ma_align_64(pConfig->capacity * sizeof(ma_resource_manager_job));
return MA_SUCCESS;
}
MA_API ma_result ma_resource_manager_job_queue_get_heap_size(const ma_resource_manager_job_queue_config* pConfig, size_t* pHeapSizeInBytes)
{
ma_result result;
ma_resource_manager_job_queue_heap_layout layout;
if (pHeapSizeInBytes == NULL) {
return MA_INVALID_ARGS;
}
*pHeapSizeInBytes = 0;
result = ma_resource_manager_job_queue_get_heap_layout(pConfig, &layout);
if (result != MA_SUCCESS) {
return result;
}
*pHeapSizeInBytes = layout.sizeInBytes;
return MA_SUCCESS;
}
MA_API ma_result ma_resource_manager_job_queue_init_preallocated(const ma_resource_manager_job_queue_config* pConfig, void* pHeap, ma_resource_manager_job_queue* pQueue)
{
ma_result result;
ma_resource_manager_job_queue_heap_layout heapLayout;
ma_slot_allocator_config allocatorConfig;
if (pQueue == NULL) {
......@@ -5420,10 +5516,19 @@ MA_API ma_result ma_resource_manager_job_queue_init(ma_uint32 flags, const ma_al
}
MA_ZERO_OBJECT(pQueue);
pQueue->flags = flags;
allocatorConfig = ma_slot_allocator_config_init(MA_RESOURCE_MANAGER_JOB_QUEUE_CAPACITY);
result = ma_slot_allocator_init(&allocatorConfig, pAllocationCallbacks, &pQueue->allocator);
result = ma_resource_manager_job_queue_get_heap_layout(pConfig, &heapLayout);
if (result != MA_SUCCESS) {
return result;
}
pQueue->_pHeap = pHeap;
pQueue->flags = pConfig->flags;
pQueue->capacity = pConfig->capacity;
pQueue->pJobs = (ma_resource_manager_job*)ma_offset_ptr(pHeap, heapLayout.jobsOffset);
allocatorConfig = ma_slot_allocator_config_init(pConfig->capacity);
result = ma_slot_allocator_init_preallocated(&allocatorConfig, ma_offset_ptr(pHeap, heapLayout.allocatorOffset), &pQueue->allocator);
if (result != MA_SUCCESS) {
return result;
}
......@@ -5438,16 +5543,46 @@ MA_API ma_result ma_resource_manager_job_queue_init(ma_uint32 flags, const ma_al
just a dummy item for giving us the first item in the list which is stored in the "next" member.
*/
ma_slot_allocator_alloc(&pQueue->allocator, &pQueue->head); /* Will never fail. */
pQueue->jobs[ma_resource_manager_job_extract_slot(pQueue->head)].next = MA_RESOURCE_MANAGER_JOB_ID_NONE;
pQueue->pJobs[ma_resource_manager_job_extract_slot(pQueue->head)].next = MA_RESOURCE_MANAGER_JOB_ID_NONE;
pQueue->tail = pQueue->head;
return MA_SUCCESS;
}
MA_API ma_result ma_resource_manager_job_queue_uninit(ma_resource_manager_job_queue* pQueue, const ma_allocation_callbacks* pAllocationCallbacks)
MA_API ma_result ma_resource_manager_job_queue_init(const ma_resource_manager_job_queue_config* pConfig, const ma_allocation_callbacks* pAllocationCallbacks, ma_resource_manager_job_queue* pQueue)
{
ma_result result;
size_t heapSizeInBytes;
void* pHeap;
result = ma_resource_manager_job_queue_get_heap_size(pConfig, &heapSizeInBytes);
if (result != MA_SUCCESS) {
return result;
}
if (heapSizeInBytes > 0) {
pHeap = ma_malloc(heapSizeInBytes, pAllocationCallbacks);
if (pHeap == NULL) {
return MA_OUT_OF_MEMORY;
}
} else {
pHeap = NULL;
}
result = ma_resource_manager_job_queue_init_preallocated(pConfig, pHeap, pQueue);
if (result != MA_SUCCESS) {
ma_free(pHeap, pAllocationCallbacks);
return result;
}
pQueue->_ownsHeap = MA_TRUE;
return MA_SUCCESS;
}
MA_API void ma_resource_manager_job_queue_uninit(ma_resource_manager_job_queue* pQueue, const ma_allocation_callbacks* pAllocationCallbacks)
{
if (pQueue == NULL) {
return MA_INVALID_ARGS;
return;
}
/* All we need to do is uninitialize the semaphore. */
......@@ -5457,11 +5592,16 @@ MA_API ma_result ma_resource_manager_job_queue_uninit(ma_resource_manager_job_qu
ma_slot_allocator_uninit(&pQueue->allocator, pAllocationCallbacks);
return MA_SUCCESS;
if (pQueue->_ownsHeap) {
ma_free(pQueue->_pHeap, pAllocationCallbacks);
}
}
MA_API ma_result ma_resource_manager_job_queue_post(ma_resource_manager_job_queue* pQueue, const ma_resource_manager_job* pJob)
{
/*
Lock free queue implementation based on the paper by Michael and Scott: Nonblocking Algorithms and Preemption-Safe Locking on Multiprogrammed Shared Memory Multiprocessors
*/
ma_result result;
ma_uint64 slot;
ma_uint64 tail;
......@@ -5478,22 +5618,22 @@ MA_API ma_result ma_resource_manager_job_queue_post(ma_resource_manager_job_queu
}
/* At this point we should have a slot to place the job. */
MA_ASSERT(ma_resource_manager_job_extract_slot(slot) < MA_RESOURCE_MANAGER_JOB_QUEUE_CAPACITY);
MA_ASSERT(ma_resource_manager_job_extract_slot(slot) < pQueue->capacity);
/* We need to put the job into memory before we do anything. */
pQueue->jobs[ma_resource_manager_job_extract_slot(slot)] = *pJob;
pQueue->jobs[ma_resource_manager_job_extract_slot(slot)].toc.allocation = slot; /* This will overwrite the job code. */
pQueue->jobs[ma_resource_manager_job_extract_slot(slot)].toc.breakup.code = pJob->toc.breakup.code; /* The job code needs to be applied again because the line above overwrote it. */
pQueue->jobs[ma_resource_manager_job_extract_slot(slot)].next = MA_RESOURCE_MANAGER_JOB_ID_NONE; /* Reset for safety. */
pQueue->pJobs[ma_resource_manager_job_extract_slot(slot)] = *pJob;
pQueue->pJobs[ma_resource_manager_job_extract_slot(slot)].toc.allocation = slot; /* This will overwrite the job code. */
pQueue->pJobs[ma_resource_manager_job_extract_slot(slot)].toc.breakup.code = pJob->toc.breakup.code; /* The job code needs to be applied again because the line above overwrote it. */
pQueue->pJobs[ma_resource_manager_job_extract_slot(slot)].next = MA_RESOURCE_MANAGER_JOB_ID_NONE; /* Reset for safety. */
/* The job is stored in memory so now we need to add it to our linked list. We only ever add items to the end of the list. */
for (;;) {
tail = pQueue->tail;
next = pQueue->jobs[ma_resource_manager_job_extract_slot(tail)].next;
next = pQueue->pJobs[ma_resource_manager_job_extract_slot(tail)].next;
if (ma_resource_manager_job_toc_to_allocation(tail) == ma_resource_manager_job_toc_to_allocation(pQueue->tail)) {
if (ma_resource_manager_job_extract_slot(next) == 0xFFFF) {
if (c89atomic_compare_and_swap_64(&pQueue->jobs[ma_resource_manager_job_extract_slot(tail)].next, next, slot) == next) {
if (c89atomic_compare_and_swap_64(&pQueue->pJobs[ma_resource_manager_job_extract_slot(tail)].next, next, slot) == next) {
break;
}
} else {
......@@ -5531,7 +5671,7 @@ MA_API ma_result ma_resource_manager_job_queue_next(ma_resource_manager_job_queu
for (;;) {
head = pQueue->head;
tail = pQueue->tail;
next = pQueue->jobs[ma_resource_manager_job_extract_slot(head)].next;
next = pQueue->pJobs[ma_resource_manager_job_extract_slot(head)].next;
if (ma_resource_manager_job_toc_to_allocation(head) == ma_resource_manager_job_toc_to_allocation(pQueue->head)) {
if (ma_resource_manager_job_toc_to_allocation(head) == ma_resource_manager_job_toc_to_allocation(tail)) {
......@@ -5540,7 +5680,7 @@ MA_API ma_result ma_resource_manager_job_queue_next(ma_resource_manager_job_queu
}
c89atomic_compare_and_swap_64(&pQueue->tail, tail, next);
} else {
*pJob = pQueue->jobs[ma_resource_manager_job_extract_slot(next)];
*pJob = pQueue->pJobs[ma_resource_manager_job_extract_slot(next)];
if (c89atomic_compare_and_swap_64(&pQueue->head, head, next) == head) {
break;
}
......@@ -6166,6 +6306,7 @@ MA_API ma_resource_manager_config ma_resource_manager_config_init(void)
config.decodedChannels = 0;
config.decodedSampleRate = 0;
config.jobThreadCount = 1; /* A single miniaudio-managed job thread by default. */
config.jobQueueCapacity = MA_RESOURCE_MANAGER_JOB_QUEUE_CAPACITY;
return config;
}
......@@ -6174,7 +6315,7 @@ MA_API ma_resource_manager_config ma_resource_manager_config_init(void)
MA_API ma_result ma_resource_manager_init(const ma_resource_manager_config* pConfig, ma_resource_manager* pResourceManager)
{
ma_result result;
ma_uint32 jobQueueFlags;
ma_resource_manager_job_queue_config jobQueueConfig;
ma_uint32 iJobThread;
if (pResourceManager == NULL) {
......@@ -6224,16 +6365,17 @@ MA_API ma_result ma_resource_manager_init(const ma_resource_manager_config* pCon
}
/* Job queue. */
jobQueueFlags = 0;
jobQueueConfig.capacity = pResourceManager->config.jobQueueCapacity;
jobQueueConfig.flags = 0;
if ((pResourceManager->config.flags & MA_RESOURCE_MANAGER_FLAG_NON_BLOCKING) != 0) {
if (pResourceManager->config.jobThreadCount > 0) {
return MA_INVALID_ARGS; /* Non-blocking mode is only valid for self-managed job threads. */
}
jobQueueFlags |= MA_RESOURCE_MANAGER_JOB_QUEUE_FLAG_NON_BLOCKING;
jobQueueConfig.flags |= MA_RESOURCE_MANAGER_JOB_QUEUE_FLAG_NON_BLOCKING;
}
result = ma_resource_manager_job_queue_init(jobQueueFlags, &pResourceManager->config.allocationCallbacks, &pResourceManager->jobQueue);
result = ma_resource_manager_job_queue_init(&jobQueueConfig, &pResourceManager->config.allocationCallbacks, &pResourceManager->jobQueue);
if (result != MA_SUCCESS) {
return result;
}
......@@ -9381,6 +9523,7 @@ MA_API ma_result ma_gainer_init(const ma_gainer_config* pConfig, const ma_alloca
result = ma_gainer_init_preallocated(pConfig, pHeap, pGainer);
if (result != MA_SUCCESS) {
ma_free(pHeap, pAllocationCallbacks);
return result;
}
......@@ -9394,7 +9537,7 @@ MA_API void ma_gainer_uninit(ma_gainer* pGainer, const ma_allocation_callbacks*
return;
}
if (pGainer->_pHeap != NULL && pGainer->_ownsHeap) {
if (pGainer->_ownsHeap) {
ma_free(pGainer->_pHeap, pAllocationCallbacks);
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment