41ea816d337cba8736bbb94eb8fcc530e2c179db
1 # This file is part of NIT (http://www.nitlanguage.org).
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 # Abstraction of the actors concepts
20 import pthreads
::concurrent_collections
21 intrude import pthreads
22 intrude import pthreads
::extra
24 # Abstraction of an actor
25 # It has a mailbox, can receive and process messages asynchronously
29 # Type of the proxied class (or working class)
32 # The instance used to do the real work
33 # i.e. the real working object
36 # Mailbox used to receive and process messages
37 var mailbox
= new BlockingQueue[Message].with_actor
(self)
40 # i.e. does it have messages to process or is it processing one now ?
46 if m
isa ShutDownMessage then
47 sys
.active_actors
.remove
(self)
51 if mailbox
.is_empty
then
53 sys
.active_actors
.remove
(self)
58 # Ends `self`, cancel ongoing work abrutly
59 # Pretty dangerous to use
62 if n
!= null then n
.cancel
66 # A Message received by a Mailbox
67 # In fact, this is the reification of a call
68 # Each Message class represent a call to make on `instance` via `invoke`
69 abstract class Message
71 # Type of the class on which `self` make the call
74 # Redef this method so it calls the right one on `instance` (double dispatch)
75 fun invoke
(instance
: E
) is abstract
78 # Abstraction of proxies for threaded actors
81 # Type of the actor `self` is proxiing
85 var actor
: E
is noinit
87 # Kill `actor` without mercy
88 fun kill
do actor
.kill
90 # Tell `actor` to terminate properly
91 # Queueing a ShutDownMessage to the end of its mailbox
93 var msg
= new ShutDownMessage
94 actor
.mailbox
.push
(msg
)
97 # Tell `actor` to terminate now
98 # Queueing a ShutDownMessage before every other ones
100 var msg
= new ShutDownMessage
101 actor
.mailbox
.unshift
(msg
)
104 # Wait for `actor` to terminate
105 fun wait_termination
do actor
.join
108 # A Message to Rule them all... properly shutdown an Actor
109 # It's behaviour is implemented in the actor itself
110 class ShutDownMessage
114 # The promise of a value which will be set asynchronously
116 # Value promised by `self`
117 var value
: nullable E
= null
119 # Mutex for synchronisation
120 protected var mutex
= new Mutex
122 # Condition variable for synchronisation
123 protected var cond
: nullable PthreadCond = null
125 # Can be used to check if the value is available without waiting
126 protected var is_done
= false
128 # Set the value and signal so that, someone waiting for `value` can retrieve it
129 fun set_value
(value
: E
) do
134 if cond
!= null then cond
.signal
138 # Return immediatly if `value` is set, or block waiting for `value` to be set
144 cond
= new PthreadCond
154 # A Blocking queue implemented from a `ConcurrentList`
155 # `shift` is blocking if there isn't any element in `self`
156 # `push` or `unshift` releases every blocking threads
157 # Corresponds to the mailbox of an actor
158 class BlockingQueue[E
]
159 super ConcurrentList[E
]
161 # The associated actor
162 var actor
: Actor is noautoinit
164 # Used to block or signal on waiting threads
165 private var cond
= new PthreadCond
167 # init self with an associated actor
168 init with_actor
(actor
: Actor) do self.actor
= actor
170 # Adding the signal to release eventual waiting thread(s)
173 if real_collection
.is_empty
and not actor
.working
then
175 sys
.active_actors
.push
(actor
)
177 real_collection
.push
(e
)
182 redef fun unshift
(e
) do
184 real_collection
.unshift
(e
)
189 # If empty, blocks until an item is inserted with `push` or `unshift`
192 while real_collection
.is_empty
do self.cond
.wait
(mutex
)
193 var r
= real_collection
.shift
199 # A collection which `is_empty` method blocks until it's empty
200 class ReverseBlockingQueue[E
]
201 super ConcurrentList[E
]
203 # Used to block or signal on waiting threads
204 private var cond
= new PthreadCond
206 # Adding the signal to release eventual waiting thread(s)
209 real_collection
.push
(e
)
213 # When the Queue is empty, signal any
214 # possible waiting thread
215 redef fun remove
(e
) do
217 real_collection
.remove
(e
)
218 if real_collection
.is_empty
then cond
.signal
222 # Wait until the Queue is empty
223 redef fun is_empty
do
225 while not real_collection
.is_empty
do self.cond
.wait
(mutex
)
233 # List of running actors
234 var active_actors
= new ReverseBlockingQueue[Actor] is lazy
238 # The program won't end until every actor is done
239 active_actors
.is_empty