Merge: Introduces JoinTask, joinabale tasks
authorJean Privat <jean@pryen.org>
Fri, 4 Dec 2015 20:40:14 +0000 (15:40 -0500)
committerJean Privat <jean@pryen.org>
Fri, 4 Dec 2015 20:40:14 +0000 (15:40 -0500)
JoinTask prototype

Pull-Request: #1853
Reviewed-by: Jean Privat <jean@pryen.org>
Reviewed-by: Alexis Laferrière <alexis.laf@xymus.net>

lib/pthreads/concurrent_collections.nit
lib/pthreads/examples/jointask_example.nit [new file with mode: 0644]
lib/pthreads/examples/threadpool_example.nit
lib/pthreads/threadpool.nit
tests/sav/jointask_example.res [new file with mode: 0644]

index 67e6765..d55bad0 100644 (file)
@@ -499,4 +499,12 @@ class ConcurrentList[E]
                real_collection.push(e)
                mutex.unlock
        end
+
+       redef fun shift
+       do
+               mutex.lock
+               var value = real_collection.shift
+               mutex.unlock
+               return value
+       end
 end
diff --git a/lib/pthreads/examples/jointask_example.nit b/lib/pthreads/examples/jointask_example.nit
new file mode 100644 (file)
index 0000000..3d12f6f
--- /dev/null
@@ -0,0 +1,52 @@
+# This file is part of NIT (http://www.nitlanguage.org).
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Simple example of joinable task using threadpool
+module jointask_example
+
+import threadpool
+
+# Task computing a string
+class StringTask
+       super JoinTask
+
+       # Sleeping time
+       var sec: Int
+
+       # result of `self` execution
+       var value: String
+
+       # ID for printing
+       var id: Int
+
+       redef fun main do
+               nanosleep(sec, 0)
+               value += " id: {id}"
+       end
+end
+
+var tp = new ThreadPool
+var t0 = new StringTask(10, "First, long task", 0)
+var tasks = new Array[StringTask]
+for i in 5.times do
+       tasks.add(new StringTask(1, "Small task", i + 1))
+end
+tp.execute(t0)
+for t in tasks do tp.execute(t)
+for t in tasks do
+       t.join
+       print t.value
+end
+t0.join
+print t0.value
index f7fafd7..fec1ae2 100644 (file)
@@ -19,7 +19,7 @@ import threadpool
 
 # Task printing "hello world" on standard output
 class HWTask
-       super Task
+       super JoinTask
 
        # Sleeping time
        var sec: Int
index 42931d3..02042ba 100644 (file)
@@ -20,7 +20,7 @@ import concurrent_collections
 
 # A simple ThreadPool implemented with an array
 class ThreadPool
-       private var queue = new ConcurrentList[Task].wrap(new List[Task])
+       private var queue = new ConcurrentList[JoinTask].wrap(new List[JoinTask])
        private var mutex = new Mutex
        private var cond = new NativePthreadCond
 
@@ -37,7 +37,7 @@ class ThreadPool
        private fun set_nb_threads(nb: nullable Int) is autoinit do nb_threads = nb or else 5
 
        # Adds a Task into the queue
-       fun execute(task: Task) do
+       fun execute(task: JoinTask) do
                queue.push(task)
                cond.signal
        end
@@ -47,20 +47,49 @@ end
 private class PoolThread
        super Thread
 
-       var queue: ConcurrentList[Task]
+       var queue: ConcurrentList[JoinTask]
        var mutex: Mutex
        var cond : NativePthreadCond
 
        redef fun main do
                loop
-                       var t: nullable Task = null
+                       var t: nullable JoinTask = null
                        mutex.lock
                        if queue.is_empty then cond.wait(mutex.native.as(not null))
                        if not queue.is_empty then
-                               t = queue.pop
+                               t = queue.shift
                        end
                        mutex.unlock
-                       if t != null then t.main
+                       if t != null then
+                               t.main
+                               t.mutex.lock
+                               t.is_done = true
+                               var tcond = t.cond
+                               if tcond != null then tcond.signal
+                               t.mutex.unlock
+                       end
+               end
+       end
+end
+
+# A Task which is joinable, meaning it can return a value and if the value is not set yet, it blocks the execution
+class JoinTask
+       super Task
+
+       # Is `self` done ?
+       var is_done = false
+
+       private var mutex = new Mutex
+       private var cond: nullable NativePthreadCond = null
+
+       # Return immediatly if the task terminated, or block waiting for `self` to terminate
+       fun join do
+               mutex.lock
+               if not is_done then
+                       var cond = new NativePthreadCond
+                       self.cond = cond
+                       cond.wait(mutex.native.as(not null))
                end
+               mutex.unlock
        end
 end
diff --git a/tests/sav/jointask_example.res b/tests/sav/jointask_example.res
new file mode 100644 (file)
index 0000000..a0f7bca
--- /dev/null
@@ -0,0 +1,6 @@
+Small task id: 1
+Small task id: 2
+Small task id: 3
+Small task id: 4
+Small task id: 5
+First, long task id: 0