ma_uint16 slot; /* Index into a ma_slot_allocator. */
ma_uint32 refcount;
} breakup;
ma_uint64 allocation;
} toc; /* 8 bytes. We encode the job code into the slot allocation data to save space. */
MA_ATOMIC(8, ma_uint64) next; /* refcount + slot for the next item. Does not include the job code. */
ma_uint32 order; /* Execution order. Used to create a data dependency and ensure a job is executed in order. Usage is contextual depending on the job type. */
ma_bool32 decode; /* When set to true, the data buffer will be decoded. Otherwise it'll be encoded and will use a decoder for the connector. */
ma_async_notification* pInitNotification; /* Signalled when the data buffer has been initialized and the format/channels/rate can be retrieved. */
ma_async_notification* pDoneNotification; /* Signalled when the data buffer has been fully decoded. Will be passed through to MA_RESOURCE_MANAGER_JOB_PAGE_DATA_BUFFER_NODE when decoding. */
ma_fence* pInitFence; /* Released when initialization of the decoder is complete. */
ma_fence* pDoneFence; /* Released if initialization of the decoder fails. Passed through to PAGE_DATA_BUFFER_NODE untouched if init is successful. */
MA_RESOURCE_MANAGER_DATA_SOURCE_FLAG_WAIT_INIT = 0x00000008 /* When set, waits for initialization of the underlying data source before returning from ma_resource_manager_data_source_init(). */
ma_uint16 slot; /* Index into a ma_slot_allocator. */
ma_uint32 refcount;
} breakup;
ma_uint64 allocation;
} toc; /* 8 bytes. We encode the job code into the slot allocation data to save space. */
MA_ATOMIC(8, ma_uint64) next; /* refcount + slot for the next item. Does not include the job code. */
ma_uint32 order; /* Execution order. Used to create a data dependency and ensure a job is executed in order. Usage is contextual depending on the job type. */
ma_bool32 decode; /* When set to true, the data buffer will be decoded. Otherwise it'll be encoded and will use a decoder for the connector. */
ma_async_notification* pInitNotification; /* Signalled when the data buffer has been initialized and the format/channels/rate can be retrieved. */
ma_async_notification* pDoneNotification; /* Signalled when the data buffer has been fully decoded. Will be passed through to MA_RESOURCE_MANAGER_JOB_PAGE_DATA_BUFFER_NODE when decoding. */
ma_fence* pInitFence; /* Released when initialization of the decoder is complete. */
ma_fence* pDoneFence; /* Released if initialization of the decoder fails. Passed through to PAGE_DATA_BUFFER_NODE untouched if init is successful. */
result = ma_slot_allocator_init_preallocated(&allocatorConfig, ma_offset_ptr(pHeap, heapLayout.allocatorOffset), &pQueue->allocator);
if (result != MA_SUCCESS) {
return result;
}
/* We need a semaphore if we're running in non-blocking mode. If threading is disabled we need to return an error. */
if ((pQueue->flags & MA_JOB_QUEUE_FLAG_NON_BLOCKING) == 0) {
#ifndef MA_NO_THREADING
{
ma_semaphore_init(0, &pQueue->sem);
}
#else
{
/* Threading is disabled and we've requested non-blocking mode. */
return MA_INVALID_OPERATION;
}
#endif
}
/*
Our queue needs to be initialized with a free standing node. This should always be slot 0. Required for the lock free algorithm. The first job in the queue is
just a dummy item for giving us the first item in the list which is stored in the "next" member.
*/
ma_slot_allocator_alloc(&pQueue->allocator, &pQueue->head); /* Will never fail. */
Lock free queue implementation based on the paper by Michael and Scott: Nonblocking Algorithms and Preemption-Safe Locking on Multiprogrammed Shared Memory Multiprocessors
*/
ma_result result;
ma_uint64 slot;
ma_uint64 tail;
ma_uint64 next;
if (pQueue == NULL || pJob == NULL) {
return MA_INVALID_ARGS;
}
/* We need a new slot. */
result = ma_slot_allocator_alloc(&pQueue->allocator, &slot);
if (result != MA_SUCCESS) {
return result; /* Probably ran out of slots. If so, MA_OUT_OF_MEMORY will be returned. */
}
/* At this point we should have a slot to place the job. */
pQueue->pJobs[ma_resource_manager_job_extract_slot(slot)].toc.allocation = slot; /* This will overwrite the job code. */
pQueue->pJobs[ma_resource_manager_job_extract_slot(slot)].toc.breakup.code = pJob->toc.breakup.code; /* The job code needs to be applied again because the line above overwrote it. */
pQueue->pJobs[ma_resource_manager_job_extract_slot(slot)].next = MA_RESOURCE_MANAGER_JOB_ID_NONE; /* Reset for safety. */
#ifndef MA_USE_EXPERIMENTAL_LOCK_FREE_JOB_QUEUE
ma_spinlock_lock(&pQueue->lock);
#endif
{
/* The job is stored in memory so now we need to add it to our linked list. We only ever add items to the end of the list. */
for (;;) {
tail = c89atomic_load_64(&pQueue->tail);
next = c89atomic_load_64(&pQueue->pJobs[ma_resource_manager_job_extract_slot(tail)].next);
if (ma_resource_manager_job_toc_to_allocation(tail) == ma_resource_manager_job_toc_to_allocation(c89atomic_load_64(&pQueue->tail))) {
if (ma_resource_manager_job_extract_slot(next) == 0xFFFF) {
if (ma_resource_manager_job_queue_cas(&pQueue->pJobs[ma_resource_manager_job_extract_slot(tail)].next, next, slot)) {
result = ma_slot_allocator_init_preallocated(&allocatorConfig, ma_offset_ptr(pHeap, heapLayout.allocatorOffset), &pQueue->allocator);
if (result != MA_SUCCESS) {
return result;
}
/* We need a semaphore if we're running in non-blocking mode. If threading is disabled we need to return an error. */
if ((pQueue->flags & MA_JOB_QUEUE_FLAG_NON_BLOCKING) == 0) {
#ifndef MA_NO_THREADING
{
ma_semaphore_init(0, &pQueue->sem);
}
#else
{
/* Threading is disabled and we've requested non-blocking mode. */
return MA_INVALID_OPERATION;
}
#endif
}
/*
Our queue needs to be initialized with a free standing node. This should always be slot 0. Required for the lock free algorithm. The first job in the queue is
just a dummy item for giving us the first item in the list which is stored in the "next" member.
*/
ma_slot_allocator_alloc(&pQueue->allocator, &pQueue->head); /* Will never fail. */
Lock free queue implementation based on the paper by Michael and Scott: Nonblocking Algorithms and Preemption-Safe Locking on Multiprogrammed Shared Memory Multiprocessors
*/
ma_result result;
ma_uint64 slot;
ma_uint64 tail;
ma_uint64 next;
if (pQueue == NULL || pJob == NULL) {
return MA_INVALID_ARGS;
}
/* We need a new slot. */
result = ma_slot_allocator_alloc(&pQueue->allocator, &slot);
if (result != MA_SUCCESS) {
return result; /* Probably ran out of slots. If so, MA_OUT_OF_MEMORY will be returned. */
}
/* At this point we should have a slot to place the job. */
pQueue->pJobs[ma_resource_manager_job_extract_slot(slot)].toc.allocation = slot; /* This will overwrite the job code. */
pQueue->pJobs[ma_resource_manager_job_extract_slot(slot)].toc.breakup.code = pJob->toc.breakup.code; /* The job code needs to be applied again because the line above overwrote it. */
pQueue->pJobs[ma_resource_manager_job_extract_slot(slot)].next = MA_RESOURCE_MANAGER_JOB_ID_NONE; /* Reset for safety. */
#ifndef MA_USE_EXPERIMENTAL_LOCK_FREE_JOB_QUEUE
ma_spinlock_lock(&pQueue->lock);
#endif
{
/* The job is stored in memory so now we need to add it to our linked list. We only ever add items to the end of the list. */
for (;;) {
tail = c89atomic_load_64(&pQueue->tail);
next = c89atomic_load_64(&pQueue->pJobs[ma_resource_manager_job_extract_slot(tail)].next);
if (ma_resource_manager_job_toc_to_allocation(tail) == ma_resource_manager_job_toc_to_allocation(c89atomic_load_64(&pQueue->tail))) {
if (ma_resource_manager_job_extract_slot(next) == 0xFFFF) {
if (ma_resource_manager_job_queue_cas(&pQueue->pJobs[ma_resource_manager_job_extract_slot(tail)].next, next, slot)) {
result = ma_resource_manager_data_buffer_node_decode_next_page(pResourceManager, pDataBufferNode, pJob->data.resourceManager.pageDataBufferNode.pDecoder);
result = ma_resource_manager_data_buffer_node_decode_next_page(pResourceManager, pDataBufferNode, (ma_decoder*)pJob->data.resourceManager.pageDataBufferNode.pDecoder);
/*
If we have a success code by this point, we want to post another job. We're going to set the