41ea816d337cba8736bbb94eb8fcc530e2c179db
[nit.git] / lib / actors / actors.nit
1 # This file is part of NIT (http://www.nitlanguage.org).
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 # Abstraction of the actors concepts
16 module actors is
17 new_annotation actor
18 end
19
20 import pthreads::concurrent_collections
21 intrude import pthreads
22 intrude import pthreads::extra
23
24 # Abstraction of an actor
25 # It has a mailbox, can receive and process messages asynchronously
26 abstract class Actor
27 super Thread
28
29 # Type of the proxied class (or working class)
30 type V: Object
31
32 # The instance used to do the real work
33 # i.e. the real working object
34 var instance: V
35
36 # Mailbox used to receive and process messages
37 var mailbox = new BlockingQueue[Message].with_actor(self)
38
39 # Is `self` working ?
40 # i.e. does it have messages to process or is it processing one now ?
41 var working = false
42
43 redef fun main do
44 loop
45 var m = mailbox.shift
46 if m isa ShutDownMessage then
47 sys.active_actors.remove(self)
48 return null
49 end
50 m.invoke(instance)
51 if mailbox.is_empty then
52 working = false
53 sys.active_actors.remove(self)
54 end
55 end
56 end
57
58 # Ends `self`, cancel ongoing work abrutly
59 # Pretty dangerous to use
60 fun kill do
61 var n = self.native
62 if n != null then n.cancel
63 end
64 end
65
66 # A Message received by a Mailbox
67 # In fact, this is the reification of a call
68 # Each Message class represent a call to make on `instance` via `invoke`
69 abstract class Message
70
71 # Type of the class on which `self` make the call
72 type E: Object
73
74 # Redef this method so it calls the right one on `instance` (double dispatch)
75 fun invoke(instance: E) is abstract
76 end
77
78 # Abstraction of proxies for threaded actors
79 class Proxy
80
81 # Type of the actor `self` is proxiing
82 type E: Actor
83
84 # The proxied actor
85 var actor: E is noinit
86
87 # Kill `actor` without mercy
88 fun kill do actor.kill
89
90 # Tell `actor` to terminate properly
91 # Queueing a ShutDownMessage to the end of its mailbox
92 fun terminate do
93 var msg = new ShutDownMessage
94 actor.mailbox.push(msg)
95 end
96
97 # Tell `actor` to terminate now
98 # Queueing a ShutDownMessage before every other ones
99 fun terminate_now do
100 var msg = new ShutDownMessage
101 actor.mailbox.unshift(msg)
102 end
103
104 # Wait for `actor` to terminate
105 fun wait_termination do actor.join
106 end
107
108 # A Message to Rule them all... properly shutdown an Actor
109 # It's behaviour is implemented in the actor itself
110 class ShutDownMessage
111 super Message
112 end
113
114 # The promise of a value which will be set asynchronously
115 class Future[E]
116 # Value promised by `self`
117 var value: nullable E = null
118
119 # Mutex for synchronisation
120 protected var mutex = new Mutex
121
122 # Condition variable for synchronisation
123 protected var cond: nullable PthreadCond = null
124
125 # Can be used to check if the value is available without waiting
126 protected var is_done = false
127
128 # Set the value and signal so that, someone waiting for `value` can retrieve it
129 fun set_value(value: E) do
130 mutex.lock
131 is_done = true
132 self.value = value
133 var cond = self.cond
134 if cond != null then cond.signal
135 mutex.unlock
136 end
137
138 # Return immediatly if `value` is set, or block waiting for `value` to be set
139 fun join: E do
140 mutex.lock
141 if not is_done then
142 var cond = self.cond
143 if cond == null then
144 cond = new PthreadCond
145 self.cond = cond
146 end
147 cond.wait(mutex)
148 end
149 mutex.unlock
150 return value
151 end
152 end
153
154 # A Blocking queue implemented from a `ConcurrentList`
155 # `shift` is blocking if there isn't any element in `self`
156 # `push` or `unshift` releases every blocking threads
157 # Corresponds to the mailbox of an actor
158 class BlockingQueue[E]
159 super ConcurrentList[E]
160
161 # The associated actor
162 var actor: Actor is noautoinit
163
164 # Used to block or signal on waiting threads
165 private var cond = new PthreadCond
166
167 # init self with an associated actor
168 init with_actor(actor: Actor) do self.actor = actor
169
170 # Adding the signal to release eventual waiting thread(s)
171 redef fun push(e) do
172 mutex.lock
173 if real_collection.is_empty and not actor.working then
174 actor.working = true
175 sys.active_actors.push(actor)
176 end
177 real_collection.push(e)
178 self.cond.signal
179 mutex.unlock
180 end
181
182 redef fun unshift(e) do
183 mutex.lock
184 real_collection.unshift(e)
185 self.cond.signal
186 mutex.unlock
187 end
188
189 # If empty, blocks until an item is inserted with `push` or `unshift`
190 redef fun shift do
191 mutex.lock
192 while real_collection.is_empty do self.cond.wait(mutex)
193 var r = real_collection.shift
194 mutex.unlock
195 return r
196 end
197 end
198
199 # A collection which `is_empty` method blocks until it's empty
200 class ReverseBlockingQueue[E]
201 super ConcurrentList[E]
202
203 # Used to block or signal on waiting threads
204 private var cond = new PthreadCond
205
206 # Adding the signal to release eventual waiting thread(s)
207 redef fun push(e) do
208 mutex.lock
209 real_collection.push(e)
210 mutex.unlock
211 end
212
213 # When the Queue is empty, signal any
214 # possible waiting thread
215 redef fun remove(e) do
216 mutex.lock
217 real_collection.remove(e)
218 if real_collection.is_empty then cond.signal
219 mutex.unlock
220 end
221
222 # Wait until the Queue is empty
223 redef fun is_empty do
224 mutex.lock
225 while not real_collection.is_empty do self.cond.wait(mutex)
226 mutex.unlock
227 return true
228 end
229 end
230
231 redef class Sys
232
233 # List of running actors
234 var active_actors = new ReverseBlockingQueue[Actor] is lazy
235
236 redef fun run do
237 super
238 # The program won't end until every actor is done
239 active_actors.is_empty
240 end
241 end