module stream
intrude import ropes
+import error
in "C" `{
#include <unistd.h>
- #include <poll.h>
- #include <errno.h>
#include <string.h>
#include <signal.h>
`}
+# Any kind of error that could be produced by an operation on Streams
+class IOError
+ super Error
+end
+
# Abstract stream class
-interface IOS
+abstract class IOS
+ # Error produced by the file stream
+ #
+ # var ifs = new IFStream.open("donotmakethisfile.binx")
+ # ifs.read_all
+ # ifs.close
+ # assert ifs.last_error != null
+ var last_error: nullable IOError = null
+
# close the stream
fun close is abstract
end
# Abstract input streams
-interface IStream
+abstract class IStream
super IOS
# Read a character. Return its ASCII value, -1 on EOF or timeout
fun read_char: Int is abstract
# Read at most i bytes
fun read(i: Int): String
do
+ if last_error != null then return ""
var s = new FlatBuffer.with_capacity(i)
while i > 0 and not eof do
var c = read_char
# NOTE: Use `append_line_to` if the line terminator needs to be preserved.
fun read_line: String
do
+ if last_error != null then return ""
if eof then return ""
var s = new FlatBuffer
append_line_to(s)
return res
end
+ # Return an iterator that read each line.
+ #
+ # The line terminator '\n' and `\r\n` is removed in each line,
+ # The line are read with `read_line`. See this method for details.
+ #
+ # ~~~
+ # var txt = "Hello\n\nWorld\n"
+ # var i = new StringIStream(txt)
+ # assert i.each_line.to_a == ["Hello", "", "World"]
+ # ~~~
+ #
+ # Unlike `read_lines` that read all lines at the call, `each_line` is lazy.
+ # Therefore, the stream should no be closed until the end of the stream.
+ #
+ # ~~~
+ # i = new StringIStream(txt)
+ # var el = i.each_line
+ #
+ # assert el.item == "Hello"
+ # el.next
+ # assert el.item == ""
+ # el.next
+ #
+ # i.close
+ #
+ # assert not el.is_ok
+ # # closed before "world" is read
+ # ~~~
+ fun each_line: LineIterator do return new LineIterator(self)
+
# Read all the stream until the eof.
+ #
+ # The content of the file is returned verbatim.
+ #
+ # ~~~
+ # var txt = "Hello\n\nWorld\n"
+ # var i = new StringIStream(txt)
+ # assert i.read_all == txt
+ # ~~~
fun read_all: String
do
+ if last_error != null then return ""
var s = new FlatBuffer
while not eof do
var c = read_char
# Therefore CARRIAGE RETURN & LINE FEED (`\r\n`) is also recognized.
fun append_line_to(s: Buffer)
do
+ if last_error != null then return
loop
var x = read_char
if x == -1 then
fun eof: Bool is abstract
end
+# Iterator returned by `IStream::each_line`.
+# See the aforementioned method for details.
+class LineIterator
+ super Iterator[String]
+
+ # The original stream
+ var stream: IStream
+
+ redef fun is_ok
+ do
+ var res = not stream.eof
+ if not res and close_on_finish then stream.close
+ return res
+ end
+
+ redef fun item
+ do
+ var line = self.line
+ if line == null then
+ line = stream.read_line
+ end
+ self.line = line
+ return line
+ end
+
+ # The last line read (cache)
+ private var line: nullable String = null
+
+ redef fun next
+ do
+ # force the read
+ if line == null then item
+ # drop the line
+ line = null
+ end
+
+ # Close the stream when the stream is at the EOF.
+ #
+ # Default is false.
+ var close_on_finish = false is writable
+
+ redef fun finish
+ do
+ if close_on_finish then stream.close
+ end
+end
+
# IStream capable of declaring if readable without blocking
-interface PollableIStream
+abstract class PollableIStream
super IStream
# Is there something to read? (without blocking)
end
# Abstract output stream
-interface OStream
+abstract class OStream
super IOS
# write a string
fun write(s: Text) is abstract
super IStream
redef fun read_char
do
- if _buffer_pos >= _buffer.length then
- fill_buffer
- end
- if _buffer_pos >= _buffer.length then
+ if last_error != null then return -1
+ if eof then
+ last_error = new IOError("Stream has reached eof")
return -1
end
var c = _buffer.chars[_buffer_pos]
redef fun read(i)
do
+ if last_error != null then return ""
if _buffer.length == _buffer_pos then
if not eof then
return read(i)
redef fun read_all
do
+ if last_error != null then return ""
var s = new FlatBuffer
while not eof do
var j = _buffer_pos
end
# An Input/Output Stream
-interface IOStream
+abstract class IOStream
super IStream
super OStream
end
-##############################################################"
-
-# A File Descriptor Stream.
-abstract class FDStream
- super IOS
- # File description
- var fd: Int
-
- redef fun close do native_close(fd)
-
- private fun native_close(i: Int): Int is extern "stream_FDStream_FDStream_native_close_1"
- private fun native_read_char(i: Int): Int is extern "stream_FDStream_FDStream_native_read_char_1"
- private fun native_read(i: Int, buf: NativeString, len: Int): Int is extern "stream_FDStream_FDStream_native_read_3"
- private fun native_write(i: Int, buf: NativeString, len: Int): Int is extern "stream_FDStream_FDStream_native_write_3"
- private fun native_write_char(i: Int, c: Char): Int is extern "stream_FDStream_FDStream_native_write_char_2"
-end
-
-# An Input File Descriptor Stream.
-class FDIStream
- super FDStream
- super IStream
- redef var eof: Bool = false
-
- redef fun read_char
- do
- var nb = native_read_char(fd)
- if nb == -1 then eof = true
- return nb
- end
-end
-
-# An Output File Descriptor Stream.
-class FDOStream
- super FDStream
- super OStream
- redef var is_writable = true
-
- redef fun write(s)
- do
- var nb = native_write(fd, s.to_cstring, s.length)
- if nb < s.length then is_writable = false
- end
-end
-
-# An Input/Output File Descriptor Stream.
-class FDIOStream
- super FDIStream
- super FDOStream
- super IOStream
-end
-
-redef interface Object
- # returns first available stream to read or write to
- # return null on interruption (possibly a signal)
- protected fun poll( streams : Sequence[FDStream] ) : nullable FDStream
- do
- var in_fds = new Array[Int]
- var out_fds = new Array[Int]
- var fd_to_stream = new HashMap[Int,FDStream]
- for s in streams do
- var fd = s.fd
- if s isa FDIStream then in_fds.add( fd )
- if s isa FDOStream then out_fds.add( fd )
-
- fd_to_stream[fd] = s
- end
-
- var polled_fd = intern_poll( in_fds, out_fds )
-
- if polled_fd == null then
- return null
- else
- return fd_to_stream[polled_fd]
- end
- end
-
- private fun intern_poll(in_fds: Array[Int], out_fds: Array[Int]) : nullable Int is extern import Array[Int].length, Array[Int].[], Int.as(nullable Int) `{
- int in_len, out_len, total_len;
- struct pollfd *c_fds;
- sigset_t sigmask;
- int i;
- int first_polled_fd = -1;
- int result;
-
- in_len = Array_of_Int_length( in_fds );
- out_len = Array_of_Int_length( out_fds );
- total_len = in_len + out_len;
- c_fds = malloc( sizeof(struct pollfd) * total_len );
-
- /* input streams */
- for ( i=0; i<in_len; i ++ ) {
- int fd;
- fd = Array_of_Int__index( in_fds, i );
-
- c_fds[i].fd = fd;
- c_fds[i].events = POLLIN;
- }
-
- /* output streams */
- for ( i=0; i<out_len; i ++ ) {
- int fd;
- fd = Array_of_Int__index( out_fds, i );
-
- c_fds[i].fd = fd;
- c_fds[i].events = POLLOUT;
- }
-
- /* poll all fds, unlimited timeout */
- result = poll( c_fds, total_len, -1 );
-
- if ( result > 0 ) {
- /* analyse results */
- for ( i=0; i<total_len; i++ )
- if ( c_fds[i].revents & c_fds[i].events || /* awaited event */
- c_fds[i].revents & POLLHUP ) /* closed */
- {
- first_polled_fd = c_fds[i].fd;
- break;
- }
-
- return Int_as_nullable( first_polled_fd );
- }
- else if ( result < 0 )
- fprintf( stderr, "Error in Stream:poll: %s\n", strerror( errno ) );
-
- return null_Int();
- `}
-end
-
# Stream to a String.
#
# Mainly used for compatibility with OStream type and tests.