From: BlackMinou Date: Mon, 8 May 2017 02:03:39 +0000 (-0400) Subject: Move `BlockingQueue` and `ReverseBlockingQueue` to `concurrent_collection` X-Git-Url: http://nitlanguage.org Move `BlockingQueue` and `ReverseBlockingQueue` to `concurrent_collection` Signed-off-by: BlackMinou --- diff --git a/lib/pthreads/concurrent_collections.nit b/lib/pthreads/concurrent_collections.nit index e0c53af..3c5ae98 100644 --- a/lib/pthreads/concurrent_collections.nit +++ b/lib/pthreads/concurrent_collections.nit @@ -508,3 +508,76 @@ class ConcurrentList[E] return value end end + +# A collection which `is_empty` method blocks until it's empty +class ReverseBlockingQueue[E] + super ConcurrentList[E] + + # Used to block or signal on waiting threads + private var cond = new PthreadCond + + # Adding the signal to release eventual waiting thread(s) + redef fun push(e) do + mutex.lock + real_collection.push(e) + mutex.unlock + end + + # When the Queue is empty, signal any possible waiting thread + redef fun remove(e) do + mutex.lock + real_collection.remove(e) + if real_collection.is_empty then cond.signal + mutex.unlock + end + + # Wait until the Queue is empty + redef fun is_empty do + mutex.lock + while not real_collection.is_empty do self.cond.wait(mutex) + mutex.unlock + return true + end +end + +# A Blocking queue implemented from a `ConcurrentList` +# `shift` is blocking if there isn't any element in `self` +# `push` or `unshift` releases every blocking threads +class BlockingQueue[E] + super ConcurrentList[E] + + # Used to block or signal on waiting threads + private var cond = new PthreadCond + + # Adding the signal to release eventual waiting thread(s) + redef fun push(e) do + mutex.lock + real_collection.push(e) + self.cond.signal + real_collection.push(e) + mutex.unlock + end + + redef fun unshift(e) do + mutex.lock + real_collection.unshift(e) + self.cond.signal + mutex.unlock + end + + # If empty, blocks until an item is inserted with `push` or `unshift` + redef fun shift do + mutex.lock + while real_collection.is_empty do self.cond.wait(mutex) + var r = real_collection.shift + mutex.unlock + return r + end + + redef fun is_empty do + mutex.lock + var r = real_collection.is_empty + mutex.unlock + return r + end +end