simple threadpool implementation
authorBlackMinou <romain.chanoir@viacesi.fr>
Tue, 24 Nov 2015 16:01:15 +0000 (11:01 -0500)
committerBlackMinou <romain.chanoir@viacesi.fr>
Sat, 28 Nov 2015 16:16:58 +0000 (11:16 -0500)
Signed-off-by: BlackMinou <romain.chanoir@viacesi.fr>

lib/pthreads/concurrent_collections.nit
lib/pthreads/examples/threadpool_example.nit [new file with mode: 0644]
lib/pthreads/threadpool.nit [new file with mode: 0644]

index 6df0ea5..67e6765 100644 (file)
@@ -492,4 +492,11 @@ class ConcurrentList[E]
                real_collection.unshift(e)
                mutex.unlock
        end
+
+       redef fun push(e)
+       do
+               mutex.lock
+               real_collection.push(e)
+               mutex.unlock
+       end
 end
diff --git a/lib/pthreads/examples/threadpool_example.nit b/lib/pthreads/examples/threadpool_example.nit
new file mode 100644 (file)
index 0000000..f7fafd7
--- /dev/null
@@ -0,0 +1,42 @@
+# This file is part of NIT (http://www.nitlanguage.org).
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Simple example using threadpool
+module threadpool_example
+
+import threadpool
+
+# Task printing "hello world" on standard output
+class HWTask
+       super Task
+
+       # Sleeping time
+       var sec: Int
+
+       # id
+       var id: Int
+       redef fun main do
+               print "Hello from {id}"
+               nanosleep(sec, 0)
+               print "World from {id}"
+       end
+end
+
+var tp = new ThreadPool
+for i in 100.times do
+       var t = new HWTask(2.rand, i)
+       tp.execute(t)
+end
+
+nanosleep(20,10)
diff --git a/lib/pthreads/threadpool.nit b/lib/pthreads/threadpool.nit
new file mode 100644 (file)
index 0000000..42931d3
--- /dev/null
@@ -0,0 +1,66 @@
+# This file is part of NIT (http://www.nitlanguage.org).
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Introduces a minimal ThreadPool implementation using Tasks
+module threadpool
+
+intrude import pthreads
+import concurrent_collections
+
+# A simple ThreadPool implemented with an array
+class ThreadPool
+       private var queue = new ConcurrentList[Task].wrap(new List[Task])
+       private var mutex = new Mutex
+       private var cond = new NativePthreadCond
+
+       # Number of threads used
+       var nb_threads: Int is noinit
+
+       init do
+               for i in [0..nb_threads[ do
+                       var t = new PoolThread(queue, mutex, cond)
+                       t.start
+               end
+       end
+
+       private fun set_nb_threads(nb: nullable Int) is autoinit do nb_threads = nb or else 5
+
+       # Adds a Task into the queue
+       fun execute(task: Task) do
+               queue.push(task)
+               cond.signal
+       end
+end
+
+# A Thread running in a threadpool
+private class PoolThread
+       super Thread
+
+       var queue: ConcurrentList[Task]
+       var mutex: Mutex
+       var cond : NativePthreadCond
+
+       redef fun main do
+               loop
+                       var t: nullable Task = null
+                       mutex.lock
+                       if queue.is_empty then cond.wait(mutex.native.as(not null))
+                       if not queue.is_empty then
+                               t = queue.pop
+                       end
+                       mutex.unlock
+                       if t != null then t.main
+               end
+       end
+end