module libevent is pkgconfig("libevent")
in "C header" `{
- #include <sys/stat.h>
- #include <sys/types.h>
- #include <fcntl.h>
- #include <errno.h>
- #include <sys/socket.h>
-
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
`}
in "C" `{
+ #include <sys/stat.h>
+ #include <sys/types.h>
+ #include <fcntl.h>
+ #include <errno.h>
+ #include <string.h>
+
+ #include <sys/socket.h>
+ #include <arpa/inet.h>
+ #include <netinet/in.h>
+ #include <netinet/ip.h>
+
+// Protect callbacks for compatibility with light FFI
+#ifdef Connection_decr_ref
// Callback forwarded to 'Connection.write_callback'
static void c_write_cb(struct bufferevent *bev, Connection ctx) {
Connection_write_callback(ctx);
// Callback forwarded to 'Connection.read_callback_native'
static void c_read_cb(struct bufferevent *bev, Connection ctx)
{
- // TODO move to Nit code
- struct evbuffer *input = bufferevent_get_input(bev);
- size_t len = evbuffer_get_length(input);
- char* cstr = malloc(len);
- evbuffer_remove(input, cstr, len);
- Connection_read_callback_native(ctx, cstr, len);
+ Connection_read_callback_native(ctx, bev);
}
// Callback forwarded to 'Connection.event_callback'
static void c_event_cb(struct bufferevent *bev, short events, Connection ctx)
{
- Connection_event_callback(ctx, events);
-
- // TODO move to Nit code
- if (events & BEV_EVENT_ERROR)
- perror("Error from bufferevent");
- if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
- bufferevent_free(bev);
- Connection_decr_ref(ctx);
- }
+ int release = Connection_event_callback(ctx, events);
+ if (release) Connection_decr_ref(ctx);
}
- // Callback fowarded to 'ConnectionFactory.spawn_connection'
- static void accept_conn_cb(struct evconnlistener *listener, evutil_socket_t fd,
- struct sockaddr *address, int socklen, ConnectionFactory ctx)
+ // Callback forwarded to 'ConnectionFactory.accept_connection'
+ static void accept_connection_cb(struct evconnlistener *listener, evutil_socket_t fd,
+ struct sockaddr *addrin, int socklen, ConnectionFactory ctx)
{
- // TODO move to Nit code
- struct event_base *base = evconnlistener_get_base(listener);
- struct bufferevent *bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
-
- Connection nit_con = ConnectionFactory_spawn_connection(ctx, bev);
- Connection_incr_ref(nit_con);
-
- bufferevent_setcb(bev,
- (bufferevent_data_cb)c_read_cb,
- (bufferevent_data_cb)c_write_cb,
- (bufferevent_event_cb)c_event_cb, nit_con);
- bufferevent_enable(bev, EV_READ|EV_WRITE);
+ ConnectionFactory_accept_connection(ctx, listener, fd, addrin, socklen);
}
+#endif
+
`}
# Structure to hold information and state for a Libevent dispatch loop.
#
# This loop will run the event base until either there are no more added
# events, or until something calls `exit_loop`.
- fun dispatch `{ event_base_dispatch(recv); `}
+ fun dispatch `{ event_base_dispatch(self); `}
# Exit the event loop
#
# TODO support timer
- fun exit_loop `{ event_base_loopexit(recv, NULL); `}
+ fun exit_loop `{ event_base_loopexit(self, NULL); `}
# Destroy this instance
- fun destroy `{ event_base_free(recv); `}
+ fun destroy `{ event_base_free(self); `}
end
# Spawned to manage a specific connection
#
# TODO, use polls
class Connection
+ super Writer
+
# Closing this connection has been requested, but may not yet be `closed`
var close_requested = false
var native_buffer_event: NativeBufferEvent
# Close this connection if possible, otherwise mark it to be closed later
- fun close
+ redef fun close
do
+ if closed then return
var success = native_buffer_event.destroy
close_requested = true
closed = success
# Callback method on a write event
fun write_callback
do
- if close_requested and not closed then close
+ if close_requested then close
end
- private fun read_callback_native(cstr: NativeString, len: Int)
+ private fun read_callback_native(bev: NativeBufferEvent)
do
- read_callback(cstr.to_s_with_length(len))
+ var evbuffer = bev.input_buffer
+ var len = evbuffer.length
+ var buf = new NativeString(len)
+ evbuffer.remove(buf, len)
+ var str = buf.to_s_with_length(len)
+ read_callback str
end
# Callback method when data is available to read
fun read_callback(content: String)
do
- if close_requested and not closed then close
+ if close_requested then close
end
- # Callback method on events
- fun event_callback(events: Int) do end
+ # Callback method on events: EOF, user-defined timeout and unrecoverable errors
+ #
+ # Returns `true` if the native handles to `self` can be released.
+ fun event_callback(events: Int): Bool
+ do
+ if events & bev_event_error != 0 or events & bev_event_eof != 0 then
+ if events & bev_event_error != 0 then print_error "Error from bufferevent"
+ close
+ return true
+ end
+
+ return false
+ end
# Write a string to the connection
- fun write(str: String)
+ redef fun write(str)
do
- native_buffer_event.write(str.to_cstring, str.length)
+ if close_requested then return
+ native_buffer_event.write(str.to_cstring, str.bytelen)
+ end
+
+ redef fun write_byte(byte)
+ do
+ if close_requested then return
+ native_buffer_event.write_byte(byte)
+ end
+
+ redef fun write_bytes(bytes)
+ do
+ if close_requested then return
+ native_buffer_event.write(bytes.items, bytes.length)
end
# Write a file to the connection
#
- # require: `path.file_exists`
+ # If `not path.file_exists`, the method returns.
fun write_file(path: String)
do
- assert path.file_exists
+ if close_requested then return
+
+ var file = new FileReader.open(path)
+ if file.last_error != null then
+ var error = new IOError("Failed to open file at '{path}'")
+ error.cause = file.last_error
+ self.last_error = error
+ file.close
+ return
+ end
- var file = new IFStream.open(path)
- var output = native_buffer_event.output_buffer
- var fd = file.fd
- var length = file.file_stat.size
+ var stat = file.file_stat
+ if stat == null then
+ last_error = new IOError("Failed to stat file at '{path}'")
+ file.close
+ return
+ end
- output.add_file(fd, 0, length)
+ var err = native_buffer_event.output_buffer.add_file(file.fd, 0, stat.size)
+ if err then
+ last_error = new IOError("Failed to add file at '{path}'")
+ file.close
+ end
end
end
+# ---
+# Error code for event callbacks
+
+# error encountered while reading
+fun bev_event_reading: Int `{ return BEV_EVENT_READING; `}
+
+# error encountered while writing
+fun bev_event_writing: Int `{ return BEV_EVENT_WRITING; `}
+
+# eof file reached
+fun bev_event_eof: Int `{ return BEV_EVENT_EOF; `}
+
+# unrecoverable error encountered
+fun bev_event_error: Int `{ return BEV_EVENT_ERROR; `}
+
+# user-specified timeout reached
+fun bev_event_timeout: Int `{ return BEV_EVENT_TIMEOUT; `}
+
+# connect operation finished.
+fun bev_event_connected: Int `{ return BEV_EVENT_CONNECTED; `}
+
+# ---
+# Options that can be specified when creating a `NativeBufferEvent`
+
+# Close the underlying file descriptor/bufferevent/whatever when this bufferevent is freed.
+fun bev_opt_close_on_free: Int `{ return BEV_OPT_CLOSE_ON_FREE; `}
+
+# If threading is enabled, protect the operations on this bufferevent with a lock.
+fun bev_opt_threadsafe: Int `{ return BEV_OPT_THREADSAFE; `}
+
+# Run callbacks deferred in the event loop.
+fun bev_opt_defer_callbacks: Int `{ return BEV_OPT_DEFER_CALLBACKS; `}
+
+# If set, callbacks are executed without locks being held on the bufferevent.
+fun bev_opt_unlock_callbacks: Int `{ return BEV_OPT_UNLOCK_CALLBACKS; `}
+
+# ---
+# Options for `NativeBufferEvent::enable`
+
+# Read operation
+fun ev_read: Int `{ return EV_READ; `}
+
+# Write operation
+fun ev_write: Int `{ return EV_WRITE; `}
+
+# ---
+
# A buffer event structure, strongly associated to a connection, an input buffer and an output_buffer
extern class NativeBufferEvent `{ struct bufferevent * `}
+
+ # Socket-based `NativeBufferEvent` that reads and writes data onto a network
+ new socket(base: NativeEventBase, fd, options: Int) `{
+ return bufferevent_socket_new(base, fd, options);
+ `}
+
+ # Enable a bufferevent.
+ fun enable(operation: Int) `{
+ bufferevent_enable(self, operation);
+ `}
+
+ # Set callbacks to `read_callback_native`, `write_callback` and `event_callback` of `conn`
+ fun setcb(conn: Connection) import Connection.read_callback_native,
+ Connection.write_callback, Connection.event_callback, NativeString `{
+ Connection_incr_ref(conn);
+ bufferevent_setcb(self,
+ (bufferevent_data_cb)c_read_cb,
+ (bufferevent_data_cb)c_write_cb,
+ (bufferevent_event_cb)c_event_cb, conn);
+ `}
+
# Write `length` bytes of `line`
fun write(line: NativeString, length: Int): Int `{
- return bufferevent_write(recv, line, length);
+ return bufferevent_write(self, line, length);
+ `}
+
+ # Write the byte `value`
+ fun write_byte(value: Byte): Int `{
+ unsigned char byt = (unsigned char)value;
+ return bufferevent_write(self, &byt, 1);
`}
# Check if we have anything left in our buffers. If so, we set our connection to be closed
# on a callback. Otherwise we close it and free it right away.
fun destroy: Bool `{
- struct evbuffer* out = bufferevent_get_output(recv);
- struct evbuffer* in = bufferevent_get_input(recv);
+ struct evbuffer* out = bufferevent_get_output(self);
+ struct evbuffer* in = bufferevent_get_input(self);
if(evbuffer_get_length(in) > 0 || evbuffer_get_length(out) > 0) {
return 0;
} else {
- bufferevent_free(recv);
+ bufferevent_free(self);
return 1;
}
`}
# The output buffer associated to `self`
- fun output_buffer: OutputNativeEvBuffer `{ return bufferevent_get_output(recv); `}
+ fun output_buffer: OutputNativeEvBuffer `{ return bufferevent_get_output(self); `}
# The input buffer associated to `self`
- fun input_buffer: InputNativeEvBuffer `{ return bufferevent_get_input(recv); `}
+ fun input_buffer: InputNativeEvBuffer `{ return bufferevent_get_input(self); `}
+
+ # Read data from this buffer
+ fun read_buffer(buf: NativeEvBuffer): Int `{ return bufferevent_read_buffer(self, buf); `}
+
+ # Write data to this buffer
+ fun write_buffer(buf: NativeEvBuffer): Int `{ return bufferevent_write_buffer(self, buf); `}
end
# A single buffer
extern class NativeEvBuffer `{ struct evbuffer * `}
# Length of data in this buffer
- fun length: Int `{ return evbuffer_get_length(recv); `}
+ fun length: Int `{ return evbuffer_get_length(self); `}
+
+ # Read data from an evbuffer and drain the bytes read
+ fun remove(buffer: NativeString, len: Int) `{
+ evbuffer_remove(self, buffer, len);
+ `}
end
# An input buffer
super NativeEvBuffer
# Empty/clear `length` data from buffer
- fun drain(length: Int) `{ evbuffer_drain(recv, length); `}
+ fun drain(length: Int) `{ evbuffer_drain(self, length); `}
end
# An output buffer
# Add file to buffer
fun add_file(fd, offset, length: Int): Bool `{
- return evbuffer_add_file(recv, fd, offset, length);
+ return evbuffer_add_file(self, fd, offset, length);
`}
end
extern class ConnectionListener `{ struct evconnlistener * `}
private new bind_to(base: NativeEventBase, address: NativeString, port: Int, factory: ConnectionFactory)
- import ConnectionFactory.spawn_connection, error_callback, Connection.read_callback_native,
- Connection.write_callback, Connection.event_callback `{
+ import ConnectionFactory.accept_connection, error_callback `{
struct sockaddr_in sin;
struct evconnlistener *listener;
struct hostent *hostent = gethostbyname(address);
+ if (!hostent) {
+ return NULL;
+ }
+
memset(&sin, 0, sizeof(sin));
sin.sin_family = hostent->h_addrtype;
sin.sin_port = htons(port);
memcpy( &(sin.sin_addr.s_addr), (const void*)hostent->h_addr, hostent->h_length );
listener = evconnlistener_new_bind(base,
- (evconnlistener_cb)accept_conn_cb, factory,
+ (evconnlistener_cb)accept_connection_cb, factory,
LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE, -1,
(struct sockaddr*)&sin, sizeof(sin));
`}
# Get the `NativeEventBase` associated to `self`
- fun base: NativeEventBase `{ return evconnlistener_get_base(recv); `}
+ fun base: NativeEventBase `{ return evconnlistener_get_base(self); `}
# Callback method on listening error
fun error_callback do
# The `NativeEventBase` for the dispatch loop of this factory
var event_base: NativeEventBase
- # On new connection, create the handler `Connection` object
- fun spawn_connection(nat_buf_ev: NativeBufferEvent): Connection
+ # Accept a connection on `listener`
+ #
+ # By default, it creates a new NativeBufferEvent and calls `spawn_connection`.
+ fun accept_connection(listener: ConnectionListener, fd: Int, addrin: Pointer, socklen: Int)
do
- return new Connection(nat_buf_ev)
+ var base = listener.base
+ var bev = new NativeBufferEvent.socket(base, fd, bev_opt_close_on_free)
+
+ # Human representation of remote client address
+ var addr_len = 46 # Longest possible IPv6 address + null byte
+ var addr_buf = new NativeString(addr_len)
+ addr_buf = addrin_to_address(addrin, addr_buf, addr_len)
+ var addr = if addr_buf.address_is_null then
+ "Unknown address"
+ else addr_buf.to_s
+
+ var conn = spawn_connection(bev, addr)
+ bev.enable ev_read|ev_write
+ bev.setcb conn
+ end
+
+ # Create a new `Connection` object for `buffer_event`
+ fun spawn_connection(buffer_event: NativeBufferEvent, address: String): Connection
+ do
+ return new Connection(buffer_event)
end
# Listen on `address`:`port` for new connection, which will callback `spawn_connection`
end
return listener
end
+
+ # Put string representation of source `address` into `buf`
+ private fun addrin_to_address(address: Pointer, buf: NativeString, buf_len: Int): NativeString `{
+ struct sockaddr *addrin = (struct sockaddr*)address;
+
+ if (addrin->sa_family == AF_INET) {
+ struct in_addr *src = &((struct sockaddr_in*)addrin)->sin_addr;
+ return (char *)inet_ntop(addrin->sa_family, src, buf, buf_len);
+ }
+ else if (addrin->sa_family == AF_INET6) {
+ struct in6_addr *src = &((struct sockaddr_in6*)addrin)->sin6_addr;
+ return (char *)inet_ntop(addrin->sa_family, src, buf, buf_len);
+ }
+ return NULL;
+ `}
end