1 # This file is part of NIT (http://www.nitlanguage.org).
3 # Copyright 2013 Jean-Philippe Caissy <jpcaissy@piji.ca>
4 # Copyright 2014 Alexis Laferrière <alexis.laf@xymus.net>
5 # Copyright 2018 Matthieu Le Guellaut <leguellaut.matthieu@gmail.com>
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # Low-level wrapper around the libevent library to manage events on file descriptors
21 # For mor information, refer to the libevent documentation at
22 # http://monkey.org/~provos/libevent/doxygen-2.0.1/
23 module libevent
is pkgconfig
("libevent")
26 #include <event2/listener.h>
27 #include <event2/bufferevent.h>
28 #include <event2/buffer.h>
33 #include <sys/types.h>
38 #include <sys/socket.h>
39 #include <arpa/inet.h>
40 #include <netinet/in.h>
41 #include <netinet/ip.h>
45 // Protect callbacks for compatibility with light FFI
46 #ifdef Connection_decr_ref
47 // Callback forwarded to 'Connection.write_callback'
48 static void c_write_cb(struct bufferevent *bev, Connection ctx) {
49 Connection_write_callback(ctx);
52 // Callback forwarded to 'Connection.read_callback_native'
53 static void c_read_cb(struct bufferevent *bev, Connection ctx)
55 Connection_read_callback_native(ctx, bev);
58 // Callback forwarded to 'Connection.event_callback'
59 static void c_event_cb(struct bufferevent *bev, short events, Connection ctx)
61 int release = Connection_event_callback(ctx, events);
62 if (release) Connection_decr_ref(ctx);
65 // Callback forwarded to 'ConnectionFactory.accept_connection'
66 static void accept_connection_cb(struct evconnlistener *listener, evutil_socket_t fd,
67 struct sockaddr *addrin, int socklen, ConnectionFactory ctx)
69 ConnectionFactory_accept_connection(ctx, listener, fd, addrin, socklen);
73 #ifdef EventCallback_incr_ref
74 // Callback forwarded to 'EventCallback.callback'
75 static void signal_cb(evutil_socket_t fd, short events, void *data)
77 EventCallback handler = data;
78 EventCallback_callback(handler, events);
83 # Structure to hold information and state for a Libevent dispatch loop.
85 # The event_base lies at the center of Libevent; every application will
86 # have one. It keeps track of all pending and active events, and
87 # notifies your application of the active ones.
88 extern class NativeEventBase `{ struct event_base * `}
90 # Create a new event_base to use with the rest of Libevent
91 new `{ return event_base_new(); `}
93 # Has `self` been correctly initialized?
94 fun is_valid
: Bool do return not address_is_null
96 # Reinitialize the event base after a fork
98 # Some event mechanisms do not survive across fork.
99 # The event base needs to be reinitialized with the `reinit` method.
101 # Returns `true` if some events could not be re-added.
102 fun reinit
: Bool `{ return event_reinit(self); `}
104 # Event dispatching loop
106 # This loop will run the event base until either there are no more added
107 # events, or until something calls `loopexit
`.
108 fun dispatch `{ event_base_dispatch(self); `}
110 # Exit the event loop
113 fun loopexit
`{ event_base_loopexit(self, NULL); `}
115 redef fun free `{ event_base_free(self); `}
118 # Event, libevent's basic unit of operation
119 extern class NativeEvent `{ struct event * `}
121 # Add to the set of pending events
123 # TODO support timeout
124 fun add `{ event_add(self, NULL); `}
126 # Remove from the set of monitored events
127 fun del
`{ event_del(self); `}
129 redef fun free `{ event_free(self); `}
133 extern class NativeEvSignal
136 new (base
: NativeEventBase, signal
: Int, handler
: EventCallback)
137 import EventCallback.callback
`{
138 EventCallback_incr_ref(handler);
139 return evsignal_new(base, signal, signal_cb, handler);
143 # Receiver of event callbacks
144 interface EventCallback
146 # Callback on an event
147 fun callback
(events
: Int) do end
150 # Spawned to manage a specific connection
156 # Closing this connection has been requested, but may not yet be `closed`
157 var close_requested
= false
159 # This connection is closed
162 # The native libevent linked to `self`
163 var native_buffer_event
: NativeBufferEvent
165 # Close this connection if possible, otherwise mark it to be closed later
168 if closed
then return
170 var i
= native_buffer_event
.input_buffer
171 var o
= native_buffer_event
.output_buffer
172 if i
.length
> 0 or o
.length
> 0 then
173 close_requested
= true
179 # Force closing this connection and freeing `native_buffer_event`
182 if closed
then return
184 native_buffer_event
.free
188 # Callback method on a write event
191 if close_requested
then close
194 private fun read_callback_native
(bev
: NativeBufferEvent)
196 var evbuffer
= bev
.input_buffer
197 var len
= evbuffer
.length
198 var buf
= new CString(len
)
199 evbuffer
.remove
(buf
, len
)
200 var str
= buf
.to_s_with_length
(len
)
204 # Callback method when data is available to read
205 fun read_callback
(content
: String)
207 if close_requested
then close
210 # Callback method on events: EOF, user-defined timeout and unrecoverable errors
212 # Returns `true` if the native handles to `self` can be released.
213 fun event_callback
(events
: Int): Bool
215 if events
& bev_event_error
!= 0 or events
& bev_event_eof
!= 0 then
216 if events
& bev_event_error
!= 0 then
217 var sock_err
= evutil_socket_error
218 # Ignore some normal errors and print the others for debugging
219 if sock_err
== 110 then
220 # Connection timed out (ETIMEDOUT)
221 else if sock_err
== 104 then
222 # Connection reset by peer (ECONNRESET)
224 print_error
"libevent error event: {evutil_socket_error_to_string(sock_err)} ({sock_err})"
234 # Write a string to the connection
237 if close_requested
then return
238 native_buffer_event
.write
(str
.to_cstring
, str
.byte_length
)
241 redef fun write_byte
(byte
)
243 if close_requested
then return
244 native_buffer_event
.write_byte
(byte
)
247 redef fun write_bytes_from_cstring
(ns
, len
)
249 if close_requested
then return
250 native_buffer_event
.write
(ns
, len
)
253 # Write a file to the connection
255 # If `not path.file_exists`, the method returns.
256 fun write_file
(path
: String)
258 if close_requested
then return
260 var file
= new FileReader.open
(path
)
261 if file
.last_error
!= null then
262 var error
= new IOError("Failed to open file at '{path}'")
263 error
.cause
= file
.last_error
264 self.last_error
= error
269 var stat
= file
.file_stat
271 last_error
= new IOError("Failed to stat file at '{path}'")
276 var err
= native_buffer_event
.output_buffer
.add_file
(file
.fd
, 0, stat
.size
)
278 last_error
= new IOError("Failed to add file at '{path}'")
285 # Error code for event callbacks
287 # error encountered while reading
288 fun bev_event_reading
: Int `{ return BEV_EVENT_READING; `}
290 # error encountered while writing
291 fun bev_event_writing: Int `{ return BEV_EVENT_WRITING; `}
294 fun bev_event_eof
: Int `{ return BEV_EVENT_EOF; `}
296 # unrecoverable error encountered
297 fun bev_event_error: Int `{ return BEV_EVENT_ERROR; `}
299 # user-specified timeout reached
300 fun bev_event_timeout
: Int `{ return BEV_EVENT_TIMEOUT; `}
302 # connect operation finished.
303 fun bev_event_connected: Int `{ return BEV_EVENT_CONNECTED; `}
305 # Global error code for the last socket operation on the calling thread
307 # Not idempotent on all platforms.
308 fun evutil_socket_error
: Int `{
309 return EVUTIL_SOCKET_ERROR();
312 # Convert an error code from `evutil_socket_error` to a string
313 fun evutil_socket_error_to_string
(error_code
: Int): CString `{
314 return evutil_socket_error_to_string(error_code);
318 # Options that can be specified when creating a `NativeBufferEvent`
320 # Close the underlying file descriptor/bufferevent/whatever when this bufferevent is freed.
321 fun bev_opt_close_on_free
: Int `{ return BEV_OPT_CLOSE_ON_FREE; `}
323 # If threading is enabled, protect the operations on this bufferevent with a lock.
324 fun bev_opt_threadsafe: Int `{ return BEV_OPT_THREADSAFE; `}
326 # Run callbacks deferred in the event loop.
327 fun bev_opt_defer_callbacks
: Int `{ return BEV_OPT_DEFER_CALLBACKS; `}
329 # If set, callbacks are executed without locks being held on the bufferevent.
330 fun bev_opt_unlock_callbacks: Int `{ return BEV_OPT_UNLOCK_CALLBACKS; `}
333 # Options for `NativeBufferEvent::enable`
336 fun ev_read
: Int `{ return EV_READ; `}
339 fun ev_write: Int `{ return EV_WRITE; `}
343 # A buffer event structure, strongly associated to a connection, an input buffer and an output_buffer
344 extern class NativeBufferEvent `{ struct bufferevent * `}
346 # Socket-based `NativeBufferEvent` that reads and writes data onto a network
347 new socket(base: NativeEventBase, fd, options: Int) `{
348 return bufferevent_socket_new
(base
, fd
, options
);
351 # Enable a bufferevent.
352 fun enable(operation: Int) `{
353 bufferevent_enable
(self, operation
);
356 # Set callbacks to `read_callback_native
`, `write_callback
` and `event_callback
` of `conn
`
357 fun setcb(conn: Connection) import Connection.read_callback_native,
358 Connection.write_callback, Connection.event_callback, CString `{
359 Connection_incr_ref(conn
);
360 bufferevent_setcb
(self,
361 (bufferevent_data_cb
)c_read_cb
,
362 (bufferevent_data_cb
)c_write_cb
,
363 (bufferevent_event_cb
)c_event_cb
, conn
);
366 # Write `length
` bytes of `line
`
367 fun write(line: CString, length: Int): Int `{
368 return bufferevent_write
(self, line
, length
);
371 # Write the byte `value
`
372 fun write_byte(value: Int): Int `{
373 unsigned char byt
= (unsigned char
)value
;
374 return bufferevent_write
(self, &byt
, 1);
377 redef fun free `{ bufferevent_free(self); `}
379 # The output buffer associated to `self`
380 fun output_buffer
: OutputNativeEvBuffer `{ return bufferevent_get_output(self); `}
382 # The input buffer associated to `self`
383 fun input_buffer: InputNativeEvBuffer `{ return bufferevent_get_input(self); `}
385 # Read data from this buffer
386 fun read_buffer
(buf
: NativeEvBuffer): Int `{ return bufferevent_read_buffer(self, buf); `}
388 # Write data to this buffer
389 fun write_buffer(buf: NativeEvBuffer): Int `{ return bufferevent_write_buffer(self, buf); `}
393 extern class NativeEvBuffer `{ struct evbuffer * `}
394 # Length of data in this buffer
395 fun length: Int `{ return evbuffer_get_length(self); `}
397 # Read data from an evbuffer and drain the bytes read
398 fun remove
(buffer
: CString, len
: Int) `{
399 evbuffer_remove(self, buffer, len);
404 extern class InputNativeEvBuffer
407 # Empty/clear `length` data from buffer
408 fun drain
(length
: Int) `{ evbuffer_drain(self, length); `}
412 extern class OutputNativeEvBuffer
416 fun add_file(fd, offset, length: Int): Bool `{
417 return evbuffer_add_file
(self, fd
, offset
, length
);
421 # A listener acting on an interface and port, spawns `Connection` on new connections
422 extern class ConnectionListener `{ struct evconnlistener * `}
424 private new bind_to
(base
: NativeEventBase, address
: CString, port
: Int, factory
: ConnectionFactory)
425 import ConnectionFactory.accept_connection
, error_callback
`{
427 struct sockaddr_in sin = {0};
428 struct evconnlistener *listener;
429 ConnectionFactory_incr_ref(factory);
431 struct hostent *hostent = gethostbyname(address);
437 sin.sin_family = hostent->h_addrtype;
438 sin.sin_port = htons(port);
439 memcpy( &(sin.sin_addr.s_addr), (const void*)hostent->h_addr, hostent->h_length );
441 listener = evconnlistener_new_bind(base,
442 (evconnlistener_cb)accept_connection_cb, factory,
443 LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE, -1,
444 (struct sockaddr*)&sin, sizeof(sin));
446 if (listener != NULL) {
447 evconnlistener_set_error_cb(listener, (evconnlistener_errorcb)ConnectionListener_error_callback);
453 private new bind_unix
(base
: NativeEventBase, file
: CString, factory
: ConnectionFactory)
454 import ConnectionFactory.accept_connection
, error_callback
`{
456 ConnectionFactory_incr_ref(factory);
458 struct sockaddr_un sun = {0};
459 sun.sun_family = AF_UNIX;
460 strncpy(sun.sun_path, file, sizeof(sun.sun_path) - 1);
462 struct evconnlistener *listener = evconnlistener_new_bind(base,
463 (evconnlistener_cb)accept_connection_cb, factory,
464 LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE, -1,
465 (struct sockaddr*)&sun, sizeof(sun));
466 if (listener != NULL) {
467 evconnlistener_set_error_cb(listener,
468 (evconnlistener_errorcb)ConnectionListener_error_callback);
474 # Get the `NativeEventBase` associated to `self`
475 fun base
: NativeEventBase `{ return evconnlistener_get_base(self); `}
477 # Callback method on listening error
478 fun error_callback do
479 var cstr = evutil_socket_error_to_string(evutil_socket_error)
480 print_error "libevent error: '{cstr}'"
484 # Factory to listen on sockets and create new `Connection`
485 class ConnectionFactory
486 # The `NativeEventBase` for the dispatch loop of this factory
487 var event_base: NativeEventBase
489 # Accept a connection on `listener
`
491 # By default, it creates a new NativeBufferEvent and calls `spawn_connection
`.
492 fun accept_connection(listener: ConnectionListener, fd: Int, addrin: Pointer, socklen: Int)
494 var base = listener.base
495 var bev = new NativeBufferEvent.socket(base, fd, bev_opt_close_on_free)
497 # Human representation of remote client address
498 var addr_len = 46 # Longest possible IPv6 address + null byte
499 var addr_buf = new CString(addr_len)
500 addr_buf = addrin_to_address(addrin, addr_buf, addr_len)
501 var addr = if addr_buf.address_is_null then
505 var conn = spawn_connection(bev, addr)
506 bev.enable ev_read|ev_write
510 # Create a new `Connection` object for `buffer_event
`
511 fun spawn_connection(buffer_event: NativeBufferEvent, address: String): Connection
513 return new Connection(buffer_event)
516 # Listen on `address
`:`port
` for new connection, which will callback `spawn_connection
`
517 fun bind_to(address: String, port: Int): nullable ConnectionListener
519 var listener = new ConnectionListener.bind_to(event_base, address.to_cstring, port, self)
520 if listener.address_is_null then
521 sys.stderr.write "libevent warning: Opening {address}:{port} failed\n"
527 # Listen on a UNIX domain socket for new connections
529 # On new connections, libevent callbacks `spawn_connection
`.
530 fun bind_unix(path: String): nullable ConnectionListener
532 # Delete the socket if it already exists
533 var stat = path.file_stat
534 if stat != null and stat.is_sock then path.file_delete
536 var listener = new ConnectionListener.bind_unix(
537 event_base, path.to_cstring, self)
539 if listener.address_is_null then
540 print_error "libevent warning: Opening UNIX domain socket {path} failed, " +
541 evutil_socket_error_to_string(evutil_socket_error).to_s
548 # Put string representation of source `address
` into `buf
`
549 private fun addrin_to_address(address: Pointer, buf: CString, buf_len: Int): CString `{
550 struct sockaddr
*addrin
= (struct sockaddr
*)address
;
552 if (addrin-
>sa_family
== AF_INET) {
553 struct in_addr
*src
= &((struct sockaddr_in
*)addrin
)->sin_addr
;
554 return (char
*)inet_ntop
(addrin-
>sa_family
, src
, buf
, buf_len
);
556 else if (addrin-
>sa_family
== AF_INET6) {
557 struct in6_addr
*src
= &((struct sockaddr_in6
*)addrin
)->sin6_addr
;
558 return (char
*)inet_ntop
(addrin-
>sa_family
, src
, buf
, buf_len
);
564 # Enable some relatively expensive debugging checks that would normally be turned off
565 fun enable_debug_mode `{ event_enable_debug_mode(); `}
567 # Use Windows builtin locking and thread ID functions
568 fun use_windows_threads
: Bool `{
569 #ifdef EVTHREAD_USE_WINDOWS_THREADS_IMPLEMENTED
570 return evthread_use_windows_threads();
576 # Use Pthreads locking and thread ID functions
577 fun use_pthreads
: Bool `{
578 #ifdef EVTHREAD_USE_PTHREADS_IMPLEMENTED
579 return evthread_use_pthreads();