8d3ec079127f4d2ba6d21c7d613b6734fc33f2d7
[nit.git] / lib / libevent / libevent.nit
1 # This file is part of NIT (http://www.nitlanguage.org).
2 #
3 # Copyright 2013 Jean-Philippe Caissy <jpcaissy@piji.ca>
4 # Copyright 2014 Alexis Laferrière <alexis.laf@xymus.net>
5 # Copyright 2018 Matthieu Le Guellaut <leguellaut.matthieu@gmail.com>
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18
19 # Low-level wrapper around the libevent library to manage events on file descriptors
20 #
21 # For mor information, refer to the libevent documentation at
22 # http://monkey.org/~provos/libevent/doxygen-2.0.1/
23 module libevent is pkgconfig("libevent")
24
25 in "C header" `{
26 #include <event2/listener.h>
27 #include <event2/bufferevent.h>
28 #include <event2/buffer.h>
29 `}
30
31 in "C" `{
32 #include <sys/stat.h>
33 #include <sys/types.h>
34 #include <fcntl.h>
35 #include <errno.h>
36 #include <string.h>
37
38 #include <sys/socket.h>
39 #include <arpa/inet.h>
40 #include <netinet/in.h>
41 #include <netinet/ip.h>
42 #include <sys/un.h>
43 #include <unistd.h>
44
45 // Protect callbacks for compatibility with light FFI
46 #ifdef Connection_decr_ref
47 // Callback forwarded to 'Connection.write_callback'
48 static void c_write_cb(struct bufferevent *bev, Connection ctx) {
49 Connection_write_callback(ctx);
50 }
51
52 // Callback forwarded to 'Connection.read_callback_native'
53 static void c_read_cb(struct bufferevent *bev, Connection ctx)
54 {
55 Connection_read_callback_native(ctx, bev);
56 }
57
58 // Callback forwarded to 'Connection.event_callback'
59 static void c_event_cb(struct bufferevent *bev, short events, Connection ctx)
60 {
61 int release = Connection_event_callback(ctx, events);
62 if (release) Connection_decr_ref(ctx);
63 }
64
65 // Callback forwarded to 'ConnectionFactory.accept_connection'
66 static void accept_connection_cb(struct evconnlistener *listener, evutil_socket_t fd,
67 struct sockaddr *addrin, int socklen, ConnectionFactory ctx)
68 {
69 ConnectionFactory_accept_connection(ctx, listener, fd, addrin, socklen);
70 }
71 #endif
72
73 #ifdef EventCallback_incr_ref
74 // Callback forwarded to 'EventCallback.callback'
75 static void signal_cb(evutil_socket_t fd, short events, void *data)
76 {
77 EventCallback handler = data;
78 EventCallback_callback(handler, events);
79 }
80 #endif
81 `}
82
83 # Structure to hold information and state for a Libevent dispatch loop.
84 #
85 # The event_base lies at the center of Libevent; every application will
86 # have one. It keeps track of all pending and active events, and
87 # notifies your application of the active ones.
88 extern class NativeEventBase `{ struct event_base * `}
89
90 # Create a new event_base to use with the rest of Libevent
91 new `{ return event_base_new(); `}
92
93 # Has `self` been correctly initialized?
94 fun is_valid: Bool do return not address_is_null
95
96 # Reinitialize the event base after a fork
97 #
98 # Some event mechanisms do not survive across fork.
99 # The event base needs to be reinitialized with the `reinit` method.
100 #
101 # Returns `true` if some events could not be re-added.
102 fun reinit: Bool `{ return event_reinit(self); `}
103
104 # Event dispatching loop
105 #
106 # This loop will run the event base until either there are no more added
107 # events, or until something calls `loopexit`.
108 fun dispatch `{ event_base_dispatch(self); `}
109
110 # Exit the event loop
111 #
112 # TODO support timer
113 fun loopexit `{ event_base_loopexit(self, NULL); `}
114
115 redef fun free `{ event_base_free(self); `}
116 end
117
118 # Event, libevent's basic unit of operation
119 extern class NativeEvent `{ struct event * `}
120
121 # Add to the set of pending events
122 #
123 # TODO support timeout
124 fun add `{ event_add(self, NULL); `}
125
126 # Remove from the set of monitored events
127 fun del `{ event_del(self); `}
128
129 redef fun free `{ event_free(self); `}
130 end
131
132 # Signal event
133 extern class NativeEvSignal
134 super NativeEvent
135
136 new (base: NativeEventBase, signal: Int, handler: EventCallback)
137 import EventCallback.callback `{
138 EventCallback_incr_ref(handler);
139 return evsignal_new(base, signal, signal_cb, handler);
140 `}
141 end
142
143 # Receiver of event callbacks
144 interface EventCallback
145
146 # Callback on an event
147 fun callback(events: Int) do end
148 end
149
150 # Spawned to manage a specific connection
151 class Connection
152 super Writer
153
154 # Closing this connection has been requested, but may not yet be `closed`
155 var close_requested = false
156
157 # This connection is closed
158 var closed = false
159
160 # The native libevent linked to `self`
161 var native_buffer_event: NativeBufferEvent
162
163 # Close this connection if possible, otherwise mark it to be closed later
164 redef fun close
165 do
166 if closed then return
167
168 var i = native_buffer_event.input_buffer
169 var o = native_buffer_event.output_buffer
170 if i.length > 0 or o.length > 0 then
171 close_requested = true
172 else
173 force_close
174 end
175 end
176
177 # Force closing this connection and freeing `native_buffer_event`
178 fun force_close
179 do
180 if closed then return
181
182 native_buffer_event.free
183 closed = true
184 end
185
186 # Callback method on a write event
187 fun write_callback
188 do
189 if close_requested then close
190 end
191
192 private fun read_callback_native(bev: NativeBufferEvent)
193 do
194 var evbuffer = bev.input_buffer
195 var len = evbuffer.length
196 var buf = new CString(len)
197 evbuffer.remove(buf, len)
198 var str = buf.to_s_with_length(len)
199 read_callback str
200 end
201
202 # Callback method when data is available to read
203 fun read_callback(content: String)
204 do
205 if close_requested then close
206 end
207
208 # Callback method on events: EOF, user-defined timeout and unrecoverable errors
209 #
210 # Returns `true` if the native handles to `self` can be released.
211 fun event_callback(events: Int): Bool
212 do
213 if events & bev_event_error != 0 or events & bev_event_eof != 0 then
214 if events & bev_event_error != 0 then
215 var sock_err = evutil_socket_error
216 # Ignore some normal errors and print the others for debugging
217 if sock_err == 110 then
218 # Connection timed out (ETIMEDOUT)
219 else if sock_err == 104 then
220 # Connection reset by peer (ECONNRESET)
221 else
222 print_error "libevent error event: {evutil_socket_error_to_string(sock_err)} ({sock_err})"
223 end
224 end
225 force_close
226 return true
227 end
228
229 return false
230 end
231
232 # Write a string to the connection
233 redef fun write(str)
234 do
235 if close_requested then return
236 native_buffer_event.write(str.to_cstring, str.byte_length)
237 end
238
239 redef fun write_byte(byte)
240 do
241 if close_requested then return
242 native_buffer_event.write_byte(byte)
243 end
244
245 redef fun write_bytes_from_cstring(ns, len)
246 do
247 if close_requested then return
248 native_buffer_event.write(ns, len)
249 end
250
251 # Write a file to the connection
252 #
253 # If `not path.file_exists`, the method returns.
254 fun write_file(path: String)
255 do
256 if close_requested then return
257
258 var file = new FileReader.open(path)
259 if file.last_error != null then
260 var error = new IOError("Failed to open file at '{path}'")
261 error.cause = file.last_error
262 self.last_error = error
263 file.close
264 return
265 end
266
267 var stat = file.file_stat
268 if stat == null then
269 last_error = new IOError("Failed to stat file at '{path}'")
270 file.close
271 return
272 end
273
274 var err = native_buffer_event.output_buffer.add_file(file.fd, 0, stat.size)
275 if err then
276 last_error = new IOError("Failed to add file at '{path}'")
277 file.close
278 end
279 end
280 end
281
282 # ---
283 # Error code for event callbacks
284
285 # error encountered while reading
286 fun bev_event_reading: Int `{ return BEV_EVENT_READING; `}
287
288 # error encountered while writing
289 fun bev_event_writing: Int `{ return BEV_EVENT_WRITING; `}
290
291 # eof file reached
292 fun bev_event_eof: Int `{ return BEV_EVENT_EOF; `}
293
294 # unrecoverable error encountered
295 fun bev_event_error: Int `{ return BEV_EVENT_ERROR; `}
296
297 # user-specified timeout reached
298 fun bev_event_timeout: Int `{ return BEV_EVENT_TIMEOUT; `}
299
300 # connect operation finished.
301 fun bev_event_connected: Int `{ return BEV_EVENT_CONNECTED; `}
302
303 # Global error code for the last socket operation on the calling thread
304 #
305 # Not idempotent on all platforms.
306 fun evutil_socket_error: Int `{
307 return EVUTIL_SOCKET_ERROR();
308 `}
309
310 # Convert an error code from `evutil_socket_error` to a string
311 fun evutil_socket_error_to_string(error_code: Int): CString `{
312 return evutil_socket_error_to_string(error_code);
313 `}
314
315 # ---
316 # Options that can be specified when creating a `NativeBufferEvent`
317
318 # Close the underlying file descriptor/bufferevent/whatever when this bufferevent is freed.
319 fun bev_opt_close_on_free: Int `{ return BEV_OPT_CLOSE_ON_FREE; `}
320
321 # If threading is enabled, protect the operations on this bufferevent with a lock.
322 fun bev_opt_threadsafe: Int `{ return BEV_OPT_THREADSAFE; `}
323
324 # Run callbacks deferred in the event loop.
325 fun bev_opt_defer_callbacks: Int `{ return BEV_OPT_DEFER_CALLBACKS; `}
326
327 # If set, callbacks are executed without locks being held on the bufferevent.
328 fun bev_opt_unlock_callbacks: Int `{ return BEV_OPT_UNLOCK_CALLBACKS; `}
329
330 # ---
331 # Options for `NativeBufferEvent::enable`
332
333 # Read operation
334 fun ev_read: Int `{ return EV_READ; `}
335
336 # Write operation
337 fun ev_write: Int `{ return EV_WRITE; `}
338
339 # ---
340
341 # A buffer event structure, strongly associated to a connection, an input buffer and an output_buffer
342 extern class NativeBufferEvent `{ struct bufferevent * `}
343
344 # Socket-based `NativeBufferEvent` that reads and writes data onto a network
345 new socket(base: NativeEventBase, fd, options: Int) `{
346 return bufferevent_socket_new(base, fd, options);
347 `}
348
349 # Enable a bufferevent.
350 fun enable(operation: Int) `{
351 bufferevent_enable(self, operation);
352 `}
353
354 # Set callbacks to `read_callback_native`, `write_callback` and `event_callback` of `conn`
355 fun setcb(conn: Connection) import Connection.read_callback_native,
356 Connection.write_callback, Connection.event_callback, CString `{
357 Connection_incr_ref(conn);
358 bufferevent_setcb(self,
359 (bufferevent_data_cb)c_read_cb,
360 (bufferevent_data_cb)c_write_cb,
361 (bufferevent_event_cb)c_event_cb, conn);
362 `}
363
364 # Write `length` bytes of `line`
365 fun write(line: CString, length: Int): Int `{
366 return bufferevent_write(self, line, length);
367 `}
368
369 # Write the byte `value`
370 fun write_byte(value: Int): Int `{
371 unsigned char byt = (unsigned char)value;
372 return bufferevent_write(self, &byt, 1);
373 `}
374
375 redef fun free `{ bufferevent_free(self); `}
376
377 # The output buffer associated to `self`
378 fun output_buffer: OutputNativeEvBuffer `{ return bufferevent_get_output(self); `}
379
380 # The input buffer associated to `self`
381 fun input_buffer: InputNativeEvBuffer `{ return bufferevent_get_input(self); `}
382
383 # Read data from this buffer
384 fun read_buffer(buf: NativeEvBuffer): Int `{ return bufferevent_read_buffer(self, buf); `}
385
386 # Write data to this buffer
387 fun write_buffer(buf: NativeEvBuffer): Int `{ return bufferevent_write_buffer(self, buf); `}
388 end
389
390 # A single buffer
391 extern class NativeEvBuffer `{ struct evbuffer * `}
392 # Length of data in this buffer
393 fun length: Int `{ return evbuffer_get_length(self); `}
394
395 # Read data from an evbuffer and drain the bytes read
396 fun remove(buffer: CString, len: Int) `{
397 evbuffer_remove(self, buffer, len);
398 `}
399 end
400
401 # An input buffer
402 extern class InputNativeEvBuffer
403 super NativeEvBuffer
404
405 # Empty/clear `length` data from buffer
406 fun drain(length: Int) `{ evbuffer_drain(self, length); `}
407 end
408
409 # An output buffer
410 extern class OutputNativeEvBuffer
411 super NativeEvBuffer
412
413 # Add file to buffer
414 fun add_file(fd, offset, length: Int): Bool `{
415 return evbuffer_add_file(self, fd, offset, length);
416 `}
417 end
418
419 # A listener acting on an interface and port, spawns `Connection` on new connections
420 extern class ConnectionListener `{ struct evconnlistener * `}
421
422 private new bind_to(base: NativeEventBase, address: CString, port: Int, factory: ConnectionFactory)
423 import ConnectionFactory.accept_connection, error_callback `{
424
425 ConnectionFactory_incr_ref(factory);
426
427 struct hostent *hostent = gethostbyname(address);
428 if (!hostent) {
429 return NULL;
430 }
431
432 struct sockaddr_in sin = {0};
433 sin.sin_family = hostent->h_addrtype;
434 sin.sin_port = htons(port);
435 memcpy( &(sin.sin_addr.s_addr), (const void*)hostent->h_addr, hostent->h_length );
436
437 struct evconnlistener *listener = evconnlistener_new_bind(base,
438 (evconnlistener_cb)accept_connection_cb, factory,
439 LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE, -1,
440 (struct sockaddr*)&sin, sizeof(sin));
441 if (listener != NULL) {
442 evconnlistener_set_error_cb(listener,
443 (evconnlistener_errorcb)ConnectionListener_error_callback);
444 }
445
446 return listener;
447 `}
448
449 private new bind_unix(base: NativeEventBase, file: CString, factory: ConnectionFactory)
450 import ConnectionFactory.accept_connection, error_callback `{
451
452 ConnectionFactory_incr_ref(factory);
453
454 struct sockaddr_un sun = {0};
455 sun.sun_family = AF_UNIX;
456 strncpy(sun.sun_path, file, sizeof(sun.sun_path) - 1);
457
458 struct evconnlistener *listener = evconnlistener_new_bind(base,
459 (evconnlistener_cb)accept_connection_cb, factory,
460 LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE, -1,
461 (struct sockaddr*)&sun, sizeof(sun));
462 if (listener != NULL) {
463 evconnlistener_set_error_cb(listener,
464 (evconnlistener_errorcb)ConnectionListener_error_callback);
465 }
466
467 return listener;
468 `}
469
470 # Get the `NativeEventBase` associated to `self`
471 fun base: NativeEventBase `{ return evconnlistener_get_base(self); `}
472
473 # Callback on listening error
474 fun error_callback
475 do
476 var cstr = evutil_socket_error_to_string(evutil_socket_error)
477 print_error "libevent error: {cstr}"
478 end
479 end
480
481 # Factory to listen on sockets and create new `Connection`
482 class ConnectionFactory
483
484 # The `NativeEventBase` for the dispatch loop of this factory
485 var event_base: NativeEventBase
486
487 # Accept a connection on `listener`
488 #
489 # By default, it creates a new NativeBufferEvent and calls `spawn_connection`.
490 fun accept_connection(listener: ConnectionListener, fd: Int, addrin: Pointer, socklen: Int)
491 do
492 var base = listener.base
493 var bev = new NativeBufferEvent.socket(base, fd, bev_opt_close_on_free)
494
495 # Human representation of remote client address
496 var addr_len = 46 # Longest possible IPv6 address + null byte
497 var addr_buf = new CString(addr_len)
498 addr_buf = addrin_to_address(addrin, addr_buf, addr_len)
499 var addr = if addr_buf.address_is_null then
500 "Unknown address"
501 else addr_buf.to_s
502
503 var conn = spawn_connection(bev, addr)
504 bev.enable ev_read|ev_write
505 bev.setcb conn
506 end
507
508 # Create a new `Connection` object for `buffer_event`
509 fun spawn_connection(buffer_event: NativeBufferEvent, address: String): Connection
510 do
511 return new Connection(buffer_event)
512 end
513
514 # Listen on the TCP socket at `address`:`port` for new connections
515 #
516 # On new connections, libevent callbacks `spawn_connection`.
517 fun bind_to(address: String, port: Int): nullable ConnectionListener
518 do
519 var listener = new ConnectionListener.bind_to(
520 event_base, address.to_cstring, port, self)
521
522 if listener.address_is_null then
523 print_error "libevent warning: Opening {address}:{port} failed, " +
524 evutil_socket_error_to_string(evutil_socket_error).to_s
525 return null
526 end
527
528 return listener
529 end
530
531 # Listen on a UNIX domain socket for new connections
532 #
533 # On new connections, libevent callbacks `spawn_connection`.
534 fun bind_unix(path: String): nullable ConnectionListener
535 do
536 # Delete the socket if it already exists
537 var stat = path.file_stat
538 if stat != null and stat.is_sock then path.file_delete
539
540 var listener = new ConnectionListener.bind_unix(
541 event_base, path.to_cstring, self)
542
543 if listener.address_is_null then
544 print_error "libevent warning: Opening UNIX domain socket {path} failed, " +
545 evutil_socket_error_to_string(evutil_socket_error).to_s
546 return null
547 end
548
549 return listener
550 end
551
552 # Put a human readable string representation of `address` into `buf`
553 private fun addrin_to_address(address: Pointer, buf: CString, buf_len: Int): CString `{
554 struct sockaddr *addrin = (struct sockaddr*)address;
555
556 if (addrin->sa_family == AF_INET) {
557 struct in_addr *src = &((struct sockaddr_in*)addrin)->sin_addr;
558 return (char *)inet_ntop(addrin->sa_family, src, buf, buf_len);
559 }
560 else if (addrin->sa_family == AF_INET6) {
561 struct in6_addr *src = &((struct sockaddr_in6*)addrin)->sin6_addr;
562 return (char *)inet_ntop(addrin->sa_family, src, buf, buf_len);
563 }
564 else if (addrin->sa_family == AF_UNIX) {
565 struct sockaddr_un *src = (struct sockaddr_un*)addrin;
566 char *path = src->sun_path;
567 if (path == NULL) return "Unnamed UNIX domain socket";
568 if (path[0] == '\0') return "Abstract UNIX domain socket";
569 return path;
570 }
571
572 return NULL;
573 `}
574 end
575
576 # Enable some relatively expensive debugging checks that would normally be turned off
577 fun enable_debug_mode `{ event_enable_debug_mode(); `}
578
579 # Use Windows builtin locking and thread ID functions
580 fun use_windows_threads: Bool `{
581 #ifdef EVTHREAD_USE_WINDOWS_THREADS_IMPLEMENTED
582 return evthread_use_windows_threads();
583 #else
584 return -1;
585 #endif
586 `}
587
588 # Use Pthreads locking and thread ID functions
589 fun use_pthreads: Bool `{
590 #ifdef EVTHREAD_USE_PTHREADS_IMPLEMENTED
591 return evthread_use_pthreads();
592 #else
593 return -1;
594 #endif
595 `}