X-Git-Url: http://nitlanguage.org diff --git a/lib/pthreads/concurrent_collections.nit b/lib/pthreads/concurrent_collections.nit index 67e6765..3c5ae98 100644 --- a/lib/pthreads/concurrent_collections.nit +++ b/lib/pthreads/concurrent_collections.nit @@ -161,10 +161,10 @@ abstract class ConcurrentCollection[E] return r end - redef fun join(sep) + redef fun join(sep, last_sep) do mutex.lock - var r = real_collection.join(sep) + var r = real_collection.join(sep, last_sep) mutex.unlock return r end @@ -499,4 +499,85 @@ class ConcurrentList[E] real_collection.push(e) mutex.unlock end + + redef fun shift + do + mutex.lock + var value = real_collection.shift + mutex.unlock + return value + end +end + +# A collection which `is_empty` method blocks until it's empty +class ReverseBlockingQueue[E] + super ConcurrentList[E] + + # Used to block or signal on waiting threads + private var cond = new PthreadCond + + # Adding the signal to release eventual waiting thread(s) + redef fun push(e) do + mutex.lock + real_collection.push(e) + mutex.unlock + end + + # When the Queue is empty, signal any possible waiting thread + redef fun remove(e) do + mutex.lock + real_collection.remove(e) + if real_collection.is_empty then cond.signal + mutex.unlock + end + + # Wait until the Queue is empty + redef fun is_empty do + mutex.lock + while not real_collection.is_empty do self.cond.wait(mutex) + mutex.unlock + return true + end +end + +# A Blocking queue implemented from a `ConcurrentList` +# `shift` is blocking if there isn't any element in `self` +# `push` or `unshift` releases every blocking threads +class BlockingQueue[E] + super ConcurrentList[E] + + # Used to block or signal on waiting threads + private var cond = new PthreadCond + + # Adding the signal to release eventual waiting thread(s) + redef fun push(e) do + mutex.lock + real_collection.push(e) + self.cond.signal + real_collection.push(e) + mutex.unlock + end + + redef fun unshift(e) do + mutex.lock + real_collection.unshift(e) + self.cond.signal + mutex.unlock + end + + # If empty, blocks until an item is inserted with `push` or `unshift` + redef fun shift do + mutex.lock + while real_collection.is_empty do self.cond.wait(mutex) + var r = real_collection.shift + mutex.unlock + return r + end + + redef fun is_empty do + mutex.lock + var r = real_collection.is_empty + mutex.unlock + return r + end end