pthreads :: threadpool $ Task
Task with amain
method to be implemented by subclasses
pthreads :: threadpool $ Task
Task with amain
method to be implemented by subclasses
core :: union_find
union–find algorithm using an efficient disjoint-set data structurepthreads :: concurrent_collections
Introduces thread-safe concurrent collectionsrestful
annotation documented at lib/nitcorn/restful.nit
# Introduces a minimal ThreadPool implementation using Tasks
module threadpool
intrude import pthreads
import concurrent_collections
# A simple ThreadPool implemented with an array
class ThreadPool
private var queue = new ConcurrentList[Task]
private var mutex = new Mutex
private var cond = new NativePthreadCond
private var threads = new Array[PoolThread]
# Number of threads used, can only grow after the first call to `execute`
var nb_threads = 5 is optional, writable
private fun create_threads do
while threads.length < nb_threads do
var t = new PoolThread(queue, mutex, cond)
t.start
threads.add t
end
end
# Adds a Task into the queue
fun execute(task: Task) do
create_threads
queue.push(task)
cond.signal
end
# Join all threads, waiting for all tasks to be completed
fun join_all do
# Wait
for t in threads do t.join
# Reset
threads.clear
end
end
# A Thread running in a threadpool
private class PoolThread
super Thread
var queue: ConcurrentList[Task]
var mutex: Mutex
var cond : NativePthreadCond
redef fun main do
loop
var t = null
mutex.lock
if queue.is_empty then cond.wait(mutex.native.as(not null))
if not queue.is_empty then
t = queue.shift
end
mutex.unlock
if t != null then
t.main
t.after_main
end
end
end
end
redef class Task
# Additional work executed after `main` from a `ThreadPool`
private fun after_main do end
end
# A Task which is joinable, meaning it can return a value and if the value is not set yet, it blocks the execution
class JoinTask
super Task
# Is `self` done?
var is_done = false
private var mutex = new Mutex
private var cond: nullable NativePthreadCond = null
# Return immediatly if the task terminated, or block waiting for `self` to terminate
fun join do
mutex.lock
if not is_done then
var cond = new NativePthreadCond
self.cond = cond
cond.wait(mutex.native.as(not null))
end
mutex.unlock
end
redef fun after_main do
# TODO move this at the end of main so all `JoinTask` can be joined
# no matter what calls `main`.
mutex.lock
is_done = true
var tcond = cond
if tcond != null then tcond.signal
mutex.unlock
end
end
lib/pthreads/threadpool.nit:15,1--117,3