# A simple ThreadPool implemented with an array
class ThreadPool
- private var queue = new ConcurrentList[Task].wrap(new List[Task])
+ private var queue = new ConcurrentList[Task]
private var mutex = new Mutex
private var cond = new NativePthreadCond
+ private var threads = new Array[PoolThread]
- # Number of threads used
- var nb_threads: Int is noinit
+ # Number of threads used, can only grow after the first call to `execute`
+ var nb_threads = 5 is optional, writable
- init do
- for i in [0..nb_threads[ do
+ private fun create_threads do
+ while threads.length < nb_threads do
var t = new PoolThread(queue, mutex, cond)
t.start
+ threads.add t
end
end
- private fun set_nb_threads(nb: nullable Int) is autoinit do nb_threads = nb or else 5
-
# Adds a Task into the queue
fun execute(task: Task) do
+ create_threads
queue.push(task)
cond.signal
end
+
+ # Join all threads, waiting for all tasks to be completed
+ fun join_all do
+ # Wait
+ for t in threads do t.join
+
+ # Reset
+ threads.clear
+ end
end
# A Thread running in a threadpool
redef fun main do
loop
- var t: nullable Task = null
+ var t = null
mutex.lock
if queue.is_empty then cond.wait(mutex.native.as(not null))
if not queue.is_empty then
- t = queue.pop
+ t = queue.shift
end
mutex.unlock
- if t != null then t.main
+ if t != null then
+ t.main
+ t.after_main
+ end
+ end
+ end
+end
+
+redef class Task
+ # Additional work executed after `main` from a `ThreadPool`
+ private fun after_main do end
+end
+
+# A Task which is joinable, meaning it can return a value and if the value is not set yet, it blocks the execution
+class JoinTask
+ super Task
+
+ # Is `self` done?
+ var is_done = false
+
+ private var mutex = new Mutex
+ private var cond: nullable NativePthreadCond = null
+
+ # Return immediatly if the task terminated, or block waiting for `self` to terminate
+ fun join do
+ mutex.lock
+ if not is_done then
+ var cond = new NativePthreadCond
+ self.cond = cond
+ cond.wait(mutex.native.as(not null))
end
+ mutex.unlock
+ end
+
+ redef fun after_main do
+ # TODO move this at the end of main so all `JoinTask` can be joined
+ # no matter what calls `main`.
+
+ mutex.lock
+ is_done = true
+ var tcond = cond
+ if tcond != null then tcond.signal
+ mutex.unlock
end
end