concurrent_collections: Adding implementation of has method
[nit.git] / lib / pthreads / concurrent_collections.nit
1 # This file is part of NIT (http://www.nitlanguage.org).
2 #
3 # Copyright 2014 Alexis Laferrière <alexis.laf@xymus.net>
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 # Introduces thread-safe concurrent collections
18 #
19 # This module offers new thread-safe collections. If you want to
20 # render basic collections thread-safe and don't mind the overhead cost,
21 # you can use `threads::redef_collections`.
22 #
23 # Concurrent collections:
24 #
25 # - [x] `ConcurrentArray`
26 # - [x] `ConcurrentList`
27 # - [ ] `ConcurrentHashMap`
28 # - [ ] `ConcurrentHashSet`
29 # - [ ] `ConcurrentRef`
30 # - [ ] `ConcurrentQueue`
31 #
32 # Introduced collections specialize their critical methods according to the
33 # current implementation in the standard library. If additional services
34 # are added to the underlying collections by refinement or evolution, they
35 # might need to be covered in the concurrent version.
36 module concurrent_collections
37
38 import pthreads
39
40 redef class Collection[E]
41 # Type of the concurrent variant of this collection
42 type CONCURRENT: ConcurrentCollection[E]
43
44 # Wraps `self` in a thread-safe collection
45 fun to_concurrent: CONCURRENT is abstract
46 end
47
48 redef class SequenceRead[E]
49 redef type CONCURRENT: ConcurrentSequenceRead[E]
50 end
51
52 redef class Sequence[E]
53 redef type CONCURRENT: ConcurrentSequence[E]
54 end
55
56 redef class Array[E]
57 redef type CONCURRENT: ConcurrentArray[E]
58
59 redef fun to_concurrent do return new ConcurrentArray[E].wrap(self)
60 end
61
62 redef class List[E]
63 redef type CONCURRENT: ConcurrentList[E]
64
65 redef fun to_concurrent do return new ConcurrentList[E].wrap(self)
66 end
67
68 # A concurrent variant to the standard `Collection`
69 abstract class ConcurrentCollection[E]
70 super Collection[E]
71
72 # Type of the equivalent non thread-safe collection
73 type REAL: Collection[E]
74
75 # Collection wrapped by `self`
76 var real_collection: REAL is noinit
77
78 # `Mutex` used to synchronize access to `self`
79 #
80 # It is used by the implementation on each protected methods. It can also
81 # be used externally to ensure that no other `Thread` modify this object.
82 var mutex = new Mutex
83
84 redef fun count(e)
85 do
86 mutex.lock
87 var r = real_collection.count(e)
88 mutex.unlock
89 return r
90 end
91
92 redef fun first
93 do
94 mutex.lock
95 var r = real_collection.first
96 mutex.unlock
97 return r
98 end
99
100 redef fun has(e)
101 do
102 mutex.lock
103 var r = real_collection.has(e)
104 mutex.unlock
105 return r
106 end
107
108 redef fun has_all(e)
109 do
110 mutex.lock
111 var r = real_collection.has_all(e)
112 mutex.unlock
113 return r
114 end
115
116 redef fun has_only(e)
117 do
118 mutex.lock
119 var r = real_collection.has_only(e)
120 mutex.unlock
121 return r
122 end
123
124 redef fun is_empty
125 do
126 mutex.lock
127 var r = real_collection.is_empty
128 mutex.unlock
129 return r
130 end
131
132 redef fun iterator
133 do
134 mutex.lock
135 var r = real_collection.iterator
136 mutex.unlock
137 return r
138 end
139
140 redef fun length
141 do
142 mutex.lock
143 var r = real_collection.length
144 mutex.unlock
145 return r
146 end
147
148 redef fun to_a
149 do
150 mutex.lock
151 var r = real_collection.to_a
152 mutex.unlock
153 return r
154 end
155
156 redef fun rand
157 do
158 mutex.lock
159 var r = real_collection.rand
160 mutex.unlock
161 return r
162 end
163
164 redef fun join(sep, last_sep)
165 do
166 mutex.lock
167 var r = real_collection.join(sep, last_sep)
168 mutex.unlock
169 return r
170 end
171
172 redef fun to_s
173 do
174 mutex.lock
175 var r = real_collection.to_s
176 mutex.unlock
177 return r
178 end
179 end
180
181 # A concurrent variant to the standard `SequenceRead`
182 abstract class ConcurrentSequenceRead[E]
183 super ConcurrentCollection[E]
184 super SequenceRead[E]
185
186 redef type REAL: SequenceRead[E]
187
188 redef fun ==(o)
189 do
190 mutex.lock
191 var r = real_collection == o
192 mutex.unlock
193 return r
194 end
195
196 redef fun [](index)
197 do
198 mutex.lock
199 var r = real_collection[index]
200 mutex.unlock
201 return r
202 end
203
204 redef fun hash
205 do
206 mutex.lock
207 var r = real_collection.hash
208 mutex.unlock
209 return r
210 end
211
212 redef fun index_of(index)
213 do
214 mutex.lock
215 var r = real_collection.index_of(index)
216 mutex.unlock
217 return r
218 end
219
220 redef fun index_of_from(index, from)
221 do
222 mutex.lock
223 var r = real_collection.index_of_from(index, from)
224 mutex.unlock
225 return r
226 end
227
228 redef fun iterator_from(index)
229 do
230 mutex.lock
231 var r = real_collection.iterator_from(index)
232 mutex.unlock
233 return r
234 end
235
236 redef fun last
237 do
238 mutex.lock
239 var r = real_collection.last
240 mutex.unlock
241 return r
242 end
243
244 redef fun last_index_of(e)
245 do
246 mutex.lock
247 var r = real_collection.last_index_of(e)
248 mutex.unlock
249 return r
250 end
251
252 redef fun last_index_of_from(e, from)
253 do
254 mutex.lock
255 var r = real_collection.last_index_of_from(e, from)
256 mutex.unlock
257 return r
258 end
259
260 redef fun reverse_iterator
261 do
262 mutex.lock
263 var r = real_collection.reverse_iterator
264 mutex.unlock
265 return r
266 end
267
268 redef fun reverse_iterator_from(from)
269 do
270 mutex.lock
271 var r = real_collection.reverse_iterator_from(from)
272 mutex.unlock
273 return r
274 end
275 end
276
277 # A concurrent variant to the standard `Sequence`
278 abstract class ConcurrentSequence[E]
279 super ConcurrentSequenceRead[E]
280 super Sequence[E]
281
282 redef type REAL: Sequence[E]
283
284 redef fun []=(index, e)
285 do
286 mutex.lock
287 real_collection[index] = e
288 mutex.unlock
289 end
290
291 redef fun add(e)
292 do
293 mutex.lock
294 real_collection.add e
295 mutex.unlock
296 end
297
298 redef fun append(e)
299 do
300 mutex.lock
301 real_collection.append e
302 mutex.unlock
303 end
304
305 redef fun first=(e)
306 do
307 mutex.lock
308 real_collection.first = e
309 mutex.unlock
310 end
311
312 redef fun insert(e, i)
313 do
314 mutex.lock
315 real_collection.insert(e, i)
316 mutex.unlock
317 end
318
319 redef fun insert_all(from, pos)
320 do
321 mutex.lock
322 real_collection
323 mutex.unlock
324 end
325
326 redef fun last=(e)
327 do
328 mutex.lock
329 real_collection.last = e
330 mutex.unlock
331 end
332
333 redef fun pop
334 do
335 mutex.lock
336 var r = real_collection.pop
337 mutex.unlock
338 return r
339 end
340
341 redef fun prepend(e)
342 do
343 mutex.lock
344 real_collection.prepend e
345 mutex.unlock
346 end
347
348 redef fun push(e)
349 do
350 mutex.lock
351 real_collection.push e
352 mutex.unlock
353 end
354
355 redef fun remove_at(index)
356 do
357 mutex.lock
358 real_collection.remove_at(index)
359 mutex.unlock
360 end
361
362 redef fun shift
363 do
364 mutex.lock
365 var r = real_collection.shift
366 mutex.unlock
367 return r
368 end
369
370 redef fun unshift(e)
371 do
372 mutex.lock
373 real_collection.unshift(e)
374 mutex.unlock
375 end
376
377 redef fun subarray(start, len)
378 do
379 mutex.lock
380 var r = real_collection.subarray(start, len)
381 mutex.unlock
382 return r
383 end
384 end
385
386 # A concurrent variant to the standard `Array`
387 class ConcurrentArray[E]
388 super ConcurrentSequence[E]
389 super Array[E]
390
391 redef type REAL: Array[E]
392
393 init wrap(real_collection: REAL) do self.real_collection = real_collection
394 init do self.real_collection = new Array[E]
395
396 redef fun clear
397 do
398 mutex.lock
399 real_collection.clear
400 mutex.unlock
401 end
402
403 redef fun enlarge(cap)
404 do
405 mutex.lock
406 real_collection.enlarge(cap)
407 mutex.unlock
408 end
409
410 redef fun remove_all(e)
411 do
412 mutex.lock
413 real_collection.remove_all(e)
414 mutex.unlock
415 end
416
417 redef fun swap_at(a, b)
418 do
419 mutex.lock
420 real_collection.swap_at(a, b)
421 mutex.unlock
422 end
423
424 redef fun has(e)
425 do
426 mutex.lock
427 var result = real_collection.has(e)
428 mutex.unlock
429 return result
430 end
431
432 #
433 ## The following method defs are conflict resolutions
434 #
435
436 redef fun add(e)
437 do
438 mutex.lock
439 real_collection.add e
440 mutex.unlock
441 end
442
443 redef fun length
444 do
445 mutex.lock
446 var r = real_collection.length
447 mutex.unlock
448 return r
449 end
450 end
451
452 # A concurrent variant to the standard `List`
453 class ConcurrentList[E]
454 super ConcurrentSequence[E]
455 super List[E]
456
457 redef type REAL: List[E]
458
459 init wrap(real_collection: REAL) do self.real_collection = real_collection
460 init do self.real_collection = new List[E]
461
462 redef fun link(l)
463 do
464 mutex.lock
465 real_collection.link(l)
466 mutex.unlock
467 end
468
469 redef fun slice(from, to)
470 do
471 mutex.lock
472 var r = real_collection.slice(from, to)
473 mutex.unlock
474 return r
475 end
476
477 #
478 ## The following method defs are conflict resolutions
479 #
480
481 redef fun pop
482 do
483 mutex.lock
484 var r = real_collection.pop
485 mutex.unlock
486 return r
487 end
488
489 redef fun is_empty
490 do
491 mutex.lock
492 var r = real_collection.is_empty
493 mutex.unlock
494 return r
495 end
496
497 redef fun unshift(e)
498 do
499 mutex.lock
500 real_collection.unshift(e)
501 mutex.unlock
502 end
503
504 redef fun push(e)
505 do
506 mutex.lock
507 real_collection.push(e)
508 mutex.unlock
509 end
510
511 redef fun shift
512 do
513 mutex.lock
514 var value = real_collection.shift
515 mutex.unlock
516 return value
517 end
518 end
519
520 # A collection which `is_empty` method blocks until it's empty
521 class ReverseBlockingQueue[E]
522 super ConcurrentList[E]
523
524 # Used to block or signal on waiting threads
525 private var cond = new PthreadCond
526
527 # Adding the signal to release eventual waiting thread(s)
528 redef fun push(e) do
529 mutex.lock
530 real_collection.push(e)
531 mutex.unlock
532 end
533
534 # When the Queue is empty, signal any possible waiting thread
535 redef fun remove(e) do
536 mutex.lock
537 real_collection.remove(e)
538 if real_collection.is_empty then cond.signal
539 mutex.unlock
540 end
541
542 # Wait until the Queue is empty
543 redef fun is_empty do
544 mutex.lock
545 while not real_collection.is_empty do self.cond.wait(mutex)
546 mutex.unlock
547 return true
548 end
549 end
550
551 # A Blocking queue implemented from a `ConcurrentList`
552 # `shift` is blocking if there isn't any element in `self`
553 # `push` or `unshift` releases every blocking threads
554 class BlockingQueue[E]
555 super ConcurrentList[E]
556
557 # Used to block or signal on waiting threads
558 private var cond = new PthreadCond
559
560 # Adding the signal to release eventual waiting thread(s)
561 redef fun push(e) do
562 mutex.lock
563 real_collection.push(e)
564 self.cond.signal
565 real_collection.push(e)
566 mutex.unlock
567 end
568
569 redef fun unshift(e) do
570 mutex.lock
571 real_collection.unshift(e)
572 self.cond.signal
573 mutex.unlock
574 end
575
576 # If empty, blocks until an item is inserted with `push` or `unshift`
577 redef fun shift do
578 mutex.lock
579 while real_collection.is_empty do self.cond.wait(mutex)
580 var r = real_collection.shift
581 mutex.unlock
582 return r
583 end
584
585 redef fun is_empty do
586 mutex.lock
587 var r = real_collection.is_empty
588 mutex.unlock
589 return r
590 end
591 end