Merge: Nitsmell : Adding new code smells and print console updated
[nit.git] / lib / pthreads / threadpool.nit
index 42931d3..927cdbf 100644 (file)
@@ -20,27 +20,37 @@ import concurrent_collections
 
 # A simple ThreadPool implemented with an array
 class ThreadPool
-       private var queue = new ConcurrentList[Task].wrap(new List[Task])
+       private var queue = new ConcurrentList[Task]
        private var mutex = new Mutex
        private var cond = new NativePthreadCond
+       private var threads = new Array[PoolThread]
 
-       # Number of threads used
-       var nb_threads: Int is noinit
+       # Number of threads used, can only grow after the first call to `execute`
+       var nb_threads = 5 is optional, writable
 
-       init do
-               for i in [0..nb_threads[ do
+       private fun create_threads do
+               while threads.length < nb_threads do
                        var t = new PoolThread(queue, mutex, cond)
                        t.start
+                       threads.add t
                end
        end
 
-       private fun set_nb_threads(nb: nullable Int) is autoinit do nb_threads = nb or else 5
-
        # Adds a Task into the queue
        fun execute(task: Task) do
+               create_threads
                queue.push(task)
                cond.signal
        end
+
+       # Join all threads, waiting for all tasks to be completed
+       fun join_all do
+               # Wait
+               for t in threads do t.join
+
+               # Reset
+               threads.clear
+       end
 end
 
 # A Thread running in a threadpool
@@ -53,14 +63,55 @@ private class PoolThread
 
        redef fun main do
                loop
-                       var t: nullable Task = null
+                       var t = null
                        mutex.lock
                        if queue.is_empty then cond.wait(mutex.native.as(not null))
                        if not queue.is_empty then
-                               t = queue.pop
+                               t = queue.shift
                        end
                        mutex.unlock
-                       if t != null then t.main
+                       if t != null then
+                               t.main
+                               t.after_main
+                       end
+               end
+       end
+end
+
+redef class Task
+       # Additional work executed after `main` from a `ThreadPool`
+       private fun after_main do end
+end
+
+# A Task which is joinable, meaning it can return a value and if the value is not set yet, it blocks the execution
+class JoinTask
+       super Task
+
+       # Is `self` done?
+       var is_done = false
+
+       private var mutex = new Mutex
+       private var cond: nullable NativePthreadCond = null
+
+       # Return immediatly if the task terminated, or block waiting for `self` to terminate
+       fun join do
+               mutex.lock
+               if not is_done then
+                       var cond = new NativePthreadCond
+                       self.cond = cond
+                       cond.wait(mutex.native.as(not null))
                end
+               mutex.unlock
+       end
+
+       redef fun after_main do
+               # TODO move this at the end of main so all `JoinTask` can be joined
+               # no matter what calls `main`.
+
+               mutex.lock
+               is_done = true
+               var tcond = cond
+               if tcond != null then tcond.signal
+               mutex.unlock
        end
 end