module libevent is pkgconfig("libevent")
in "C header" `{
- #include <sys/stat.h>
- #include <sys/types.h>
- #include <fcntl.h>
- #include <errno.h>
- #include <sys/socket.h>
-
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
`}
in "C" `{
+ #include <sys/stat.h>
+ #include <sys/types.h>
+ #include <fcntl.h>
+ #include <errno.h>
+ #include <string.h>
+
+ #include <sys/socket.h>
+ #include <arpa/inet.h>
+ #include <netinet/in.h>
+ #include <netinet/ip.h>
+
+// Protect callbacks for compatibility with light FFI
+#ifdef Connection_decr_ref
// Callback forwarded to 'Connection.write_callback'
static void c_write_cb(struct bufferevent *bev, Connection ctx) {
Connection_write_callback(ctx);
// Callback forwarded to 'ConnectionFactory.accept_connection'
static void accept_connection_cb(struct evconnlistener *listener, evutil_socket_t fd,
- struct sockaddr *address, int socklen, ConnectionFactory ctx)
+ struct sockaddr *addrin, int socklen, ConnectionFactory ctx)
+ {
+ ConnectionFactory_accept_connection(ctx, listener, fd, addrin, socklen);
+ }
+#endif
+
+#ifdef EventCallback_incr_ref
+ // Callback forwarded to 'EventCallback.callback'
+ static void signal_cb(evutil_socket_t fd, short events, void *data)
{
- ConnectionFactory_accept_connection(ctx, listener, fd, address, socklen);
+ EventCallback handler = data;
+ EventCallback_callback(handler, events);
}
+#endif
`}
# Structure to hold information and state for a Libevent dispatch loop.
# Has `self` been correctly initialized?
fun is_valid: Bool do return not address_is_null
+ # Reinitialize the event base after a fork
+ #
+ # Some event mechanisms do not survive across fork.
+ # The event base needs to be reinitialized with the `reinit` method.
+ #
+ # Returns `true` if some events could not be re-added.
+ fun reinit: Bool `{ return event_reinit(self); `}
+
# Event dispatching loop
#
# This loop will run the event base until either there are no more added
- # events, or until something calls `exit_loop`.
+ # events, or until something calls `loopexit`.
fun dispatch `{ event_base_dispatch(self); `}
# Exit the event loop
#
# TODO support timer
- fun exit_loop `{ event_base_loopexit(self, NULL); `}
+ fun loopexit `{ event_base_loopexit(self, NULL); `}
+
+ redef fun free `{ event_base_free(self); `}
+end
+
+# Event, libevent's basic unit of operation
+extern class NativeEvent `{ struct event * `}
+
+ # Add to the set of pending events
+ #
+ # TODO support timeout
+ fun add `{ event_add(self, NULL); `}
+
+ # Remove from the set of monitored events
+ fun del `{ event_del(self); `}
+
+ redef fun free `{ event_free(self); `}
+end
+
+# Signal event
+extern class NativeEvSignal
+ super NativeEvent
+
+ new (base: NativeEventBase, signal: Int, handler: EventCallback)
+ import EventCallback.callback `{
+ EventCallback_incr_ref(handler);
+ return evsignal_new(base, signal, signal_cb, handler);
+ `}
+end
+
+# Receiver of event callbacks
+interface EventCallback
- # Destroy this instance
- fun destroy `{ event_base_free(self); `}
+ # Callback on an event
+ fun callback(events: Int) do end
end
# Spawned to manage a specific connection
redef fun close
do
if closed then return
- var success = native_buffer_event.destroy
- close_requested = true
- closed = success
+
+ var i = native_buffer_event.input_buffer
+ var o = native_buffer_event.output_buffer
+ if i.length > 0 or o.length > 0 then
+ close_requested = true
+ else
+ force_close
+ end
+ end
+
+ # Force closing this connection and freeing `native_buffer_event`
+ fun force_close
+ do
+ if closed then return
+
+ native_buffer_event.free
+ closed = true
end
# Callback method on a write event
do
var evbuffer = bev.input_buffer
var len = evbuffer.length
- var buf = new NativeString(len)
+ var buf = new CString(len)
evbuffer.remove(buf, len)
var str = buf.to_s_with_length(len)
read_callback str
fun event_callback(events: Int): Bool
do
if events & bev_event_error != 0 or events & bev_event_eof != 0 then
- if events & bev_event_error != 0 then print_error "Error from bufferevent"
- close
+ if events & bev_event_error != 0 then
+ var sock_err = evutil_socket_error
+ # Ignore some normal errors and print the others for debugging
+ if sock_err == 110 then
+ # Connection timed out (ETIMEDOUT)
+ else if sock_err == 104 then
+ # Connection reset by peer (ECONNRESET)
+ else
+ print_error "libevent error event: {evutil_socket_error_to_string(sock_err)} ({sock_err})"
+ end
+ end
+ force_close
return true
end
redef fun write(str)
do
if close_requested then return
- native_buffer_event.write(str.to_cstring, str.bytelen)
+ native_buffer_event.write(str.to_cstring, str.byte_length)
end
redef fun write_byte(byte)
# connect operation finished.
fun bev_event_connected: Int `{ return BEV_EVENT_CONNECTED; `}
+# Global error code for the last socket operation on the calling thread
+#
+# Not idempotent on all platforms.
+fun evutil_socket_error: Int `{
+ return EVUTIL_SOCKET_ERROR();
+`}
+
+# Convert an error code from `evutil_socket_error` to a string
+fun evutil_socket_error_to_string(error_code: Int): CString `{
+ return evutil_socket_error_to_string(error_code);
+`}
+
# ---
# Options that can be specified when creating a `NativeBufferEvent`
# Set callbacks to `read_callback_native`, `write_callback` and `event_callback` of `conn`
fun setcb(conn: Connection) import Connection.read_callback_native,
- Connection.write_callback, Connection.event_callback, NativeString `{
+ Connection.write_callback, Connection.event_callback, CString `{
Connection_incr_ref(conn);
bufferevent_setcb(self,
(bufferevent_data_cb)c_read_cb,
`}
# Write `length` bytes of `line`
- fun write(line: NativeString, length: Int): Int `{
+ fun write(line: CString, length: Int): Int `{
return bufferevent_write(self, line, length);
`}
return bufferevent_write(self, &byt, 1);
`}
- # Check if we have anything left in our buffers. If so, we set our connection to be closed
- # on a callback. Otherwise we close it and free it right away.
- fun destroy: Bool `{
- struct evbuffer* out = bufferevent_get_output(self);
- struct evbuffer* in = bufferevent_get_input(self);
- if(evbuffer_get_length(in) > 0 || evbuffer_get_length(out) > 0) {
- return 0;
- } else {
- bufferevent_free(self);
- return 1;
- }
- `}
+ redef fun free `{ bufferevent_free(self); `}
# The output buffer associated to `self`
fun output_buffer: OutputNativeEvBuffer `{ return bufferevent_get_output(self); `}
fun length: Int `{ return evbuffer_get_length(self); `}
# Read data from an evbuffer and drain the bytes read
- fun remove(buffer: NativeString, len: Int) `{
+ fun remove(buffer: CString, len: Int) `{
evbuffer_remove(self, buffer, len);
`}
end
# A listener acting on an interface and port, spawns `Connection` on new connections
extern class ConnectionListener `{ struct evconnlistener * `}
- private new bind_to(base: NativeEventBase, address: NativeString, port: Int, factory: ConnectionFactory)
+ private new bind_to(base: NativeEventBase, address: CString, port: Int, factory: ConnectionFactory)
import ConnectionFactory.accept_connection, error_callback `{
struct sockaddr_in sin;
# Callback method on listening error
fun error_callback do
- var cstr = socket_error
- sys.stderr.write "libevent error: '{cstr}'"
+ var cstr = evutil_socket_error_to_string(evutil_socket_error)
+ print_error "libevent error: '{cstr}'"
end
-
- # Error with sockets
- fun socket_error: NativeString `{
- // TODO move to Nit and maybe NativeEventBase
- int err = EVUTIL_SOCKET_ERROR();
- return evutil_socket_error_to_string(err);
- `}
end
# Factory to listen on sockets and create new `Connection`
# Accept a connection on `listener`
#
# By default, it creates a new NativeBufferEvent and calls `spawn_connection`.
- fun accept_connection(listener: ConnectionListener, fd: Int, address: Pointer, socklen: Int)
+ fun accept_connection(listener: ConnectionListener, fd: Int, addrin: Pointer, socklen: Int)
do
var base = listener.base
var bev = new NativeBufferEvent.socket(base, fd, bev_opt_close_on_free)
- var conn = spawn_connection(bev)
+
+ # Human representation of remote client address
+ var addr_len = 46 # Longest possible IPv6 address + null byte
+ var addr_buf = new CString(addr_len)
+ addr_buf = addrin_to_address(addrin, addr_buf, addr_len)
+ var addr = if addr_buf.address_is_null then
+ "Unknown address"
+ else addr_buf.to_s
+
+ var conn = spawn_connection(bev, addr)
bev.enable ev_read|ev_write
bev.setcb conn
end
# Create a new `Connection` object for `buffer_event`
- fun spawn_connection(buffer_event: NativeBufferEvent): Connection
+ fun spawn_connection(buffer_event: NativeBufferEvent, address: String): Connection
do
return new Connection(buffer_event)
end
end
return listener
end
+
+ # Put string representation of source `address` into `buf`
+ private fun addrin_to_address(address: Pointer, buf: CString, buf_len: Int): CString `{
+ struct sockaddr *addrin = (struct sockaddr*)address;
+
+ if (addrin->sa_family == AF_INET) {
+ struct in_addr *src = &((struct sockaddr_in*)addrin)->sin_addr;
+ return (char *)inet_ntop(addrin->sa_family, src, buf, buf_len);
+ }
+ else if (addrin->sa_family == AF_INET6) {
+ struct in6_addr *src = &((struct sockaddr_in6*)addrin)->sin6_addr;
+ return (char *)inet_ntop(addrin->sa_family, src, buf, buf_len);
+ }
+ return NULL;
+ `}
end
+
+# Enable some relatively expensive debugging checks that would normally be turned off
+fun enable_debug_mode `{ event_enable_debug_mode(); `}
+
+# Use Windows builtin locking and thread ID functions
+fun use_windows_threads: Bool `{
+#ifdef EVTHREAD_USE_WINDOWS_THREADS_IMPLEMENTED
+ return evthread_use_windows_threads();
+#else
+ return -1;
+#endif
+`}
+
+# Use Pthreads locking and thread ID functions
+fun use_pthreads: Bool `{
+#ifdef EVTHREAD_USE_PTHREADS_IMPLEMENTED
+ return evthread_use_pthreads();
+#else
+ return -1;
+#endif
+`}