summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--gzio.c34
-rw-r--r--gzio.h3
-rw-r--r--parser.c2
-rw-r--r--stream.c126
-rw-r--r--stream.h4
-rw-r--r--tests/018/gzip.tl4
6 files changed, 118 insertions, 55 deletions
diff --git a/gzio.c b/gzio.c
index 11991aa1..1edb04a4 100644
--- a/gzio.c
+++ b/gzio.c
@@ -35,6 +35,9 @@
#include <errno.h>
#include <zlib.h>
#include "config.h"
+#if HAVE_SYS_WAIT
+#include <sys/wait.h>
+#endif
#include "alloca.h"
#include "lib.h"
#include "stream.h"
@@ -57,6 +60,9 @@ struct gzio_handle {
val err, errstr;
char *buf;
int fd;
+#if HAVE_FORK_STUFF
+ pid_t pid;
+#endif
unsigned is_byte_oriented : 8;
unsigned is_output : 8;
};
@@ -283,7 +289,18 @@ static val gzio_close(val stream, val throw_on_error)
gzio_maybe_error(stream, lit("closing"));
return nil;
}
- return t;
+#if HAVE_FORK_STUFF
+ if (h->pid != 0)
+ {
+ int status = 0;
+ val self = lit("close-stream");
+ sig_save_enable;
+ while (waitpid(h->pid, &status, 0) == -1 && errno == EINTR)
+ ;
+ sig_restore_enable;
+ return pipe_close_status_helper(stream, throw_on_error, status, self);
+ }
+#endif
}
return nil;
}
@@ -561,5 +578,20 @@ val make_gzio_stream(gzFile f, int fd, val descr, int is_output)
h->buf = 0;
h->is_byte_oriented = 0;
h->is_output = is_output;
+#if HAVE_FORK_STUFF
+ h->pid = 0;
+#endif
+ return stream;
+}
+
+#if HAVE_FORK_STUFF
+
+val make_gzio_pipe_stream(gzFile f, int fd, val descr, int is_output, pid_t pid)
+{
+ val stream = make_gzio_stream(f, fd, descr, is_output);
+ struct gzio_handle *h = coerce(struct gzio_handle *, stream->co.handle);
+ h->pid = pid;
return stream;
}
+
+#endif
diff --git a/gzio.h b/gzio.h
index 4b5e9ee9..ae888728 100644
--- a/gzio.h
+++ b/gzio.h
@@ -33,3 +33,6 @@ gzFile w_gzopen_mode(const wchar_t *wname, const wchar_t *wmode,
gzFile w_gzdopen_mode(int fd, const wchar_t *wmode,
const struct stdio_mode m, val self);
val make_gzio_stream(gzFile f, int fd, val descr, int is_output);
+#if HAVE_FORK_STUFF
+val make_gzio_pipe_stream(gzFile f, int fd, val descr, int is_output, pid_t pid);
+#endif
diff --git a/parser.c b/parser.c
index 1a375bd7..1ed3ca78 100644
--- a/parser.c
+++ b/parser.c
@@ -1947,7 +1947,7 @@ static mem_t *lino_open8(const char *name_in, lino_file_mode_t mode_in)
static mem_t *lino_fdopen(int fd, lino_file_mode_t mode_in)
{
val mode = static_str(lino_mode_str[mode_in]);
- return coerce(mem_t *, open_fileno(num(fd), mode));
+ return coerce(mem_t *, open_fileno(num(fd), mode, nil));
}
static void lino_close(mem_t *stream)
diff --git a/stream.c b/stream.c
index 0b75c65a..e3ba0510 100644
--- a/stream.c
+++ b/stream.c
@@ -1373,8 +1373,47 @@ static int se_pclose(FILE *f)
}
#endif
+val pipe_close_status_helper(val stream, val throw_on_error,
+ int status, val self)
+{
+ if (status < 0) {
+ if (throw_on_error)
+ uw_ethrowf(process_error_s,
+ lit("~a: unable to obtain status of command ~s: ~d/~s"),
+ self, stream, num(errno), errno_to_str(errno), nao);
+ return nil;
+ } else {
+#if HAVE_SYS_WAIT
+ if (default_null_arg(throw_on_error)) {
+ if (WIFSIGNALED(status)) {
+ int termsig = WTERMSIG(status);
+ uw_throwf(process_error_s, lit("~a: pipe ~s terminated by signal ~a"),
+ self, stream, num(termsig), nao);
+#ifndef WIFCONTINUED
+#define WIFCONTINUED(X) 0
+#endif
+ } else if (WIFSTOPPED(status) || WIFCONTINUED(status)) {
+ uw_throwf(process_error_s,
+ lit("~s, processes of closed pipe ~s still running"),
+ self, stream, nao);
+ }
+ }
+ if (WIFEXITED(status)) {
+ int exitstatus = WEXITSTATUS(status);
+ return num(exitstatus);
+ }
+#else
+ if (status != 0 && default_null_arg(throw_on_error))
+ uw_throwf(process_error_s, lit("~a: closing pipe ~s failed"),
+ self, stream, nao);
+#endif
+ return status == 0 ? zero : nil;
+ }
+}
+
static val pipe_close(val stream, val throw_on_error)
{
+ val self = lit("close-stream");
struct stdio_handle *h = coerce(struct stdio_handle *, stream->co.handle);
if (h->f != 0) {
@@ -1385,38 +1424,9 @@ static val pipe_close(val stream, val throw_on_error)
#endif
h->f = 0;
- if (status < 0) {
- if (throw_on_error)
- uw_ethrowf(process_error_s,
- lit("unable to obtain status of command ~s: ~d/~s"),
- stream, num(errno), errno_to_str(errno), nao);
- } else {
-#if HAVE_SYS_WAIT
- if (default_null_arg(throw_on_error)) {
- if (WIFSIGNALED(status)) {
- int termsig = WTERMSIG(status);
- uw_throwf(process_error_s, lit("pipe ~s terminated by signal ~a"),
- stream, num(termsig), nao);
-#ifndef WIFCONTINUED
-#define WIFCONTINUED(X) 0
-#endif
- } else if (WIFSTOPPED(status) || WIFCONTINUED(status)) {
- uw_throwf(process_error_s,
- lit("processes of closed pipe ~s still running"),
- stream, nao);
- }
- }
- if (WIFEXITED(status)) {
- int exitstatus = WEXITSTATUS(status);
- return num(exitstatus);
- }
-#else
- if (status != 0 && default_null_arg(throw_on_error))
- uw_throwf(process_error_s, lit("closing pipe ~s failed"), stream, nao);
-#endif
- return status == 0 ? zero : nil;
- }
+ return pipe_close_status_helper(stream, throw_on_error, status, self);
}
+
return nil;
}
@@ -4277,11 +4287,12 @@ error:
}
}
-val open_fileno(val fd, val mode_str)
+val open_fileno(val fd, val mode_str, val pid_opt)
{
val self = lit("open-fileno");
struct stdio_mode m, m_r = stdio_mode_init_r;
val norm_mode = normalize_mode(&m, mode_str, m_r, self);
+ val pid = default_arg(pid_opt, nil);
if (!m.gzip) {
FILE *f = (errno = 0, w_fdopen(c_num(fd, self),
@@ -4296,9 +4307,19 @@ val open_fileno(val fd, val mode_str)
fd, num(eno), errno_to_str(eno), nao);
}
- return set_mode_props(m, make_stdio_stream(f, format(nil,
- lit("fd ~d"),
- fd, nao)));
+ {
+ val descr = format(nil, lit("fd ~d"), fd, nao);
+
+#if HAVE_FORK_STUFF
+ return set_mode_props(m, if3(pid,
+ make_pipevp_stream(f, descr,
+ c_num(pid, self)),
+ make_stdio_stream(f, descr)));
+#else
+ return set_mode_props(m, make_stdio_stream(f, descr));
+#endif
+ }
+
} else {
#if HAVE_ZLIB
cnum fdn = c_num(fd, self);
@@ -4479,8 +4500,7 @@ static val open_subprocess(val name, val mode_str, val args, val fun)
{
val self = lit("open-subprocess");
struct stdio_mode m, m_r = stdio_mode_init_r;
- val mode = normalize_mode(&m, mode_str, m_r, self);
- int input = m.read != 0;
+ int input = (normalize_mode(&m, mode_str, m_r, self), m.read != 0);
int fd[2];
pid_t pid;
char **argv = 0;
@@ -4593,8 +4613,6 @@ static val open_subprocess(val name, val mode_str, val args, val fun)
_exit(errno);
} else {
int whichfd;
- char *utf8mode = utf8_dup_to(c_str(mode, self));
- FILE *f;
if (input) {
close(fd[1]);
@@ -4614,20 +4632,24 @@ static val open_subprocess(val name, val mode_str, val args, val fun)
fcntl(whichfd, F_SETFD, FD_CLOEXEC);
#endif
- if ((f = fdopen(whichfd, utf8mode)) == 0) {
+ uw_simple_catch_begin;
+
+ ret = open_fileno(num(whichfd), mode_str, num(pid));
+
+ uw_unwind {
int status;
- kill(pid, SIGINT);
- kill(pid, SIGTERM);
- while (waitpid(pid, &status, 0) == -1 && errno == EINTR)
- ;
- free(utf8mode);
- uw_ethrowf(file_error_s, lit("opening pipe ~s, fdopen failed: ~d/~s"),
- name, num(errno), errno_to_str(errno), nao);
+ if (ret == nil) {
+ int eno = errno;
+ kill(pid, SIGINT);
+ kill(pid, SIGTERM);
+ while (waitpid(pid, &status, 0) == -1 && errno == EINTR)
+ ;
+ uw_ethrowf(file_error_s, lit("opening pipe ~s: ~d/~s"),
+ name, num(eno), errno_to_str(eno), nao);
+ }
}
- free(utf8mode);
- /* TODO: catch potential OOM exception here and kill process. */
- ret = set_mode_props(m, make_pipevp_stream(f, name, pid));
+ uw_catch_end;
}
return ret;
@@ -5492,7 +5514,7 @@ val mkstemp_wrap(val prefix, val suffix)
name = string_utf8(tmpl);
free(tmpl);
if (fd != -1) {
- val stream = open_fileno(num(fd), lit("w+b"));
+ val stream = open_fileno(num(fd), lit("w+b"), nil);
stream_set_prop(stream, name_k, name);
return stream;
}
@@ -5658,7 +5680,7 @@ void stream_init(void)
reg_fun(intern(lit("record-adapter"), user_package), func_n3o(record_adapter, 1));
reg_fun(intern(lit("open-directory"), user_package), func_n1(open_directory));
reg_fun(intern(lit("open-file"), user_package), func_n2o(open_file, 1));
- reg_fun(intern(lit("open-fileno"), user_package), func_n2o(open_fileno, 1));
+ reg_fun(intern(lit("open-fileno"), user_package), func_n3o(open_fileno, 1));
reg_fun(intern(lit("open-tail"), user_package), func_n3o(open_tail, 1));
reg_fun(intern(lit("path-search"), user_package), func_n2o(path_search, 1));
reg_fun(intern(lit("open-command"), user_package), func_n2o(open_command, 1));
diff --git a/stream.h b/stream.h
index b735dd07..d54da38c 100644
--- a/stream.h
+++ b/stream.h
@@ -182,6 +182,8 @@ val make_tail_stream(FILE *, val descr);
#if !HAVE_FORK_STUFF
val make_pipe_stream(FILE *, val descr);
#endif
+val pipe_close_status_helper(val stream, val throw_on_error,
+ int status, val self);
val stream_fd(val stream);
#if HAVE_SOCKETS
val make_sock_stream(FILE *f, val family, val type);
@@ -246,7 +248,7 @@ struct strm_ctx *get_ctx(val stream);
val get_string(val stream, val nchars, val close_after_p);
val open_directory(val path);
val open_file(val path, val mode_str);
-val open_fileno(val fd, val mode_str);
+val open_fileno(val fd, val mode_str, val pid_opt);
val open_tail(val path, val mode_str, val seek_end_p);
val open_command(val path, val mode_str);
val path_search(val name, val path_in);
diff --git a/tests/018/gzip.tl b/tests/018/gzip.tl
index 3e7c9ef9..c6d5767e 100644
--- a/tests/018/gzip.tl
+++ b/tests/018/gzip.tl
@@ -51,3 +51,7 @@
(with-out-string-stream (*stdout*)
(load "./test-file-combined.tlo.gz"))
"a\nb\n"))
+
+(when %have-gzip%
+ (with-stream (s (open-command "echo abc | gzip -c" "z"))
+ (test (get-line s) "abc")))