X-Git-Url: http://nitlanguage.org diff --git a/lib/libevent.nit b/lib/libevent.nit index ddae7fe..c01d417 100644 --- a/lib/libevent.nit +++ b/lib/libevent.nit @@ -42,44 +42,21 @@ in "C" `{ // Callback forwarded to 'Connection.read_callback_native' static void c_read_cb(struct bufferevent *bev, Connection ctx) { - // TODO move to Nit code - struct evbuffer *input = bufferevent_get_input(bev); - size_t len = evbuffer_get_length(input); - char* cstr = malloc(len); - evbuffer_remove(input, cstr, len); - Connection_read_callback_native(ctx, cstr, len); + Connection_read_callback_native(ctx, bev); } // Callback forwarded to 'Connection.event_callback' static void c_event_cb(struct bufferevent *bev, short events, Connection ctx) { - Connection_event_callback(ctx, events); - - // TODO move to Nit code - if (events & BEV_EVENT_ERROR) - perror("Error from bufferevent"); - if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) { - bufferevent_free(bev); - Connection_decr_ref(ctx); - } + int release = Connection_event_callback(ctx, events); + if (release) Connection_decr_ref(ctx); } - // Callback fowarded to 'ConnectionFactory.spawn_connection' - static void accept_conn_cb(struct evconnlistener *listener, evutil_socket_t fd, + // Callback forwarded to 'ConnectionFactory.accept_connection' + static void accept_connection_cb(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *address, int socklen, ConnectionFactory ctx) { - // TODO move to Nit code - struct event_base *base = evconnlistener_get_base(listener); - struct bufferevent *bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE); - - Connection nit_con = ConnectionFactory_spawn_connection(ctx, bev); - Connection_incr_ref(nit_con); - - bufferevent_setcb(bev, - (bufferevent_data_cb)c_read_cb, - (bufferevent_data_cb)c_write_cb, - (bufferevent_event_cb)c_event_cb, nit_con); - bufferevent_enable(bev, EV_READ|EV_WRITE); + ConnectionFactory_accept_connection(ctx, listener, fd, address, socklen); } `} @@ -141,9 +118,14 @@ class Connection if close_requested then close end - private fun read_callback_native(cstr: NativeString, len: Int) + private fun read_callback_native(bev: NativeBufferEvent) do - read_callback(cstr.to_s_with_length(len)) + var evbuffer = bev.input_buffer + var len = evbuffer.length + var buf = new NativeString(len) + evbuffer.remove(buf, len) + var str = buf.to_s_with_length(len) + read_callback str end # Callback method when data is available to read @@ -152,8 +134,19 @@ class Connection if close_requested then close end - # Callback method on events - fun event_callback(events: Int) do end + # Callback method on events: EOF, user-defined timeout and unrecoverable errors + # + # Returns `true` if the native handles to `self` can be released. + fun event_callback(events: Int): Bool + do + if events & bev_event_error != 0 or events & bev_event_eof != 0 then + if events & bev_event_error != 0 then print_error "Error from bufferevent" + close + return true + end + + return false + end # Write a string to the connection redef fun write(str) @@ -205,8 +198,76 @@ class Connection end end +# --- +# Error code for event callbacks + +# error encountered while reading +fun bev_event_reading: Int `{ return BEV_EVENT_READING; `} + +# error encountered while writing +fun bev_event_writing: Int `{ return BEV_EVENT_WRITING; `} + +# eof file reached +fun bev_event_eof: Int `{ return BEV_EVENT_EOF; `} + +# unrecoverable error encountered +fun bev_event_error: Int `{ return BEV_EVENT_ERROR; `} + +# user-specified timeout reached +fun bev_event_timeout: Int `{ return BEV_EVENT_TIMEOUT; `} + +# connect operation finished. +fun bev_event_connected: Int `{ return BEV_EVENT_CONNECTED; `} + +# --- +# Options that can be specified when creating a `NativeBufferEvent` + +# Close the underlying file descriptor/bufferevent/whatever when this bufferevent is freed. +fun bev_opt_close_on_free: Int `{ return BEV_OPT_CLOSE_ON_FREE; `} + +# If threading is enabled, protect the operations on this bufferevent with a lock. +fun bev_opt_threadsafe: Int `{ return BEV_OPT_THREADSAFE; `} + +# Run callbacks deferred in the event loop. +fun bev_opt_defer_callbacks: Int `{ return BEV_OPT_DEFER_CALLBACKS; `} + +# If set, callbacks are executed without locks being held on the bufferevent. +fun bev_opt_unlock_callbacks: Int `{ return BEV_OPT_UNLOCK_CALLBACKS; `} + +# --- +# Options for `NativeBufferEvent::enable` + +# Read operation +fun ev_read: Int `{ return EV_READ; `} + +# Write operation +fun ev_write: Int `{ return EV_WRITE; `} + +# --- + # A buffer event structure, strongly associated to a connection, an input buffer and an output_buffer extern class NativeBufferEvent `{ struct bufferevent * `} + + # Socket-based `NativeBufferEvent` that reads and writes data onto a network + new socket(base: NativeEventBase, fd, options: Int) `{ + return bufferevent_socket_new(base, fd, options); + `} + + # Enable a bufferevent. + fun enable(operation: Int) `{ + bufferevent_enable(self, operation); + `} + + # Set callbacks to `read_callback_native`, `write_callback` and `event_callback` of `conn` + fun setcb(conn: Connection) import Connection.read_callback_native, + Connection.write_callback, Connection.event_callback, NativeString `{ + Connection_incr_ref(conn); + bufferevent_setcb(self, + (bufferevent_data_cb)c_read_cb, + (bufferevent_data_cb)c_write_cb, + (bufferevent_event_cb)c_event_cb, conn); + `} + # Write `length` bytes of `line` fun write(line: NativeString, length: Int): Int `{ return bufferevent_write(self, line, length); @@ -248,6 +309,11 @@ end extern class NativeEvBuffer `{ struct evbuffer * `} # Length of data in this buffer fun length: Int `{ return evbuffer_get_length(self); `} + + # Read data from an evbuffer and drain the bytes read + fun remove(buffer: NativeString, len: Int) `{ + evbuffer_remove(self, buffer, len); + `} end # An input buffer @@ -272,8 +338,7 @@ end extern class ConnectionListener `{ struct evconnlistener * `} private new bind_to(base: NativeEventBase, address: NativeString, port: Int, factory: ConnectionFactory) - import ConnectionFactory.spawn_connection, error_callback, Connection.read_callback_native, - Connection.write_callback, Connection.event_callback `{ + import ConnectionFactory.accept_connection, error_callback `{ struct sockaddr_in sin; struct evconnlistener *listener; @@ -281,13 +346,17 @@ extern class ConnectionListener `{ struct evconnlistener * `} struct hostent *hostent = gethostbyname(address); + if (!hostent) { + return NULL; + } + memset(&sin, 0, sizeof(sin)); sin.sin_family = hostent->h_addrtype; sin.sin_port = htons(port); memcpy( &(sin.sin_addr.s_addr), (const void*)hostent->h_addr, hostent->h_length ); listener = evconnlistener_new_bind(base, - (evconnlistener_cb)accept_conn_cb, factory, + (evconnlistener_cb)accept_connection_cb, factory, LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE, -1, (struct sockaddr*)&sin, sizeof(sin)); @@ -320,10 +389,22 @@ class ConnectionFactory # The `NativeEventBase` for the dispatch loop of this factory var event_base: NativeEventBase - # On new connection, create the handler `Connection` object - fun spawn_connection(nat_buf_ev: NativeBufferEvent): Connection + # Accept a connection on `listener` + # + # By default, it creates a new NativeBufferEvent and calls `spawn_connection`. + fun accept_connection(listener: ConnectionListener, fd: Int, address: Pointer, socklen: Int) + do + var base = listener.base + var bev = new NativeBufferEvent.socket(base, fd, bev_opt_close_on_free) + var conn = spawn_connection(bev) + bev.enable ev_read|ev_write + bev.setcb conn + end + + # Create a new `Connection` object for `buffer_event` + fun spawn_connection(buffer_event: NativeBufferEvent): Connection do - return new Connection(nat_buf_ev) + return new Connection(buffer_event) end # Listen on `address`:`port` for new connection, which will callback `spawn_connection`