diff options
-rw-r--r-- | gzio.c | 34 | ||||
-rw-r--r-- | gzio.h | 3 | ||||
-rw-r--r-- | parser.c | 2 | ||||
-rw-r--r-- | stream.c | 126 | ||||
-rw-r--r-- | stream.h | 4 | ||||
-rw-r--r-- | tests/018/gzip.tl | 4 |
6 files changed, 118 insertions, 55 deletions
@@ -35,6 +35,9 @@ #include <errno.h> #include <zlib.h> #include "config.h" +#if HAVE_SYS_WAIT +#include <sys/wait.h> +#endif #include "alloca.h" #include "lib.h" #include "stream.h" @@ -57,6 +60,9 @@ struct gzio_handle { val err, errstr; char *buf; int fd; +#if HAVE_FORK_STUFF + pid_t pid; +#endif unsigned is_byte_oriented : 8; unsigned is_output : 8; }; @@ -283,7 +289,18 @@ static val gzio_close(val stream, val throw_on_error) gzio_maybe_error(stream, lit("closing")); return nil; } - return t; +#if HAVE_FORK_STUFF + if (h->pid != 0) + { + int status = 0; + val self = lit("close-stream"); + sig_save_enable; + while (waitpid(h->pid, &status, 0) == -1 && errno == EINTR) + ; + sig_restore_enable; + return pipe_close_status_helper(stream, throw_on_error, status, self); + } +#endif } return nil; } @@ -561,5 +578,20 @@ val make_gzio_stream(gzFile f, int fd, val descr, int is_output) h->buf = 0; h->is_byte_oriented = 0; h->is_output = is_output; +#if HAVE_FORK_STUFF + h->pid = 0; +#endif + return stream; +} + +#if HAVE_FORK_STUFF + +val make_gzio_pipe_stream(gzFile f, int fd, val descr, int is_output, pid_t pid) +{ + val stream = make_gzio_stream(f, fd, descr, is_output); + struct gzio_handle *h = coerce(struct gzio_handle *, stream->co.handle); + h->pid = pid; return stream; } + +#endif @@ -33,3 +33,6 @@ gzFile w_gzopen_mode(const wchar_t *wname, const wchar_t *wmode, gzFile w_gzdopen_mode(int fd, const wchar_t *wmode, const struct stdio_mode m, val self); val make_gzio_stream(gzFile f, int fd, val descr, int is_output); +#if HAVE_FORK_STUFF +val make_gzio_pipe_stream(gzFile f, int fd, val descr, int is_output, pid_t pid); +#endif @@ -1947,7 +1947,7 @@ static mem_t *lino_open8(const char *name_in, lino_file_mode_t mode_in) static mem_t *lino_fdopen(int fd, lino_file_mode_t mode_in) { val mode = static_str(lino_mode_str[mode_in]); - return coerce(mem_t *, open_fileno(num(fd), mode)); + return coerce(mem_t *, open_fileno(num(fd), mode, nil)); } static void lino_close(mem_t *stream) @@ -1373,8 +1373,47 @@ static int se_pclose(FILE *f) } #endif +val pipe_close_status_helper(val stream, val throw_on_error, + int status, val self) +{ + if (status < 0) { + if (throw_on_error) + uw_ethrowf(process_error_s, + lit("~a: unable to obtain status of command ~s: ~d/~s"), + self, stream, num(errno), errno_to_str(errno), nao); + return nil; + } else { +#if HAVE_SYS_WAIT + if (default_null_arg(throw_on_error)) { + if (WIFSIGNALED(status)) { + int termsig = WTERMSIG(status); + uw_throwf(process_error_s, lit("~a: pipe ~s terminated by signal ~a"), + self, stream, num(termsig), nao); +#ifndef WIFCONTINUED +#define WIFCONTINUED(X) 0 +#endif + } else if (WIFSTOPPED(status) || WIFCONTINUED(status)) { + uw_throwf(process_error_s, + lit("~s, processes of closed pipe ~s still running"), + self, stream, nao); + } + } + if (WIFEXITED(status)) { + int exitstatus = WEXITSTATUS(status); + return num(exitstatus); + } +#else + if (status != 0 && default_null_arg(throw_on_error)) + uw_throwf(process_error_s, lit("~a: closing pipe ~s failed"), + self, stream, nao); +#endif + return status == 0 ? zero : nil; + } +} + static val pipe_close(val stream, val throw_on_error) { + val self = lit("close-stream"); struct stdio_handle *h = coerce(struct stdio_handle *, stream->co.handle); if (h->f != 0) { @@ -1385,38 +1424,9 @@ static val pipe_close(val stream, val throw_on_error) #endif h->f = 0; - if (status < 0) { - if (throw_on_error) - uw_ethrowf(process_error_s, - lit("unable to obtain status of command ~s: ~d/~s"), - stream, num(errno), errno_to_str(errno), nao); - } else { -#if HAVE_SYS_WAIT - if (default_null_arg(throw_on_error)) { - if (WIFSIGNALED(status)) { - int termsig = WTERMSIG(status); - uw_throwf(process_error_s, lit("pipe ~s terminated by signal ~a"), - stream, num(termsig), nao); -#ifndef WIFCONTINUED -#define WIFCONTINUED(X) 0 -#endif - } else if (WIFSTOPPED(status) || WIFCONTINUED(status)) { - uw_throwf(process_error_s, - lit("processes of closed pipe ~s still running"), - stream, nao); - } - } - if (WIFEXITED(status)) { - int exitstatus = WEXITSTATUS(status); - return num(exitstatus); - } -#else - if (status != 0 && default_null_arg(throw_on_error)) - uw_throwf(process_error_s, lit("closing pipe ~s failed"), stream, nao); -#endif - return status == 0 ? zero : nil; - } + return pipe_close_status_helper(stream, throw_on_error, status, self); } + return nil; } @@ -4277,11 +4287,12 @@ error: } } -val open_fileno(val fd, val mode_str) +val open_fileno(val fd, val mode_str, val pid_opt) { val self = lit("open-fileno"); struct stdio_mode m, m_r = stdio_mode_init_r; val norm_mode = normalize_mode(&m, mode_str, m_r, self); + val pid = default_arg(pid_opt, nil); if (!m.gzip) { FILE *f = (errno = 0, w_fdopen(c_num(fd, self), @@ -4296,9 +4307,19 @@ val open_fileno(val fd, val mode_str) fd, num(eno), errno_to_str(eno), nao); } - return set_mode_props(m, make_stdio_stream(f, format(nil, - lit("fd ~d"), - fd, nao))); + { + val descr = format(nil, lit("fd ~d"), fd, nao); + +#if HAVE_FORK_STUFF + return set_mode_props(m, if3(pid, + make_pipevp_stream(f, descr, + c_num(pid, self)), + make_stdio_stream(f, descr))); +#else + return set_mode_props(m, make_stdio_stream(f, descr)); +#endif + } + } else { #if HAVE_ZLIB cnum fdn = c_num(fd, self); @@ -4479,8 +4500,7 @@ static val open_subprocess(val name, val mode_str, val args, val fun) { val self = lit("open-subprocess"); struct stdio_mode m, m_r = stdio_mode_init_r; - val mode = normalize_mode(&m, mode_str, m_r, self); - int input = m.read != 0; + int input = (normalize_mode(&m, mode_str, m_r, self), m.read != 0); int fd[2]; pid_t pid; char **argv = 0; @@ -4593,8 +4613,6 @@ static val open_subprocess(val name, val mode_str, val args, val fun) _exit(errno); } else { int whichfd; - char *utf8mode = utf8_dup_to(c_str(mode, self)); - FILE *f; if (input) { close(fd[1]); @@ -4614,20 +4632,24 @@ static val open_subprocess(val name, val mode_str, val args, val fun) fcntl(whichfd, F_SETFD, FD_CLOEXEC); #endif - if ((f = fdopen(whichfd, utf8mode)) == 0) { + uw_simple_catch_begin; + + ret = open_fileno(num(whichfd), mode_str, num(pid)); + + uw_unwind { int status; - kill(pid, SIGINT); - kill(pid, SIGTERM); - while (waitpid(pid, &status, 0) == -1 && errno == EINTR) - ; - free(utf8mode); - uw_ethrowf(file_error_s, lit("opening pipe ~s, fdopen failed: ~d/~s"), - name, num(errno), errno_to_str(errno), nao); + if (ret == nil) { + int eno = errno; + kill(pid, SIGINT); + kill(pid, SIGTERM); + while (waitpid(pid, &status, 0) == -1 && errno == EINTR) + ; + uw_ethrowf(file_error_s, lit("opening pipe ~s: ~d/~s"), + name, num(eno), errno_to_str(eno), nao); + } } - free(utf8mode); - /* TODO: catch potential OOM exception here and kill process. */ - ret = set_mode_props(m, make_pipevp_stream(f, name, pid)); + uw_catch_end; } return ret; @@ -5492,7 +5514,7 @@ val mkstemp_wrap(val prefix, val suffix) name = string_utf8(tmpl); free(tmpl); if (fd != -1) { - val stream = open_fileno(num(fd), lit("w+b")); + val stream = open_fileno(num(fd), lit("w+b"), nil); stream_set_prop(stream, name_k, name); return stream; } @@ -5658,7 +5680,7 @@ void stream_init(void) reg_fun(intern(lit("record-adapter"), user_package), func_n3o(record_adapter, 1)); reg_fun(intern(lit("open-directory"), user_package), func_n1(open_directory)); reg_fun(intern(lit("open-file"), user_package), func_n2o(open_file, 1)); - reg_fun(intern(lit("open-fileno"), user_package), func_n2o(open_fileno, 1)); + reg_fun(intern(lit("open-fileno"), user_package), func_n3o(open_fileno, 1)); reg_fun(intern(lit("open-tail"), user_package), func_n3o(open_tail, 1)); reg_fun(intern(lit("path-search"), user_package), func_n2o(path_search, 1)); reg_fun(intern(lit("open-command"), user_package), func_n2o(open_command, 1)); @@ -182,6 +182,8 @@ val make_tail_stream(FILE *, val descr); #if !HAVE_FORK_STUFF val make_pipe_stream(FILE *, val descr); #endif +val pipe_close_status_helper(val stream, val throw_on_error, + int status, val self); val stream_fd(val stream); #if HAVE_SOCKETS val make_sock_stream(FILE *f, val family, val type); @@ -246,7 +248,7 @@ struct strm_ctx *get_ctx(val stream); val get_string(val stream, val nchars, val close_after_p); val open_directory(val path); val open_file(val path, val mode_str); -val open_fileno(val fd, val mode_str); +val open_fileno(val fd, val mode_str, val pid_opt); val open_tail(val path, val mode_str, val seek_end_p); val open_command(val path, val mode_str); val path_search(val name, val path_in); diff --git a/tests/018/gzip.tl b/tests/018/gzip.tl index 3e7c9ef9..c6d5767e 100644 --- a/tests/018/gzip.tl +++ b/tests/018/gzip.tl @@ -51,3 +51,7 @@ (with-out-string-stream (*stdout*) (load "./test-file-combined.tlo.gz")) "a\nb\n")) + +(when %have-gzip% + (with-stream (s (open-command "echo abc | gzip -c" "z")) + (test (get-line s) "abc"))) |