* @copyright Copyright (c) 2024 Barak Shoshany. Licensed under the MIT license. If you found this project useful, please consider starring it on GitHub! If you use this library in software of any kind, please provide a link to the GitHub repository https://github.com/bshoshany/thread-pool in the source code and documentation. If you use this library in published research, please cite it as follows: Barak Shoshany, "A C++17 Thread Pool for High-Performance Scientific Computing", doi:10.1016/j.softx.2024.101687, SoftwareX 26 (2024) 101687, arXiv:2105.00613
*
* @brief BS::thread_pool: a fast, lightweight, and easy-to-use C++17 thread pool library. This header file contains the main thread pool class and some additional classes and definitions. No other files are needed in order to use the thread pool itself.
* @brief A namespace used to obtain information about the current thread.
*/
namespacethis_thread{
/**
* @brief A type returned by `BS::this_thread::get_index()` which can optionally contain the index of a thread, if that thread belongs to a `BS::thread_pool`. Otherwise, it will contain no value.
*/
usingoptional_index=std::optional<size_t>;
/**
* @brief A type returned by `BS::this_thread::get_pool()` which can optionally contain the pointer to the pool that owns a thread, if that thread belongs to a `BS::thread_pool`. Otherwise, it will contain no value.
*/
usingoptional_pool=std::optional<thread_pool*>;
/**
* @brief A helper class to store information about the index of the current thread.
*/
class[[nodiscard]]thread_info_index
{
friendclassBS::thread_pool;
public:
/**
* @brief Get the index of the current thread. If this thread belongs to a `BS::thread_pool` object, it will have an index from 0 to `BS::thread_pool::get_thread_count() - 1`. Otherwise, for example if this thread is the main thread or an independent `std::thread`, `std::nullopt` will be returned.
*
* @return An `std::optional` object, optionally containing a thread index. Unless you are 100% sure this thread is in a pool, first use `std::optional::has_value()` to check if it contains a value, and if so, use `std::optional::value()` to obtain that value.
*/
[[nodiscard]]optional_indexoperator()()const
{
returnindex;
}
private:
/**
* @brief The index of the current thread.
*/
optional_indexindex=std::nullopt;
};// class thread_info_index
/**
* @brief A helper class to store information about the thread pool that owns the current thread.
*/
class[[nodiscard]]thread_info_pool
{
friendclassBS::thread_pool;
public:
/**
* @brief Get the pointer to the thread pool that owns the current thread. If this thread belongs to a `BS::thread_pool` object, a pointer to that object will be returned. Otherwise, for example if this thread is the main thread or an independent `std::thread`, `std::nullopt` will be returned.
*
* @return An `std::optional` object, optionally containing a pointer to a thread pool. Unless you are 100% sure this thread is in a pool, first use `std::optional::has_value()` to check if it contains a value, and if so, use `std::optional::value()` to obtain that value.
*/
[[nodiscard]]optional_pooloperator()()const
{
returnpool;
}
private:
/**
* @brief A pointer to the thread pool that owns the current thread.
*/
optional_poolpool=std::nullopt;
};// class thread_info_pool
/**
* @brief A `thread_local` object used to obtain information about the index of the current thread.
*/
inlinethread_localthread_info_indexget_index;
/**
* @brief A `thread_local` object used to obtain information about the thread pool that owns the current thread.
*/
inlinethread_localthread_info_poolget_pool;
}// namespace this_thread
/**
* @brief A helper class to facilitate waiting for and/or getting the results of multiple futures at once.
* @brief Check if all the futures stored in this `multi_future` are valid.
*
* @return `true` if all futures are valid, `false` if at least one of the futures is not valid.
*/
[[nodiscard]]boolvalid()const
{
boolis_valid=true;
for(conststd::future<T>&future:*this)
is_valid=is_valid&&future.valid();
returnis_valid;
}
/**
* @brief Wait for all the futures stored in this `multi_future`.
*/
voidwait()const
{
for(conststd::future<T>&future:*this)
future.wait();
}
/**
* @brief Wait for all the futures stored in this `multi_future`, but stop waiting after the specified duration has passed. This function first waits for the first future for the desired duration. If that future is ready before the duration expires, this function waits for the second future for whatever remains of the duration. It continues similarly until the duration expires.
*
* @tparam R An arithmetic type representing the number of ticks to wait.
* @tparam P An `std::ratio` representing the length of each tick in seconds.
* @param duration The amount of time to wait.
* @return `true` if all futures have been waited for before the duration expired, `false` otherwise.
* @brief Wait for all the futures stored in this `multi_future`, but stop waiting after the specified time point has been reached. This function first waits for the first future until the desired time point. If that future is ready before the time point is reached, this function waits for the second future until the desired time point. It continues similarly until the time point is reached.
*
* @tparam C The type of the clock used to measure time.
* @tparam D An `std::chrono::duration` type used to indicate the time point.
* @param timeout_time The time point at which to stop waiting.
* @return `true` if all futures have been waited for before the time point was reached, `false` otherwise.
* @brief A fast, lightweight, and easy-to-use C++17 thread pool class.
*/
class[[nodiscard]]thread_pool
{
public:
// ============================
// Constructors and destructors
// ============================
/**
* @brief Construct a new thread pool. The number of threads will be the total number of hardware threads available, as reported by the implementation. This is usually determined by the number of cores in the CPU. If a core is hyperthreaded, it will count as two threads.
*/
thread_pool():thread_pool(0,[]{}){}
/**
* @brief Construct a new thread pool with the specified number of threads.
*
* @param num_threads The number of threads to use.
* @brief Construct a new thread pool with the specified initialization function.
*
* @param init_task An initialization function to run in each thread before it starts to execute any submitted tasks. The function must take no arguments and have no return value. It will only be executed exactly once, when the thread is first constructed.
* @brief Construct a new thread pool with the specified number of threads and initialization function.
*
* @param num_threads The number of threads to use.
* @param init_task An initialization function to run in each thread before it starts to execute any submitted tasks. The function must take no arguments and have no return value. It will only be executed exactly once, when the thread is first constructed.
// The copy and move constructors and assignment operators are deleted. The thread pool uses a mutex, which cannot be copied or moved.
thread_pool(constthread_pool&)=delete;
thread_pool(thread_pool&&)=delete;
thread_pool&operator=(constthread_pool&)=delete;
thread_pool&operator=(thread_pool&&)=delete;
/**
* @brief Destruct the thread pool. Waits for all tasks to complete, then destroys all threads. Note that if the pool is paused, then any tasks still in the queue will never be executed.
*/
~thread_pool()
{
wait();
destroy_threads();
}
// =======================
// Public member functions
// =======================
#ifdef BS_THREAD_POOL_ENABLE_NATIVE_HANDLES
/**
* @brief Get a vector containing the underlying implementation-defined thread handles for each of the pool's threads, as obtained by `std::thread::native_handle()`. Only enabled if `BS_THREAD_POOL_ENABLE_NATIVE_HANDLES` is defined.
* @brief Get the number of tasks currently waiting in the queue to be executed by the threads.
*
* @return The number of queued tasks.
*/
[[nodiscard]]size_tget_tasks_queued()const
{
conststd::scoped_locktasks_lock(tasks_mutex);
returntasks.size();
}
/**
* @brief Get the number of tasks currently being executed by the threads.
*
* @return The number of running tasks.
*/
[[nodiscard]]size_tget_tasks_running()const
{
conststd::scoped_locktasks_lock(tasks_mutex);
returntasks_running;
}
/**
* @brief Get the total number of unfinished tasks: either still waiting in the queue, or running in a thread. Note that `get_tasks_total() == get_tasks_queued() + get_tasks_running()`.
*
* @return The total number of tasks.
*/
[[nodiscard]]size_tget_tasks_total()const
{
conststd::scoped_locktasks_lock(tasks_mutex);
returntasks_running+tasks.size();
}
/**
* @brief Get the number of threads in the pool.
*
* @return The number of threads.
*/
[[nodiscard]]concurrency_tget_thread_count()const
{
returnthread_count;
}
/**
* @brief Get a vector containing the unique identifiers for each of the pool's threads, as obtained by `std::thread::get_id()`.
* @brief Check whether the pool is currently paused. Only enabled if `BS_THREAD_POOL_ENABLE_PAUSE` is defined.
*
* @return `true` if the pool is paused, `false` if it is not paused.
*/
[[nodiscard]]boolis_paused()const
{
conststd::scoped_locktasks_lock(tasks_mutex);
returnpaused;
}
/**
* @brief Pause the pool. The workers will temporarily stop retrieving new tasks out of the queue, although any tasks already executed will keep running until they are finished. Only enabled if `BS_THREAD_POOL_ENABLE_PAUSE` is defined.
*/
voidpause()
{
conststd::scoped_locktasks_lock(tasks_mutex);
paused=true;
}
#endif
/**
* @brief Purge all the tasks waiting in the queue. Tasks that are currently running will not be affected, but any tasks still waiting in the queue will be discarded, and will never be executed by the threads. Please note that there is no way to restore the purged tasks.
*/
voidpurge()
{
conststd::scoped_locktasks_lock(tasks_mutex);
while(!tasks.empty())
tasks.pop();
}
/**
* @brief Submit a function with no arguments and no return value into the task queue, with the specified priority. To push a function with arguments, enclose it in a lambda expression. Does not return a future, so the user must use `wait()` or some other method to ensure that the task finishes executing, otherwise bad things will happen.
*
* @tparam F The type of the function.
* @param task The function to push.
* @param priority The priority of the task. Should be between -32,768 and 32,767 (a signed 16-bit integer). The default is 0. Only enabled if `BS_THREAD_POOL_ENABLE_PRIORITY` is defined.
* @brief Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue, with the specified priority. The block function takes two arguments, the start and end of the block, so that it is only called only once per block, but it is up to the user make sure the block function correctly deals with all the indices in each block. Does not return a `multi_future`, so the user must use `wait()` or some other method to ensure that the loop finishes executing, otherwise bad things will happen.
*
* @tparam T The type of the indices. Should be a signed or unsigned integer.
* @tparam F The type of the function to loop through.
* @param first_index The first index in the loop.
* @param index_after_last The index after the last index in the loop. The loop will iterate from `first_index` to `(index_after_last - 1)` inclusive. In other words, it will be equivalent to `for (T i = first_index; i < index_after_last; ++i)`. Note that if `index_after_last <= first_index`, no blocks will be submitted.
* @param block A function that will be called once per block. Should take exactly two arguments: the first index in the block and the index after the last index in the block. `block(start, end)` should typically involve a loop of the form `for (T i = start; i < end; ++i)`.
* @param num_blocks The maximum number of blocks to split the loop into. The default is 0, which means the number of blocks will be equal to the number of threads in the pool.
* @param priority The priority of the tasks. Should be between -32,768 and 32,767 (a signed 16-bit integer). The default is 0. Only enabled if `BS_THREAD_POOL_ENABLE_PRIORITY` is defined.
* @brief Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue, with the specified priority. The loop function takes one argument, the loop index, so that it is called many times per block. Does not return a `multi_future`, so the user must use `wait()` or some other method to ensure that the loop finishes executing, otherwise bad things will happen.
*
* @tparam T The type of the indices. Should be a signed or unsigned integer.
* @tparam F The type of the function to loop through.
* @param first_index The first index in the loop.
* @param index_after_last The index after the last index in the loop. The loop will iterate from `first_index` to `(index_after_last - 1)` inclusive. In other words, it will be equivalent to `for (T i = first_index; i < index_after_last; ++i)`. Note that if `index_after_last <= first_index`, no blocks will be submitted.
* @param loop The function to loop through. Will be called once per index, many times per block. Should take exactly one argument: the loop index.
* @param num_blocks The maximum number of blocks to split the loop into. The default is 0, which means the number of blocks will be equal to the number of threads in the pool.
* @param priority The priority of the tasks. Should be between -32,768 and 32,767 (a signed 16-bit integer). The default is 0. Only enabled if `BS_THREAD_POOL_ENABLE_PRIORITY` is defined.
* @brief Submit a sequence of tasks enumerated by indices to the queue, with the specified priority. Does not return a `multi_future`, so the user must use `wait()` or some other method to ensure that the sequence finishes executing, otherwise bad things will happen.
*
* @tparam T The type of the indices. Should be a signed or unsigned integer.
* @tparam F The type of the function used to define the sequence.
* @param first_index The first index in the sequence.
* @param index_after_last The index after the last index in the sequence. The sequence will iterate from `first_index` to `(index_after_last - 1)` inclusive. In other words, it will be equivalent to `for (T i = first_index; i < index_after_last; ++i)`. Note that if `index_after_last <= first_index`, no tasks will be submitted.
* @param sequence The function used to define the sequence. Will be called once per index. Should take exactly one argument, the index.
* @param priority The priority of the tasks. Should be between -32,768 and 32,767 (a signed 16-bit integer). The default is 0. Only enabled if `BS_THREAD_POOL_ENABLE_PRIORITY` is defined.
* @brief Reset the pool with the total number of hardware threads available, as reported by the implementation. Waits for all currently running tasks to be completed, then destroys all threads in the pool and creates a new thread pool with the new number of threads. Any tasks that were waiting in the queue before the pool was reset will then be executed by the new threads. If the pool was paused before resetting it, the new pool will be paused as well.
*/
voidreset()
{
reset(0,[]{});
}
/**
* @brief Reset the pool with a new number of threads. Waits for all currently running tasks to be completed, then destroys all threads in the pool and creates a new thread pool with the new number of threads. Any tasks that were waiting in the queue before the pool was reset will then be executed by the new threads. If the pool was paused before resetting it, the new pool will be paused as well.
*
* @param num_threads The number of threads to use.
*/
voidreset(constconcurrency_tnum_threads)
{
reset(num_threads,[]{});
}
/**
* @brief Reset the pool with the total number of hardware threads available, as reported by the implementation, and a new initialization function. Waits for all currently running tasks to be completed, then destroys all threads in the pool and creates a new thread pool with the new number of threads and initialization function. Any tasks that were waiting in the queue before the pool was reset will then be executed by the new threads. If the pool was paused before resetting it, the new pool will be paused as well.
*
* @param init_task An initialization function to run in each thread before it starts to execute any submitted tasks. The function must take no arguments and have no return value. It will only be executed exactly once, when the thread is first constructed.
*/
voidreset(conststd::function<void()>&init_task)
{
reset(0,init_task);
}
/**
* @brief Reset the pool with a new number of threads and a new initialization function. Waits for all currently running tasks to be completed, then destroys all threads in the pool and creates a new thread pool with the new number of threads and initialization function. Any tasks that were waiting in the queue before the pool was reset will then be executed by the new threads. If the pool was paused before resetting it, the new pool will be paused as well.
*
* @param num_threads The number of threads to use.
* @param init_task An initialization function to run in each thread before it starts to execute any submitted tasks. The function must take no arguments and have no return value. It will only be executed exactly once, when the thread is first constructed.
* @brief Submit a function with no arguments into the task queue, with the specified priority. To submit a function with arguments, enclose it in a lambda expression. If the function has a return value, get a future for the eventual returned value. If the function has no return value, get an `std::future<void>` which can be used to wait until the task finishes.
*
* @tparam F The type of the function.
* @tparam R The return type of the function (can be `void`).
* @param task The function to submit.
* @param priority The priority of the task. Should be between -32,768 and 32,767 (a signed 16-bit integer). The default is 0. Only enabled if `BS_THREAD_POOL_ENABLE_PRIORITY` is defined.
* @return A future to be used later to wait for the function to finish executing and/or obtain its returned value if it has one.
* @brief Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue, with the specified priority. The block function takes two arguments, the start and end of the block, so that it is only called only once per block, but it is up to the user make sure the block function correctly deals with all the indices in each block. Returns a `multi_future` that contains the futures for all of the blocks.
*
* @tparam T The type of the indices. Should be a signed or unsigned integer.
* @tparam F The type of the function to loop through.
* @tparam R The return type of the function to loop through (can be `void`).
* @param first_index The first index in the loop.
* @param index_after_last The index after the last index in the loop. The loop will iterate from `first_index` to `(index_after_last - 1)` inclusive. In other words, it will be equivalent to `for (T i = first_index; i < index_after_last; ++i)`. Note that if `index_after_last <= first_index`, no blocks will be submitted, and an empty `multi_future` will be returned.
* @param block A function that will be called once per block. Should take exactly two arguments: the first index in the block and the index after the last index in the block. `block(start, end)` should typically involve a loop of the form `for (T i = start; i < end; ++i)`.
* @param num_blocks The maximum number of blocks to split the loop into. The default is 0, which means the number of blocks will be equal to the number of threads in the pool.
* @param priority The priority of the tasks. Should be between -32,768 and 32,767 (a signed 16-bit integer). The default is 0. Only enabled if `BS_THREAD_POOL_ENABLE_PRIORITY` is defined.
* @return A `multi_future` that can be used to wait for all the blocks to finish. If the block function returns a value, the `multi_future` can also be used to obtain the values returned by each block.
* @brief Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue, with the specified priority. The loop function takes one argument, the loop index, so that it is called many times per block. It must have no return value. Returns a `multi_future` that contains the futures for all of the blocks.
*
* @tparam T The type of the indices. Should be a signed or unsigned integer.
* @tparam F The type of the function to loop through.
* @param first_index The first index in the loop.
* @param index_after_last The index after the last index in the loop. The loop will iterate from `first_index` to `(index_after_last - 1)` inclusive. In other words, it will be equivalent to `for (T i = first_index; i < index_after_last; ++i)`. Note that if `index_after_last <= first_index`, no tasks will be submitted, and an empty `multi_future` will be returned.
* @param loop The function to loop through. Will be called once per index, many times per block. Should take exactly one argument: the loop index. It cannot have a return value.
* @param num_blocks The maximum number of blocks to split the loop into. The default is 0, which means the number of blocks will be equal to the number of threads in the pool.
* @param priority The priority of the tasks. Should be between -32,768 and 32,767 (a signed 16-bit integer). The default is 0. Only enabled if `BS_THREAD_POOL_ENABLE_PRIORITY` is defined.
* @return A `multi_future` that can be used to wait for all the blocks to finish.
* @brief Submit a sequence of tasks enumerated by indices to the queue, with the specified priority. Returns a `multi_future` that contains the futures for all of the tasks.
*
* @tparam T The type of the indices. Should be a signed or unsigned integer.
* @tparam F The type of the function used to define the sequence.
* @tparam R The return type of the function used to define the sequence (can be `void`).
* @param first_index The first index in the sequence.
* @param index_after_last The index after the last index in the sequence. The sequence will iterate from `first_index` to `(index_after_last - 1)` inclusive. In other words, it will be equivalent to `for (T i = first_index; i < index_after_last; ++i)`. Note that if `index_after_last <= first_index`, no tasks will be submitted, and an empty `multi_future` will be returned.
* @param sequence The function used to define the sequence. Will be called once per index. Should take exactly one argument, the index.
* @param priority The priority of the tasks. Should be between -32,768 and 32,767 (a signed 16-bit integer). The default is 0. Only enabled if `BS_THREAD_POOL_ENABLE_PRIORITY` is defined.
* @return A `multi_future` that can be used to wait for all the tasks to finish. If the sequence function returns a value, the `multi_future` can also be used to obtain the values returned by each task.
* @brief Wait for tasks to be completed. Normally, this function waits for all tasks, both those that are currently running in the threads and those that are still waiting in the queue. However, if the pool is paused, this function only waits for the currently running tasks (otherwise it would wait forever). Note: To wait for just one specific task, use `submit_task()` instead, and call the `wait()` member function of the generated future.
*
* @throws `wait_deadlock` if called from within a thread of the same pool, which would result in a deadlock. Only enabled if `BS_THREAD_POOL_ENABLE_WAIT_DEADLOCK_CHECK` is defined.
* @brief Wait for tasks to be completed, but stop waiting after the specified duration has passed.
*
* @tparam R An arithmetic type representing the number of ticks to wait.
* @tparam P An `std::ratio` representing the length of each tick in seconds.
* @param duration The amount of time to wait.
* @return `true` if all tasks finished running, `false` if the duration expired but some tasks are still running.
*
* @throws `wait_deadlock` if called from within a thread of the same pool, which would result in a deadlock. Only enabled if `BS_THREAD_POOL_ENABLE_WAIT_DEADLOCK_CHECK` is defined.
* @brief Wait for tasks to be completed, but stop waiting after the specified time point has been reached.
*
* @tparam C The type of the clock used to measure time.
* @tparam D An `std::chrono::duration` type used to indicate the time point.
* @param timeout_time The time point at which to stop waiting.
* @return `true` if all tasks finished running, `false` if the time point was reached but some tasks are still running.
*
* @throws `wait_deadlock` if called from within a thread of the same pool, which would result in a deadlock. Only enabled if `BS_THREAD_POOL_ENABLE_WAIT_DEADLOCK_CHECK` is defined.
* @brief An exception that will be thrown by `wait()`, `wait_for()`, and `wait_until()` if the user tries to call them from within a thread of the same pool, which would result in a deadlock.
* @brief Determine how many threads the pool should have, based on the parameter passed to the constructor or reset().
*
* @param num_threads The parameter passed to the constructor or `reset()`. If the parameter is a positive number, then the pool will be created with this number of threads. If the parameter is non-positive, or a parameter was not supplied (in which case it will have the default value of 0), then the pool will be created with the total number of hardware threads available, as obtained from `std::thread::hardware_concurrency()`. If the latter returns zero for some reason, then the pool will be created with just one thread.
* @return The number of threads to use for constructing the pool.
* @brief A worker function to be assigned to each thread in the pool. Waits until it is notified by `detach_task()` that a task is available, and then retrieves the task from the queue and executes it. Once the task finishes, the worker notifies `wait()` in case it is waiting.
*
* @param idx The index of this thread.
* @param init_task An initialization function to run in this thread before it starts to execute any submitted tasks.
* @brief A flag indicating whether the workers should pause. When set to `true`, the workers temporarily stop retrieving new tasks out of the queue, although any tasks already executed will keep running until they are finished. When set to `false` again, the workers resume retrieving tasks.
*/
boolpaused=false;
#endif
/**
* @brief A condition variable to notify `worker()` that a new task has become available.
*/
std::condition_variabletask_available_cv={};
/**
* @brief A condition variable to notify `wait()` that the tasks are done.
*/
std::condition_variabletasks_done_cv={};
/**
* @brief A queue of tasks to be executed by the threads.
*/
#ifdef BS_THREAD_POOL_ENABLE_PRIORITY
std::priority_queue<pr_task>tasks={};
#else
std::queue<std::function<void()>>tasks={};
#endif
/**
* @brief A counter for the total number of currently running tasks.
*/
size_ttasks_running=0;
/**
* @brief A mutex to synchronize access to the task queue by different threads.
*/
mutablestd::mutextasks_mutex={};
/**
* @brief The number of threads in the pool.
*/
concurrency_tthread_count=0;
/**
* @brief A smart pointer to manage the memory allocated for the threads.
*/
std::unique_ptr<std::thread[]>threads=nullptr;
/**
* @brief A flag indicating that `wait()` is active and expects to be notified whenever a task is done.
*/
boolwaiting=false;
/**
* @brief A flag indicating to the workers to keep running. When set to `false`, the workers terminate permanently.