Fix synchronizations problem to better track active actors
authorBlackMinou <romain.chanoir@viacesi.fr>
Mon, 8 May 2017 02:06:54 +0000 (22:06 -0400)
committerBlackMinou <romain.chanoir@viacesi.fr>
Wed, 31 May 2017 02:39:27 +0000 (22:39 -0400)
Signed-off-by: BlackMinou <romain.chanoir@viacesi.fr>

lib/actors/actors.nit

index 41ea816..d5e350f 100644 (file)
@@ -17,7 +17,7 @@ module actors is
        new_annotation actor
 end
 
-import pthreads::concurrent_collections
+intrude import pthreads::concurrent_collections
 intrude import pthreads
 intrude import pthreads::extra
 
@@ -34,24 +34,20 @@ abstract class Actor
        var instance: V
 
        # Mailbox used to receive and process messages
-       var mailbox = new BlockingQueue[Message].with_actor(self)
+       var mailbox = new Mailbox[Message].with_actor(self)
 
        # Is `self` working ?
        # i.e. does it have messages to process or is it processing one now ?
-       var working = false
+       var working = true
 
        redef fun main do
                loop
                        var m = mailbox.shift
                        if m isa ShutDownMessage then
-                               sys.active_actors.remove(self)
+                               sys.active_actors.decrement
                                return null
                        end
                        m.invoke(instance)
-                       if mailbox.is_empty then
-                               working = false
-                               sys.active_actors.remove(self)
-                       end
                end
        end
 
@@ -63,6 +59,64 @@ abstract class Actor
        end
 end
 
+# A Blocking queue implemented from a `ConcurrentList`
+# `shift` is blocking if there isn't any element in `self`
+# `push` or `unshift` releases every blocking threads
+# Corresponds to the mailbox of an actor
+class Mailbox[E]
+       super BlockingQueue[E]
+
+       # The associated actor
+       var actor: Actor is noautoinit
+
+       # init self with an associated actor
+       init with_actor(actor: Actor) do
+               self.actor = actor
+               sys.active_actors.increment
+       end
+
+       # Adding the signal to release eventual waiting thread(s)
+       redef fun push(e) do
+               mutex.lock
+               if real_collection.is_empty and not actor.working then
+                       actor.working = true
+                       sys.active_actors.increment
+                       real_collection.push(e)
+                       self.cond.signal
+               else
+                       real_collection.push(e)
+               end
+               mutex.unlock
+       end
+
+       redef fun unshift(e) do
+               mutex.lock
+               real_collection.unshift(e)
+               self.cond.signal
+               mutex.unlock
+       end
+
+       # If empty, blocks until an item is inserted with `push` or `unshift`
+       redef fun shift do
+               mutex.lock
+               if real_collection.is_empty then
+                       actor.working = false
+                       sys.active_actors.decrement
+                       while real_collection.is_empty do self.cond.wait(mutex)
+               end
+               var r = real_collection.shift
+               mutex.unlock
+               return r
+       end
+
+       redef fun is_empty do
+               mutex.lock
+               var r = real_collection.is_empty
+               mutex.unlock
+               return r
+       end
+end
+
 # A Message received by a Mailbox
 # In fact, this is the reification of a call
 # Each Message class represent a call to make on `instance` via `invoke`
@@ -151,91 +205,49 @@ class Future[E]
        end
 end
 
-# A Blocking queue implemented from a `ConcurrentList`
-# `shift` is blocking if there isn't any element in `self`
-# `push` or `unshift` releases every blocking threads
-# Corresponds to the mailbox of an actor
-class BlockingQueue[E]
-       super ConcurrentList[E]
-
-       # The associated actor
-       var actor: Actor is noautoinit
-
-       # Used to block or signal on waiting threads
-       private var cond = new PthreadCond
-
-       # init self with an associated actor
-       init with_actor(actor: Actor) do  self.actor = actor
-
-       # Adding the signal to release eventual waiting thread(s)
-       redef fun push(e) do
-               mutex.lock
-               if real_collection.is_empty and not actor.working then
-                       actor.working = true
-                       sys.active_actors.push(actor)
-               end
-               real_collection.push(e)
-               self.cond.signal
-               mutex.unlock
-       end
-
-       redef fun unshift(e) do
-               mutex.lock
-               real_collection.unshift(e)
-               self.cond.signal
-               mutex.unlock
-       end
+# A counter on which threads can wait until its value is 0
+class SynchronizedCounter
 
-       # If empty, blocks until an item is inserted with `push` or `unshift`
-       redef fun shift do
-               mutex.lock
-               while real_collection.is_empty do self.cond.wait(mutex)
-               var r = real_collection.shift
-               mutex.unlock
-               return r
-       end
-end
+       # The starting value, always starts with 0
+       private var c = 0
 
-# A collection which `is_empty` method blocks until it's empty
-class ReverseBlockingQueue[E]
-       super ConcurrentList[E]
-
-       # Used to block or signal on waiting threads
        private var cond = new PthreadCond
+       private var mutex = new Mutex
 
-       # Adding the signal to release eventual waiting thread(s)
-       redef fun push(e) do
+       # Increment the counter atomically
+       fun increment do
                mutex.lock
-               real_collection.push(e)
+               c += 1
                mutex.unlock
        end
 
-       # When the Queue is empty, signal any
-       # possible waiting thread
-       redef fun remove(e) do
+       # Decrement the counter atomically,
+       # signals to waiting thread(s) if `c == 0`
+       fun decrement do
                mutex.lock
-               real_collection.remove(e)
-               if real_collection.is_empty then cond.signal
+               c -= 1
+               if c == 0 then
+                       cond.signal
+               end
                mutex.unlock
        end
 
-       # Wait until the Queue is empty
-       redef fun is_empty do
+       # Block until `c == 0`
+       fun wait do
                mutex.lock
-               while not real_collection.is_empty do self.cond.wait(mutex)
+               while c != 0 do cond.wait(mutex)
                mutex.unlock
-               return true
        end
 end
 
 redef class Sys
 
-       # List of running actors
-       var active_actors = new ReverseBlockingQueue[Actor] is lazy
+       # Number of active actors
+       var active_actors = new SynchronizedCounter
 
        redef fun run do
                super
                # The program won't end until every actor is done
-               active_actors.is_empty
+               active_actors.wait
        end
 end