From cb95ac27e17231635811c802626991da4c5f78ec Mon Sep 17 00:00:00 2001 From: Jari Vetoniemi Date: Wed, 18 Mar 2015 22:10:33 +0200 Subject: [PATCH] Async replys. --- bin/server.c | 79 ++++++++++++++++++++++++++++++---------------------- src/pi9.c | 60 ++++++++++++++++++++++++++------------- src/pi9.h | 11 ++++++-- 3 files changed, 96 insertions(+), 54 deletions(-) diff --git a/bin/server.c b/bin/server.c index 98cbb90..7fbc670 100644 --- a/bin/server.c +++ b/bin/server.c @@ -9,8 +9,9 @@ #include #include "pi9.h" -#include "chck/pool/pool.h" -#include "chck/lut/lut.h" +#include +#include +#include #define M(m) (m & 3) @@ -38,7 +39,7 @@ struct node { bool open; struct node_procs { - bool (*read)(struct pi9 *pi9, struct node *node, uint64_t offset, uint32_t count); + bool (*read)(struct pi9 *pi9, struct node *node, uint16_t tag, uint64_t offset, uint32_t count); bool (*write)(struct pi9 *pi9, struct node *node, uint64_t offset, uint32_t count, const void *data); uint64_t (*size)(struct pi9 *pi9, struct node *node); } procs; @@ -212,53 +213,66 @@ clunk_fid(struct fs *fs, uint32_t fid) } static bool -cb_read_qtdir(struct pi9 *pi9, struct node *node, uint64_t offset, uint32_t count) +cb_read_qtdir(struct pi9 *pi9, struct node *node, uint16_t tag, uint64_t offset, uint32_t count) { (void)offset, (void)count; + struct pi9_reply reply; + pi9_reply_start(&reply, tag, pi9->stream); + // For directories, read returns an integral number of directory entries exactly as in stat (see stat(5)), // one for each member of the directory. The read request message must have offset equal to zero or the // value of offset in the previous read on the directory, plus the number of bytes returned in the previous read. // In other words, seeking other than to the beginning is illegal in a directory (see seek(2)). - struct pi9_stat stats[2]; - memcpy(&stats[0], &node->stat, sizeof(struct pi9_stat)); - memcpy(&stats[1], &node->stat, sizeof(struct pi9_stat)); - pi9_string_set_cstr_with_length(&stats[0].name, ".", 1, false); - pi9_string_set_cstr_with_length(&stats[1].name, "..", 2, false); + if (offset == 0) { + struct pi9_stat stats[2]; + memcpy(&stats[0], &node->stat, sizeof(struct pi9_stat)); + memcpy(&stats[1], &node->stat, sizeof(struct pi9_stat)); + stats[0].name = (struct pi9_string){ ".", 1, false }; + stats[1].name = (struct pi9_string){ "..", 2, false }; - for (uint32_t i = 0; i < 2; ++i) { - if (!pi9_write_stat(&stats[i], pi9->stream)) - return false; - } + for (uint32_t i = 0; i < 2; ++i) { + if (!pi9_write_stat(&stats[i], pi9->stream)) + return false; + } - for (size_t i = 0; i < node->childs.items.count; ++i) { - size_t *n = chck_iter_pool_get(&node->childs, i); - struct node *c = get_node(pi9->userdata, *n); - assert(c); + for (size_t i = 0; i < node->childs.items.count; ++i) { + size_t *n = chck_iter_pool_get(&node->childs, i); + struct node *c = get_node(pi9->userdata, *n); + assert(c); - // update size from callback - if (c->procs.size) - c->stat.length = c->procs.size(pi9, c); + // update size from callback + if (c->procs.size) + c->stat.length = c->procs.size(pi9, c); - if (!pi9_write_stat(&c->stat, pi9->stream)) - return false; + if (!pi9_write_stat(&c->stat, pi9->stream)) + return false; + } } - return true; + return pi9_reply_send(&reply, pi9->fd, pi9->stream); } static bool -cb_read_hello(struct pi9 *pi9, struct node *node, uint64_t offset, uint32_t count) +cb_read_hello(struct pi9 *pi9, struct node *node, uint16_t tag, uint64_t offset, uint32_t count) { (void)node, (void)offset, (void)count; + struct pi9_reply reply; + pi9_reply_start(&reply, tag, pi9->stream); + // The read request asks for count bytes of data from the file identified by fid, // which must be opened for reading, starting offset bytes after the beginning of the file. // The bytes are returned with the read reply message. - // XXX: Just example here, we ignore offset and count - return (pi9_write("Hello World!", 1, sizeof("Hello World!"), pi9->stream) == sizeof("Hello World!")); + const size_t size = sizeof("Hello World!"); + offset = chck_minsz(size, offset); + count = chck_minsz(size - offset, count); + if (!(pi9_write("Hello World!" + offset, 1, count, pi9->stream)) == count) + return false; + + return pi9_reply_send(&reply, pi9->fd, pi9->stream); } static bool @@ -474,7 +488,7 @@ cb_walk(struct pi9 *pi9, uint16_t tag, uint32_t fid, uint32_t newfid, uint16_t n size_t *c; chck_iter_pool_for_each(&wnode->childs, c) { struct node *n; - if (!(n = get_node(pi9->userdata, *c)) || !pi9_string_eq(&walks[i], &n->stat.name)) + if (!(n = get_node(pi9->userdata, *c)) || pi9_string_eq(&walks[i], &n->stat.name)) continue; qids[*out_nwqid] = &n->stat.qid; @@ -624,7 +638,7 @@ cb_read(struct pi9 *pi9, uint16_t tag, uint32_t fid, uint64_t offset, uint32_t c if (M(f->omode) != PI9_OREAD) goto err_not_allowed; - if (f->procs.read && !f->procs.read(pi9, f, offset, count)) + if (f->procs.read && !f->procs.read(pi9, f, tag, offset, count)) goto err_write; return true; @@ -865,7 +879,7 @@ sock_unix(char *address, struct sockaddr_un *sa, socklen_t *salen) strncpy(sa->sun_path, address, sizeof(sa->sun_path)); *salen = SUN_LEN(sa); - int32_t fd; + int fd; if ((fd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) return -1; @@ -877,7 +891,7 @@ announce_unix(char *file) { assert(file); - int32_t fd; + int fd; socklen_t salen; struct sockaddr_un sa; if ((fd = sock_unix(file, &sa, &salen)) < 0) @@ -941,14 +955,13 @@ main(int argc, char *argv[]) int32_t clients = 0; while (running) { - int32_t ret; - if ((ret = poll(fds, 1 + clients, 500)) <= 0) + if (poll(fds, 1 + clients, 500) <= 0) continue; for (int32_t i = 0; i < 1 + clients; ++i) { if (fds[i].revents & POLLIN) { if (i == 0) { - int32_t fd; + int fd; if ((fd = accept(fds[i].fd, NULL, NULL)) >= 0) { if (clients > 0) { fprintf(stderr, "Rejected client\n"); diff --git a/src/pi9.c b/src/pi9.c index f5a6122..4218699 100644 --- a/src/pi9.c +++ b/src/pi9.c @@ -6,7 +6,7 @@ #include #include "pi9.h" -#include "chck/buffer/buffer.h" +#include #define VERBOSE false #define MAXWELEM 16 @@ -519,19 +519,7 @@ op_Tread(struct pi9 *pi9, uint16_t tag, struct pi9_stream *stream) fprintf(stderr, "Tread %u %u %"PRIu64" %u\n", tag, fid, offset, count); #endif - chck_buffer_seek(&stream->out, HDRSZ + sizeof(uint32_t), SEEK_SET); - void *start = stream->out.curpos; - if (pi9->procs.read && !pi9->procs.read(pi9, tag, fid, offset, count)) - return false; - - const uint32_t sbufsz = (offset != 0 ? 0 : (stream->out.curpos - start)); - const uint32_t size = HDRSZ + sizeof(sbufsz) + sbufsz; - chck_buffer_seek(&stream->out, 0, SEEK_SET); - if (!chck_buffer_write_int(&size, sizeof(size), &stream->out) || - !chck_buffer_write_int((uint8_t[]){Rread}, sizeof(uint8_t), &stream->out) || - !chck_buffer_write_int(&tag, sizeof(tag), &stream->out) || - !chck_buffer_write_int(&sbufsz, sizeof(sbufsz), &stream->out)) goto err_write; return true; @@ -743,7 +731,7 @@ err_unknown_op: } static bool -read_msg(struct pi9 *pi9, int32_t fd, struct pi9_stream *stream) +read_msg(struct pi9 *pi9, int fd, struct pi9_stream *stream) { assert(pi9 && fd >= 0); @@ -779,9 +767,13 @@ read_msg(struct pi9 *pi9, int32_t fd, struct pi9_stream *stream) } static inline bool -write_msg(int32_t fd, struct pi9_stream *stream) +write_msg(int fd, struct pi9_stream *stream) { assert(fd >= 0 && stream->out.buffer); + + if ((size_t)(stream->out.curpos - stream->out.buffer) < sizeof(uint32_t)) + return true; + const uint32_t size = *(uint32_t*)stream->out.buffer; #if VERBOSE fprintf(stderr, "Write message of size: %u\n", size); @@ -789,6 +781,34 @@ write_msg(int32_t fd, struct pi9_stream *stream) return write(fd, stream->out.buffer, size) == size; } +void +pi9_reply_start(struct pi9_reply *reply, uint16_t tag, struct pi9_stream *stream) +{ + assert(reply && stream); + memset(reply, 0, sizeof(struct pi9_reply)); + + chck_buffer_seek(&stream->out, HDRSZ + sizeof(uint32_t), SEEK_SET); + reply->start = stream->out.curpos; + reply->tag = tag; +} + +bool +pi9_reply_send(struct pi9_reply *reply, int fd, struct pi9_stream *stream) +{ + assert(reply && stream); + + const uint32_t sbufsz = (stream->out.curpos - reply->start); + const uint32_t size = HDRSZ + sizeof(sbufsz) + sbufsz; + chck_buffer_seek(&stream->out, 0, SEEK_SET); + if (!chck_buffer_write_int(&size, sizeof(size), &stream->out) || + !chck_buffer_write_int((uint8_t[]){Rread}, sizeof(uint8_t), &stream->out) || + !chck_buffer_write_int(&reply->tag, sizeof(reply->tag), &stream->out) || + !chck_buffer_write_int(&sbufsz, sizeof(sbufsz), &stream->out)) + return false; + + return (stream->in.buffer == stream->in.curpos ? write_msg(fd, stream) : true); +} + bool pi9_write_stat(struct pi9_stat *stat, struct pi9_stream *stream) { @@ -831,12 +851,10 @@ pi9_stat_release(struct pi9_stat *stat) } bool -pi9_process(struct pi9 *pi9, int32_t fd) +pi9_process(struct pi9 *pi9, int fd) { assert(pi9 && fd >= 0 && pi9->stream); - - chck_buffer_seek(&pi9->stream->in, 0, SEEK_SET); - chck_buffer_seek(&pi9->stream->out, 0, SEEK_SET); + pi9->fd = fd; bool ret = true; if (!read_msg(pi9, fd, pi9->stream)) { @@ -851,6 +869,9 @@ pi9_process(struct pi9 *pi9, int32_t fd) ret = false; } + chck_buffer_seek(&pi9->stream->in, 0, SEEK_SET); + chck_buffer_seek(&pi9->stream->out, 0, SEEK_SET); + pi9->fd = -1; return ret; } @@ -874,6 +895,7 @@ pi9_init(struct pi9 *pi9, uint32_t msize, struct pi9_procs *procs, void *userdat memcpy(&pi9->procs, procs, sizeof(struct pi9_procs)); pi9->msize = (msize > 0 ? msize : 8192); pi9->userdata = userdata; + pi9->fd = -1; if (!(pi9->stream = calloc(1, sizeof(struct pi9_stream)))) goto fail; diff --git a/src/pi9.h b/src/pi9.h index 13718b1..d28d7cd 100644 --- a/src/pi9.h +++ b/src/pi9.h @@ -8,6 +8,11 @@ static const uint32_t PI9_NOFID = (uint32_t)~0; struct pi9_stream; +struct pi9_reply { + const void *start; + uint16_t tag; +}; + struct pi9_qid { uint8_t type; uint32_t vers; @@ -31,8 +36,8 @@ struct pi9_stat { struct pi9 { void *userdata; struct pi9_stream *stream; - uint32_t msize; + int fd; // current fd struct pi9_procs { bool (*auth)(struct pi9 *pi9, uint16_t tag, uint32_t afid, const struct pi9_string *uname, const struct pi9_string *aname, struct pi9_qid **qid); @@ -105,11 +110,13 @@ enum pi9_error { PI9_ERR_LAST, }; +void pi9_reply_start(struct pi9_reply *r, uint16_t tag, struct pi9_stream *stream); +bool pi9_reply_send(struct pi9_reply *r, int fd, struct pi9_stream *stream); bool pi9_write_stat(struct pi9_stat *stat, struct pi9_stream *stream); void pi9_write_error(uint16_t tag, enum pi9_error error, struct pi9_stream *stream); size_t pi9_write(const void *src, size_t size, size_t nmemb, struct pi9_stream *stream); void pi9_stat_release(struct pi9_stat *stat); -bool pi9_process(struct pi9 *pi9, int32_t fd); +bool pi9_process(struct pi9 *pi9, int fd); bool pi9_init(struct pi9 *pi9, uint32_t msize, struct pi9_procs *procs, void *userdata); void pi9_release(struct pi9 *pi9);