# A simple ThreadPool implemented with an array
class ThreadPool
- private var queue = new ConcurrentList[JoinTask].wrap(new List[JoinTask])
+ private var queue = new ConcurrentList[Task]
private var mutex = new Mutex
private var cond = new NativePthreadCond
+ private var threads = new Array[PoolThread]
- # Number of threads used
- var nb_threads: Int is noinit
+ # Number of threads used, can only grow after the first call to `execute`
+ var nb_threads = 5 is optional, writable
- init do
- for i in [0..nb_threads[ do
+ private fun create_threads do
+ while threads.length < nb_threads do
var t = new PoolThread(queue, mutex, cond)
t.start
+ threads.add t
end
end
- private fun set_nb_threads(nb: nullable Int) is autoinit do nb_threads = nb or else 5
-
# Adds a Task into the queue
- fun execute(task: JoinTask) do
+ fun execute(task: Task) do
+ create_threads
queue.push(task)
cond.signal
end
+
+ # Join all threads, waiting for all tasks to be completed
+ fun join_all do
+ # Wait
+ for t in threads do t.join
+
+ # Reset
+ threads.clear
+ end
end
# A Thread running in a threadpool
private class PoolThread
super Thread
- var queue: ConcurrentList[JoinTask]
+ var queue: ConcurrentList[Task]
var mutex: Mutex
var cond : NativePthreadCond
redef fun main do
loop
- var t: nullable JoinTask = null
+ var t = null
mutex.lock
if queue.is_empty then cond.wait(mutex.native.as(not null))
if not queue.is_empty then
mutex.unlock
if t != null then
t.main
- t.mutex.lock
- t.is_done = true
- var tcond = t.cond
- if tcond != null then tcond.signal
- t.mutex.unlock
+ t.after_main
end
end
end
end
+redef class Task
+ # Additional work executed after `main` from a `ThreadPool`
+ private fun after_main do end
+end
+
# A Task which is joinable, meaning it can return a value and if the value is not set yet, it blocks the execution
class JoinTask
super Task
- # Is `self` done ?
+ # Is `self` done?
var is_done = false
private var mutex = new Mutex
end
mutex.unlock
end
+
+ redef fun after_main do
+ # TODO move this at the end of main so all `JoinTask` can be joined
+ # no matter what calls `main`.
+
+ mutex.lock
+ is_done = true
+ var tcond = cond
+ if tcond != null then tcond.signal
+ mutex.unlock
+ end
end