new_annotation actor
end
-import pthreads::concurrent_collections
+intrude import pthreads::concurrent_collections
intrude import pthreads
intrude import pthreads::extra
var instance: V
# Mailbox used to receive and process messages
- var mailbox = new BlockingQueue[Message].with_actor(self)
+ var mailbox = new Mailbox[Message].with_actor(self)
# Is `self` working ?
# i.e. does it have messages to process or is it processing one now ?
- var working = false
+ var working = true
redef fun main do
loop
var m = mailbox.shift
if m isa ShutDownMessage then
- sys.active_actors.remove(self)
+ sys.active_actors.decrement
return null
end
m.invoke(instance)
- if mailbox.is_empty then
- working = false
- sys.active_actors.remove(self)
- end
end
end
end
end
+# A Blocking queue implemented from a `ConcurrentList`
+# `shift` is blocking if there isn't any element in `self`
+# `push` or `unshift` releases every blocking threads
+# Corresponds to the mailbox of an actor
+class Mailbox[E]
+ super BlockingQueue[E]
+
+ # The associated actor
+ var actor: Actor is noautoinit
+
+ # init self with an associated actor
+ init with_actor(actor: Actor) do
+ self.actor = actor
+ sys.active_actors.increment
+ end
+
+ # Adding the signal to release eventual waiting thread(s)
+ redef fun push(e) do
+ mutex.lock
+ if real_collection.is_empty and not actor.working then
+ actor.working = true
+ sys.active_actors.increment
+ real_collection.push(e)
+ self.cond.signal
+ else
+ real_collection.push(e)
+ end
+ mutex.unlock
+ end
+
+ redef fun unshift(e) do
+ mutex.lock
+ real_collection.unshift(e)
+ self.cond.signal
+ mutex.unlock
+ end
+
+ # If empty, blocks until an item is inserted with `push` or `unshift`
+ redef fun shift do
+ mutex.lock
+ if real_collection.is_empty then
+ actor.working = false
+ sys.active_actors.decrement
+ while real_collection.is_empty do self.cond.wait(mutex)
+ end
+ var r = real_collection.shift
+ mutex.unlock
+ return r
+ end
+
+ redef fun is_empty do
+ mutex.lock
+ var r = real_collection.is_empty
+ mutex.unlock
+ return r
+ end
+end
+
# A Message received by a Mailbox
# In fact, this is the reification of a call
# Each Message class represent a call to make on `instance` via `invoke`
end
end
-# A Blocking queue implemented from a `ConcurrentList`
-# `shift` is blocking if there isn't any element in `self`
-# `push` or `unshift` releases every blocking threads
-# Corresponds to the mailbox of an actor
-class BlockingQueue[E]
- super ConcurrentList[E]
-
- # The associated actor
- var actor: Actor is noautoinit
-
- # Used to block or signal on waiting threads
- private var cond = new PthreadCond
-
- # init self with an associated actor
- init with_actor(actor: Actor) do self.actor = actor
-
- # Adding the signal to release eventual waiting thread(s)
- redef fun push(e) do
- mutex.lock
- if real_collection.is_empty and not actor.working then
- actor.working = true
- sys.active_actors.push(actor)
- end
- real_collection.push(e)
- self.cond.signal
- mutex.unlock
- end
-
- redef fun unshift(e) do
- mutex.lock
- real_collection.unshift(e)
- self.cond.signal
- mutex.unlock
- end
+# A counter on which threads can wait until its value is 0
+class SynchronizedCounter
- # If empty, blocks until an item is inserted with `push` or `unshift`
- redef fun shift do
- mutex.lock
- while real_collection.is_empty do self.cond.wait(mutex)
- var r = real_collection.shift
- mutex.unlock
- return r
- end
-end
+ # The starting value, always starts with 0
+ private var c = 0
-# A collection which `is_empty` method blocks until it's empty
-class ReverseBlockingQueue[E]
- super ConcurrentList[E]
-
- # Used to block or signal on waiting threads
private var cond = new PthreadCond
+ private var mutex = new Mutex
- # Adding the signal to release eventual waiting thread(s)
- redef fun push(e) do
+ # Increment the counter atomically
+ fun increment do
mutex.lock
- real_collection.push(e)
+ c += 1
mutex.unlock
end
- # When the Queue is empty, signal any
- # possible waiting thread
- redef fun remove(e) do
+ # Decrement the counter atomically,
+ # signals to waiting thread(s) if `c == 0`
+ fun decrement do
mutex.lock
- real_collection.remove(e)
- if real_collection.is_empty then cond.signal
+ c -= 1
+ if c == 0 then
+ cond.signal
+ end
mutex.unlock
end
- # Wait until the Queue is empty
- redef fun is_empty do
+ # Block until `c == 0`
+ fun wait do
mutex.lock
- while not real_collection.is_empty do self.cond.wait(mutex)
+ while c != 0 do cond.wait(mutex)
mutex.unlock
- return true
end
end
redef class Sys
- # List of running actors
- var active_actors = new ReverseBlockingQueue[Actor] is lazy
+ # Number of active actors
+ var active_actors = new SynchronizedCounter
redef fun run do
super
# The program won't end until every actor is done
- active_actors.is_empty
+ active_actors.wait
end
end