Commit 0d9d4813 authored by David Reid's avatar David Reid

Disable lock free job queues.

This is hopefully a temporary measure to address a bug in the lock-free
algorithm.
parent c3c961e9
...@@ -1604,8 +1604,8 @@ Another major feature of the resource manager is the ability to asynchronously d ...@@ -1604,8 +1604,8 @@ Another major feature of the resource manager is the ability to asynchronously d
This relieves the audio thread of time-consuming decoding which can negatively affect scalability This relieves the audio thread of time-consuming decoding which can negatively affect scalability
due to the audio thread needing to complete it's work extremely quickly to avoid glitching. due to the audio thread needing to complete it's work extremely quickly to avoid glitching.
Asynchronous decoding is achieved through a job system. There is a central multi-producer, Asynchronous decoding is achieved through a job system. There is a central multi-producer,
multi-consumer, lock-free, fixed-capacity job queue. When some asynchronous work needs to be done, multi-consumer, fixed-capacity job queue. When some asynchronous work needs to be done, a job is
a job is posted to the queue which is then read by a job thread. The number of job threads can be posted to the queue which is then read by a job thread. The number of job threads can be
configured for improved scalability, and job threads can all run in parallel without needing to configured for improved scalability, and job threads can all run in parallel without needing to
worry about the order of execution (how this is achieved is explained below). worry about the order of execution (how this is achieved is explained below).
...@@ -1695,7 +1695,7 @@ returned when trying to read frames. ...@@ -1695,7 +1695,7 @@ returned when trying to read frames.
When `ma_resource_manager_data_source_read_pcm_frames()` results in a page getting fully consumed When `ma_resource_manager_data_source_read_pcm_frames()` results in a page getting fully consumed
a job is posted to load the next page. This will be posted from the same thread that called a job is posted to load the next page. This will be posted from the same thread that called
`ma_resource_manager_data_source_read_pcm_frames()` which should be lock-free. `ma_resource_manager_data_source_read_pcm_frames()`.
Data streams are uninitialized by posting a job to the queue, but the function won't return until Data streams are uninitialized by posting a job to the queue, but the function won't return until
that job has been processed. The reason for this is that the caller owns the data stream object and that job has been processed. The reason for this is that the caller owns the data stream object and
...@@ -1706,13 +1706,11 @@ complete before destroying any underlying object and the job system handles this ...@@ -1706,13 +1706,11 @@ complete before destroying any underlying object and the job system handles this
6.1.3. Job Queue 6.1.3. Job Queue
---------------- ----------------
The resource manager uses a job queue which is multi-producer, multi-consumer, lock-free and The resource manager uses a job queue which is multi-producer, multi-consumer, and fixed-capacity.
fixed-capacity. The lock-free property of the queue is achieved using the algorithm described by This job queue is not currently lock-free, and instead uses a spinlock to achieve thread-safety.
Michael and Scott: Nonblocking Algorithms and Preemption-Safe Locking on Multiprogrammed Shared Only a fixed number of jobs can be allocated and inserted into the queue which is done through a
Memory Multiprocessors. In order for this to work, only a fixed number of jobs can be allocated and lock-free data structure for allocating an index into a fixed sized array, with reference counting
inserted into the queue which is done through a lock-free data structure for allocating an index for mitigation of the ABA problem. The reference count is 32-bit.
into a fixed sized array, with reference counting for mitigation of the ABA problem. The reference
count is 32-bit.
For many types of jobs it's important that they execute in a specific order. In these cases, jobs For many types of jobs it's important that they execute in a specific order. In these cases, jobs
are executed serially. For the resource manager, serial execution of jobs is only required on a are executed serially. For the resource manager, serial execution of jobs is only required on a
...@@ -9030,6 +9028,9 @@ typedef struct ...@@ -9030,6 +9028,9 @@ typedef struct
#endif #endif
ma_slot_allocator allocator; ma_slot_allocator allocator;
ma_resource_manager_job* pJobs; ma_resource_manager_job* pJobs;
#ifndef MA_USE_EXPERIMENTAL_LOCK_FREE_JOB_QUEUE
ma_spinlock lock;
#endif
/* Memory management. */ /* Memory management. */
void* _pHeap; void* _pHeap;
...@@ -9211,7 +9212,7 @@ struct ma_resource_manager ...@@ -9211,7 +9212,7 @@ struct ma_resource_manager
ma_mutex dataBufferBSTLock; /* For synchronizing access to the data buffer binary tree. */ ma_mutex dataBufferBSTLock; /* For synchronizing access to the data buffer binary tree. */
ma_thread jobThreads[MA_RESOURCE_MANAGER_MAX_JOB_THREAD_COUNT]; /* The threads for executing jobs. */ ma_thread jobThreads[MA_RESOURCE_MANAGER_MAX_JOB_THREAD_COUNT]; /* The threads for executing jobs. */
#endif #endif
ma_resource_manager_job_queue jobQueue; /* Lock-free multi-consumer, multi-producer job queue for managing jobs for asynchronous decoding and streaming. */ ma_resource_manager_job_queue jobQueue; /* Multi-consumer, multi-producer job queue for managing jobs for asynchronous decoding and streaming. */
ma_default_vfs defaultVFS; /* Only used if a custom VFS is not specified. */ ma_default_vfs defaultVFS; /* Only used if a custom VFS is not specified. */
ma_log log; /* Only used if no log was specified in the config. */ ma_log log; /* Only used if no log was specified in the config. */
}; };
...@@ -61829,22 +61830,30 @@ MA_API ma_result ma_resource_manager_job_queue_post(ma_resource_manager_job_queu ...@@ -61829,22 +61830,30 @@ MA_API ma_result ma_resource_manager_job_queue_post(ma_resource_manager_job_queu
pQueue->pJobs[ma_resource_manager_job_extract_slot(slot)].toc.breakup.code = pJob->toc.breakup.code; /* The job code needs to be applied again because the line above overwrote it. */ pQueue->pJobs[ma_resource_manager_job_extract_slot(slot)].toc.breakup.code = pJob->toc.breakup.code; /* The job code needs to be applied again because the line above overwrote it. */
pQueue->pJobs[ma_resource_manager_job_extract_slot(slot)].next = MA_RESOURCE_MANAGER_JOB_ID_NONE; /* Reset for safety. */ pQueue->pJobs[ma_resource_manager_job_extract_slot(slot)].next = MA_RESOURCE_MANAGER_JOB_ID_NONE; /* Reset for safety. */
/* The job is stored in memory so now we need to add it to our linked list. We only ever add items to the end of the list. */ #ifndef MA_USE_EXPERIMENTAL_LOCK_FREE_JOB_QUEUE
for (;;) { ma_spinlock_lock(&pQueue->lock);
tail = c89atomic_load_64(&pQueue->tail); #endif
next = c89atomic_load_64(&pQueue->pJobs[ma_resource_manager_job_extract_slot(tail)].next); {
/* The job is stored in memory so now we need to add it to our linked list. We only ever add items to the end of the list. */
for (;;) {
tail = c89atomic_load_64(&pQueue->tail);
next = c89atomic_load_64(&pQueue->pJobs[ma_resource_manager_job_extract_slot(tail)].next);
if (ma_resource_manager_job_toc_to_allocation(tail) == ma_resource_manager_job_toc_to_allocation(c89atomic_load_64(&pQueue->tail))) { if (ma_resource_manager_job_toc_to_allocation(tail) == ma_resource_manager_job_toc_to_allocation(c89atomic_load_64(&pQueue->tail))) {
if (ma_resource_manager_job_extract_slot(next) == 0xFFFF) { if (ma_resource_manager_job_extract_slot(next) == 0xFFFF) {
if (ma_resource_manager_job_queue_cas(&pQueue->pJobs[ma_resource_manager_job_extract_slot(tail)].next, next, slot)) { if (ma_resource_manager_job_queue_cas(&pQueue->pJobs[ma_resource_manager_job_extract_slot(tail)].next, next, slot)) {
break; break;
}
} else {
ma_resource_manager_job_queue_cas(&pQueue->tail, tail, ma_resource_manager_job_extract_slot(next));
} }
} else {
ma_resource_manager_job_queue_cas(&pQueue->tail, tail, ma_resource_manager_job_extract_slot(next));
} }
} }
ma_resource_manager_job_queue_cas(&pQueue->tail, tail, slot);
} }
ma_resource_manager_job_queue_cas(&pQueue->tail, tail, slot); #ifndef MA_USE_EXPERIMENTAL_LOCK_FREE_JOB_QUEUE
ma_spinlock_unlock(&pQueue->lock);
#endif
/* Signal the semaphore as the last step if we're using synchronous mode. */ /* Signal the semaphore as the last step if we're using synchronous mode. */
...@@ -61886,26 +61895,46 @@ MA_API ma_result ma_resource_manager_job_queue_next(ma_resource_manager_job_queu ...@@ -61886,26 +61895,46 @@ MA_API ma_result ma_resource_manager_job_queue_next(ma_resource_manager_job_queu
#endif #endif
} }
/* Now we need to remove the root item from the list. This must be done without locking. */ #ifndef MA_USE_EXPERIMENTAL_LOCK_FREE_JOB_QUEUE
for (;;) { ma_spinlock_lock(&pQueue->lock);
head = c89atomic_load_64(&pQueue->head); #endif
tail = c89atomic_load_64(&pQueue->tail); {
next = c89atomic_load_64(&pQueue->pJobs[ma_resource_manager_job_extract_slot(head)].next); /*
BUG: In lock-free mode, multiple threads can be in this section of code. The "head" variable in the loop below
is stored. One thread can fall through to the freeing of this item while another is still using "head" for the
retrieval of the "next" variable.
if (ma_resource_manager_job_toc_to_allocation(head) == ma_resource_manager_job_toc_to_allocation(c89atomic_load_64(&pQueue->head))) { The slot allocator might need to make use of some reference counting to ensure it's only truely freed when
if (ma_resource_manager_job_extract_slot(head) == ma_resource_manager_job_extract_slot(tail)) { there are no more references to the item. This must be fixed before removing these locks.
if (ma_resource_manager_job_extract_slot(next) == 0xFFFF) { */
return MA_NO_DATA_AVAILABLE;
} /* Now we need to remove the root item from the list. */
ma_resource_manager_job_queue_cas(&pQueue->tail, tail, ma_resource_manager_job_extract_slot(next)); for (;;) {
} else { head = c89atomic_load_64(&pQueue->head);
*pJob = pQueue->pJobs[ma_resource_manager_job_extract_slot(next)]; tail = c89atomic_load_64(&pQueue->tail);
if (ma_resource_manager_job_queue_cas(&pQueue->head, head, ma_resource_manager_job_extract_slot(next))) { next = c89atomic_load_64(&pQueue->pJobs[ma_resource_manager_job_extract_slot(head)].next);
break;
if (ma_resource_manager_job_toc_to_allocation(head) == ma_resource_manager_job_toc_to_allocation(c89atomic_load_64(&pQueue->head))) {
if (ma_resource_manager_job_extract_slot(head) == ma_resource_manager_job_extract_slot(tail)) {
if (ma_resource_manager_job_extract_slot(next) == 0xFFFF) {
#ifndef MA_USE_EXPERIMENTAL_LOCK_FREE_JOB_QUEUE
ma_spinlock_unlock(&pQueue->lock);
#endif
return MA_NO_DATA_AVAILABLE;
}
ma_resource_manager_job_queue_cas(&pQueue->tail, tail, ma_resource_manager_job_extract_slot(next));
} else {
*pJob = pQueue->pJobs[ma_resource_manager_job_extract_slot(next)];
if (ma_resource_manager_job_queue_cas(&pQueue->head, head, ma_resource_manager_job_extract_slot(next))) {
break;
}
} }
} }
} }
} }
#ifndef MA_USE_EXPERIMENTAL_LOCK_FREE_JOB_QUEUE
ma_spinlock_unlock(&pQueue->lock);
#endif
ma_slot_allocator_free(&pQueue->allocator, head); ma_slot_allocator_free(&pQueue->allocator, head);
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment