1 # This file is part of NIT (http://www.nitlanguage.org).
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 # Abstraction of the actors concepts
20 intrude import pthreads
::concurrent_collections
21 intrude import pthreads
22 intrude import pthreads
::extra
24 # Abstraction of an actor
25 # It has a mailbox, can receive and process messages asynchronously
29 # Type of the proxied class (or working class)
32 # The instance used to do the real work
33 # i.e. the real working object
36 # Mailbox used to receive and process messages
37 var mailbox
= new Mailbox[Message].with_actor
(self)
40 # i.e. does it have messages to process or is it processing one now ?
46 if m
isa ShutDownMessage then
47 sys
.active_actors
.decrement
54 # Ends `self`, cancel ongoing work abrutly
55 # Pretty dangerous to use
58 if n
!= null then n
.cancel
62 # A Blocking queue implemented from a `ConcurrentList`
63 # `shift` is blocking if there isn't any element in `self`
64 # `push` or `unshift` releases every blocking threads
65 # Corresponds to the mailbox of an actor
67 super BlockingQueue[E
]
69 # The associated actor
70 var actor
: Actor is noautoinit
72 # init self with an associated actor
73 init with_actor
(actor
: Actor) do
75 sys
.active_actors
.increment
78 # Adding the signal to release eventual waiting thread(s)
81 if real_collection
.is_empty
and not actor
.working
then
83 sys
.active_actors
.increment
84 real_collection
.push
(e
)
87 real_collection
.push
(e
)
92 redef fun unshift
(e
) do
94 real_collection
.unshift
(e
)
99 # If empty, blocks until an item is inserted with `push` or `unshift`
102 if real_collection
.is_empty
then
103 actor
.working
= false
104 sys
.active_actors
.decrement
105 while real_collection
.is_empty
do self.cond
.wait
(mutex
)
107 var r
= real_collection
.shift
112 redef fun is_empty
do
114 var r
= real_collection
.is_empty
120 # A Message received by a Mailbox
121 # In fact, this is the reification of a call
122 # Each Message class represent a call to make on `instance` via `invoke`
123 abstract class Message
125 # Type of the class on which `self` make the call
128 # Redef this method so it calls the right one on `instance` (double dispatch)
129 fun invoke
(instance
: E
) is abstract
132 # Abstraction of proxies for threaded actors
135 # Type of the actor `self` is proxiing
139 var actor
: E
is noinit
141 # Kill `actor` without mercy
142 fun kill
do actor
.kill
144 # Tell `actor` to terminate properly
145 # Queueing a ShutDownMessage to the end of its mailbox
147 var msg
= new ShutDownMessage
148 actor
.mailbox
.push
(msg
)
151 # Tell `actor` to terminate now
152 # Queueing a ShutDownMessage before every other ones
154 var msg
= new ShutDownMessage
155 actor
.mailbox
.unshift
(msg
)
158 # Wait for `actor` to terminate
159 fun wait_termination
do actor
.join
162 # A Message to Rule them all... properly shutdown an Actor
163 # It's behaviour is implemented in the actor itself
164 class ShutDownMessage
168 # The promise of a value which will be set asynchronously
170 # Value promised by `self`
171 var value
: nullable E
= null
173 # Mutex for synchronisation
174 protected var mutex
= new Mutex
176 # Condition variable for synchronisation
177 protected var cond
: nullable PthreadCond = null
179 # Can be used to check if the value is available without waiting
180 protected var is_done
= false
182 # Set the value and signal so that, someone waiting for `value` can retrieve it
183 fun set_value
(value
: E
) do
188 if cond
!= null then cond
.signal
192 # Return immediatly if `value` is set, or block waiting for `value` to be set
198 cond
= new PthreadCond
208 # A counter on which threads can wait until its value is 0
209 class SynchronizedCounter
211 # The starting value, always starts with 0
214 private var cond
= new PthreadCond
215 private var mutex
= new Mutex
217 # Increment the counter atomically
224 # Decrement the counter atomically,
225 # signals to waiting thread(s) if `c == 0`
235 # Block until `c == 0`
238 while c
!= 0 do cond
.wait
(mutex
)
245 # Number of active actors
246 var active_actors
= new SynchronizedCounter
250 # The program won't end until every actor is done