summaryrefslogtreecommitdiffstats
path: root/stream.c
diff options
context:
space:
mode:
authorKaz Kylheku <kaz@kylheku.com>2016-01-01 12:23:00 -0800
committerKaz Kylheku <kaz@kylheku.com>2016-01-01 12:23:00 -0800
commit27e0a161c083222ef78bcf6192b931aa815583b3 (patch)
treeebb045f0417307f3c7763ce900cbb4b50781e581 /stream.c
parentbccdfcb523d7b0315c20a41dec43a6b0cf302a73 (diff)
downloadtxr-27e0a161c083222ef78bcf6192b931aa815583b3.tar.gz
txr-27e0a161c083222ef78bcf6192b931aa815583b3.tar.bz2
txr-27e0a161c083222ef78bcf6192b931aa815583b3.zip
Record-delimiting stream adapter.
* regex.c (read_until_match): New function. (regex_init): Registered read-until-match intrinsic. * regex.h (read_until_match): Declared. * stream.c (struct delegate_base): New struct type. (delegate_base_mark, delegate_put_string, delegate_put_char, delegate_put_byte, delegate_get_char, delegate_get_byte, delegate_unget_char, delegate_unget_byte, delegate_close, delegate_flush, delegate_seek, delegate_truncate, delegate_get_prop, delegate_set_prop, delegate_get_error, delegate_get_error_str, delegate_clear_error, make_delegate_stream): New static functions. (struct record_adapter_base): New struct type. (record_adapter_base_mark, record_adapter_mark_op, record_adapter_get_line): New static functions. (record_adapter_ops): New static structure. (record_adapter): New function. (stream_init): Registered record-adapter intrinsic. * stream.h (record_adapter): Declared. * txr.1: Documented read-until-match and record-adapter.
Diffstat (limited to 'stream.c')
-rw-r--r--stream.c181
1 files changed, 181 insertions, 0 deletions
diff --git a/stream.c b/stream.c
index 1798d901..2d6e1b0b 100644
--- a/stream.c
+++ b/stream.c
@@ -1913,6 +1913,186 @@ val catenated_stream_push(val new_stream, val cat_stream)
}
}
+struct delegate_base {
+ struct strm_base a;
+ val target_stream;
+ struct strm_ops *target_ops;
+};
+
+static void delegate_base_mark(struct delegate_base *db)
+{
+ strm_base_mark(&db->a);
+ gc_mark(db->target_stream);
+}
+
+static val delegate_put_string(val stream, val str)
+{
+ struct delegate_base *s = coerce(struct delegate_base *, stream->co.handle);
+ return s->target_ops->put_string(s->target_stream, str);
+}
+
+static val delegate_put_char(val stream, val ch)
+{
+ struct delegate_base *s = coerce(struct delegate_base *, stream->co.handle);
+ return s->target_ops->put_char(s->target_stream, ch);
+}
+
+static val delegate_put_byte(val stream, int byte)
+{
+ struct delegate_base *s = coerce(struct delegate_base *, stream->co.handle);
+ return s->target_ops->put_byte(s->target_stream, byte);
+}
+
+static val delegate_get_char(val stream)
+{
+ struct delegate_base *s = coerce(struct delegate_base *, stream->co.handle);
+ return s->target_ops->get_char(s->target_stream);
+}
+
+static val delegate_get_byte(val stream)
+{
+ struct delegate_base *s = coerce(struct delegate_base *, stream->co.handle);
+ return s->target_ops->get_byte(s->target_stream);
+}
+
+static val delegate_unget_char(val stream, val ch)
+{
+ struct delegate_base *s = coerce(struct delegate_base *, stream->co.handle);
+ return s->target_ops->unget_char(s->target_stream, ch);
+}
+
+static val delegate_unget_byte(val stream, int byte)
+{
+ struct delegate_base *s = coerce(struct delegate_base *, stream->co.handle);
+ return s->target_ops->unget_byte(s->target_stream, byte);
+}
+
+static val delegate_close(val stream, val throw_on_error)
+{
+ struct delegate_base *s = coerce(struct delegate_base *, stream->co.handle);
+ return s->target_ops->close(s->target_stream, throw_on_error);
+}
+
+static val delegate_flush(val stream)
+{
+ struct delegate_base *s = coerce(struct delegate_base *, stream->co.handle);
+ return s->target_ops->flush(s->target_stream);
+}
+
+static val delegate_seek(val stream, val off, enum strm_whence whence)
+{
+ struct delegate_base *s = coerce(struct delegate_base *, stream->co.handle);
+ return s->target_ops->seek(s->target_stream, off, whence);
+}
+
+static val delegate_truncate(val stream, val len)
+{
+ struct delegate_base *s = coerce(struct delegate_base *, stream->co.handle);
+ return s->target_ops->truncate(s->target_stream, len);
+}
+
+static val delegate_get_prop(val stream, val ind)
+{
+ struct delegate_base *s = coerce(struct delegate_base *, stream->co.handle);
+ return s->target_ops->get_prop(s->target_stream, ind);
+}
+
+static val delegate_set_prop(val stream, val ind, val value)
+{
+ struct delegate_base *s = coerce(struct delegate_base *, stream->co.handle);
+ return s->target_ops->set_prop(s->target_stream, ind, value);
+}
+
+static val delegate_get_error(val stream)
+{
+ struct delegate_base *s = coerce(struct delegate_base *, stream->co.handle);
+ return s->target_ops->get_error(s->target_stream);
+}
+
+static val delegate_get_error_str(val stream)
+{
+ struct delegate_base *s = coerce(struct delegate_base *, stream->co.handle);
+ return s->target_ops->get_error_str(s->target_stream);
+}
+
+static val delegate_clear_error(val stream)
+{
+ struct delegate_base *s = coerce(struct delegate_base *, stream->co.handle);
+ return s->target_ops->clear_error(s->target_stream);
+}
+
+static val make_delegate_stream(val orig_stream, size_t handle_size,
+ struct cobj_ops *ops)
+{
+ struct strm_ops *orig_ops = coerce(struct strm_ops *,
+ cobj_ops(orig_stream, stream_s));
+ struct delegate_base *db = coerce(struct delegate_base *,
+ chk_calloc(1, handle_size));
+ val delegate_stream;
+
+ strm_base_init(&db->a);
+ db->target_stream = nil;
+ db->target_ops = orig_ops;
+
+ delegate_stream = cobj(coerce(mem_t *, db), stream_s, ops);
+
+ db->target_stream = orig_stream;
+
+ return delegate_stream;
+}
+
+struct record_adapter_base {
+ struct delegate_base db;
+ val regex;
+};
+
+static void record_adapter_base_mark(struct record_adapter_base *rb)
+{
+ delegate_base_mark(&rb->db);
+ gc_mark(rb->regex);
+}
+
+static void record_adapter_mark_op(val stream)
+{
+ struct record_adapter_base *rb = coerce(struct record_adapter_base *,
+ stream->co.handle);
+ record_adapter_base_mark(rb);
+}
+
+static val record_adapter_get_line(val stream)
+{
+ struct record_adapter_base *rb = coerce(struct record_adapter_base *,
+ stream->co.handle);
+ return read_until_match(rb->regex, rb->db.target_stream);
+}
+
+static struct strm_ops record_adapter_ops =
+ strm_ops_init(cobj_ops_init(eq,
+ stream_print_op,
+ stream_destroy_op,
+ record_adapter_mark_op,
+ cobj_hash_op),
+ wli("record-adapter"),
+ delegate_put_string, delegate_put_char, delegate_put_byte,
+ record_adapter_get_line, delegate_get_char, delegate_get_byte,
+ delegate_unget_char, delegate_unget_byte,
+ delegate_close, delegate_flush, delegate_seek,
+ delegate_truncate, delegate_get_prop, delegate_set_prop,
+ delegate_get_error, delegate_get_error_str,
+ delegate_clear_error);
+
+val record_adapter(val regex, val stream)
+{
+ val rec_adapter = make_delegate_stream(default_arg(stream, std_input),
+ sizeof (struct record_adapter_base),
+ &record_adapter_ops.cobj_ops);
+ struct record_adapter_base *rb = coerce(struct record_adapter_base *,
+ rec_adapter->co.handle);
+
+ rb->regex = regex;
+ return rec_adapter;
+}
+
val streamp(val obj)
{
return typeof(obj) == stream_s ? t : nil;
@@ -3365,6 +3545,7 @@ void stream_init(void)
reg_fun(intern(lit("cat-streams"), user_package), func_n1(make_catenated_stream));
reg_fun(intern(lit("catenated-stream-p"), user_package), func_n1(catenated_stream_p));
reg_fun(intern(lit("catenated-stream-push"), user_package), func_n2(catenated_stream_push));
+ reg_fun(intern(lit("record-adapter"), user_package), func_n2o(record_adapter, 1));
reg_fun(intern(lit("open-directory"), user_package), func_n1(open_directory));
reg_fun(intern(lit("open-file"), user_package), func_n2o(open_file, 1));
reg_fun(intern(lit("open-fileno"), user_package), func_n2o(open_fileno, 1));