From e396824ed2d6514a62302c5e5591cacfbb8cd29b Mon Sep 17 00:00:00 2001 From: Kaz Kylheku Date: Sun, 6 Mar 2016 16:36:58 -0800 Subject: Special implementation of dgram socket streams. * socket.c (struct dgram_stream): New struct. (make_dgram_sock_stream, dgram_print, dgram_mark, dgram_destroy, dgram_overflow, dgram_put_byte_callback, dgram_put_string, dgram_put_char, dgram_put_byte, dgram_get_byte_callback, dgram_get_char, dgram_get_byte, dgram_unget_char, dgram_unget_byte, dgram_flush, dgram_close, dgram_get_prop, dgram_get_fd, dgram_get_sock_family, dgram_get_sock_type, dgram_get_sock_peer, dgram_set_sock_peer, dgram_get_error, dgram_get_error_str, dgram_clear_error): New static functions. (dgram_strm_ops): New static structure. (sock_listen): Check against datagram sockets which have a peer; otherwise a no-op for datagram sockets. (sock_accept): Special logic for dgram sockets. (open_sockfd): Function moved here from stream.c, and augmented to open dgram stream for dgram sockets. (open_socket): Function moved here from stream.c. (sock_load_init): Fill in some operations in dgram_strm_ops. * stream.c (generic_get_line, make_sock_stream, errno_to_string): Statics become external. (open_sockfd, open_socket): Functions removed from here, moved to socket.c. * stream.h (generic_get_line, make_sock_stream, errno_to_string): Declared. --- socket.c | 516 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---- stream.c | 35 +---- stream.h | 3 + 3 files changed, 494 insertions(+), 60 deletions(-) diff --git a/socket.c b/socket.c index 49b531a8..e4bbb61f 100644 --- a/socket.c +++ b/socket.c @@ -33,9 +33,10 @@ #include #include #include -#include "config.h" +#include #include #include +#include "config.h" #include ALLOCA_H #include "lib.h" #include "stream.h" @@ -49,6 +50,23 @@ #include "arith.h" #include "socket.h" +struct dgram_stream { + struct strm_base a; + val stream; + val family; + val peer; + val unget_c; + utf8_decoder_t ud; + struct sockaddr_storage peer_addr; + socklen_t pa_len; + int fd; + int err; + mem_t *rx_buf; + mem_t *tx_buf; + int rx_size, rx_pos; + int tx_pos; +}; + val sockaddr_in_s, sockaddr_in6_s, sockaddr_un_s, addrinfo_s; val flags_s, family_s, socktype_s, protocol_s, addr_s, canonname_s; val port_s, flow_info_s, scope_id_s, path_s; @@ -91,7 +109,6 @@ static void ipv6_scope_id_from_num(struct sockaddr_in6 *dst, val scope) scope, nao); } - static val sockaddr_in_out(struct sockaddr_in *src) { args_decl(args, ARGS_MIN); @@ -235,6 +252,347 @@ static void sockaddr_in(val sockaddr, val family, } } +static_forward(struct strm_ops dgram_strm_ops); + +static val make_dgram_sock_stream(int fd, val family, val peer, + mem_t *dgram, int dgram_size, + struct sockaddr *peer_addr, socklen_t pa_len) +{ + struct dgram_stream *d = coerce(struct dgram_stream *, + chk_malloc(sizeof *d)); + val stream; + + strm_base_init(&d->a); + d->stream = nil; + d->fd = fd; + d->family = d->peer = d->unget_c = nil; + d->err = 0; + d->rx_buf = dgram; + d->rx_size = dgram_size; + d->rx_pos = 0; + d->tx_buf = 0; + d->tx_pos = 0; + utf8_decoder_init(&d->ud); + if (peer_addr != 0) + memcpy(&d->peer_addr, peer_addr, pa_len); + d->pa_len = pa_len; + stream = cobj(coerce(mem_t *, d), stream_s, &dgram_strm_ops.cobj_ops); + d->stream = stream; + d->family = family; + d->peer = peer; + return stream; +} + +static void dgram_print(val stream, val out, val pretty) +{ + struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle); + struct strm_ops *ops = coerce(struct strm_ops *, stream->co.ops); + val name = static_str(ops->name); + + (void) pretty; + + format(out, lit("#<~a ~s ~p>"), name, num(d->fd), stream, nao); +} + +static void dgram_mark(val stream) +{ + struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle); + strm_base_mark(&d->a); + /* h->stream == stream and so no need to mark h->stream */ + gc_mark(d->family); + gc_mark(d->peer); + gc_mark(d->unget_c); +} + +static void dgram_destroy(val stream) +{ + struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle); + free(d->rx_buf); + free(d->tx_buf); + d->rx_buf = d->tx_buf = 0; +} + +static void dgram_overflow(val stream) +{ + /* + * Not called under present logic, because dgram_put_byte_callback keeps + * increasing the datagram size. + */ + struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle); + d->err = ENOBUFS; + uw_throwf(socket_error_s, lit("dgram write overflow on ~s"), stream, nao); +} + +static int dgram_put_byte_callback(int b, mem_t *ctx) +{ + struct dgram_stream *d = coerce(struct dgram_stream *, ctx); + d->tx_buf = chk_manage_vec(d->tx_buf, d->tx_pos, d->tx_pos + 1, 1, 0); + d->tx_buf[d->tx_pos++] = b; + return 1; +} + +static val dgram_put_string(val stream, val str) +{ + struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle); + const wchar_t *s = c_str(str); + + while (*s) { + if (!utf8_encode(*s++, dgram_put_byte_callback, coerce(mem_t *, d))) + dgram_overflow(stream); + } + + return t; +} + +static val dgram_put_char(val stream, val ch) +{ + struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle); + if (!utf8_encode(c_chr(ch), dgram_put_byte_callback, coerce(mem_t *, d))) + dgram_overflow(stream); + return t; +} + +static val dgram_put_byte(val stream, int b) +{ + struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle); + if (!dgram_put_byte_callback(b, coerce(mem_t *, d))) + dgram_overflow(stream); + return t; +} + +static int dgram_get_byte_callback(mem_t *ctx) +{ + struct dgram_stream *d = coerce(struct dgram_stream *, ctx); + if (d->rx_buf) { + return (d->rx_pos < d->rx_size) ? d->rx_buf[d->rx_pos++] : EOF; + } else { + const int dgram_size = 65536; + mem_t *dgram = chk_malloc(dgram_size); + ssize_t nbytes = -1; + + uw_simple_catch_begin; + + sig_save_enable; + + nbytes = recv(d->fd, dgram, dgram_size, 0); + + sig_restore_enable; + + if (nbytes == -1) { + d->err = errno; + uw_throwf(socket_error_s, + lit("get-byte: recv on ~s failed: ~d/~s"), + d->stream, num(errno), string_utf8(strerror(errno)), nao); + } + + uw_unwind { + if (nbytes == -1) + free(dgram); + } + + uw_catch_end; + + d->rx_buf = chk_realloc(dgram, nbytes); + + if (!d->rx_buf) + d->rx_buf = dgram; + + d->rx_size = nbytes; + + return dgram_get_byte_callback(ctx); + } +} + +static val dgram_get_char(val stream) +{ + struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle); + val uc = d->unget_c; + if (uc) { + d->unget_c = nil; + return uc; + } else { + wint_t ch = utf8_decode(&d->ud, dgram_get_byte_callback, + coerce(mem_t *, d)); + return (ch != WEOF) ? chr(ch) : nil; + } +} + +static val dgram_get_byte(val stream) +{ + struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle); + int b = dgram_get_byte_callback(coerce(mem_t *, d)); + return b == EOF ? nil : num_fast(b); +} + +static val dgram_unget_char(val stream, val ch) +{ + struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle); + + if (d->unget_c){ + d->err = EOVERFLOW; + uw_throwf(file_error_s, lit("unget-char overflow on ~a: "), stream, nao); + } + + d->unget_c = ch; + return ch; +} + +static val dgram_unget_byte(val stream, int byte) +{ + struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle); + + if (d->rx_pos <= 0) { + d->err = EOVERFLOW; + uw_throwf(file_error_s, + lit("unget-byte: cannot push back past start of stream ~s"), + stream, nao); + } + + d->rx_buf[--d->rx_pos] = byte; + return num_fast(byte); +} + +static val dgram_flush(val stream) +{ + struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle); + if (d->fd != -1 && d->tx_buf) { + if (d->peer) { + int nwrit = sendto(d->fd, d->tx_buf, d->tx_pos, 0, + coerce(struct sockaddr *, &d->peer_addr), d->pa_len); + + if (nwrit != d->tx_pos) { + d->err = (nwrit < 0) ? errno : ENOBUFS; + uw_throwf(socket_error_s, + lit("flush-stream: sendto on ~s ~a: ~d/~s"), + stream, + (nwrit < 0) ? lit("failed") : lit("truncated"), + num(errno), string_utf8(strerror(errno)), nao); + } + + free(d->tx_buf); + d->tx_buf = 0; + d->tx_pos = 0; + } else { + d->err = ENOTCONN; + uw_throwf(socket_error_s, + lit("flush-stream: cannot transmit on ~s: peer not set"), + stream, nao); + } + } + return t; +} + +static val dgram_close(val stream, val throw_on_error) +{ + struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle); + if (d->fd != -1) { + dgram_flush(stream); + close(d->fd); + d->fd = -1; + d->err = 0; + } + + return t; +} + +static val dgram_get_prop(val stream, val ind) +{ + struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle); + + if (ind == fd_k) + return num(d->fd); + + return nil; +} + +static val dgram_get_error(val stream) +{ + struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle); + if (d->err) + return num(d->err); + if (d->rx_buf && d->rx_pos == d->rx_size) + return t; + return nil; +} + +static val dgram_get_error_str(val stream) +{ + return errno_to_string(dgram_get_error(stream)); +} + +static val dgram_clear_error(val stream) +{ + val ret = dgram_get_error(stream); + struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle); + + d->err = 0; + + if (d->rx_pos == d->rx_size) { + d->rx_pos = d->rx_size = 0; + free(d->rx_buf); + d->rx_buf = 0; + } + + return ret; +} + +static val dgram_get_fd(val stream) +{ + struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle); + return num(d->fd); +} + +static val dgram_get_sock_family(val stream) +{ + struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle); + return d->family; +} + +static val dgram_get_sock_type(val stream) +{ + (void) stream; + return num_fast(SOCK_DGRAM); +} + +static val dgram_get_sock_peer(val stream) +{ + struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle); + return d->peer; +} + +static val dgram_set_sock_peer(val stream, val peer) +{ + struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle); + sockaddr_in(peer, d->family, &d->peer_addr, &d->pa_len); + return d->peer = peer; +} + +static_def(struct strm_ops dgram_strm_ops = + strm_ops_init(cobj_ops_init(eq, + dgram_print, + dgram_destroy, + dgram_mark, + cobj_hash_op), + wli("dgram-stream"), + dgram_put_string, + dgram_put_char, + dgram_put_byte, + generic_get_line, + dgram_get_char, + dgram_get_byte, + dgram_unget_char, + dgram_unget_byte, + dgram_close, + dgram_flush, + 0, + 0, + dgram_get_prop, + 0, + dgram_get_error, + dgram_get_error_str, + dgram_clear_error, + dgram_get_fd)); + static val sock_bind(val sock, val sockaddr) { int sfd = c_num(stream_fd(sock)); @@ -274,49 +632,119 @@ static val sock_connect(val sock, val sockaddr) static val sock_listen(val sock, val backlog) { - val sfd = stream_fd(sock); + if (sock_type(sock) == num_fast(SOCK_DGRAM)) { + if (sock_peer(sock)) { + errno = EISCONN; + goto failed; + } + } else { + val sfd = stream_fd(sock); - if (listen(c_num(sfd), c_num(default_arg(backlog, num_fast(16))))) - uw_throwf(socket_error_s, lit("sock-listen failed: ~d/~s"), - num(errno), string_utf8(strerror(errno)), nao); + if (listen(c_num(sfd), c_num(default_arg(backlog, num_fast(16))))) + goto failed; + } return t; +failed: + uw_throwf(socket_error_s, lit("sock-listen failed: ~d/~s"), + num(errno), string_utf8(strerror(errno)), nao); } static val sock_accept(val sock, val mode_str) { val sfd = stream_fd(sock); val family = sock_family(sock); + val type = sock_type(sock); struct sockaddr_storage sa; socklen_t salen; - int afd; - val peer; + val peer = nil; + + if (type == num_fast(SOCK_DGRAM)) { + ssize_t nbytes = -1; + const int dgram_size = 65536; + mem_t *dgram = chk_malloc(dgram_size); - sig_save_enable; + if (sock_peer(sock)) { + free(dgram); + errno = EISCONN; + goto failed; + } - afd = accept(c_num(sfd), coerce(struct sockaddr *, &sa), &salen); + uw_simple_catch_begin; - sig_restore_enable; + sig_save_enable; - if (afd < 0) - uw_throwf(socket_error_s, lit("accept failed: ~d/~s"), - num(errno), string_utf8(strerror(errno)), nao); + nbytes = recvfrom(c_num(sfd), dgram, dgram_size, 0, + coerce(struct sockaddr *, &sa), &salen); + + sig_restore_enable; + + if (family == num_fast(AF_INET)) + peer = sockaddr_in_out(coerce(struct sockaddr_in *, &sa)); + else if (family == num_fast(AF_INET6)) + peer = sockaddr_in6_out(coerce(struct sockaddr_in6 *, &sa)); + else if (family == num_fast(AF_UNIX)) + peer = unix_sockaddr_out(coerce(struct sockaddr_un *, &sa)); + else { + free(dgram); + dgram = 0; + uw_throwf(socket_error_s, lit("sock-accept: ~s isn't a supported socket family"), + family, nao); + } - if (family == num_fast(AF_INET)) - peer = sockaddr_in_out(coerce(struct sockaddr_in *, &sa)); - else if (family == num_fast(AF_INET6)) - peer = sockaddr_in6_out(coerce(struct sockaddr_in6 *, &sa)); - else if (family == num_fast(AF_UNIX)) - peer = unix_sockaddr_out(coerce(struct sockaddr_un *, &sa)); - else - uw_throwf(socket_error_s, lit("accept: ~s isn't a supported socket family"), - family, nao); - - { - val stream = open_sockfd(num(afd), family, num_fast(SOCK_STREAM), mode_str); - sock_set_peer(stream, peer); - return stream; + uw_unwind { + if (nbytes == -1) + free(dgram); + } + + uw_catch_end; + + { + int afd = dup(c_num(sfd)); + mem_t *shrink = chk_realloc(dgram, nbytes); + if (shrink) + dgram = shrink; + + if (afd == -1) { + free(dgram); + dgram = 0; + uw_throwf(socket_error_s, lit("sock-accept: unable to "), + family, nao); + } + + return make_dgram_sock_stream(afd, family, peer, dgram, nbytes, + coerce(struct sockaddr *, &sa), salen); + } + } else { + int afd = -1; + sig_save_enable; + + afd = accept(c_num(sfd), coerce(struct sockaddr *, &sa), &salen); + + sig_restore_enable; + + if (afd < 0) + goto failed; + + if (family == num_fast(AF_INET)) + peer = sockaddr_in_out(coerce(struct sockaddr_in *, &sa)); + else if (family == num_fast(AF_INET6)) + peer = sockaddr_in6_out(coerce(struct sockaddr_in6 *, &sa)); + else if (family == num_fast(AF_UNIX)) + peer = unix_sockaddr_out(coerce(struct sockaddr_un *, &sa)); + else + uw_throwf(socket_error_s, lit("accept: ~s isn't a supported socket family"), + family, nao); + + { + val stream = open_sockfd(num(afd), family, num_fast(SOCK_STREAM), mode_str); + sock_set_peer(stream, peer); + return stream; + } } +failed: + uw_throwf(socket_error_s, lit("accept failed: ~d/~s"), + num(errno), string_utf8(strerror(errno)), nao); } static val sock_shutdown(val sock, val how) @@ -332,6 +760,32 @@ static val sock_shutdown(val sock, val how) return t; } +val open_sockfd(val fd, val family, val type, val mode_str_in) +{ + if (type == num_fast(SOCK_DGRAM)) { + return make_dgram_sock_stream(c_num(fd), family, nil, 0, 0, 0, 0); + } else { + struct stdio_mode m; + val mode_str = default_arg(mode_str_in, lit("r+")); + FILE *f = (errno = 0, w_fdopen(c_num(fd), c_str(normalize_mode(&m, mode_str)))); + + if (!f) { + close(c_num(fd)); + uw_throwf(file_error_s, lit("error creating stream for socket ~a: ~d/~s"), + fd, num(errno), string_utf8(strerror(errno)), nao); + } + + setvbuf(f, (char *) NULL, _IOLBF, 0); + return set_mode_props(m, make_sock_stream(f, family, type)); + } +} + +val open_socket(val family, val type, val mode_str) +{ + int fd = socket(c_num(family), c_num(type), 0); + return open_sockfd(num(fd), family, type, mode_str); +} + void sock_load_init(void) { sockaddr_in_s = intern(lit("sockaddr-in"), user_package); @@ -384,4 +838,10 @@ void sock_load_init(void) reg_fun(intern(lit("sock-listen"), user_package), func_n2o(sock_listen, 1)); reg_fun(intern(lit("sock-accept"), user_package), func_n2o(sock_accept, 1)); reg_fun(intern(lit("sock-shutdown"), user_package), func_n2o(sock_shutdown, 1)); + + fill_stream_ops(&dgram_strm_ops); + dgram_strm_ops.get_sock_family = dgram_get_sock_family; + dgram_strm_ops.get_sock_type = dgram_get_sock_type; + dgram_strm_ops.get_sock_peer = dgram_get_sock_peer; + dgram_strm_ops.set_sock_peer = dgram_set_sock_peer; } diff --git a/stream.c b/stream.c index cbb6821a..df1faa2d 100644 --- a/stream.c +++ b/stream.c @@ -406,7 +406,7 @@ static void stdio_stream_mark(val stream) #endif } -static val errno_to_string(val err) +val errno_to_string(val err) { if (err == zero) return lit("unspecified error"); @@ -608,7 +608,7 @@ static val stdio_get_fd(val stream) return h->f ? num(fileno(h->f)) : nil; } -static val generic_get_line(val stream) +val generic_get_line(val stream) { struct strm_ops *ops = coerce(struct strm_ops *, cobj_ops(stream, stream_s)); const size_t min_size = 512; @@ -1233,7 +1233,7 @@ val make_pipe_stream(FILE *f, val descr) } #if HAVE_SOCKETS -static val make_sock_stream(FILE *f, val family, val type) +val make_sock_stream(FILE *f, val family, val type) { val s = make_stdio_stream_common(f, lit("socket"), &stdio_sock_ops.cobj_ops); struct stdio_handle *h = coerce(struct stdio_handle *, s->co.handle); @@ -3249,35 +3249,6 @@ val open_fileno(val fd, val mode_str) fd, nao))); } -#if HAVE_SOCKETS -val open_sockfd(val fd, val family, val type, val mode_str_in) -{ - struct stdio_mode m; - val mode_str = default_arg(mode_str_in, lit("r+")); - FILE *f = (errno = 0, w_fdopen(c_num(fd), c_str(normalize_mode(&m, mode_str)))); - - if (!f) { - close(c_num(fd)); - uw_throwf(file_error_s, lit("error creating stream for socket ~a: ~d/~s"), - fd, num(errno), string_utf8(strerror(errno)), nao); - } - - if (type == num_fast(SOCK_DGRAM)) - setvbuf(f, (char *) NULL, _IOFBF, 65536); - else - setvbuf(f, (char *) NULL, _IOLBF, 0); - - return set_mode_props(m, make_sock_stream(f, family, type)); -} - -val open_socket(val family, val type, val mode_str) -{ - int fd = socket(c_num(family), c_num(type), 0); - return open_sockfd(num(fd), family, type, mode_str); -} -#endif - - val open_tail(val path, val mode_str, val seek_end_p) { struct stdio_mode m; diff --git a/stream.h b/stream.h index c874d96e..011fcb38 100644 --- a/stream.h +++ b/stream.h @@ -125,12 +125,15 @@ void stream_mark_op(val stream); void stream_destroy_op(val stream); val normalize_mode(struct stdio_mode *m, val mode_str); val set_mode_props(const struct stdio_mode m, val stream); +val generic_get_line(val stream); +val errno_to_string(val err); val make_null_stream(void); val make_stdio_stream(FILE *, val descr); val make_tail_stream(FILE *, val descr); val make_pipe_stream(FILE *, val descr); val stream_fd(val stream); #if HAVE_SOCKETS +val make_sock_stream(FILE *f, val family, val type); val sock_family(val stream); val sock_type(val stream); val sock_peer(val stream); -- cgit v1.2.3