#include #include #include #include "queue.h" /* * IO queues from /sys/src/9/qio.c */ /* Block.flag values */ enum { BINTR = (1<<0), BFREE = (1<<1), Bipck = (1<<2), /* ip checksum */ Budpck = (1<<3), /* udp checksum */ Btcpck = (1<<4), /* tcp checksum */ Bpktck = (1<<5), /* packet checksum */ }; typedef struct Block Block; struct Block { Block* next; Block* list; uchar* rp; /* first unconsumed byte */ uchar* wp; /* first empty byte */ uchar* lim; /* 1 past the end of the buffer */ uchar* base; /* start of the buffer */ void (*free)(Block*); ushort flag; ushort checksum; /* IP checksum of complete packet (minus media header) */ }; #define BLEN(s) ((s)->wp - (s)->rp) #define BALLOC(s) ((s)->lim - (s)->base) #define ROUND(s, sz) (((s)+((sz)-1))&~((sz)-1)) #define BLOCKALIGN 8 enum { Hdrspc = 64, /* leave room for high-level headers */ Bdead = 0x51494F42, /* "QIOB" */ }; static Block* allocb(int size) { Block *b; uintptr addr; if((b = mallocz(sizeof(*b)+size+Hdrspc, 1)) == nil) return nil; b->next = nil; b->list = nil; b->free = nil; b->flag = 0; /* align start of data portion by rounding up */ addr = (uintptr)b; addr = ROUND(addr + sizeof(Block), BLOCKALIGN); b->base = (uchar*)addr; /* align end of data portion by rounding down */ b->lim = (uchar*)b + msize(b); addr = (uintptr)b->lim; addr &= ~(BLOCKALIGN-1); b->lim = (uchar*)addr; /* leave sluff at beginning for added headers */ b->rp = b->lim - ROUND(size, BLOCKALIGN); if(b->rp < b->base) sysfatal("allocb"); b->wp = b->rp; return b; } void freeb(Block *b) { void *dead = (void*)Bdead; if(b == nil) return; /* * drivers which perform non cache coherent DMA manage their own buffer * pool of uncached buffers and provide their own free routine. */ if(b->free != nil) { b->free(b); return; } if(b->flag & BINTR) { /* TODO(mischief): fix me */ /* ilock(&ialloc); ialloc.bytes -= b->lim - b->base; iunlock(&ialloc); */ } /* poison the block in case someone is still holding onto it */ b->next = dead; b->rp = dead; b->wp = dead; b->lim = dead; b->base = dead; free(b); } typedef struct Queue Queue; struct Queue { QLock; Block* bfirst; /* buffer */ Block* blast; int len; /* bytes allocated to queue */ int dlen; /* data bytes in queue */ int limit; /* max bytes in queue */ int inilim; /* initial limit */ int state; int noblock; /* true if writes return immediately when q full */ int eof; /* number of eofs read by user */ void (*kick)(void*); /* restart output */ void (*bypass)(void*, Block*); /* bypass queue altogether */ void* arg; /* argument to kick */ QLock rlock; /* mutex for reading processes */ Rendez rr; /* process waiting to read */ QLock wlock; /* mutex for writing processes */ Rendez wr; /* process waiting to write */ char err[ERRMAX]; }; enum { Maxatomic = 64*1024, }; uint qiomaxatomic = Maxatomic; /* * free a list of blocks */ static void freeblist(Block *b) { Block *next; for(; b != nil; b = next){ next = b->next; b->next = nil; freeb(b); } } /* * called by non-interrupt code */ Queue* qopen(int limit, int msg, void (*kick)(void*), void *arg) { Queue *q; q = mallocz(sizeof(Queue), 1); if(q == nil) return nil; q->limit = q->inilim = limit; q->kick = kick; q->arg = arg; q->state = msg; q->state |= Qstarve; q->eof = 0; q->noblock = 0; q->rr.l = &q->rlock; q->wr.l = &q->wlock; return q; } char Ehungup[] = "i/o hangup"; static int notempty(void *a) { Queue *q = a; return (q->state & Qclosed) || q->bfirst != nil; } /* * wait for the queue to be non-empty or closed. * called with q qlocked. */ static int qwait(Queue *q) { /* wait for data */ for(;;){ if(q->bfirst != nil) break; if(q->state & Qclosed){ if(++q->eof > 3) return -1; if(*q->err && strcmp(q->err, Ehungup) != 0) return -1; return 0; } q->state |= Qstarve; /* flag requesting producer to wake me */ qunlock(q); while(notempty(q) == 0){ rsleep(&q->rr); fprint(2, "qwait wakeup\n"); } qlock(q); } return 1; } /* * called with q qlocked */ Block* qremove(Queue *q) { Block *b; b = q->bfirst; if(b == nil) return nil; q->bfirst = b->next; b->next = nil; q->dlen -= BLEN(b); q->len -= BALLOC(b); //QDEBUG checkb(b, "qremove"); return b; } /* * copy the contents of a string of blocks into * memory. emptied blocks are freed. return * pointer to first unconsumed block. */ Block* bl2mem(uchar *p, Block *b, int n) { int i; Block *next; for(; b != nil; b = next){ i = BLEN(b); if(i > n){ memmove(p, b->rp, n); b->rp += n; return b; } memmove(p, b->rp, i); n -= i; p += i; b->rp += i; next = b->next; freeb(b); } return nil; } /* * put a block back to the front of the queue * called with q qlocked */ void qputback(Queue *q, Block *b) { b->next = q->bfirst; if(q->bfirst == nil) q->blast = b; q->bfirst = b; q->len += BALLOC(b); q->dlen += BLEN(b); } /* * flow control, get producer going again * called with q qlocked */ static void qwakeup_iunlock(Queue *q) { int dowakeup = 0; /* if writer flow controlled, restart */ if((q->state & Qflow) && q->len < q->limit/2){ q->state &= ~Qflow; dowakeup = 1; } qunlock(q); /* wakeup flow controlled writers */ if(dowakeup){ /* XXX(mischief): should copy kick/arg first? */ if(q->kick != nil) q->kick(q->arg); qlock(&q->wlock); rwakeup(&q->wr); qunlock(&q->wlock); } } /* * read a queue. if no data is queued, post a Block * and wait on its Rendez. */ long qread(Queue *q, void *vp, int len) { Block *b, *first, **l; int m, n; qlock(&q->rlock); qlock(q); again: switch(qwait(q)){ case 0: /* queue closed */ werrstr("%s", q->err); qunlock(q); qunlock(&q->rlock); return 0; case -1: /* multiple reads on a closed queue */ werrstr("%s", q->err); qunlock(q); return -1; } /* if we get here, there's at least one block in the queue */ if(q->state & Qcoalesce){ /* when coalescing, 0 length blocks just go away */ b = q->bfirst; m = BLEN(b); if(m <= 0){ freeb(qremove(q)); goto again; } /* grab the first block plus as many * following blocks as will partially * fit in the read. */ n = 0; l = &first; for(;;) { *l = qremove(q); l = &b->next; n += m; if(n >= len || (b = q->bfirst) == nil) break; m = BLEN(b); } } else { first = qremove(q); n = BLEN(first); } /* copy to user space outside of the ilock */ qunlock(q); b = bl2mem(vp, first, len); qlock(q); /* take care of any left over partial block */ if(b != nil){ n -= BLEN(b); if(q->state & Qmsg) freeb(b); else qputback(q, b); } /* restart producer */ qwakeup_iunlock(q); qunlock(&q->rlock); return n; } static int qnotfull(void *a) { Queue *q = a; return q->len < q->limit || (q->state & Qclosed); } /* * flow control, wait for queue to get below the limit */ static void qflow(Queue *q) { for(;;){ if(q->noblock || qnotfull(q)) break; qlock(q); q->state |= Qflow; qunlock(q); qlock(&q->wlock); while(qnotfull(q) == 0) rsleep(&q->wr); qunlock(&q->wlock); } } /* * add a block to a queue obeying flow control */ long qbwrite(Queue *q, Block *b) { int n, dowakeup; n = BLEN(b); if(q->bypass != nil){ (*q->bypass)(q->arg, b); return n; } dowakeup = 0; qlock(q); /* give up if the queue is closed */ if(q->state & Qclosed){ qunlock(q); return -1; } /* don't queue over the limit */ if(q->len >= q->limit && q->noblock){ qunlock(q); freeb(b); return n; } /* queue the block */ if(q->bfirst != nil) q->blast->next = b; else q->bfirst = b; q->blast = b; b->next = nil; q->len += BALLOC(b); q->dlen += n; //QDEBUG checkb(b, "qbwrite"); /* make sure other end gets awakened */ if(q->state & Qstarve){ q->state &= ~Qstarve; dowakeup = 1; } qunlock(q); /* get output going again */ if(q->kick != nil && (dowakeup || (q->state&Qkick))) q->kick(q->arg); /* wakeup anyone consuming at the other end */ if(dowakeup){ qlock(&q->rlock); rwakeup(&q->rr); qunlock(&q->rlock); } /* * flow control, before allowing the process to continue and * queue more. We do this here so that postnote can only * interrupt us after the data has been queued. This means that * things like 9p flushes and ssl messages will not be disrupted * by software interrupts. */ qflow(q); return n; } /* * write to a queue. only Maxatomic bytes at a time is atomic. */ int qwrite(Queue *q, void *vp, int len) { int n, sofar; long r; Block *b; uchar *p = vp; //QDEBUG if(!islo()) // print("qwrite hi %#p\n", getcallerpc(&q)); /* stop queue bloat before allocating blocks */ if(q->len/2 >= q->limit && q->noblock == 0 && q->bypass == nil){ qflow(q); } sofar = 0; do { n = len-sofar; if(n > Maxatomic) n = Maxatomic; b = allocb(n); setmalloctag(b, getcallerpc(&q)); memmove(b->wp, p+sofar, n); b->wp += n; /* TODO(mischief): errstr? */ r = qbwrite(q, b); if(r == -1) return -1; sofar += n; } while(sofar < len && (q->state & Qmsg) == 0); return len; } /* * be extremely careful when calling this, * as there is no reference accounting */ void qfree(Queue *q) { qclose(q); free(q); } /* * Mark a queue as closed. No further IO is permitted. * All blocks are released. */ void qclose(Queue *q) { Block *bfirst; if(q == nil) return; /* mark it */ qlock(q); q->state |= Qclosed; q->state &= ~(Qflow|Qstarve); strncpy(q->err, Ehungup, ERRMAX); bfirst = q->bfirst; q->bfirst = nil; q->len = 0; q->dlen = 0; q->noblock = 0; qunlock(q); /* free queued blocks */ freeblist(bfirst); /* wake up readers/writers */ qlock(&q->rlock); rwakeup(&q->rr); qunlock(&q->rlock); qlock(&q->wlock); rwakeup(&q->wr); qunlock(&q->wlock); } /* * Mark a queue as closed. Wakeup any readers. Don't remove queued * blocks. */ void qhangup(Queue *q, char *msg) { /* mark it */ qlock(q); q->state |= Qclosed; if(msg == nil || *msg == '\0') msg = Ehungup; strncpy(q->err, msg, ERRMAX); qunlock(q); /* wake up readers/writers */ qlock(&q->rlock); rwakeup(&q->rr); qunlock(&q->rlock); qlock(&q->wlock); rwakeup(&q->wr); qunlock(&q->wlock); } /* * return non-zero if the q is hungup */ int qisclosed(Queue *q) { return q->state & Qclosed; } /* * mark a queue as no longer hung up */ void qreopen(Queue *q) { qlock(q); q->state &= ~Qclosed; q->state |= Qstarve; q->eof = 0; q->limit = q->inilim; qunlock(q); } /* * return bytes queued */ int qlen(Queue *q) { return q->dlen; } /* * return space remaining before flow control */ int qwindow(Queue *q) { int l; l = q->limit - q->len; if(l < 0) l = 0; return l; } /* * return true if we can read without blocking */ int qcanread(Queue *q) { return q->bfirst != nil; } /* * change queue limit */ void qsetlimit(Queue *q, int limit) { q->limit = limit; } /* * set blocking/nonblocking */ void qnoblock(Queue *q, int onoff) { q->noblock = onoff; } /* * flush the output queue */ void qflush(Queue *q) { Block *bfirst; /* mark it */ qlock(q); bfirst = q->bfirst; q->bfirst = nil; q->len = 0; q->dlen = 0; qunlock(q); /* free queued blocks */ freeblist(bfirst); /* wake up readers/writers */ qlock(&q->wlock); rwakeup(&q->wr); qunlock(&q->wlock); } int qfull(Queue *q) { return q->state & Qflow; } void qinterrupt(Queue *q) { /* wake up readers/writers */ qlock(&q->rlock); rwakeupall(&q->rr); qunlock(&q->rlock); qlock(&q->wlock); rwakeup(&q->wr); qunlock(&q->wlock); }