From: BlackMinou Date: Sat, 28 Nov 2015 20:21:24 +0000 (-0500) Subject: Introduces JoinTask, joinabale tasks X-Git-Tag: v0.8~54^2~1 X-Git-Url: http://nitlanguage.org?ds=sidebyside Introduces JoinTask, joinabale tasks Signed-off-by: BlackMinou --- diff --git a/lib/pthreads/concurrent_collections.nit b/lib/pthreads/concurrent_collections.nit index 67e6765..d55bad0 100644 --- a/lib/pthreads/concurrent_collections.nit +++ b/lib/pthreads/concurrent_collections.nit @@ -499,4 +499,12 @@ class ConcurrentList[E] real_collection.push(e) mutex.unlock end + + redef fun shift + do + mutex.lock + var value = real_collection.shift + mutex.unlock + return value + end end diff --git a/lib/pthreads/examples/threadpool_example.nit b/lib/pthreads/examples/threadpool_example.nit index f7fafd7..fec1ae2 100644 --- a/lib/pthreads/examples/threadpool_example.nit +++ b/lib/pthreads/examples/threadpool_example.nit @@ -19,7 +19,7 @@ import threadpool # Task printing "hello world" on standard output class HWTask - super Task + super JoinTask # Sleeping time var sec: Int diff --git a/lib/pthreads/threadpool.nit b/lib/pthreads/threadpool.nit index 42931d3..02042ba 100644 --- a/lib/pthreads/threadpool.nit +++ b/lib/pthreads/threadpool.nit @@ -20,7 +20,7 @@ import concurrent_collections # A simple ThreadPool implemented with an array class ThreadPool - private var queue = new ConcurrentList[Task].wrap(new List[Task]) + private var queue = new ConcurrentList[JoinTask].wrap(new List[JoinTask]) private var mutex = new Mutex private var cond = new NativePthreadCond @@ -37,7 +37,7 @@ class ThreadPool private fun set_nb_threads(nb: nullable Int) is autoinit do nb_threads = nb or else 5 # Adds a Task into the queue - fun execute(task: Task) do + fun execute(task: JoinTask) do queue.push(task) cond.signal end @@ -47,20 +47,49 @@ end private class PoolThread super Thread - var queue: ConcurrentList[Task] + var queue: ConcurrentList[JoinTask] var mutex: Mutex var cond : NativePthreadCond redef fun main do loop - var t: nullable Task = null + var t: nullable JoinTask = null mutex.lock if queue.is_empty then cond.wait(mutex.native.as(not null)) if not queue.is_empty then - t = queue.pop + t = queue.shift end mutex.unlock - if t != null then t.main + if t != null then + t.main + t.mutex.lock + t.is_done = true + var tcond = t.cond + if tcond != null then tcond.signal + t.mutex.unlock + end + end + end +end + +# A Task which is joinable, meaning it can return a value and if the value is not set yet, it blocks the execution +class JoinTask + super Task + + # Is `self` done ? + var is_done = false + + private var mutex = new Mutex + private var cond: nullable NativePthreadCond = null + + # Return immediatly if the task terminated, or block waiting for `self` to terminate + fun join do + mutex.lock + if not is_done then + var cond = new NativePthreadCond + self.cond = cond + cond.wait(mutex.native.as(not null)) end + mutex.unlock end end