X-Git-Url: http://nitlanguage.org diff --git a/lib/libevent.nit b/lib/libevent.nit index 9cbe38e..fdd84f8 100644 --- a/lib/libevent.nit +++ b/lib/libevent.nit @@ -22,18 +22,25 @@ module libevent is pkgconfig("libevent") in "C header" `{ - #include - #include - #include - #include - #include - #include #include #include `} in "C" `{ + #include + #include + #include + #include + #include + + #include + #include + #include + #include + +// Protect callbacks for compatibility with light FFI +#ifdef Connection_decr_ref // Callback forwarded to 'Connection.write_callback' static void c_write_cb(struct bufferevent *bev, Connection ctx) { Connection_write_callback(ctx); @@ -54,10 +61,20 @@ in "C" `{ // Callback forwarded to 'ConnectionFactory.accept_connection' static void accept_connection_cb(struct evconnlistener *listener, evutil_socket_t fd, - struct sockaddr *address, int socklen, ConnectionFactory ctx) + struct sockaddr *addrin, int socklen, ConnectionFactory ctx) + { + ConnectionFactory_accept_connection(ctx, listener, fd, addrin, socklen); + } +#endif + +#ifdef EventCallback_incr_ref + // Callback forwarded to 'EventCallback.callback' + static void signal_cb(evutil_socket_t fd, short events, void *data) { - ConnectionFactory_accept_connection(ctx, listener, fd, address, socklen); + EventCallback handler = data; + EventCallback_callback(handler, events); } +#endif `} # Structure to hold information and state for a Libevent dispatch loop. @@ -76,16 +93,47 @@ extern class NativeEventBase `{ struct event_base * `} # Event dispatching loop # # This loop will run the event base until either there are no more added - # events, or until something calls `exit_loop`. + # events, or until something calls `loopexit`. fun dispatch `{ event_base_dispatch(self); `} # Exit the event loop # # TODO support timer - fun exit_loop `{ event_base_loopexit(self, NULL); `} + fun loopexit `{ event_base_loopexit(self, NULL); `} + + redef fun free `{ event_base_free(self); `} +end + +# Event, libevent's basic unit of operation +extern class NativeEvent `{ struct event * `} + + # Add to the set of pending events + # + # TODO support timeout + fun add `{ event_add(self, NULL); `} - # Destroy this instance - fun destroy `{ event_base_free(self); `} + # Remove from the set of monitored events + fun del `{ event_del(self); `} + + redef fun free `{ event_free(self); `} +end + +# Signal event +extern class NativeEvSignal + super NativeEvent + + new (base: NativeEventBase, signal: Int, handler: EventCallback) + import EventCallback.callback `{ + EventCallback_incr_ref(handler); + return evsignal_new(base, signal, signal_cb, handler); + `} +end + +# Receiver of event callbacks +interface EventCallback + + # Callback on an event + fun callback(events: Int) do end end # Spawned to manage a specific connection @@ -107,9 +155,23 @@ class Connection redef fun close do if closed then return - var success = native_buffer_event.destroy - close_requested = true - closed = success + + var i = native_buffer_event.input_buffer + var o = native_buffer_event.output_buffer + if i.length > 0 or o.length > 0 then + close_requested = true + else + force_close + end + end + + # Force closing this connection and freeing `native_buffer_event` + fun force_close + do + if closed then return + + native_buffer_event.free + closed = true end # Callback method on a write event @@ -122,7 +184,7 @@ class Connection do var evbuffer = bev.input_buffer var len = evbuffer.length - var buf = new NativeString(len) + var buf = new CString(len) evbuffer.remove(buf, len) var str = buf.to_s_with_length(len) read_callback str @@ -140,8 +202,18 @@ class Connection fun event_callback(events: Int): Bool do if events & bev_event_error != 0 or events & bev_event_eof != 0 then - if events & bev_event_error != 0 then print_error "Error from bufferevent" - close + if events & bev_event_error != 0 then + var sock_err = evutil_socket_error + # Ignore some normal errors and print the others for debugging + if sock_err == 110 then + # Connection timed out (ETIMEDOUT) + else if sock_err == 104 then + # Connection reset by peer (ECONNRESET) + else + print_error "libevent error event: {evutil_socket_error_to_string(sock_err)} ({sock_err})" + end + end + force_close return true end @@ -152,7 +224,7 @@ class Connection redef fun write(str) do if close_requested then return - native_buffer_event.write(str.to_cstring, str.bytelen) + native_buffer_event.write(str.to_cstring, str.byte_length) end redef fun write_byte(byte) @@ -219,6 +291,18 @@ fun bev_event_timeout: Int `{ return BEV_EVENT_TIMEOUT; `} # connect operation finished. fun bev_event_connected: Int `{ return BEV_EVENT_CONNECTED; `} +# Global error code for the last socket operation on the calling thread +# +# Not idempotent on all platforms. +fun evutil_socket_error: Int `{ + return EVUTIL_SOCKET_ERROR(); +`} + +# Convert an error code from `evutil_socket_error` to a string +fun evutil_socket_error_to_string(error_code: Int): CString `{ + return evutil_socket_error_to_string(error_code); +`} + # --- # Options that can be specified when creating a `NativeBufferEvent` @@ -260,7 +344,7 @@ extern class NativeBufferEvent `{ struct bufferevent * `} # Set callbacks to `read_callback_native`, `write_callback` and `event_callback` of `conn` fun setcb(conn: Connection) import Connection.read_callback_native, - Connection.write_callback, Connection.event_callback, NativeString `{ + Connection.write_callback, Connection.event_callback, CString `{ Connection_incr_ref(conn); bufferevent_setcb(self, (bufferevent_data_cb)c_read_cb, @@ -269,7 +353,7 @@ extern class NativeBufferEvent `{ struct bufferevent * `} `} # Write `length` bytes of `line` - fun write(line: NativeString, length: Int): Int `{ + fun write(line: CString, length: Int): Int `{ return bufferevent_write(self, line, length); `} @@ -279,18 +363,7 @@ extern class NativeBufferEvent `{ struct bufferevent * `} return bufferevent_write(self, &byt, 1); `} - # Check if we have anything left in our buffers. If so, we set our connection to be closed - # on a callback. Otherwise we close it and free it right away. - fun destroy: Bool `{ - struct evbuffer* out = bufferevent_get_output(self); - struct evbuffer* in = bufferevent_get_input(self); - if(evbuffer_get_length(in) > 0 || evbuffer_get_length(out) > 0) { - return 0; - } else { - bufferevent_free(self); - return 1; - } - `} + redef fun free `{ bufferevent_free(self); `} # The output buffer associated to `self` fun output_buffer: OutputNativeEvBuffer `{ return bufferevent_get_output(self); `} @@ -311,7 +384,7 @@ extern class NativeEvBuffer `{ struct evbuffer * `} fun length: Int `{ return evbuffer_get_length(self); `} # Read data from an evbuffer and drain the bytes read - fun remove(buffer: NativeString, len: Int) `{ + fun remove(buffer: CString, len: Int) `{ evbuffer_remove(self, buffer, len); `} end @@ -337,7 +410,7 @@ end # A listener acting on an interface and port, spawns `Connection` on new connections extern class ConnectionListener `{ struct evconnlistener * `} - private new bind_to(base: NativeEventBase, address: NativeString, port: Int, factory: ConnectionFactory) + private new bind_to(base: NativeEventBase, address: CString, port: Int, factory: ConnectionFactory) import ConnectionFactory.accept_connection, error_callback `{ struct sockaddr_in sin; @@ -346,6 +419,10 @@ extern class ConnectionListener `{ struct evconnlistener * `} struct hostent *hostent = gethostbyname(address); + if (!hostent) { + return NULL; + } + memset(&sin, 0, sizeof(sin)); sin.sin_family = hostent->h_addrtype; sin.sin_port = htons(port); @@ -368,16 +445,9 @@ extern class ConnectionListener `{ struct evconnlistener * `} # Callback method on listening error fun error_callback do - var cstr = socket_error - sys.stderr.write "libevent error: '{cstr}'" + var cstr = evutil_socket_error_to_string(evutil_socket_error) + print_error "libevent error: '{cstr}'" end - - # Error with sockets - fun socket_error: NativeString `{ - // TODO move to Nit and maybe NativeEventBase - int err = EVUTIL_SOCKET_ERROR(); - return evutil_socket_error_to_string(err); - `} end # Factory to listen on sockets and create new `Connection` @@ -388,17 +458,26 @@ class ConnectionFactory # Accept a connection on `listener` # # By default, it creates a new NativeBufferEvent and calls `spawn_connection`. - fun accept_connection(listener: ConnectionListener, fd: Int, address: Pointer, socklen: Int) + fun accept_connection(listener: ConnectionListener, fd: Int, addrin: Pointer, socklen: Int) do var base = listener.base var bev = new NativeBufferEvent.socket(base, fd, bev_opt_close_on_free) - var conn = spawn_connection(bev) + + # Human representation of remote client address + var addr_len = 46 # Longest possible IPv6 address + null byte + var addr_buf = new CString(addr_len) + addr_buf = addrin_to_address(addrin, addr_buf, addr_len) + var addr = if addr_buf.address_is_null then + "Unknown address" + else addr_buf.to_s + + var conn = spawn_connection(bev, addr) bev.enable ev_read|ev_write bev.setcb conn end # Create a new `Connection` object for `buffer_event` - fun spawn_connection(buffer_event: NativeBufferEvent): Connection + fun spawn_connection(buffer_event: NativeBufferEvent, address: String): Connection do return new Connection(buffer_event) end @@ -412,4 +491,22 @@ class ConnectionFactory end return listener end + + # Put string representation of source `address` into `buf` + private fun addrin_to_address(address: Pointer, buf: CString, buf_len: Int): CString `{ + struct sockaddr *addrin = (struct sockaddr*)address; + + if (addrin->sa_family == AF_INET) { + struct in_addr *src = &((struct sockaddr_in*)addrin)->sin_addr; + return (char *)inet_ntop(addrin->sa_family, src, buf, buf_len); + } + else if (addrin->sa_family == AF_INET6) { + struct in6_addr *src = &((struct sockaddr_in6*)addrin)->sin6_addr; + return (char *)inet_ntop(addrin->sa_family, src, buf, buf_len); + } + return NULL; + `} end + +# Enable some relatively expensive debugging checks that would normally be turned off +fun enable_debug_mode `{ event_enable_debug_mode(); `}