X-Git-Url: http://nitlanguage.org diff --git a/lib/libevent.nit b/lib/libevent.nit index ff9a9f5..2dfb046 100644 --- a/lib/libevent.nit +++ b/lib/libevent.nit @@ -22,18 +22,25 @@ module libevent is pkgconfig("libevent") in "C header" `{ - #include - #include - #include - #include - #include - #include #include #include `} in "C" `{ + #include + #include + #include + #include + #include + + #include + #include + #include + #include + +// Protect callbacks for compatibility with light FFI +#ifdef Connection_decr_ref // Callback forwarded to 'Connection.write_callback' static void c_write_cb(struct bufferevent *bev, Connection ctx) { Connection_write_callback(ctx); @@ -42,45 +49,32 @@ in "C" `{ // Callback forwarded to 'Connection.read_callback_native' static void c_read_cb(struct bufferevent *bev, Connection ctx) { - // TODO move to Nit code - struct evbuffer *input = bufferevent_get_input(bev); - size_t len = evbuffer_get_length(input); - char* cstr = malloc(len); - evbuffer_remove(input, cstr, len); - Connection_read_callback_native(ctx, cstr, len); + Connection_read_callback_native(ctx, bev); } // Callback forwarded to 'Connection.event_callback' static void c_event_cb(struct bufferevent *bev, short events, Connection ctx) { - Connection_event_callback(ctx, events); - - // TODO move to Nit code - if (events & BEV_EVENT_ERROR) - perror("Error from bufferevent"); - if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) { - bufferevent_free(bev); - Connection_decr_ref(ctx); - } + int release = Connection_event_callback(ctx, events); + if (release) Connection_decr_ref(ctx); } - // Callback fowarded to 'ConnectionFactory.spawn_connection' - static void accept_conn_cb(struct evconnlistener *listener, evutil_socket_t fd, - struct sockaddr *address, int socklen, ConnectionFactory ctx) + // Callback forwarded to 'ConnectionFactory.accept_connection' + static void accept_connection_cb(struct evconnlistener *listener, evutil_socket_t fd, + struct sockaddr *addrin, int socklen, ConnectionFactory ctx) { - // TODO move to Nit code - struct event_base *base = evconnlistener_get_base(listener); - struct bufferevent *bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE); - - Connection nit_con = ConnectionFactory_spawn_connection(ctx, bev); - Connection_incr_ref(nit_con); + ConnectionFactory_accept_connection(ctx, listener, fd, addrin, socklen); + } +#endif - bufferevent_setcb(bev, - (bufferevent_data_cb)c_read_cb, - (bufferevent_data_cb)c_write_cb, - (bufferevent_event_cb)c_event_cb, nit_con); - bufferevent_enable(bev, EV_READ|EV_WRITE); +#ifdef EventCallback_incr_ref + // Callback forwarded to 'EventCallback.callback' + static void signal_cb(evutil_socket_t fd, short events, void *data) + { + EventCallback handler = data; + EventCallback_callback(handler, events); } +#endif `} # Structure to hold information and state for a Libevent dispatch loop. @@ -96,25 +90,66 @@ extern class NativeEventBase `{ struct event_base * `} # Has `self` been correctly initialized? fun is_valid: Bool do return not address_is_null + # Reinitialize the event base after a fork + # + # Some event mechanisms do not survive across fork. + # The event base needs to be reinitialized with the `reinit` method. + # + # Returns `true` if some events could not be re-added. + fun reinit: Bool `{ return event_reinit(self); `} + # Event dispatching loop # # This loop will run the event base until either there are no more added - # events, or until something calls `exit_loop`. - fun dispatch `{ event_base_dispatch(recv); `} + # events, or until something calls `loopexit`. + fun dispatch `{ event_base_dispatch(self); `} # Exit the event loop # # TODO support timer - fun exit_loop `{ event_base_loopexit(recv, NULL); `} + fun loopexit `{ event_base_loopexit(self, NULL); `} + + redef fun free `{ event_base_free(self); `} +end + +# Event, libevent's basic unit of operation +extern class NativeEvent `{ struct event * `} + + # Add to the set of pending events + # + # TODO support timeout + fun add `{ event_add(self, NULL); `} + + # Remove from the set of monitored events + fun del `{ event_del(self); `} - # Destroy this instance - fun destroy `{ event_base_free(recv); `} + redef fun free `{ event_free(self); `} +end + +# Signal event +extern class NativeEvSignal + super NativeEvent + + new (base: NativeEventBase, signal: Int, handler: EventCallback) + import EventCallback.callback `{ + EventCallback_incr_ref(handler); + return evsignal_new(base, signal, signal_cb, handler); + `} +end + +# Receiver of event callbacks +interface EventCallback + + # Callback on an event + fun callback(events: Int) do end end # Spawned to manage a specific connection # # TODO, use polls class Connection + super Writer + # Closing this connection has been requested, but may not yet be `closed` var close_requested = false @@ -125,86 +160,241 @@ class Connection var native_buffer_event: NativeBufferEvent # Close this connection if possible, otherwise mark it to be closed later - fun close + redef fun close do - var success = native_buffer_event.destroy - close_requested = true - closed = success + if closed then return + + var i = native_buffer_event.input_buffer + var o = native_buffer_event.output_buffer + if i.length > 0 or o.length > 0 then + close_requested = true + else + force_close + end + end + + # Force closing this connection and freeing `native_buffer_event` + fun force_close + do + if closed then return + + native_buffer_event.free + closed = true end # Callback method on a write event fun write_callback do - if close_requested and not closed then close + if close_requested then close end - private fun read_callback_native(cstr: NativeString, len: Int) + private fun read_callback_native(bev: NativeBufferEvent) do - read_callback(cstr.to_s_with_length(len)) + var evbuffer = bev.input_buffer + var len = evbuffer.length + var buf = new CString(len) + evbuffer.remove(buf, len) + var str = buf.to_s_with_length(len) + read_callback str end # Callback method when data is available to read fun read_callback(content: String) do - if close_requested and not closed then close + if close_requested then close end - # Callback method on events - fun event_callback(events: Int) do end + # Callback method on events: EOF, user-defined timeout and unrecoverable errors + # + # Returns `true` if the native handles to `self` can be released. + fun event_callback(events: Int): Bool + do + if events & bev_event_error != 0 or events & bev_event_eof != 0 then + if events & bev_event_error != 0 then + var sock_err = evutil_socket_error + # Ignore some normal errors and print the others for debugging + if sock_err == 110 then + # Connection timed out (ETIMEDOUT) + else if sock_err == 104 then + # Connection reset by peer (ECONNRESET) + else + print_error "libevent error event: {evutil_socket_error_to_string(sock_err)} ({sock_err})" + end + end + force_close + return true + end + + return false + end # Write a string to the connection - fun write(str: String) + redef fun write(str) + do + if close_requested then return + native_buffer_event.write(str.to_cstring, str.byte_length) + end + + redef fun write_byte(byte) do - native_buffer_event.write(str.to_cstring, str.length) + if close_requested then return + native_buffer_event.write_byte(byte) + end + + redef fun write_bytes(bytes) + do + if close_requested then return + native_buffer_event.write(bytes.items, bytes.length) end # Write a file to the connection # - # require: `path.file_exists` + # If `not path.file_exists`, the method returns. fun write_file(path: String) do - assert path.file_exists + if close_requested then return var file = new FileReader.open(path) - var output = native_buffer_event.output_buffer - var fd = file.fd - var length = file.file_stat.size + if file.last_error != null then + var error = new IOError("Failed to open file at '{path}'") + error.cause = file.last_error + self.last_error = error + file.close + return + end + + var stat = file.file_stat + if stat == null then + last_error = new IOError("Failed to stat file at '{path}'") + file.close + return + end - output.add_file(fd, 0, length) + var err = native_buffer_event.output_buffer.add_file(file.fd, 0, stat.size) + if err then + last_error = new IOError("Failed to add file at '{path}'") + file.close + end end end +# --- +# Error code for event callbacks + +# error encountered while reading +fun bev_event_reading: Int `{ return BEV_EVENT_READING; `} + +# error encountered while writing +fun bev_event_writing: Int `{ return BEV_EVENT_WRITING; `} + +# eof file reached +fun bev_event_eof: Int `{ return BEV_EVENT_EOF; `} + +# unrecoverable error encountered +fun bev_event_error: Int `{ return BEV_EVENT_ERROR; `} + +# user-specified timeout reached +fun bev_event_timeout: Int `{ return BEV_EVENT_TIMEOUT; `} + +# connect operation finished. +fun bev_event_connected: Int `{ return BEV_EVENT_CONNECTED; `} + +# Global error code for the last socket operation on the calling thread +# +# Not idempotent on all platforms. +fun evutil_socket_error: Int `{ + return EVUTIL_SOCKET_ERROR(); +`} + +# Convert an error code from `evutil_socket_error` to a string +fun evutil_socket_error_to_string(error_code: Int): CString `{ + return evutil_socket_error_to_string(error_code); +`} + +# --- +# Options that can be specified when creating a `NativeBufferEvent` + +# Close the underlying file descriptor/bufferevent/whatever when this bufferevent is freed. +fun bev_opt_close_on_free: Int `{ return BEV_OPT_CLOSE_ON_FREE; `} + +# If threading is enabled, protect the operations on this bufferevent with a lock. +fun bev_opt_threadsafe: Int `{ return BEV_OPT_THREADSAFE; `} + +# Run callbacks deferred in the event loop. +fun bev_opt_defer_callbacks: Int `{ return BEV_OPT_DEFER_CALLBACKS; `} + +# If set, callbacks are executed without locks being held on the bufferevent. +fun bev_opt_unlock_callbacks: Int `{ return BEV_OPT_UNLOCK_CALLBACKS; `} + +# --- +# Options for `NativeBufferEvent::enable` + +# Read operation +fun ev_read: Int `{ return EV_READ; `} + +# Write operation +fun ev_write: Int `{ return EV_WRITE; `} + +# --- + # A buffer event structure, strongly associated to a connection, an input buffer and an output_buffer extern class NativeBufferEvent `{ struct bufferevent * `} + + # Socket-based `NativeBufferEvent` that reads and writes data onto a network + new socket(base: NativeEventBase, fd, options: Int) `{ + return bufferevent_socket_new(base, fd, options); + `} + + # Enable a bufferevent. + fun enable(operation: Int) `{ + bufferevent_enable(self, operation); + `} + + # Set callbacks to `read_callback_native`, `write_callback` and `event_callback` of `conn` + fun setcb(conn: Connection) import Connection.read_callback_native, + Connection.write_callback, Connection.event_callback, CString `{ + Connection_incr_ref(conn); + bufferevent_setcb(self, + (bufferevent_data_cb)c_read_cb, + (bufferevent_data_cb)c_write_cb, + (bufferevent_event_cb)c_event_cb, conn); + `} + # Write `length` bytes of `line` - fun write(line: NativeString, length: Int): Int `{ - return bufferevent_write(recv, line, length); + fun write(line: CString, length: Int): Int `{ + return bufferevent_write(self, line, length); `} - # Check if we have anything left in our buffers. If so, we set our connection to be closed - # on a callback. Otherwise we close it and free it right away. - fun destroy: Bool `{ - struct evbuffer* out = bufferevent_get_output(recv); - struct evbuffer* in = bufferevent_get_input(recv); - if(evbuffer_get_length(in) > 0 || evbuffer_get_length(out) > 0) { - return 0; - } else { - bufferevent_free(recv); - return 1; - } + # Write the byte `value` + fun write_byte(value: Byte): Int `{ + unsigned char byt = (unsigned char)value; + return bufferevent_write(self, &byt, 1); `} + redef fun free `{ bufferevent_free(self); `} + # The output buffer associated to `self` - fun output_buffer: OutputNativeEvBuffer `{ return bufferevent_get_output(recv); `} + fun output_buffer: OutputNativeEvBuffer `{ return bufferevent_get_output(self); `} # The input buffer associated to `self` - fun input_buffer: InputNativeEvBuffer `{ return bufferevent_get_input(recv); `} + fun input_buffer: InputNativeEvBuffer `{ return bufferevent_get_input(self); `} + + # Read data from this buffer + fun read_buffer(buf: NativeEvBuffer): Int `{ return bufferevent_read_buffer(self, buf); `} + + # Write data to this buffer + fun write_buffer(buf: NativeEvBuffer): Int `{ return bufferevent_write_buffer(self, buf); `} end # A single buffer extern class NativeEvBuffer `{ struct evbuffer * `} # Length of data in this buffer - fun length: Int `{ return evbuffer_get_length(recv); `} + fun length: Int `{ return evbuffer_get_length(self); `} + + # Read data from an evbuffer and drain the bytes read + fun remove(buffer: CString, len: Int) `{ + evbuffer_remove(self, buffer, len); + `} end # An input buffer @@ -212,7 +402,7 @@ extern class InputNativeEvBuffer super NativeEvBuffer # Empty/clear `length` data from buffer - fun drain(length: Int) `{ evbuffer_drain(recv, length); `} + fun drain(length: Int) `{ evbuffer_drain(self, length); `} end # An output buffer @@ -221,16 +411,15 @@ extern class OutputNativeEvBuffer # Add file to buffer fun add_file(fd, offset, length: Int): Bool `{ - return evbuffer_add_file(recv, fd, offset, length); + return evbuffer_add_file(self, fd, offset, length); `} end # A listener acting on an interface and port, spawns `Connection` on new connections extern class ConnectionListener `{ struct evconnlistener * `} - private new bind_to(base: NativeEventBase, address: NativeString, port: Int, factory: ConnectionFactory) - import ConnectionFactory.spawn_connection, error_callback, Connection.read_callback_native, - Connection.write_callback, Connection.event_callback `{ + private new bind_to(base: NativeEventBase, address: CString, port: Int, factory: ConnectionFactory) + import ConnectionFactory.accept_connection, error_callback `{ struct sockaddr_in sin; struct evconnlistener *listener; @@ -238,13 +427,17 @@ extern class ConnectionListener `{ struct evconnlistener * `} struct hostent *hostent = gethostbyname(address); + if (!hostent) { + return NULL; + } + memset(&sin, 0, sizeof(sin)); sin.sin_family = hostent->h_addrtype; sin.sin_port = htons(port); memcpy( &(sin.sin_addr.s_addr), (const void*)hostent->h_addr, hostent->h_length ); listener = evconnlistener_new_bind(base, - (evconnlistener_cb)accept_conn_cb, factory, + (evconnlistener_cb)accept_connection_cb, factory, LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE, -1, (struct sockaddr*)&sin, sizeof(sin)); @@ -256,20 +449,13 @@ extern class ConnectionListener `{ struct evconnlistener * `} `} # Get the `NativeEventBase` associated to `self` - fun base: NativeEventBase `{ return evconnlistener_get_base(recv); `} + fun base: NativeEventBase `{ return evconnlistener_get_base(self); `} # Callback method on listening error fun error_callback do - var cstr = socket_error - sys.stderr.write "libevent error: '{cstr}'" + var cstr = evutil_socket_error_to_string(evutil_socket_error) + print_error "libevent error: '{cstr}'" end - - # Error with sockets - fun socket_error: NativeString `{ - // TODO move to Nit and maybe NativeEventBase - int err = EVUTIL_SOCKET_ERROR(); - return evutil_socket_error_to_string(err); - `} end # Factory to listen on sockets and create new `Connection` @@ -277,10 +463,31 @@ class ConnectionFactory # The `NativeEventBase` for the dispatch loop of this factory var event_base: NativeEventBase - # On new connection, create the handler `Connection` object - fun spawn_connection(nat_buf_ev: NativeBufferEvent): Connection + # Accept a connection on `listener` + # + # By default, it creates a new NativeBufferEvent and calls `spawn_connection`. + fun accept_connection(listener: ConnectionListener, fd: Int, addrin: Pointer, socklen: Int) do - return new Connection(nat_buf_ev) + var base = listener.base + var bev = new NativeBufferEvent.socket(base, fd, bev_opt_close_on_free) + + # Human representation of remote client address + var addr_len = 46 # Longest possible IPv6 address + null byte + var addr_buf = new CString(addr_len) + addr_buf = addrin_to_address(addrin, addr_buf, addr_len) + var addr = if addr_buf.address_is_null then + "Unknown address" + else addr_buf.to_s + + var conn = spawn_connection(bev, addr) + bev.enable ev_read|ev_write + bev.setcb conn + end + + # Create a new `Connection` object for `buffer_event` + fun spawn_connection(buffer_event: NativeBufferEvent, address: String): Connection + do + return new Connection(buffer_event) end # Listen on `address`:`port` for new connection, which will callback `spawn_connection` @@ -292,4 +499,40 @@ class ConnectionFactory end return listener end + + # Put string representation of source `address` into `buf` + private fun addrin_to_address(address: Pointer, buf: CString, buf_len: Int): CString `{ + struct sockaddr *addrin = (struct sockaddr*)address; + + if (addrin->sa_family == AF_INET) { + struct in_addr *src = &((struct sockaddr_in*)addrin)->sin_addr; + return (char *)inet_ntop(addrin->sa_family, src, buf, buf_len); + } + else if (addrin->sa_family == AF_INET6) { + struct in6_addr *src = &((struct sockaddr_in6*)addrin)->sin6_addr; + return (char *)inet_ntop(addrin->sa_family, src, buf, buf_len); + } + return NULL; + `} end + +# Enable some relatively expensive debugging checks that would normally be turned off +fun enable_debug_mode `{ event_enable_debug_mode(); `} + +# Use Windows builtin locking and thread ID functions +fun use_windows_threads: Bool `{ +#ifdef EVTHREAD_USE_WINDOWS_THREADS_IMPLEMENTED + return evthread_use_windows_threads(); +#else + return -1; +#endif +`} + +# Use Pthreads locking and thread ID functions +fun use_pthreads: Bool `{ +#ifdef EVTHREAD_USE_PTHREADS_IMPLEMENTED + return evthread_use_pthreads(); +#else + return -1; +#endif +`}