Browse Source

Async replys.

master
Jari Vetoniemi 10 years ago
parent
commit
cb95ac27e1
3 changed files with 96 additions and 54 deletions
  1. +46
    -33
      bin/server.c
  2. +41
    -19
      src/pi9.c
  3. +9
    -2
      src/pi9.h

+ 46
- 33
bin/server.c View File

@ -9,8 +9,9 @@
#include <assert.h> #include <assert.h>
#include "pi9.h" #include "pi9.h"
#include "chck/pool/pool.h"
#include "chck/lut/lut.h"
#include <chck/pool/pool.h>
#include <chck/lut/lut.h>
#include <chck/math/math.h>
#define M(m) (m & 3) #define M(m) (m & 3)
@ -38,7 +39,7 @@ struct node {
bool open; bool open;
struct node_procs { struct node_procs {
bool (*read)(struct pi9 *pi9, struct node *node, uint64_t offset, uint32_t count);
bool (*read)(struct pi9 *pi9, struct node *node, uint16_t tag, uint64_t offset, uint32_t count);
bool (*write)(struct pi9 *pi9, struct node *node, uint64_t offset, uint32_t count, const void *data); bool (*write)(struct pi9 *pi9, struct node *node, uint64_t offset, uint32_t count, const void *data);
uint64_t (*size)(struct pi9 *pi9, struct node *node); uint64_t (*size)(struct pi9 *pi9, struct node *node);
} procs; } procs;
@ -212,53 +213,66 @@ clunk_fid(struct fs *fs, uint32_t fid)
} }
static bool static bool
cb_read_qtdir(struct pi9 *pi9, struct node *node, uint64_t offset, uint32_t count)
cb_read_qtdir(struct pi9 *pi9, struct node *node, uint16_t tag, uint64_t offset, uint32_t count)
{ {
(void)offset, (void)count; (void)offset, (void)count;
struct pi9_reply reply;
pi9_reply_start(&reply, tag, pi9->stream);
// For directories, read returns an integral number of directory entries exactly as in stat (see stat(5)), // For directories, read returns an integral number of directory entries exactly as in stat (see stat(5)),
// one for each member of the directory. The read request message must have offset equal to zero or the // one for each member of the directory. The read request message must have offset equal to zero or the
// value of offset in the previous read on the directory, plus the number of bytes returned in the previous read. // value of offset in the previous read on the directory, plus the number of bytes returned in the previous read.
// In other words, seeking other than to the beginning is illegal in a directory (see seek(2)). // In other words, seeking other than to the beginning is illegal in a directory (see seek(2)).
struct pi9_stat stats[2];
memcpy(&stats[0], &node->stat, sizeof(struct pi9_stat));
memcpy(&stats[1], &node->stat, sizeof(struct pi9_stat));
pi9_string_set_cstr_with_length(&stats[0].name, ".", 1, false);
pi9_string_set_cstr_with_length(&stats[1].name, "..", 2, false);
if (offset == 0) {
struct pi9_stat stats[2];
memcpy(&stats[0], &node->stat, sizeof(struct pi9_stat));
memcpy(&stats[1], &node->stat, sizeof(struct pi9_stat));
stats[0].name = (struct pi9_string){ ".", 1, false };
stats[1].name = (struct pi9_string){ "..", 2, false };
for (uint32_t i = 0; i < 2; ++i) {
if (!pi9_write_stat(&stats[i], pi9->stream))
return false;
}
for (uint32_t i = 0; i < 2; ++i) {
if (!pi9_write_stat(&stats[i], pi9->stream))
return false;
}
for (size_t i = 0; i < node->childs.items.count; ++i) {
size_t *n = chck_iter_pool_get(&node->childs, i);
struct node *c = get_node(pi9->userdata, *n);
assert(c);
for (size_t i = 0; i < node->childs.items.count; ++i) {
size_t *n = chck_iter_pool_get(&node->childs, i);
struct node *c = get_node(pi9->userdata, *n);
assert(c);
// update size from callback
if (c->procs.size)
c->stat.length = c->procs.size(pi9, c);
// update size from callback
if (c->procs.size)
c->stat.length = c->procs.size(pi9, c);
if (!pi9_write_stat(&c->stat, pi9->stream))
return false;
if (!pi9_write_stat(&c->stat, pi9->stream))
return false;
}
} }
return b">true;
return f">pi9_reply_send(&reply, pi9->fd, pi9->stream);
} }
static bool static bool
cb_read_hello(struct pi9 *pi9, struct node *node, uint64_t offset, uint32_t count)
cb_read_hello(struct pi9 *pi9, struct node *node, uint16_t tag, uint64_t offset, uint32_t count)
{ {
(void)node, (void)offset, (void)count; (void)node, (void)offset, (void)count;
struct pi9_reply reply;
pi9_reply_start(&reply, tag, pi9->stream);
// The read request asks for count bytes of data from the file identified by fid, // The read request asks for count bytes of data from the file identified by fid,
// which must be opened for reading, starting offset bytes after the beginning of the file. // which must be opened for reading, starting offset bytes after the beginning of the file.
// The bytes are returned with the read reply message. // The bytes are returned with the read reply message.
// XXX: Just example here, we ignore offset and count
return (pi9_write("Hello World!", 1, sizeof("Hello World!"), pi9->stream) == sizeof("Hello World!"));
const size_t size = sizeof("Hello World!");
offset = chck_minsz(size, offset);
count = chck_minsz(size - offset, count);
if (!(pi9_write("Hello World!" + offset, 1, count, pi9->stream)) == count)
return false;
return pi9_reply_send(&reply, pi9->fd, pi9->stream);
} }
static bool static bool
@ -474,7 +488,7 @@ cb_walk(struct pi9 *pi9, uint16_t tag, uint32_t fid, uint32_t newfid, uint16_t n
size_t *c; size_t *c;
chck_iter_pool_for_each(&wnode->childs, c) { chck_iter_pool_for_each(&wnode->childs, c) {
struct node *n; struct node *n;
if (!(n = get_node(pi9->userdata, *c)) || o">!pi9_string_eq(&walks[i], &n->stat.name))
if (!(n = get_node(pi9->userdata, *c)) || pi9_string_eq(&walks[i], &n->stat.name))
continue; continue;
qids[*out_nwqid] = &n->stat.qid; qids[*out_nwqid] = &n->stat.qid;
@ -624,7 +638,7 @@ cb_read(struct pi9 *pi9, uint16_t tag, uint32_t fid, uint64_t offset, uint32_t c
if (M(f->omode) != PI9_OREAD) if (M(f->omode) != PI9_OREAD)
goto err_not_allowed; goto err_not_allowed;
if (f->procs.read && !f->procs.read(pi9, f, offset, count))
if (f->procs.read && !f->procs.read(pi9, f, tag, offset, count))
goto err_write; goto err_write;
return true; return true;
@ -865,7 +879,7 @@ sock_unix(char *address, struct sockaddr_un *sa, socklen_t *salen)
strncpy(sa->sun_path, address, sizeof(sa->sun_path)); strncpy(sa->sun_path, address, sizeof(sa->sun_path));
*salen = SUN_LEN(sa); *salen = SUN_LEN(sa);
n">int32_t fd;
kt">int fd;
if ((fd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) if ((fd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0)
return -1; return -1;
@ -877,7 +891,7 @@ announce_unix(char *file)
{ {
assert(file); assert(file);
n">int32_t fd;
kt">int fd;
socklen_t salen; socklen_t salen;
struct sockaddr_un sa; struct sockaddr_un sa;
if ((fd = sock_unix(file, &sa, &salen)) < 0) if ((fd = sock_unix(file, &sa, &salen)) < 0)
@ -941,14 +955,13 @@ main(int argc, char *argv[])
int32_t clients = 0; int32_t clients = 0;
while (running) { while (running) {
int32_t ret;
if ((ret = poll(fds, 1 + clients, 500)) <= 0)
if (poll(fds, 1 + clients, 500) <= 0)
continue; continue;
for (int32_t i = 0; i < 1 + clients; ++i) { for (int32_t i = 0; i < 1 + clients; ++i) {
if (fds[i].revents & POLLIN) { if (fds[i].revents & POLLIN) {
if (i == 0) { if (i == 0) {
n">int32_t fd;
kt">int fd;
if ((fd = accept(fds[i].fd, NULL, NULL)) >= 0) { if ((fd = accept(fds[i].fd, NULL, NULL)) >= 0) {
if (clients > 0) { if (clients > 0) {
fprintf(stderr, "Rejected client\n"); fprintf(stderr, "Rejected client\n");

+ 41
- 19
src/pi9.c View File

@ -6,7 +6,7 @@
#include <inttypes.h> #include <inttypes.h>
#include "pi9.h" #include "pi9.h"
#include &#34;chck/buffer/buffer.h";
#include &lt;chck/buffer/buffer.h>;
#define VERBOSE false #define VERBOSE false
#define MAXWELEM 16 #define MAXWELEM 16
@ -519,19 +519,7 @@ op_Tread(struct pi9 *pi9, uint16_t tag, struct pi9_stream *stream)
fprintf(stderr, "Tread %u %u %"PRIu64" %u\n", tag, fid, offset, count); fprintf(stderr, "Tread %u %u %"PRIu64" %u\n", tag, fid, offset, count);
#endif #endif
chck_buffer_seek(&stream->out, HDRSZ + sizeof(uint32_t), SEEK_SET);
void *start = stream->out.curpos;
if (pi9->procs.read && !pi9->procs.read(pi9, tag, fid, offset, count)) if (pi9->procs.read && !pi9->procs.read(pi9, tag, fid, offset, count))
return false;
const uint32_t sbufsz = (offset != 0 ? 0 : (stream->out.curpos - start));
const uint32_t size = HDRSZ + sizeof(sbufsz) + sbufsz;
chck_buffer_seek(&stream->out, 0, SEEK_SET);
if (!chck_buffer_write_int(&size, sizeof(size), &stream->out) ||
!chck_buffer_write_int((uint8_t[]){Rread}, sizeof(uint8_t), &stream->out) ||
!chck_buffer_write_int(&tag, sizeof(tag), &stream->out) ||
!chck_buffer_write_int(&sbufsz, sizeof(sbufsz), &stream->out))
goto err_write; goto err_write;
return true; return true;
@ -743,7 +731,7 @@ err_unknown_op:
} }
static bool static bool
read_msg(struct pi9 *pi9, n">int32_t fd, struct pi9_stream *stream)
read_msg(struct pi9 *pi9, kt">int fd, struct pi9_stream *stream)
{ {
assert(pi9 && fd >= 0); assert(pi9 && fd >= 0);
@ -779,9 +767,13 @@ read_msg(struct pi9 *pi9, int32_t fd, struct pi9_stream *stream)
} }
static inline bool static inline bool
write_msg(n">int32_t fd, struct pi9_stream *stream)
write_msg(kt">int fd, struct pi9_stream *stream)
{ {
assert(fd >= 0 && stream->out.buffer); assert(fd >= 0 && stream->out.buffer);
if ((size_t)(stream->out.curpos - stream->out.buffer) < sizeof(uint32_t))
return true;
const uint32_t size = *(uint32_t*)stream->out.buffer; const uint32_t size = *(uint32_t*)stream->out.buffer;
#if VERBOSE #if VERBOSE
fprintf(stderr, "Write message of size: %u\n", size); fprintf(stderr, "Write message of size: %u\n", size);
@ -789,6 +781,34 @@ write_msg(int32_t fd, struct pi9_stream *stream)
return write(fd, stream->out.buffer, size) == size; return write(fd, stream->out.buffer, size) == size;
} }
void
pi9_reply_start(struct pi9_reply *reply, uint16_t tag, struct pi9_stream *stream)
{
assert(reply && stream);
memset(reply, 0, sizeof(struct pi9_reply));
chck_buffer_seek(&stream->out, HDRSZ + sizeof(uint32_t), SEEK_SET);
reply->start = stream->out.curpos;
reply->tag = tag;
}
bool
pi9_reply_send(struct pi9_reply *reply, int fd, struct pi9_stream *stream)
{
assert(reply && stream);
const uint32_t sbufsz = (stream->out.curpos - reply->start);
const uint32_t size = HDRSZ + sizeof(sbufsz) + sbufsz;
chck_buffer_seek(&stream->out, 0, SEEK_SET);
if (!chck_buffer_write_int(&size, sizeof(size), &stream->out) ||
!chck_buffer_write_int((uint8_t[]){Rread}, sizeof(uint8_t), &stream->out) ||
!chck_buffer_write_int(&reply->tag, sizeof(reply->tag), &stream->out) ||
!chck_buffer_write_int(&sbufsz, sizeof(sbufsz), &stream->out))
return false;
return (stream->in.buffer == stream->in.curpos ? write_msg(fd, stream) : true);
}
bool bool
pi9_write_stat(struct pi9_stat *stat, struct pi9_stream *stream) pi9_write_stat(struct pi9_stat *stat, struct pi9_stream *stream)
{ {
@ -831,12 +851,10 @@ pi9_stat_release(struct pi9_stat *stat)
} }
bool bool
pi9_process(struct pi9 *pi9, n">int32_t fd)
pi9_process(struct pi9 *pi9, kt">int fd)
{ {
assert(pi9 && fd >= 0 && pi9->stream); assert(pi9 && fd >= 0 && pi9->stream);
chck_buffer_seek(&pi9->stream->in, 0, SEEK_SET);
chck_buffer_seek(&pi9->stream->out, 0, SEEK_SET);
pi9->fd = fd;
bool ret = true; bool ret = true;
if (!read_msg(pi9, fd, pi9->stream)) { if (!read_msg(pi9, fd, pi9->stream)) {
@ -851,6 +869,9 @@ pi9_process(struct pi9 *pi9, int32_t fd)
ret = false; ret = false;
} }
chck_buffer_seek(&pi9->stream->in, 0, SEEK_SET);
chck_buffer_seek(&pi9->stream->out, 0, SEEK_SET);
pi9->fd = -1;
return ret; return ret;
} }
@ -874,6 +895,7 @@ pi9_init(struct pi9 *pi9, uint32_t msize, struct pi9_procs *procs, void *userdat
memcpy(&pi9->procs, procs, sizeof(struct pi9_procs)); memcpy(&pi9->procs, procs, sizeof(struct pi9_procs));
pi9->msize = (msize > 0 ? msize : 8192); pi9->msize = (msize > 0 ? msize : 8192);
pi9->userdata = userdata; pi9->userdata = userdata;
pi9->fd = -1;
if (!(pi9->stream = calloc(1, sizeof(struct pi9_stream)))) if (!(pi9->stream = calloc(1, sizeof(struct pi9_stream))))
goto fail; goto fail;

+ 9
- 2
src/pi9.h View File

@ -8,6 +8,11 @@ static const uint32_t PI9_NOFID = (uint32_t)~0;
struct pi9_stream; struct pi9_stream;
struct pi9_reply {
const void *start;
uint16_t tag;
};
struct pi9_qid { struct pi9_qid {
uint8_t type; uint8_t type;
uint32_t vers; uint32_t vers;
@ -31,8 +36,8 @@ struct pi9_stat {
struct pi9 { struct pi9 {
void *userdata; void *userdata;
struct pi9_stream *stream; struct pi9_stream *stream;
uint32_t msize; uint32_t msize;
int fd; // current fd
struct pi9_procs { struct pi9_procs {
bool (*auth)(struct pi9 *pi9, uint16_t tag, uint32_t afid, const struct pi9_string *uname, const struct pi9_string *aname, struct pi9_qid **qid); bool (*auth)(struct pi9 *pi9, uint16_t tag, uint32_t afid, const struct pi9_string *uname, const struct pi9_string *aname, struct pi9_qid **qid);
@ -105,11 +110,13 @@ enum pi9_error {
PI9_ERR_LAST, PI9_ERR_LAST,
}; };
void pi9_reply_start(struct pi9_reply *r, uint16_t tag, struct pi9_stream *stream);
bool pi9_reply_send(struct pi9_reply *r, int fd, struct pi9_stream *stream);
bool pi9_write_stat(struct pi9_stat *stat, struct pi9_stream *stream); bool pi9_write_stat(struct pi9_stat *stat, struct pi9_stream *stream);
void pi9_write_error(uint16_t tag, enum pi9_error error, struct pi9_stream *stream); void pi9_write_error(uint16_t tag, enum pi9_error error, struct pi9_stream *stream);
size_t pi9_write(const void *src, size_t size, size_t nmemb, struct pi9_stream *stream); size_t pi9_write(const void *src, size_t size, size_t nmemb, struct pi9_stream *stream);
void pi9_stat_release(struct pi9_stat *stat); void pi9_stat_release(struct pi9_stat *stat);
bool pi9_process(struct pi9 *pi9, n">int32_t fd);
bool pi9_process(struct pi9 *pi9, kt">int fd);
bool pi9_init(struct pi9 *pi9, uint32_t msize, struct pi9_procs *procs, void *userdata); bool pi9_init(struct pi9 *pi9, uint32_t msize, struct pi9_procs *procs, void *userdata);
void pi9_release(struct pi9 *pi9); void pi9_release(struct pi9 *pi9);

Loading…
Cancel
Save