Move `BlockingQueue` and `ReverseBlockingQueue` to `concurrent_collection`
authorBlackMinou <romain.chanoir@viacesi.fr>
Mon, 8 May 2017 02:03:39 +0000 (22:03 -0400)
committerBlackMinou <romain.chanoir@viacesi.fr>
Wed, 31 May 2017 02:39:27 +0000 (22:39 -0400)
Signed-off-by: BlackMinou <romain.chanoir@viacesi.fr>

lib/pthreads/concurrent_collections.nit

index e0c53af..3c5ae98 100644 (file)
@@ -508,3 +508,76 @@ class ConcurrentList[E]
                return value
        end
 end
+
+# A collection which `is_empty` method blocks until it's empty
+class ReverseBlockingQueue[E]
+       super ConcurrentList[E]
+
+       # Used to block or signal on waiting threads
+       private var cond = new PthreadCond
+
+       # Adding the signal to release eventual waiting thread(s)
+       redef fun push(e) do
+               mutex.lock
+               real_collection.push(e)
+               mutex.unlock
+       end
+
+       # When the Queue is empty, signal any possible waiting thread
+       redef fun remove(e) do
+               mutex.lock
+               real_collection.remove(e)
+               if real_collection.is_empty then cond.signal
+               mutex.unlock
+       end
+
+       # Wait until the Queue is empty
+       redef fun is_empty do
+               mutex.lock
+               while not real_collection.is_empty do self.cond.wait(mutex)
+               mutex.unlock
+               return true
+       end
+end
+
+# A Blocking queue implemented from a `ConcurrentList`
+# `shift` is blocking if there isn't any element in `self`
+# `push` or `unshift` releases every blocking threads
+class BlockingQueue[E]
+       super ConcurrentList[E]
+
+       # Used to block or signal on waiting threads
+       private var cond = new PthreadCond
+
+       # Adding the signal to release eventual waiting thread(s)
+       redef fun push(e) do
+               mutex.lock
+               real_collection.push(e)
+               self.cond.signal
+               real_collection.push(e)
+               mutex.unlock
+       end
+
+       redef fun unshift(e) do
+               mutex.lock
+               real_collection.unshift(e)
+               self.cond.signal
+               mutex.unlock
+       end
+
+       # If empty, blocks until an item is inserted with `push` or `unshift`
+       redef fun shift do
+               mutex.lock
+               while real_collection.is_empty do self.cond.wait(mutex)
+               var r = real_collection.shift
+               mutex.unlock
+               return r
+       end
+
+       redef fun is_empty do
+               mutex.lock
+               var r = real_collection.is_empty
+               mutex.unlock
+               return r
+       end
+end