Commit cf14d077 authored by David Reid's avatar David Reid

Experiment with some mitigation for the ABA problem.

parent 1a967551
......@@ -140,30 +140,49 @@ typedef struct ma_resource_manager_data_source ma_resource_manager_data_source;
/*
The idea of the slot allocator is for it to be used in conjunction with a fixed sized buffer. You use the slot allocator to allocator an index that can be used
as the insertion point for an object. This is lock-free.
as the insertion point for an object.
Slots are reference counted to help mitigate the ABA problem in the lock-free queue we use for tracking jobs.
The slot index is stored in the low 32 bits. The reference counter is stored in the high 32 bits:
+-----------------+-----------------+
| 32 Bits | 32 Bits |
+-----------------+-----------------+
| Reference Count | Slot Index |
+-----------------+-----------------+
*/
typedef struct
{
struct
volatile struct
{
ma_uint32 bitfield;
/*ma_uint32 refcount;*/ /* When greater than 0 it means something already has a hold on this group. */
} groups[MA_RESOURCE_MANAGER_MESSAGE_QUEUE_CAPACITY/32];
ma_uint32 counter;
ma_uint32 slots[MA_RESOURCE_MANAGER_MESSAGE_QUEUE_CAPACITY]; /* 1 bit to indicate if the slot is allocated, 31 bits for reference counting. */
ma_uint32 count; /* Allocation count. */
} ma_slot_allocator;
MA_API ma_result ma_slot_allocator_init(ma_slot_allocator* pAllocator);
MA_API ma_result ma_slot_allocator_alloc(ma_slot_allocator* pAllocator, ma_uint32* pSlot);
MA_API ma_result ma_slot_allocator_alloc_16(ma_slot_allocator* pAllocator, ma_uint16* pSlot);
MA_API ma_result ma_slot_allocator_free(ma_slot_allocator* pAllocator, ma_uint32 slot);
MA_API ma_result ma_slot_allocator_alloc(ma_slot_allocator* pAllocator, ma_uint64* pSlot);
MA_API ma_result ma_slot_allocator_free(ma_slot_allocator* pAllocator, ma_uint64 slot);
typedef struct
{
ma_uint16 code;
ma_uint16 slot; /* Internal use only. */
ma_uint16 next; /* Internal use only. The slot of the next job in the list. Set to 0xFFFF if this is the last item. */
ma_uint16 padding;
union
{
struct
{
ma_uint16 code;
ma_uint16 slot;
ma_uint32 refcount;
};
ma_uint64 allocation;
} toc; /* 8 bytes. We encode the job code into the slot allocation data to save space. */
ma_uint64 next; /* refcount + slot for the next item. Does not include the job code. */
union
{
/* Resource Managemer Jobs */
......@@ -184,8 +203,8 @@ MA_API ma_job ma_job_init(ma_uint16 code);
typedef struct
{
ma_uint32 flags; /* Flags passed in at initialization time. */
ma_uint16 head; /* The first item in the list. Required for removing from the top of the list. */
ma_uint16 tail; /* The last item in the list. Required for appending to the end of the list. */
ma_uint64 head; /* The first item in the list. Required for removing from the top of the list. */
ma_uint64 tail; /* The last item in the list. Required for appending to the end of the list. */
ma_semaphore sem; /* Only used when MA_JOB_QUEUE_ASYNC is unset. */
ma_slot_allocator allocator;
ma_job jobs[MA_RESOURCE_MANAGER_MESSAGE_QUEUE_CAPACITY];
......@@ -695,7 +714,7 @@ MA_API ma_result ma_slot_allocator_init(ma_slot_allocator* pAllocator)
return MA_SUCCESS;
}
MA_API ma_result ma_slot_allocator_alloc(ma_slot_allocator* pAllocator, ma_uint32* pSlot)
MA_API ma_result ma_slot_allocator_alloc(ma_slot_allocator* pAllocator, ma_uint64* pSlot)
{
ma_uint32 capacity;
......@@ -709,8 +728,7 @@ MA_API ma_result ma_slot_allocator_alloc(ma_slot_allocator* pAllocator, ma_uint3
/* We need to acquire a suitable bitfield first. This is a bitfield that's got an available slot within it. */
ma_uint32 iGroup;
for (iGroup = 0; iGroup < ma_countof(pAllocator->groups); iGroup += 1) {
#if 1
/* A CAS implementation which would rid us of the refcount requirement? */
/* CAS */
for (;;) {
ma_uint32 newBitfield;
ma_uint32 oldBitfield;
......@@ -718,56 +736,38 @@ MA_API ma_result ma_slot_allocator_alloc(ma_slot_allocator* pAllocator, ma_uint3
oldBitfield = pAllocator->groups[iGroup].bitfield;
bitOffset = ma_ffs_32(~oldBitfield);
if (bitOffset == 32) {
/* Fast check to see if anything is available. */
if (oldBitfield == 0xFFFFFFFF) {
break; /* No available bits in this bitfield. */
}
bitOffset = ma_ffs_32(~oldBitfield);
MA_ASSERT(bitOffset < 32);
newBitfield = oldBitfield | (1 << bitOffset);
if ((ma_uint32)c89atomic_compare_and_swap_32(&pAllocator->groups[iGroup].bitfield, oldBitfield, newBitfield) == oldBitfield) {
*pSlot = iGroup*32 + bitOffset;
c89atomic_fetch_add_32(&pAllocator->counter, 1);
return MA_SUCCESS;
}
}
#else
/* A ref counted implementation which is a bit simpler to understand what's going on, but has the expense of an extra 32-bits for the group ref count. */
ma_uint32 refcount;
refcount = ma_atomic_increment_32(&pAllocator->groups[iGroup].refcount); /* <-- Grab a hold on the bitfield. */
if (refcount == 1) {
if (pAllocator->groups[iGroup].bitfield != 0xFFFFFFFF) {
/* We have an available bit. Now we just find the first unset bit. */
ma_uint32 bitOffset;
if (c89atomic_compare_and_swap_32(&pAllocator->groups[iGroup].bitfield, oldBitfield, newBitfield) == oldBitfield) {
ma_uint32 slotIndex;
bitOffset = ma_ffs_32(~pAllocator->groups[iGroup].bitfield); /* ffs = find first set. We just invert the bits to find the first unset. */
MA_ASSERT(bitOffset < 32);
/* Increment the counter as soon as possible to have other threads report out-of-memory sooner than later. */
c89atomic_fetch_add_32(&pAllocator->count, 1);
pAllocator->groups[iGroup].bitfield = pAllocator->groups[iGroup].bitfield | (1 << bitOffset);
/* The slot index is required for constructing the output value. */
slotIndex = (iGroup << 5) + bitOffset; /* iGroup << 5 = iGroup * 32 */
/* Before releasing the group we need to ensure the write operation above has completed so we'll throw a memory barrier in here for safety. */
ma_memory_barrier();
ma_atomic_increment_32(&pAllocator->counter); /* Incrementing the counter should happen before releasing the group's ref count to ensure we don't waste loop iterations in out-of-memory scenarios. */
ma_atomic_decrement_32(&pAllocator->groups[iGroup].refcount); /* Release the hold as soon as possible to allow other things to access the bitfield. */
/* Increment the reference count before constructing the output value. */
pAllocator->slots[slotIndex] += 1;
*pSlot = iGroup*32 + bitOffset;
/* Construct the output value. */
*pSlot = ((ma_uint64)pAllocator->slots[slotIndex] << 32 | slotIndex);
return MA_SUCCESS;
} else {
/* Every slot within this group has been consumed so we'll need to move on to the next one. */
}
} else {
/* This group is being held by another thread for it's own allocation. Skip this group and move on to the next one. */
MA_ASSERT(refcount > 1);
}
/* Getting here means we didn't find a slot in this group. We need to release the hold on this group and move to the next one. */
ma_atomic_decrement_32(&pAllocator->groups[iGroup].refcount);
#endif
}
/* We weren't able to find a slot. If it's because we've reached our capacity we need to return MA_OUT_OF_MEMORY. Otherwise we need to do another iteration and try again. */
if (pAllocator->counter < capacity) {
if (pAllocator->count < capacity) {
ma_yield();
} else {
return MA_OUT_OF_MEMORY;
......@@ -777,26 +777,7 @@ MA_API ma_result ma_slot_allocator_alloc(ma_slot_allocator* pAllocator, ma_uint3
/* Unreachable. */
}
MA_API ma_result ma_slot_allocator_alloc_16(ma_slot_allocator* pAllocator, ma_uint16* pSlot)
{
ma_uint32 slot32;
ma_result result = ma_slot_allocator_alloc(pAllocator, &slot32);
if (result != MA_SUCCESS) {
return result;
}
if (slot32 > 65535) {
return MA_OUT_OF_RANGE;
}
if (pSlot != NULL) {
*pSlot = (ma_uint16)slot32;
}
return MA_SUCCESS;
}
MA_API ma_result ma_slot_allocator_free(ma_slot_allocator* pAllocator, ma_uint32 slot)
MA_API ma_result ma_slot_allocator_free(ma_slot_allocator* pAllocator, ma_uint64 slot)
{
ma_uint32 iGroup;
ma_uint32 iBit;
......@@ -805,8 +786,8 @@ MA_API ma_result ma_slot_allocator_free(ma_slot_allocator* pAllocator, ma_uint32
return MA_INVALID_ARGS;
}
iGroup = slot >> 5; /* slot / 32 */
iBit = slot & 31; /* slot % 32 */
iGroup = (slot & 0xFFFFFFFF) >> 5; /* slot / 32 */
iBit = (slot & 0xFFFFFFFF) & 31; /* slot % 32 */
if (iGroup >= ma_countof(pAllocator->groups)) {
return MA_INVALID_ARGS;
......@@ -814,46 +795,45 @@ MA_API ma_result ma_slot_allocator_free(ma_slot_allocator* pAllocator, ma_uint32
MA_ASSERT(iBit < 32); /* This must be true due to the logic we used to actually calculate it. */
while (pAllocator->counter > 0) {
#if 1
/* CAS loop implementation. */
while (pAllocator->count > 0) {
/* CAS */
ma_uint32 newBitfield;
ma_uint32 oldBitfield;
oldBitfield = pAllocator->groups[iGroup].bitfield;
newBitfield = oldBitfield & ~(1 << iBit);
if ((ma_uint32)c89atomic_compare_and_swap_32(&pAllocator->groups[iGroup].bitfield, oldBitfield, newBitfield) == oldBitfield) {
c89atomic_fetch_sub_32(&pAllocator->counter, 1);
if (c89atomic_compare_and_swap_32(&pAllocator->groups[iGroup].bitfield, oldBitfield, newBitfield) == oldBitfield) {
c89atomic_fetch_sub_32(&pAllocator->count, 1);
return MA_SUCCESS;
}
#else
/* Ref counted implementation. */
}
/* We need to get a hold on the group. We may need to spin for a few iterations, but this should complete in a reasonable amount of time. */
ma_uint32 refcount = ma_atomic_increment_32(&pAllocator->groups[iGroup]);
if (refcount == 1) {
pAllocator->groups[iGroup].bitfield = pAllocator->groups[iGroup].bitfield & ~(1 << iBit); /* Unset the bit. */
/* Getting here means there are no allocations available for freeing. */
return MA_INVALID_OPERATION;
}
/* Before releasing the group we need to ensure the write operation above has completed so we'll throw a memory barrier in here for safety. */
ma_memory_barrier();
c89atomic_fetch_sub_32(&pAllocator->counter, 1); /* Incrementing the counter should happen before releasing the group's ref count to ensure we don't waste loop iterations in out-of-memory scenarios. */
c89atomic_fetch_sub_32(&pAllocator->groups[iGroup].refcount, 1); /* Release the hold as soon as possible to allow other things to access the bitfield. */
return MA_SUCCESS;
} else {
/* Something else is holding the group. We need to spin for a bit. */
MA_ASSERT(refcount > 1);
}
#define MA_JOB_ID_NONE ~((ma_uint64)0)
/* Getting here means something is holding our lock. We need to release and spin. */
c89atomic_fetch_sub_32(&pAllocator->groups[iGroup], 1);
ma_yield();
#endif
}
static MA_INLINE ma_uint32 ma_job_extract_refcount(ma_uint64 toc)
{
return (ma_uint32)(toc >> 32);
}
/* Getting here means there are no allocations available for freeing. */
return MA_INVALID_OPERATION;
static MA_INLINE ma_uint16 ma_job_extract_slot(ma_uint64 toc)
{
return (ma_uint16)(toc & 0x0000FFFF);
}
static MA_INLINE ma_uint16 ma_job_extract_code(ma_uint64 toc)
{
return (ma_uint16)((toc & 0xFFFF0000) >> 16);
}
static MA_INLINE ma_uint64 ma_job_toc_to_allocation(ma_uint64 toc)
{
return ((ma_uint64)ma_job_extract_refcount(toc) << 32) | (ma_uint64)ma_job_extract_slot(toc);
}
......@@ -862,9 +842,9 @@ MA_API ma_job ma_job_init(ma_uint16 code)
ma_job job;
MA_ZERO_OBJECT(&job);
job.code = code;
job.slot = 0xFFFF;
job.next = 0xFFFF;
job.toc.code = code;
job.toc.slot = MA_JOB_ID_NONE; /* Temp value. Will be allocated when posted to a queue. */
job.next = MA_JOB_ID_NONE;
return job;
}
......@@ -872,9 +852,6 @@ MA_API ma_job ma_job_init(ma_uint16 code)
/*
Lock free queue implementation based on the paper by Michael and Scott: Nonblocking Algorithms and Preemption-Safe Locking on Multiprogrammed Shared Memory Multiprocessors
TODO:
- Figure out ABA protection.
*/
MA_API ma_result ma_job_queue_init(ma_uint32 flags, ma_job_queue* pQueue)
{
......@@ -896,10 +873,10 @@ MA_API ma_result ma_job_queue_init(ma_uint32 flags, ma_job_queue* pQueue)
Our queue needs to be initialized with a free standing node. This should always be slot 0. Required for the lock free algorithm. The first job in the queue is
just a dummy item for giving us the first item in the list which is stored in the "next" member.
*/
ma_slot_allocator_alloc_16(&pQueue->allocator, &pQueue->head); /* Will never fail. */
ma_slot_allocator_alloc(&pQueue->allocator, &pQueue->head); /* Will never fail. */
pQueue->tail = pQueue->head;
pQueue->jobs[pQueue->head].next = 0xFFFF;
pQueue->jobs[pQueue->head].next = MA_JOB_ID_NONE;
return MA_SUCCESS;
}
......@@ -921,44 +898,45 @@ MA_API ma_result ma_job_queue_uninit(ma_job_queue* pQueue)
MA_API ma_result ma_job_queue_post(ma_job_queue* pQueue, const ma_job* pJob)
{
ma_result result;
ma_uint16 slot;
ma_uint16 tail;
ma_uint16 next;
ma_uint64 slot;
ma_uint64 tail;
ma_uint64 next;
if (pQueue == NULL || pJob == NULL) {
return MA_INVALID_ARGS;
}
/* We need a new slot. */
result = ma_slot_allocator_alloc_16(&pQueue->allocator, &slot);
result = ma_slot_allocator_alloc(&pQueue->allocator, &slot);
if (result != MA_SUCCESS) {
return result; /* Probably ran out of slots. If so, MA_OUT_OF_MEMORY will be returned. */
}
/* At this point we should have a slot to place the job. */
MA_ASSERT(slot < MA_RESOURCE_MANAGER_MESSAGE_QUEUE_CAPACITY);
MA_ASSERT(ma_job_extract_slot(slot) < MA_RESOURCE_MANAGER_MESSAGE_QUEUE_CAPACITY);
/* We need to put the job into memory before we do anything. */
pQueue->jobs[slot] = *pJob;
pQueue->jobs[slot].slot = slot; /* Safe cast as our maximum slot is <= 65535. */
pQueue->jobs[slot].next = 0xFFFF; /* Reset for safety. */
pQueue->jobs[ma_job_extract_slot(slot)] = *pJob;
pQueue->jobs[ma_job_extract_slot(slot)].toc.allocation = slot; /* This will overwrite the job code. */
pQueue->jobs[ma_job_extract_slot(slot)].toc.code = pJob->toc.code; /* The job code needs to be applied again because the line above overwrote it. */
pQueue->jobs[ma_job_extract_slot(slot)].next = MA_JOB_ID_NONE; /* Reset for safety. */
/* The job is stored in memory so now we need to add it to our linked list. We only ever add items to the end of the list. */
for (;;) {
tail = pQueue->tail;
next = pQueue->jobs[tail].next;
next = pQueue->jobs[ma_job_extract_slot(tail)].next;
if (tail == pQueue->tail) {
if (next == 0xFFFF) {
if (c89atomic_compare_and_swap_16(&pQueue->jobs[tail].next, next, slot) == next) {
if (next == MA_JOB_ID_NONE) {
if (c89atomic_compare_and_swap_64(&pQueue->jobs[ma_job_extract_slot(tail)].next, next, slot) == next) {
break;
}
} else {
c89atomic_compare_and_swap_16(&pQueue->tail, tail, next);
c89atomic_compare_and_swap_64(&pQueue->tail, tail, next);
}
}
}
c89atomic_compare_and_swap_16(&pQueue->tail, tail, slot);
c89atomic_compare_and_swap_64(&pQueue->tail, tail, slot);
/* Signal the semaphore as the last step if we're using synchronous mode. */
......@@ -971,9 +949,9 @@ MA_API ma_result ma_job_queue_post(ma_job_queue* pQueue, const ma_job* pJob)
MA_API ma_result ma_job_queue_next(ma_job_queue* pQueue, ma_job* pJob)
{
ma_uint16 head;
ma_uint16 tail;
ma_uint16 next;
ma_uint64 head;
ma_uint64 tail;
ma_uint64 next;
if (pQueue == NULL || pJob == NULL) {
return MA_INVALID_ARGS;
......@@ -988,17 +966,17 @@ MA_API ma_result ma_job_queue_next(ma_job_queue* pQueue, ma_job* pJob)
for (;;) {
head = pQueue->head;
tail = pQueue->tail;
next = pQueue->jobs[head].next;
next = pQueue->jobs[ma_job_extract_slot(head)].next;
if (head == pQueue->head) {
if (head == tail) {
if (next == 0xFFFF) {
if (next == MA_JOB_ID_NONE) {
return MA_NO_DATA_AVAILABLE;
}
c89atomic_compare_and_swap_16(&pQueue->tail, tail, next);
c89atomic_compare_and_swap_64(&pQueue->tail, tail, next);
} else {
*pJob = pQueue->jobs[next];
if (c89atomic_compare_and_swap_16(&pQueue->head, head, next) == head) {
*pJob = pQueue->jobs[ma_job_extract_slot(next)];
if (c89atomic_compare_and_swap_64(&pQueue->head, head, next) == head) {
break;
}
}
......@@ -1016,7 +994,7 @@ MA_API ma_result ma_job_queue_free(ma_job_queue* pQueue, ma_job* pJob)
return MA_INVALID_ARGS;
}
return ma_slot_allocator_free(&pQueue->allocator, pJob->slot);
return ma_slot_allocator_free(&pQueue->allocator, ma_job_toc_to_allocation(pJob->toc.allocation));
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment