3c5ae98823a139ba74799cc26a988054ed20ec3f
[nit.git] / lib / pthreads / concurrent_collections.nit
1 # This file is part of NIT (http://www.nitlanguage.org).
2 #
3 # Copyright 2014 Alexis Laferrière <alexis.laf@xymus.net>
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 # Introduces thread-safe concurrent collections
18 #
19 # This module offers new thread-safe collections. If you want to
20 # render basic collections thread-safe and don't mind the overhead cost,
21 # you can use `threads::redef_collections`.
22 #
23 # Concurrent collections:
24 #
25 # - [x] `ConcurrentArray`
26 # - [x] `ConcurrentList`
27 # - [ ] `ConcurrentHashMap`
28 # - [ ] `ConcurrentHashSet`
29 # - [ ] `ConcurrentRef`
30 # - [ ] `ConcurrentQueue`
31 #
32 # Introduced collections specialize their critical methods according to the
33 # current implementation in the standard library. If additional services
34 # are added to the underlying collections by refinement or evolution, they
35 # might need to be covered in the concurrent version.
36 module concurrent_collections
37
38 import pthreads
39
40 redef class Collection[E]
41 # Type of the concurrent variant of this collection
42 type CONCURRENT: ConcurrentCollection[E]
43
44 # Wraps `self` in a thread-safe collection
45 fun to_concurrent: CONCURRENT is abstract
46 end
47
48 redef class SequenceRead[E]
49 redef type CONCURRENT: ConcurrentSequenceRead[E]
50 end
51
52 redef class Sequence[E]
53 redef type CONCURRENT: ConcurrentSequence[E]
54 end
55
56 redef class Array[E]
57 redef type CONCURRENT: ConcurrentArray[E]
58
59 redef fun to_concurrent do return new ConcurrentArray[E].wrap(self)
60 end
61
62 redef class List[E]
63 redef type CONCURRENT: ConcurrentList[E]
64
65 redef fun to_concurrent do return new ConcurrentList[E].wrap(self)
66 end
67
68 # A concurrent variant to the standard `Collection`
69 abstract class ConcurrentCollection[E]
70 super Collection[E]
71
72 # Type of the equivalent non thread-safe collection
73 type REAL: Collection[E]
74
75 # Collection wrapped by `self`
76 var real_collection: REAL is noinit
77
78 # `Mutex` used to synchronize access to `self`
79 #
80 # It is used by the implementation on each protected methods. It can also
81 # be used externally to ensure that no other `Thread` modify this object.
82 var mutex = new Mutex
83
84 redef fun count(e)
85 do
86 mutex.lock
87 var r = real_collection.count(e)
88 mutex.unlock
89 return r
90 end
91
92 redef fun first
93 do
94 mutex.lock
95 var r = real_collection.first
96 mutex.unlock
97 return r
98 end
99
100 redef fun has(e)
101 do
102 mutex.lock
103 var r = real_collection.has(e)
104 mutex.unlock
105 return r
106 end
107
108 redef fun has_all(e)
109 do
110 mutex.lock
111 var r = real_collection.has_all(e)
112 mutex.unlock
113 return r
114 end
115
116 redef fun has_only(e)
117 do
118 mutex.lock
119 var r = real_collection.has_only(e)
120 mutex.unlock
121 return r
122 end
123
124 redef fun is_empty
125 do
126 mutex.lock
127 var r = real_collection.is_empty
128 mutex.unlock
129 return r
130 end
131
132 redef fun iterator
133 do
134 mutex.lock
135 var r = real_collection.iterator
136 mutex.unlock
137 return r
138 end
139
140 redef fun length
141 do
142 mutex.lock
143 var r = real_collection.length
144 mutex.unlock
145 return r
146 end
147
148 redef fun to_a
149 do
150 mutex.lock
151 var r = real_collection.to_a
152 mutex.unlock
153 return r
154 end
155
156 redef fun rand
157 do
158 mutex.lock
159 var r = real_collection.rand
160 mutex.unlock
161 return r
162 end
163
164 redef fun join(sep, last_sep)
165 do
166 mutex.lock
167 var r = real_collection.join(sep, last_sep)
168 mutex.unlock
169 return r
170 end
171
172 redef fun to_s
173 do
174 mutex.lock
175 var r = real_collection.to_s
176 mutex.unlock
177 return r
178 end
179 end
180
181 # A concurrent variant to the standard `SequenceRead`
182 abstract class ConcurrentSequenceRead[E]
183 super ConcurrentCollection[E]
184 super SequenceRead[E]
185
186 redef type REAL: SequenceRead[E]
187
188 redef fun ==(o)
189 do
190 mutex.lock
191 var r = real_collection == o
192 mutex.unlock
193 return r
194 end
195
196 redef fun [](index)
197 do
198 mutex.lock
199 var r = real_collection[index]
200 mutex.unlock
201 return r
202 end
203
204 redef fun hash
205 do
206 mutex.lock
207 var r = real_collection.hash
208 mutex.unlock
209 return r
210 end
211
212 redef fun index_of(index)
213 do
214 mutex.lock
215 var r = real_collection.index_of(index)
216 mutex.unlock
217 return r
218 end
219
220 redef fun index_of_from(index, from)
221 do
222 mutex.lock
223 var r = real_collection.index_of_from(index, from)
224 mutex.unlock
225 return r
226 end
227
228 redef fun iterator_from(index)
229 do
230 mutex.lock
231 var r = real_collection.iterator_from(index)
232 mutex.unlock
233 return r
234 end
235
236 redef fun last
237 do
238 mutex.lock
239 var r = real_collection.last
240 mutex.unlock
241 return r
242 end
243
244 redef fun last_index_of(e)
245 do
246 mutex.lock
247 var r = real_collection.last_index_of(e)
248 mutex.unlock
249 return r
250 end
251
252 redef fun last_index_of_from(e, from)
253 do
254 mutex.lock
255 var r = real_collection.last_index_of_from(e, from)
256 mutex.unlock
257 return r
258 end
259
260 redef fun reverse_iterator
261 do
262 mutex.lock
263 var r = real_collection.reverse_iterator
264 mutex.unlock
265 return r
266 end
267
268 redef fun reverse_iterator_from(from)
269 do
270 mutex.lock
271 var r = real_collection.reverse_iterator_from(from)
272 mutex.unlock
273 return r
274 end
275 end
276
277 # A concurrent variant to the standard `Sequence`
278 abstract class ConcurrentSequence[E]
279 super ConcurrentSequenceRead[E]
280 super Sequence[E]
281
282 redef type REAL: Sequence[E]
283
284 redef fun []=(index, e)
285 do
286 mutex.lock
287 real_collection[index] = e
288 mutex.unlock
289 end
290
291 redef fun add(e)
292 do
293 mutex.lock
294 real_collection.add e
295 mutex.unlock
296 end
297
298 redef fun append(e)
299 do
300 mutex.lock
301 real_collection.append e
302 mutex.unlock
303 end
304
305 redef fun first=(e)
306 do
307 mutex.lock
308 real_collection.first = e
309 mutex.unlock
310 end
311
312 redef fun insert(e, i)
313 do
314 mutex.lock
315 real_collection.insert(e, i)
316 mutex.unlock
317 end
318
319 redef fun insert_all(from, pos)
320 do
321 mutex.lock
322 real_collection
323 mutex.unlock
324 end
325
326 redef fun last=(e)
327 do
328 mutex.lock
329 real_collection.last = e
330 mutex.unlock
331 end
332
333 redef fun pop
334 do
335 mutex.lock
336 var r = real_collection.pop
337 mutex.unlock
338 return r
339 end
340
341 redef fun prepend(e)
342 do
343 mutex.lock
344 real_collection.prepend e
345 mutex.unlock
346 end
347
348 redef fun push(e)
349 do
350 mutex.lock
351 real_collection.push e
352 mutex.unlock
353 end
354
355 redef fun remove_at(index)
356 do
357 mutex.lock
358 real_collection.remove_at(index)
359 mutex.unlock
360 end
361
362 redef fun shift
363 do
364 mutex.lock
365 var r = real_collection.shift
366 mutex.unlock
367 return r
368 end
369
370 redef fun unshift(e)
371 do
372 mutex.lock
373 real_collection.unshift(e)
374 mutex.unlock
375 end
376
377 redef fun subarray(start, len)
378 do
379 mutex.lock
380 var r = real_collection.subarray(start, len)
381 mutex.unlock
382 return r
383 end
384 end
385
386 # A concurrent variant to the standard `Array`
387 class ConcurrentArray[E]
388 super ConcurrentSequence[E]
389 super Array[E]
390
391 redef type REAL: Array[E]
392
393 init wrap(real_collection: REAL) do self.real_collection = real_collection
394 init do self.real_collection = new Array[E]
395
396 redef fun clear
397 do
398 mutex.lock
399 real_collection.clear
400 mutex.unlock
401 end
402
403 redef fun enlarge(cap)
404 do
405 mutex.lock
406 real_collection.enlarge(cap)
407 mutex.unlock
408 end
409
410 redef fun remove_all(e)
411 do
412 mutex.lock
413 real_collection.remove_all(e)
414 mutex.unlock
415 end
416
417 redef fun swap_at(a, b)
418 do
419 mutex.lock
420 real_collection.swap_at(a, b)
421 mutex.unlock
422 end
423
424 #
425 ## The following method defs are conflict resolutions
426 #
427
428 redef fun add(e)
429 do
430 mutex.lock
431 real_collection.add e
432 mutex.unlock
433 end
434
435 redef fun length
436 do
437 mutex.lock
438 var r = real_collection.length
439 mutex.unlock
440 return r
441 end
442 end
443
444 # A concurrent variant to the standard `List`
445 class ConcurrentList[E]
446 super ConcurrentSequence[E]
447 super List[E]
448
449 redef type REAL: List[E]
450
451 init wrap(real_collection: REAL) do self.real_collection = real_collection
452 init do self.real_collection = new List[E]
453
454 redef fun link(l)
455 do
456 mutex.lock
457 real_collection.link(l)
458 mutex.unlock
459 end
460
461 redef fun slice(from, to)
462 do
463 mutex.lock
464 var r = real_collection.slice(from, to)
465 mutex.unlock
466 return r
467 end
468
469 #
470 ## The following method defs are conflict resolutions
471 #
472
473 redef fun pop
474 do
475 mutex.lock
476 var r = real_collection.pop
477 mutex.unlock
478 return r
479 end
480
481 redef fun is_empty
482 do
483 mutex.lock
484 var r = real_collection.is_empty
485 mutex.unlock
486 return r
487 end
488
489 redef fun unshift(e)
490 do
491 mutex.lock
492 real_collection.unshift(e)
493 mutex.unlock
494 end
495
496 redef fun push(e)
497 do
498 mutex.lock
499 real_collection.push(e)
500 mutex.unlock
501 end
502
503 redef fun shift
504 do
505 mutex.lock
506 var value = real_collection.shift
507 mutex.unlock
508 return value
509 end
510 end
511
512 # A collection which `is_empty` method blocks until it's empty
513 class ReverseBlockingQueue[E]
514 super ConcurrentList[E]
515
516 # Used to block or signal on waiting threads
517 private var cond = new PthreadCond
518
519 # Adding the signal to release eventual waiting thread(s)
520 redef fun push(e) do
521 mutex.lock
522 real_collection.push(e)
523 mutex.unlock
524 end
525
526 # When the Queue is empty, signal any possible waiting thread
527 redef fun remove(e) do
528 mutex.lock
529 real_collection.remove(e)
530 if real_collection.is_empty then cond.signal
531 mutex.unlock
532 end
533
534 # Wait until the Queue is empty
535 redef fun is_empty do
536 mutex.lock
537 while not real_collection.is_empty do self.cond.wait(mutex)
538 mutex.unlock
539 return true
540 end
541 end
542
543 # A Blocking queue implemented from a `ConcurrentList`
544 # `shift` is blocking if there isn't any element in `self`
545 # `push` or `unshift` releases every blocking threads
546 class BlockingQueue[E]
547 super ConcurrentList[E]
548
549 # Used to block or signal on waiting threads
550 private var cond = new PthreadCond
551
552 # Adding the signal to release eventual waiting thread(s)
553 redef fun push(e) do
554 mutex.lock
555 real_collection.push(e)
556 self.cond.signal
557 real_collection.push(e)
558 mutex.unlock
559 end
560
561 redef fun unshift(e) do
562 mutex.lock
563 real_collection.unshift(e)
564 self.cond.signal
565 mutex.unlock
566 end
567
568 # If empty, blocks until an item is inserted with `push` or `unshift`
569 redef fun shift do
570 mutex.lock
571 while real_collection.is_empty do self.cond.wait(mutex)
572 var r = real_collection.shift
573 mutex.unlock
574 return r
575 end
576
577 redef fun is_empty do
578 mutex.lock
579 var r = real_collection.is_empty
580 mutex.unlock
581 return r
582 end
583 end