ff10b0578cf76f73f72e267c2d80d3639029bdec
[nit.git] / lib / libevent / libevent.nit
1 # This file is part of NIT (http://www.nitlanguage.org).
2 #
3 # Copyright 2013 Jean-Philippe Caissy <jpcaissy@piji.ca>
4 # Copyright 2014 Alexis Laferrière <alexis.laf@xymus.net>
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17
18 # Low-level wrapper around the libevent library to manage events on file descriptors
19 #
20 # For mor information, refer to the libevent documentation at
21 # http://monkey.org/~provos/libevent/doxygen-2.0.1/
22 module libevent is pkgconfig("libevent")
23
24 in "C header" `{
25 #include <event2/listener.h>
26 #include <event2/bufferevent.h>
27 #include <event2/buffer.h>
28 `}
29
30 in "C" `{
31 #include <sys/stat.h>
32 #include <sys/types.h>
33 #include <fcntl.h>
34 #include <errno.h>
35 #include <string.h>
36
37 #include <sys/socket.h>
38 #include <arpa/inet.h>
39 #include <netinet/in.h>
40 #include <netinet/ip.h>
41
42 // Protect callbacks for compatibility with light FFI
43 #ifdef Connection_decr_ref
44 // Callback forwarded to 'Connection.write_callback'
45 static void c_write_cb(struct bufferevent *bev, Connection ctx) {
46 Connection_write_callback(ctx);
47 }
48
49 // Callback forwarded to 'Connection.read_callback_native'
50 static void c_read_cb(struct bufferevent *bev, Connection ctx)
51 {
52 Connection_read_callback_native(ctx, bev);
53 }
54
55 // Callback forwarded to 'Connection.event_callback'
56 static void c_event_cb(struct bufferevent *bev, short events, Connection ctx)
57 {
58 int release = Connection_event_callback(ctx, events);
59 if (release) Connection_decr_ref(ctx);
60 }
61
62 // Callback forwarded to 'ConnectionFactory.accept_connection'
63 static void accept_connection_cb(struct evconnlistener *listener, evutil_socket_t fd,
64 struct sockaddr *addrin, int socklen, ConnectionFactory ctx)
65 {
66 ConnectionFactory_accept_connection(ctx, listener, fd, addrin, socklen);
67 }
68 #endif
69
70 #ifdef EventCallback_incr_ref
71 // Callback forwarded to 'EventCallback.callback'
72 static void signal_cb(evutil_socket_t fd, short events, void *data)
73 {
74 EventCallback handler = data;
75 EventCallback_callback(handler, events);
76 }
77 #endif
78 `}
79
80 # Structure to hold information and state for a Libevent dispatch loop.
81 #
82 # The event_base lies at the center of Libevent; every application will
83 # have one. It keeps track of all pending and active events, and
84 # notifies your application of the active ones.
85 extern class NativeEventBase `{ struct event_base * `}
86
87 # Create a new event_base to use with the rest of Libevent
88 new `{ return event_base_new(); `}
89
90 # Has `self` been correctly initialized?
91 fun is_valid: Bool do return not address_is_null
92
93 # Reinitialize the event base after a fork
94 #
95 # Some event mechanisms do not survive across fork.
96 # The event base needs to be reinitialized with the `reinit` method.
97 #
98 # Returns `true` if some events could not be re-added.
99 fun reinit: Bool `{ return event_reinit(self); `}
100
101 # Event dispatching loop
102 #
103 # This loop will run the event base until either there are no more added
104 # events, or until something calls `loopexit`.
105 fun dispatch `{ event_base_dispatch(self); `}
106
107 # Exit the event loop
108 #
109 # TODO support timer
110 fun loopexit `{ event_base_loopexit(self, NULL); `}
111
112 redef fun free `{ event_base_free(self); `}
113 end
114
115 # Event, libevent's basic unit of operation
116 extern class NativeEvent `{ struct event * `}
117
118 # Add to the set of pending events
119 #
120 # TODO support timeout
121 fun add `{ event_add(self, NULL); `}
122
123 # Remove from the set of monitored events
124 fun del `{ event_del(self); `}
125
126 redef fun free `{ event_free(self); `}
127 end
128
129 # Signal event
130 extern class NativeEvSignal
131 super NativeEvent
132
133 new (base: NativeEventBase, signal: Int, handler: EventCallback)
134 import EventCallback.callback `{
135 EventCallback_incr_ref(handler);
136 return evsignal_new(base, signal, signal_cb, handler);
137 `}
138 end
139
140 # Receiver of event callbacks
141 interface EventCallback
142
143 # Callback on an event
144 fun callback(events: Int) do end
145 end
146
147 # Spawned to manage a specific connection
148 #
149 # TODO, use polls
150 class Connection
151 super Writer
152
153 # Closing this connection has been requested, but may not yet be `closed`
154 var close_requested = false
155
156 # This connection is closed
157 var closed = false
158
159 # The native libevent linked to `self`
160 var native_buffer_event: NativeBufferEvent
161
162 # Close this connection if possible, otherwise mark it to be closed later
163 redef fun close
164 do
165 if closed then return
166
167 var i = native_buffer_event.input_buffer
168 var o = native_buffer_event.output_buffer
169 if i.length > 0 or o.length > 0 then
170 close_requested = true
171 else
172 force_close
173 end
174 end
175
176 # Force closing this connection and freeing `native_buffer_event`
177 fun force_close
178 do
179 if closed then return
180
181 native_buffer_event.free
182 closed = true
183 end
184
185 # Callback method on a write event
186 fun write_callback
187 do
188 if close_requested then close
189 end
190
191 private fun read_callback_native(bev: NativeBufferEvent)
192 do
193 var evbuffer = bev.input_buffer
194 var len = evbuffer.length
195 var buf = new CString(len)
196 evbuffer.remove(buf, len)
197 var str = buf.to_s_with_length(len)
198 read_callback str
199 end
200
201 # Callback method when data is available to read
202 fun read_callback(content: String)
203 do
204 if close_requested then close
205 end
206
207 # Callback method on events: EOF, user-defined timeout and unrecoverable errors
208 #
209 # Returns `true` if the native handles to `self` can be released.
210 fun event_callback(events: Int): Bool
211 do
212 if events & bev_event_error != 0 or events & bev_event_eof != 0 then
213 if events & bev_event_error != 0 then
214 var sock_err = evutil_socket_error
215 # Ignore some normal errors and print the others for debugging
216 if sock_err == 110 then
217 # Connection timed out (ETIMEDOUT)
218 else if sock_err == 104 then
219 # Connection reset by peer (ECONNRESET)
220 else
221 print_error "libevent error event: {evutil_socket_error_to_string(sock_err)} ({sock_err})"
222 end
223 end
224 force_close
225 return true
226 end
227
228 return false
229 end
230
231 # Write a string to the connection
232 redef fun write(str)
233 do
234 if close_requested then return
235 native_buffer_event.write(str.to_cstring, str.byte_length)
236 end
237
238 redef fun write_byte(byte)
239 do
240 if close_requested then return
241 native_buffer_event.write_byte(byte)
242 end
243
244 redef fun write_bytes_from_cstring(ns, len)
245 do
246 if close_requested then return
247 native_buffer_event.write(ns, len)
248 end
249
250 # Write a file to the connection
251 #
252 # If `not path.file_exists`, the method returns.
253 fun write_file(path: String)
254 do
255 if close_requested then return
256
257 var file = new FileReader.open(path)
258 if file.last_error != null then
259 var error = new IOError("Failed to open file at '{path}'")
260 error.cause = file.last_error
261 self.last_error = error
262 file.close
263 return
264 end
265
266 var stat = file.file_stat
267 if stat == null then
268 last_error = new IOError("Failed to stat file at '{path}'")
269 file.close
270 return
271 end
272
273 var err = native_buffer_event.output_buffer.add_file(file.fd, 0, stat.size)
274 if err then
275 last_error = new IOError("Failed to add file at '{path}'")
276 file.close
277 end
278 end
279 end
280
281 # ---
282 # Error code for event callbacks
283
284 # error encountered while reading
285 fun bev_event_reading: Int `{ return BEV_EVENT_READING; `}
286
287 # error encountered while writing
288 fun bev_event_writing: Int `{ return BEV_EVENT_WRITING; `}
289
290 # eof file reached
291 fun bev_event_eof: Int `{ return BEV_EVENT_EOF; `}
292
293 # unrecoverable error encountered
294 fun bev_event_error: Int `{ return BEV_EVENT_ERROR; `}
295
296 # user-specified timeout reached
297 fun bev_event_timeout: Int `{ return BEV_EVENT_TIMEOUT; `}
298
299 # connect operation finished.
300 fun bev_event_connected: Int `{ return BEV_EVENT_CONNECTED; `}
301
302 # Global error code for the last socket operation on the calling thread
303 #
304 # Not idempotent on all platforms.
305 fun evutil_socket_error: Int `{
306 return EVUTIL_SOCKET_ERROR();
307 `}
308
309 # Convert an error code from `evutil_socket_error` to a string
310 fun evutil_socket_error_to_string(error_code: Int): CString `{
311 return evutil_socket_error_to_string(error_code);
312 `}
313
314 # ---
315 # Options that can be specified when creating a `NativeBufferEvent`
316
317 # Close the underlying file descriptor/bufferevent/whatever when this bufferevent is freed.
318 fun bev_opt_close_on_free: Int `{ return BEV_OPT_CLOSE_ON_FREE; `}
319
320 # If threading is enabled, protect the operations on this bufferevent with a lock.
321 fun bev_opt_threadsafe: Int `{ return BEV_OPT_THREADSAFE; `}
322
323 # Run callbacks deferred in the event loop.
324 fun bev_opt_defer_callbacks: Int `{ return BEV_OPT_DEFER_CALLBACKS; `}
325
326 # If set, callbacks are executed without locks being held on the bufferevent.
327 fun bev_opt_unlock_callbacks: Int `{ return BEV_OPT_UNLOCK_CALLBACKS; `}
328
329 # ---
330 # Options for `NativeBufferEvent::enable`
331
332 # Read operation
333 fun ev_read: Int `{ return EV_READ; `}
334
335 # Write operation
336 fun ev_write: Int `{ return EV_WRITE; `}
337
338 # ---
339
340 # A buffer event structure, strongly associated to a connection, an input buffer and an output_buffer
341 extern class NativeBufferEvent `{ struct bufferevent * `}
342
343 # Socket-based `NativeBufferEvent` that reads and writes data onto a network
344 new socket(base: NativeEventBase, fd, options: Int) `{
345 return bufferevent_socket_new(base, fd, options);
346 `}
347
348 # Enable a bufferevent.
349 fun enable(operation: Int) `{
350 bufferevent_enable(self, operation);
351 `}
352
353 # Set callbacks to `read_callback_native`, `write_callback` and `event_callback` of `conn`
354 fun setcb(conn: Connection) import Connection.read_callback_native,
355 Connection.write_callback, Connection.event_callback, CString `{
356 Connection_incr_ref(conn);
357 bufferevent_setcb(self,
358 (bufferevent_data_cb)c_read_cb,
359 (bufferevent_data_cb)c_write_cb,
360 (bufferevent_event_cb)c_event_cb, conn);
361 `}
362
363 # Write `length` bytes of `line`
364 fun write(line: CString, length: Int): Int `{
365 return bufferevent_write(self, line, length);
366 `}
367
368 # Write the byte `value`
369 fun write_byte(value: Int): Int `{
370 unsigned char byt = (unsigned char)value;
371 return bufferevent_write(self, &byt, 1);
372 `}
373
374 redef fun free `{ bufferevent_free(self); `}
375
376 # The output buffer associated to `self`
377 fun output_buffer: OutputNativeEvBuffer `{ return bufferevent_get_output(self); `}
378
379 # The input buffer associated to `self`
380 fun input_buffer: InputNativeEvBuffer `{ return bufferevent_get_input(self); `}
381
382 # Read data from this buffer
383 fun read_buffer(buf: NativeEvBuffer): Int `{ return bufferevent_read_buffer(self, buf); `}
384
385 # Write data to this buffer
386 fun write_buffer(buf: NativeEvBuffer): Int `{ return bufferevent_write_buffer(self, buf); `}
387 end
388
389 # A single buffer
390 extern class NativeEvBuffer `{ struct evbuffer * `}
391 # Length of data in this buffer
392 fun length: Int `{ return evbuffer_get_length(self); `}
393
394 # Read data from an evbuffer and drain the bytes read
395 fun remove(buffer: CString, len: Int) `{
396 evbuffer_remove(self, buffer, len);
397 `}
398 end
399
400 # An input buffer
401 extern class InputNativeEvBuffer
402 super NativeEvBuffer
403
404 # Empty/clear `length` data from buffer
405 fun drain(length: Int) `{ evbuffer_drain(self, length); `}
406 end
407
408 # An output buffer
409 extern class OutputNativeEvBuffer
410 super NativeEvBuffer
411
412 # Add file to buffer
413 fun add_file(fd, offset, length: Int): Bool `{
414 return evbuffer_add_file(self, fd, offset, length);
415 `}
416 end
417
418 # A listener acting on an interface and port, spawns `Connection` on new connections
419 extern class ConnectionListener `{ struct evconnlistener * `}
420
421 private new bind_to(base: NativeEventBase, address: CString, port: Int, factory: ConnectionFactory)
422 import ConnectionFactory.accept_connection, error_callback `{
423
424 struct sockaddr_in sin;
425 struct evconnlistener *listener;
426 ConnectionFactory_incr_ref(factory);
427
428 struct hostent *hostent = gethostbyname(address);
429
430 if (!hostent) {
431 return NULL;
432 }
433
434 memset(&sin, 0, sizeof(sin));
435 sin.sin_family = hostent->h_addrtype;
436 sin.sin_port = htons(port);
437 memcpy( &(sin.sin_addr.s_addr), (const void*)hostent->h_addr, hostent->h_length );
438
439 listener = evconnlistener_new_bind(base,
440 (evconnlistener_cb)accept_connection_cb, factory,
441 LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE, -1,
442 (struct sockaddr*)&sin, sizeof(sin));
443
444 if (listener != NULL) {
445 evconnlistener_set_error_cb(listener, (evconnlistener_errorcb)ConnectionListener_error_callback);
446 }
447
448 return listener;
449 `}
450
451 # Get the `NativeEventBase` associated to `self`
452 fun base: NativeEventBase `{ return evconnlistener_get_base(self); `}
453
454 # Callback method on listening error
455 fun error_callback do
456 var cstr = evutil_socket_error_to_string(evutil_socket_error)
457 print_error "libevent error: '{cstr}'"
458 end
459 end
460
461 # Factory to listen on sockets and create new `Connection`
462 class ConnectionFactory
463 # The `NativeEventBase` for the dispatch loop of this factory
464 var event_base: NativeEventBase
465
466 # Accept a connection on `listener`
467 #
468 # By default, it creates a new NativeBufferEvent and calls `spawn_connection`.
469 fun accept_connection(listener: ConnectionListener, fd: Int, addrin: Pointer, socklen: Int)
470 do
471 var base = listener.base
472 var bev = new NativeBufferEvent.socket(base, fd, bev_opt_close_on_free)
473
474 # Human representation of remote client address
475 var addr_len = 46 # Longest possible IPv6 address + null byte
476 var addr_buf = new CString(addr_len)
477 addr_buf = addrin_to_address(addrin, addr_buf, addr_len)
478 var addr = if addr_buf.address_is_null then
479 "Unknown address"
480 else addr_buf.to_s
481
482 var conn = spawn_connection(bev, addr)
483 bev.enable ev_read|ev_write
484 bev.setcb conn
485 end
486
487 # Create a new `Connection` object for `buffer_event`
488 fun spawn_connection(buffer_event: NativeBufferEvent, address: String): Connection
489 do
490 return new Connection(buffer_event)
491 end
492
493 # Listen on `address`:`port` for new connection, which will callback `spawn_connection`
494 fun bind_to(address: String, port: Int): nullable ConnectionListener
495 do
496 var listener = new ConnectionListener.bind_to(event_base, address.to_cstring, port, self)
497 if listener.address_is_null then
498 sys.stderr.write "libevent warning: Opening {address}:{port} failed\n"
499 end
500 return listener
501 end
502
503 # Put string representation of source `address` into `buf`
504 private fun addrin_to_address(address: Pointer, buf: CString, buf_len: Int): CString `{
505 struct sockaddr *addrin = (struct sockaddr*)address;
506
507 if (addrin->sa_family == AF_INET) {
508 struct in_addr *src = &((struct sockaddr_in*)addrin)->sin_addr;
509 return (char *)inet_ntop(addrin->sa_family, src, buf, buf_len);
510 }
511 else if (addrin->sa_family == AF_INET6) {
512 struct in6_addr *src = &((struct sockaddr_in6*)addrin)->sin6_addr;
513 return (char *)inet_ntop(addrin->sa_family, src, buf, buf_len);
514 }
515 return NULL;
516 `}
517 end
518
519 # Enable some relatively expensive debugging checks that would normally be turned off
520 fun enable_debug_mode `{ event_enable_debug_mode(); `}
521
522 # Use Windows builtin locking and thread ID functions
523 fun use_windows_threads: Bool `{
524 #ifdef EVTHREAD_USE_WINDOWS_THREADS_IMPLEMENTED
525 return evthread_use_windows_threads();
526 #else
527 return -1;
528 #endif
529 `}
530
531 # Use Pthreads locking and thread ID functions
532 fun use_pthreads: Bool `{
533 #ifdef EVTHREAD_USE_PTHREADS_IMPLEMENTED
534 return evthread_use_pthreads();
535 #else
536 return -1;
537 #endif
538 `}