1 # This file is part of NIT (http://www.nitlanguage.org).
3 # Copyright 2013 Jean-Philippe Caissy <jpcaissy@piji.ca>
4 # Copyright 2014 Alexis Laferrière <alexis.laf@xymus.net>
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
18 # Low-level wrapper around the libevent library to manage events on file descriptors
20 # For mor information, refer to the libevent documentation at
21 # http://monkey.org/~provos/libevent/doxygen-2.0.1/
22 module libevent
is pkgconfig
("libevent")
25 #include <event2/listener.h>
26 #include <event2/bufferevent.h>
27 #include <event2/buffer.h>
32 #include <sys/types.h>
37 #include <sys/socket.h>
38 #include <arpa/inet.h>
39 #include <netinet/in.h>
40 #include <netinet/ip.h>
42 // Protect callbacks for compatibility with light FFI
43 #ifdef Connection_decr_ref
44 // Callback forwarded to 'Connection.write_callback'
45 static void c_write_cb(struct bufferevent *bev, Connection ctx) {
46 Connection_write_callback(ctx);
49 // Callback forwarded to 'Connection.read_callback_native'
50 static void c_read_cb(struct bufferevent *bev, Connection ctx)
52 Connection_read_callback_native(ctx, bev);
55 // Callback forwarded to 'Connection.event_callback'
56 static void c_event_cb(struct bufferevent *bev, short events, Connection ctx)
58 int release = Connection_event_callback(ctx, events);
59 if (release) Connection_decr_ref(ctx);
62 // Callback forwarded to 'ConnectionFactory.accept_connection'
63 static void accept_connection_cb(struct evconnlistener *listener, evutil_socket_t fd,
64 struct sockaddr *addrin, int socklen, ConnectionFactory ctx)
66 ConnectionFactory_accept_connection(ctx, listener, fd, addrin, socklen);
70 #ifdef EventCallback_incr_ref
71 // Callback forwarded to 'EventCallback.callback'
72 static void signal_cb(evutil_socket_t fd, short events, void *data)
74 EventCallback handler = data;
75 EventCallback_callback(handler, events);
80 # Structure to hold information and state for a Libevent dispatch loop.
82 # The event_base lies at the center of Libevent; every application will
83 # have one. It keeps track of all pending and active events, and
84 # notifies your application of the active ones.
85 extern class NativeEventBase `{ struct event_base * `}
87 # Create a new event_base to use with the rest of Libevent
88 new `{ return event_base_new(); `}
90 # Has `self` been correctly initialized?
91 fun is_valid
: Bool do return not address_is_null
93 # Event dispatching loop
95 # This loop will run the event base until either there are no more added
96 # events, or until something calls `loopexit`.
97 fun dispatch
`{ event_base_dispatch(self); `}
102 fun loopexit `{ event_base_loopexit(self, NULL); `}
104 redef fun free
`{ event_base_free(self); `}
107 # Event, libevent's basic unit of operation
108 extern class NativeEvent `{ struct event * `}
110 # Add to the set of pending events
112 # TODO support timeout
113 fun add
`{ event_add(self, NULL); `}
115 # Remove from the set of monitored events
116 fun del `{ event_del(self); `}
118 redef fun free
`{ event_free(self); `}
122 extern class NativeEvSignal
125 new (base: NativeEventBase, signal: Int, handler: EventCallback)
126 import EventCallback.callback `{
127 EventCallback_incr_ref(handler
);
128 return evsignal_new
(base
, signal
, signal_cb
, handler
);
132 # Receiver of event callbacks
133 interface EventCallback
135 # Callback on an event
136 fun callback(events: Int) do end
139 # Spawned to manage a specific connection
145 # Closing this connection has been requested, but may not yet be `closed
`
146 var close_requested = false
148 # This connection is closed
151 # The native libevent linked to `self`
152 var native_buffer_event: NativeBufferEvent
154 # Close this connection if possible, otherwise mark it to be closed later
157 if closed then return
159 var i = native_buffer_event.input_buffer
160 var o = native_buffer_event.output_buffer
161 if i.length > 0 or o.length > 0 then
162 close_requested = true
168 # Force closing this connection and freeing `native_buffer_event
`
171 if closed then return
173 native_buffer_event.free
177 # Callback method on a write event
180 if close_requested then close
183 private fun read_callback_native(bev: NativeBufferEvent)
185 var evbuffer = bev.input_buffer
186 var len = evbuffer.length
187 var buf = new NativeString(len)
188 evbuffer.remove(buf, len)
189 var str = buf.to_s_with_length(len)
193 # Callback method when data is available to read
194 fun read_callback(content: String)
196 if close_requested then close
199 # Callback method on events: EOF, user-defined timeout and unrecoverable errors
201 # Returns `true` if the native handles to `self` can be released.
202 fun event_callback(events: Int): Bool
204 if events & bev_event_error != 0 or events & bev_event_eof != 0 then
205 if events & bev_event_error != 0 then
206 var sock_err = evutil_socket_error
207 # Ignore some normal errors and print the others for debugging
208 if sock_err == 110 then
209 # Connection timed out (ETIMEDOUT)
210 else if sock_err == 104 then
211 # Connection reset by peer (ECONNRESET)
213 print_error "libevent error event: {evutil_socket_error_to_string(sock_err)} ({sock_err})"
223 # Write a string to the connection
226 if close_requested then return
227 native_buffer_event.write(str.to_cstring, str.byte_length)
230 redef fun write_byte(byte)
232 if close_requested then return
233 native_buffer_event.write_byte(byte)
236 redef fun write_bytes(bytes)
238 if close_requested then return
239 native_buffer_event.write(bytes.items, bytes.length)
242 # Write a file to the connection
244 # If `not path
.file_exists
`, the method returns.
245 fun write_file(path: String)
247 if close_requested then return
249 var file = new FileReader.open(path)
250 if file.last_error != null then
251 var error = new IOError("Failed to open file at '{path}'")
252 error.cause = file.last_error
253 self.last_error = error
258 var stat = file.file_stat
260 last_error = new IOError("Failed to stat file at '{path}'")
265 var err = native_buffer_event.output_buffer.add_file(file.fd, 0, stat.size)
267 last_error = new IOError("Failed to add file at '{path}'")
274 # Error code for event callbacks
276 # error encountered while reading
277 fun bev_event_reading: Int `{ return BEV_EVENT_READING; `}
279 # error encountered while writing
280 fun bev_event_writing
: Int `{ return BEV_EVENT_WRITING; `}
283 fun bev_event_eof: Int `{ return BEV_EVENT_EOF; `}
285 # unrecoverable error encountered
286 fun bev_event_error
: Int `{ return BEV_EVENT_ERROR; `}
288 # user-specified timeout reached
289 fun bev_event_timeout: Int `{ return BEV_EVENT_TIMEOUT; `}
291 # connect operation finished.
292 fun bev_event_connected
: Int `{ return BEV_EVENT_CONNECTED; `}
294 # Global error code for the last socket operation on the calling thread
296 # Not idempotent on all platforms.
297 fun evutil_socket_error: Int `{
298 return EVUTIL_SOCKET_ERROR();
301 # Convert an error code from `evutil_socket_error
` to a string
302 fun evutil_socket_error_to_string(error_code: Int): NativeString `{
303 return evutil_socket_error_to_string
(error_code
);
307 # Options that can be specified when creating a `NativeBufferEvent`
309 # Close the underlying file descriptor/bufferevent/whatever when this bufferevent is freed.
310 fun bev_opt_close_on_free: Int `{ return BEV_OPT_CLOSE_ON_FREE; `}
312 # If threading is enabled, protect the operations on this bufferevent with a lock.
313 fun bev_opt_threadsafe
: Int `{ return BEV_OPT_THREADSAFE; `}
315 # Run callbacks deferred in the event loop.
316 fun bev_opt_defer_callbacks: Int `{ return BEV_OPT_DEFER_CALLBACKS; `}
318 # If set, callbacks are executed without locks being held on the bufferevent.
319 fun bev_opt_unlock_callbacks
: Int `{ return BEV_OPT_UNLOCK_CALLBACKS; `}
322 # Options for `NativeBufferEvent::enable
`
325 fun ev_read: Int `{ return EV_READ; `}
328 fun ev_write
: Int `{ return EV_WRITE; `}
332 # A buffer event structure, strongly associated to a connection, an input buffer and an output_buffer
333 extern class NativeBufferEvent `{ struct bufferevent * `}
335 # Socket-based `NativeBufferEvent` that reads and writes data onto a network
336 new socket
(base
: NativeEventBase, fd
, options
: Int) `{
337 return bufferevent_socket_new(base, fd, options);
340 # Enable a bufferevent.
341 fun enable
(operation
: Int) `{
342 bufferevent_enable(self, operation);
345 # Set callbacks to `read_callback_native`, `write_callback` and `event_callback` of `conn`
346 fun setcb
(conn
: Connection) import Connection.read_callback_native
,
347 Connection.write_callback
, Connection.event_callback
, NativeString `{
348 Connection_incr_ref(conn);
349 bufferevent_setcb(self,
350 (bufferevent_data_cb)c_read_cb,
351 (bufferevent_data_cb)c_write_cb,
352 (bufferevent_event_cb)c_event_cb, conn);
355 # Write `length` bytes of `line`
356 fun write
(line
: NativeString, length
: Int): Int `{
357 return bufferevent_write(self, line, length);
360 # Write the byte `value`
361 fun write_byte
(value
: Byte): Int `{
362 unsigned char byt = (unsigned char)value;
363 return bufferevent_write(self, &byt, 1);
366 redef fun free
`{ bufferevent_free(self); `}
368 # The output buffer associated to `self`
369 fun output_buffer: OutputNativeEvBuffer `{ return bufferevent_get_output(self); `}
371 # The input buffer associated to `self`
372 fun input_buffer
: InputNativeEvBuffer `{ return bufferevent_get_input(self); `}
374 # Read data from this buffer
375 fun read_buffer(buf: NativeEvBuffer): Int `{ return bufferevent_read_buffer(self, buf); `}
377 # Write data to this buffer
378 fun write_buffer
(buf
: NativeEvBuffer): Int `{ return bufferevent_write_buffer(self, buf); `}
382 extern class NativeEvBuffer `{ struct evbuffer * `}
383 # Length of data in this buffer
384 fun length
: Int `{ return evbuffer_get_length(self); `}
386 # Read data from an evbuffer and drain the bytes read
387 fun remove(buffer: NativeString, len: Int) `{
388 evbuffer_remove
(self, buffer
, len
);
393 extern class InputNativeEvBuffer
396 # Empty/clear `length
` data from buffer
397 fun drain(length: Int) `{ evbuffer_drain(self, length); `}
401 extern class OutputNativeEvBuffer
405 fun add_file
(fd
, offset
, length
: Int): Bool `{
406 return evbuffer_add_file(self, fd, offset, length);
410 # A listener acting on an interface and port, spawns `Connection` on new connections
411 extern class ConnectionListener `{ struct evconnlistener * `}
413 private new bind_to(base: NativeEventBase, address: NativeString, port: Int, factory: ConnectionFactory)
414 import ConnectionFactory.accept_connection, error_callback `{
416 struct sockaddr_in sin
;
417 struct evconnlistener
*listener
;
418 ConnectionFactory_incr_ref(factory
);
420 struct hostent
*hostent
= gethostbyname
(address
);
426 memset
(&sin
, 0, sizeof
(sin
));
427 sin
.sin_family
= hostent-
>h_addrtype
;
428 sin
.sin_port
= htons
(port
);
429 memcpy
( &(sin
.sin_addr
.s_addr
), (const void
*)hostent-
>h_addr
, hostent-
>h_length
);
431 listener
= evconnlistener_new_bind
(base
,
432 (evconnlistener_cb
)accept_connection_cb
, factory
,
433 LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE, -1,
434 (struct sockaddr
*)&sin
, sizeof
(sin
));
436 if (listener
!= NULL) {
437 evconnlistener_set_error_cb
(listener
, (evconnlistener_errorcb
)ConnectionListener_error_callback);
443 # Get the `NativeEventBase` associated to `self`
444 fun base: NativeEventBase `{ return evconnlistener_get_base(self); `}
446 # Callback method on listening error
447 fun error_callback
do
448 var cstr
= evutil_socket_error_to_string
(evutil_socket_error
)
449 print_error
"libevent error: '{cstr}'"
453 # Factory to listen on sockets and create new `Connection`
454 class ConnectionFactory
455 # The `NativeEventBase` for the dispatch loop of this factory
456 var event_base
: NativeEventBase
458 # Accept a connection on `listener`
460 # By default, it creates a new NativeBufferEvent and calls `spawn_connection`.
461 fun accept_connection
(listener
: ConnectionListener, fd
: Int, addrin
: Pointer, socklen
: Int)
463 var base
= listener
.base
464 var bev
= new NativeBufferEvent.socket
(base
, fd
, bev_opt_close_on_free
)
466 # Human representation of remote client address
467 var addr_len
= 46 # Longest possible IPv6 address + null byte
468 var addr_buf
= new NativeString(addr_len
)
469 addr_buf
= addrin_to_address
(addrin
, addr_buf
, addr_len
)
470 var addr
= if addr_buf
.address_is_null
then
474 var conn
= spawn_connection
(bev
, addr
)
475 bev
.enable ev_read
|ev_write
479 # Create a new `Connection` object for `buffer_event`
480 fun spawn_connection
(buffer_event
: NativeBufferEvent, address
: String): Connection
482 return new Connection(buffer_event
)
485 # Listen on `address`:`port` for new connection, which will callback `spawn_connection`
486 fun bind_to
(address
: String, port
: Int): nullable ConnectionListener
488 var listener
= new ConnectionListener.bind_to
(event_base
, address
.to_cstring
, port
, self)
489 if listener
.address_is_null
then
490 sys
.stderr
.write
"libevent warning: Opening {address}:{port} failed\n"
495 # Put string representation of source `address` into `buf`
496 private fun addrin_to_address
(address
: Pointer, buf
: NativeString, buf_len
: Int): NativeString `{
497 struct sockaddr *addrin = (struct sockaddr*)address;
499 if (addrin->sa_family == AF_INET) {
500 struct in_addr *src = &((struct sockaddr_in*)addrin)->sin_addr;
501 return (char *)inet_ntop(addrin->sa_family, src, buf, buf_len);
503 else if (addrin->sa_family == AF_INET6) {
504 struct in6_addr *src = &((struct sockaddr_in6*)addrin)->sin6_addr;
505 return (char *)inet_ntop(addrin->sa_family, src, buf, buf_len);
511 # Enable some relatively expensive debugging checks that would normally be turned off
512 fun enable_debug_mode
`{ event_enable_debug_mode(); `}