examples: annotate examples
[nit.git] / lib / actors / actors.nit
1 # This file is part of NIT (http://www.nitlanguage.org).
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 # Abstraction of the actors concepts
16 module actors is
17 new_annotation actor
18 end
19
20 intrude import pthreads::concurrent_collections
21 intrude import pthreads
22 intrude import pthreads::extra
23
24 # Abstraction of an actor
25 # It has a mailbox, can receive and process messages asynchronously
26 abstract class Actor
27 super Thread
28
29 # Type of the proxied class (or working class)
30 type V: Object
31
32 # The instance used to do the real work
33 # i.e. the real working object
34 var instance: V
35
36 # Mailbox used to receive and process messages
37 var mailbox = new Mailbox[Message].with_actor(self)
38
39 # Is `self` working ?
40 # i.e. does it have messages to process or is it processing one now ?
41 var working = true
42
43 redef fun main do
44 loop
45 var m = mailbox.shift
46 if m isa ShutDownMessage then
47 sys.active_actors.decrement
48 return null
49 end
50 m.invoke(instance)
51 end
52 end
53
54 # Ends `self`, cancel ongoing work abrutly
55 # Pretty dangerous to use
56 fun kill do
57 var n = self.native
58 if n != null then n.cancel
59 end
60 end
61
62 # A Blocking queue implemented from a `ConcurrentList`
63 # `shift` is blocking if there isn't any element in `self`
64 # `push` or `unshift` releases every blocking threads
65 # Corresponds to the mailbox of an actor
66 class Mailbox[E]
67 super BlockingQueue[E]
68
69 # The associated actor
70 var actor: Actor is noautoinit
71
72 # init self with an associated actor
73 init with_actor(actor: Actor) do
74 self.actor = actor
75 sys.active_actors.increment
76 end
77
78 # Adding the signal to release eventual waiting thread(s)
79 redef fun push(e) do
80 mutex.lock
81 if real_collection.is_empty and not actor.working then
82 actor.working = true
83 sys.active_actors.increment
84 real_collection.push(e)
85 self.cond.signal
86 else
87 real_collection.push(e)
88 end
89 mutex.unlock
90 end
91
92 redef fun unshift(e) do
93 mutex.lock
94 real_collection.unshift(e)
95 self.cond.signal
96 mutex.unlock
97 end
98
99 # If empty, blocks until an item is inserted with `push` or `unshift`
100 redef fun shift do
101 mutex.lock
102 if real_collection.is_empty then
103 actor.working = false
104 sys.active_actors.decrement
105 while real_collection.is_empty do self.cond.wait(mutex)
106 end
107 var r = real_collection.shift
108 mutex.unlock
109 return r
110 end
111
112 redef fun is_empty do
113 mutex.lock
114 var r = real_collection.is_empty
115 mutex.unlock
116 return r
117 end
118 end
119
120 # A Message received by a Mailbox
121 # In fact, this is the reification of a call
122 # Each Message class represent a call to make on `instance` via `invoke`
123 abstract class Message
124
125 # Type of the class on which `self` make the call
126 type E: Object
127
128 # Redef this method so it calls the right one on `instance` (double dispatch)
129 fun invoke(instance: E) is abstract
130 end
131
132 # Abstraction of proxies for threaded actors
133 class Proxy
134
135 # Type of the actor `self` is proxiing
136 type E: Actor
137
138 # The proxied actor
139 var actor: E is noinit
140
141 # Kill `actor` without mercy
142 fun kill do actor.kill
143
144 # Tell `actor` to terminate properly
145 # Queueing a ShutDownMessage to the end of its mailbox
146 fun terminate do
147 var msg = new ShutDownMessage
148 actor.mailbox.push(msg)
149 end
150
151 # Tell `actor` to terminate now
152 # Queueing a ShutDownMessage before every other ones
153 fun terminate_now do
154 var msg = new ShutDownMessage
155 actor.mailbox.unshift(msg)
156 end
157
158 # Wait for `actor` to terminate
159 fun wait_termination do actor.join
160 end
161
162 # A Message to Rule them all... properly shutdown an Actor
163 # It's behaviour is implemented in the actor itself
164 class ShutDownMessage
165 super Message
166 end
167
168 # The promise of a value which will be set asynchronously
169 class Future[E]
170 # Value promised by `self`
171 var value: nullable E = null
172
173 # Mutex for synchronisation
174 protected var mutex = new Mutex
175
176 # Condition variable for synchronisation
177 protected var cond: nullable PthreadCond = null
178
179 # Can be used to check if the value is available without waiting
180 protected var is_done = false
181
182 # Set the value and signal so that, someone waiting for `value` can retrieve it
183 fun set_value(value: E) do
184 mutex.lock
185 is_done = true
186 self.value = value
187 var cond = self.cond
188 if cond != null then cond.signal
189 mutex.unlock
190 end
191
192 # Return immediatly if `value` is set, or block waiting for `value` to be set
193 fun join: E do
194 mutex.lock
195 if not is_done then
196 var cond = self.cond
197 if cond == null then
198 cond = new PthreadCond
199 self.cond = cond
200 end
201 cond.wait(mutex)
202 end
203 mutex.unlock
204 return value
205 end
206 end
207
208 # A counter on which threads can wait until its value is 0
209 class SynchronizedCounter
210
211 # The starting value, always starts with 0
212 private var c = 0
213
214 private var cond = new PthreadCond
215 private var mutex = new Mutex
216
217 # Increment the counter atomically
218 fun increment do
219 mutex.lock
220 c += 1
221 mutex.unlock
222 end
223
224 # Decrement the counter atomically,
225 # signals to waiting thread(s) if `c == 0`
226 fun decrement do
227 mutex.lock
228 c -= 1
229 if c == 0 then
230 cond.signal
231 end
232 mutex.unlock
233 end
234
235 # Block until `c == 0`
236 fun wait do
237 mutex.lock
238 while c != 0 do cond.wait(mutex)
239 mutex.unlock
240 end
241 end
242
243 redef class Sys
244
245 # Number of active actors
246 var active_actors = new SynchronizedCounter
247
248 redef fun run do
249 super
250 # The program won't end until every actor is done
251 active_actors.wait
252 end
253 end