plan 9 kernel history: overview | file list | diff list

1990/0227/port/stream.c (diff list | history)

port/stream.c on 1990/0227
1990/0227    
#include	"u.h" 
#include	"lib.h" 
#include	"mem.h" 
#include	"dat.h" 
#include	"fns.h" 
#include	"io.h" 
#include	"errno.h" 
#include	"devtab.h" 
 
static void stputq(Queue*, Block*); 
Qinfo procinfo = { stputq, nullput, 0, 0, "process" } ; 
extern Qinfo noetherinfo; 
 
static Qinfo *lds[] = { 
	&noetherinfo, 
	0 
}; 
 
enum { 
	Nclass=4, 
}; 
 
/* 
 *  All stream structures are ialloc'd at boot time 
 */ 
Stream *slist; 
Queue *qlist; 
Block *blist; 
static Lock garbagelock; 
 
/* 
 *  The block classes.  There are Nclass block sizes, each with its own free list. 
 *  All are ialloced at qinit() time. 
 */ 
typedef struct { 
	int	size; 
	Queue; 
} Bclass; 
Bclass bclass[Nclass]={ 
	{ 0 }, 
	{ 64 }, 
	{ 512 }, 
	{ 4096 }, 
}; 
 
/* 
 *  Allocate streams, queues, and blocks.  Allocate n block classes with 
 *	1/2(m+1) to class m < n-1 
 *	1/2(n-1) to class n-1 
 */ 
void 
streaminit(void) 
{ 
	int class, i, n; 
	Block *bp; 
	Bclass *bcp; 
 
	slist = (Stream *)ialloc(conf.nstream * sizeof(Stream), 0); 
	qlist = (Queue *)ialloc(conf.nqueue * sizeof(Queue), 0); 
	blist = (Block *)ialloc(conf.nblock * sizeof(Block), 0); 
	bp = blist; 
	n = conf.nblock; 
	for(class = 0; class < Nclass; class++){ 
		if(class < Nclass-1) 
			n = n/2; 
		bcp = &bclass[class]; 
		for(i = 0; i < n; i++) { 
			if(bcp->size) 
				bp->base = (uchar *)ialloc(bcp->size, 0); 
			bp->lim = bp->base + bcp->size; 
			bp->flags = class; 
			freeb(bp); 
			bp++; 
		} 
	} 
} 
 
 
/* 
 *  allocate a block 
 */ 
static int 
isblock(void *arg) 
{ 
	Bclass *bcp; 
 
	bcp = (Bclass *)arg; 
	return bcp->first!=0; 
} 
Block * 
allocb(ulong size) 
{ 
	Block *bp; 
	Bclass *bcp; 
	int i; 
 
 
	/* 
	 *  map size to class 
	 */ 
	for(bcp=bclass; bcp->size<size && bcp<&bclass[Nclass-1]; bcp++) 
		; 
 
	/* 
	 *  look for a free block, garbage collect if there are none 
	 */ 
	lock(bcp); 
	while(bcp->first == 0){ 
		unlock(bcp); 
		print("waiting for blocks\n"); 
		sleep(&bcp->r, isblock, (void *)bcp); 
		lock(bcp); 
	} 
	bp = bcp->first; 
	bcp->first = bp->next; 
	if(bcp->first == 0) 
		bcp->last = 0; 
	unlock(bcp); 
 
	/* 
	 *  return an empty block 
	 */ 
	bp->rptr = bp->wptr = bp->base; 
	bp->next = 0; 
	bp->type = M_DATA; 
	bp->flags &= S_CLASS; 
	return bp; 
} 
 
/* 
 *  Free a block.  Poison its pointers so that someone trying to access 
 *  it after freeing will cause a dump. 
 */ 
void 
freeb(Block *bp) 
{ 
	Bclass *bcp; 
 
	bcp = &bclass[bp->flags & S_CLASS]; 
	bp->rptr = bp->wptr = 0; 
	lock(bcp); 
	if(bcp->first) 
		bcp->last->next = bp; 
	else 
		bcp->first = bp; 
	bcp->last = bp; 
	bp->next = 0; 
	wakeup(&bcp->r); 
	unlock(bcp); 
} 
 
/* 
 *  allocate a pair of queues.  flavor them with the requested put routines. 
 *  the `QINUSE' flag on the read side is the only one used. 
 */ 
static Queue * 
allocq(Qinfo *qi) 
{ 
	Queue *q, *wq; 
 
	for(q=qlist; q<&qlist[conf.nqueue]; q++, q++) { 
		if(q->flag == 0){ 
			if(canlock(q)){ 
				if(q->flag == 0) 
					break; 
				unlock(q); 
			} 
		} 
	} 
 
	if(q == &qlist[conf.nqueue]){ 
		print("no more queues\n"); 
		error(0, Enoqueue); 
	} 
 
	q->flag = QINUSE; 
	q->r.p = 0; 
	q->info = qi; 
	q->put = qi->iput; 
	wq = q->other = q + 1; 
 
	wq->r.p = 0; 
	wq->info = qi; 
	wq->put = qi->oput; 
	wq->other = q; 
 
	unlock(q); 
 
	return q; 
} 
 
/* 
 *  free a queue 
 */ 
static void 
freeq(Queue *q) 
{ 
	Block *bp; 
 
	q = RD(q); 
	while(bp = getq(q)) 
		freeb(bp); 
	q = WR(q); 
	while(bp = getq(q)) 
		freeb(bp); 
	RD(q)->flag = 0; 
} 
 
/* 
 *  push a queue onto a stream referenced by the proc side write q 
 */ 
Queue * 
pushq(Stream* s, Qinfo *qi) 
{ 
	Queue *q; 
	Queue *nq; 
 
	q = RD(s->procq); 
 
	/* 
	 *  make the new queue 
	 */ 
	nq = allocq(qi); 
 
	/* 
	 *  push 
	 */ 
	RD(nq)->next = q; 
	RD(WR(q)->next)->next = RD(nq); 
	WR(nq)->next = WR(q)->next; 
	WR(q)->next = WR(nq); 
 
	if(qi->open) 
		(*qi->open)(RD(nq), s); 
 
	return WR(nq)->next; 
} 
 
/* 
 *  pop off the top line discipline 
 */ 
static void 
popq(Stream *s) 
{ 
	Queue *q; 
 
	if(s->procq->next == WR(s->devq)) 
		error(0, Ebadld); 
	q = s->procq->next; 
	if(q->info->close) 
		(*q->info->close)(RD(q)); 
	s->procq->next = q->next; 
	RD(q->next)->next = RD(s->procq); 
	freeq(q); 
} 
 
/* 
 *  add a block (or list of blocks) to the end of a queue.  return true 
 *  if one of the blocks contained a delimiter.  
 */ 
int 
putq(Queue *q, Block *bp) 
{ 
	int delim; 
 
	delim = 0; 
	lock(q); 
	if(q->first) 
		q->last->next = bp; 
	else 
		q->first = bp; 
	q->len += bp->wptr - bp->rptr; 
	delim = bp->flags & S_DELIM; 
	while(bp->next) { 
		bp = bp->next; 
		q->len += bp->wptr - bp->rptr; 
		delim |= bp->flags & S_DELIM; 
	} 
	q->last = bp; 
	if(q->len >= Streamhi) 
		q->flag |= QHIWAT; 
	unlock(q); 
	return delim; 
} 
int 
putb(Blist *q, Block *bp) 
{ 
	int delim; 
 
	delim = 0; 
	if(q->first) 
		q->last->next = bp; 
	else 
		q->first = bp; 
	q->len += bp->wptr - bp->rptr; 
	delim = bp->flags & S_DELIM; 
	while(bp->next) { 
		bp = bp->next; 
		q->len += bp->wptr - bp->rptr; 
		delim |= bp->flags & S_DELIM; 
	} 
	q->last = bp; 
	bp->next = 0; 
	return delim; 
} 
 
/* 
 *  add a block to the start of a queue  
 */ 
static void 
putbq(Blist *q, Block *bp) 
{ 
	lock(q); 
	if(q->first) 
		bp->next = q->first; 
	else 
		q->last = bp; 
	q->first = bp; 
	q->len += bp->wptr - bp->rptr; 
	unlock(q); 
} 
 
/* 
 *  remove the first block from a queue  
 */ 
Block * 
getq(Queue *q) 
{ 
	Block *bp; 
 
	lock(q); 
	bp = q->first; 
	if(bp) { 
		q->first = bp->next; 
		if(q->first == 0) 
			q->last = 0; 
		q->len -= bp->wptr - bp->rptr; 
		if((q->flag&QHIWAT) && q->len < Streamhi/2){ 
			wakeup(&q->other->next->other->r); 
			q->flag &= ~QHIWAT; 
		} 
		bp->next = 0; 
	} 
	unlock(q); 
	return bp; 
} 
Block * 
getb(Blist *q) 
{ 
	Block *bp; 
 
	bp = q->first; 
	if(bp) { 
		q->first = bp->next; 
		if(q->first == 0) 
			q->last = 0; 
		q->len -= bp->wptr - bp->rptr; 
		bp->next = 0; 
	} 
	return bp; 
} 
 
/* 
 *  put a block into the bit bucket 
 */ 
void 
nullput(Queue *q, Block *bp) 
{ 
	freeb(bp); 
	error(0, Ehungup); 
} 
 
/* 
 *  find the info structure for line discipline 'name' 
 */ 
static Qinfo * 
qinfofind(char *name) 
{ 
	Qinfo **qip; 
 
	if(name == 0) 
		error(0, Ebadld); 
	for(qip = lds; *qip; qip++) 
		if(strcmp((*qip)->name, name)==0) 
			return *qip; 
	error(0, Ebadld); 
} 
 
/* 
 *  send a hangup up a stream 
 */ 
static void 
hangup(Stream *s) 
{ 
	Block *bp; 
 
	bp = allocb(0); 
	bp->type = M_HANGUP; 
	(*s->devq->put)(s->devq, bp); 
} 
 
/* 
 *  parse a string and return a pointer to the second element if the  
 *  first matches name.  bp->rptr will be updated to point to the 
 *  second element. 
 * 
 *  return 0 if no match. 
 * 
 *  it is assumed that the block data is null terminated.  streamwrite 
 *  guarantees this. 
 */ 
int 
streamparse(char *name, Block *bp) 
{ 
	int len; 
 
	len = strlen(name); 
	if(bp->wptr - bp->rptr < len) 
		return 0; 
	if(strncmp(name, (char *)bp->rptr, len)==0){ 
		if(bp->rptr[len] == ' ') 
			bp->rptr += len+1; 
		else if(bp->rptr[len]) 
			return 0; 
		else 
			bp->rptr += len; 
		return 1; 
	} 
	return 0; 
} 
 
/* 
 *  the per stream directory structure 
 */ 
Dirtab streamdir[]={ 
	"data",		Sdataqid,	0,			0600, 
	"ctl",		Sctlqid,	0,			0600, 
}; 
 
/* 
 *  A stream device consists of the contents of streamdir plus 
 *  any directory supplied by the actual device. 
 * 
 *  values of s: 
 * 	0 to ntab-1 apply to the auxiliary directory. 
 *	ntab to ntab+Shighqid-Slowqid+1 apply to streamdir. 
 */ 
int 
streamgen(Chan *c, Dirtab *tab, int ntab, int s, Dir *dp) 
{ 
	Proc *p; 
	char buf[NAMELEN]; 
 
	if(s < ntab) 
		tab = &tab[s]; 
	else if(s < ntab + Shighqid - Slowqid + 1) 
		tab = &streamdir[s - ntab]; 
	else 
		return -1; 
 
	devdir(c, STREAMQID(STREAMID(c->qid),tab->qid), tab->name, tab->length, 
		tab->perm, dp); 
	return 1; 
} 
 
/* 
 *  create a new stream 
 */ 
Stream * 
streamnew(Chan *c, Qinfo *qi) 
{ 
	Stream *s; 
	Queue *q; 
 
	/* 
	 *  find a free stream struct 
	 */ 
	for(s = slist; s < &slist[conf.nstream]; s++) { 
		if(s->inuse == 0){ 
			if(canlock(s)){ 
				if(s->inuse == 0) 
					break; 
				unlock(s); 
			} 
		} 
	} 
	if(s == &slist[conf.nstream]){ 
		print("no more streams\n"); 
		error(0, Enostream); 
	} 
	if(waserror()){ 
		unlock(s); 
		streamclose(c); 
		nexterror(); 
	} 
 
	/* 
	 *  marry a stream and a channel 
	 */ 
	if(c){ 
		c->stream = s; 
		s->type = c->type; 
		s->dev = c->dev; 
		s->id = STREAMID(c->qid); 
	} else 
		s->type = -1; 
 
	/* 
 	 *  hang a device and process q off the stream 
	 */ 
	s->inuse = 1; 
	s->tag[0] = 0; 
	q = allocq(&procinfo); 
	s->procq = WR(q); 
	q = allocq(qi); 
	s->devq = RD(q); 
	WR(s->procq)->next = WR(s->devq); 
	RD(s->procq)->next = 0; 
	RD(s->devq)->next = RD(s->procq); 
	WR(s->devq)->next = 0; 
 
	if(qi->open) 
		(*qi->open)(RD(s->devq), s); 
 
	c->flag |= COPEN; 
	unlock(s); 
	poperror(); 
	return s; 
} 
 
/* 
 *  (Re)open a stream.  If this is the first open, create a stream. 
 */ 
void 
streamopen(Chan *c, Qinfo *qi) 
{ 
	Stream *s; 
	Queue *q; 
 
	/* 
	 *  if the stream already exists, just up the reference count. 
	 */ 
	for(s = slist; s < &slist[conf.nstream]; s++) { 
		if(s->inuse && s->type == c->type && s->dev == c->dev 
		   && s->id == STREAMID(c->qid)){ 
			lock(s); 
			if(s->inuse && s->type == c->type 
			&& s->dev == c->dev 
		 	&& s->id == STREAMID(c->qid)){ 
				s->inuse++; 
				c->stream = s; 
				unlock(s); 
				return; 
			} 
			unlock(s); 
		} 
	} 
 
	/* 
	 *  create a new stream 
	 */ 
	streamnew(c, qi); 
} 
 
/* 
 *  On the last close of a stream, for each queue on the 
 *  stream release its blocks and call its close routine. 
 */ 
void 
streamclose(Chan *c) 
{ 
	Queue *q, *nq; 
	Block *bp; 
 
	/* 
	 *  if not open, ignore it 
	 */ 
	if(!(c->flag & COPEN)) 
		return; 
 
	/* 
	 *  decrement the reference cound 
	 */ 
	lock(c->stream); 
	if(--(c->stream->inuse) != 0){ 
		unlock(c->stream); 
		return; 
	} 
 
	/* 
	 *  descend the stream closing the queues 
	 */ 
	for(q = c->stream->procq; q; q = q->next){ 
		if(q->info->close) 
			(*q->info->close)(q->other); 
		if(q == c->stream->devq->other) 
			break; 
	} 
	/* 
	 *  ascend the stream freeing the queues 
	 */ 
	for(q = c->stream->devq; q; q = nq){ 
		nq = q->next; 
		freeq(q); 
	} 
	c->stream->id = c->stream->dev = c->stream->type = 0; 
	unlock(c->stream); 
} 
 
/* 
 *  put a block to be read into the queue.  wakeup any waiting reader 
 */ 
void 
stputq(Queue *q, Block *bp) 
{ 
	int i; 
 
	if(bp->type == M_HANGUP){ 
		freeb(bp); 
		q->flag |= QHUNGUP; 
		q->other->flag |= QHUNGUP; 
	} else { 
		lock(q); 
		if(q->first) 
			q->last->next = bp; 
		else 
			q->first = bp; 
		q->last = bp; 
		q->len += bp->wptr - bp->rptr; 
		if(q->len >= Streamhi) 
			q->flag |= QHIWAT; 
		unlock(q); 
	} 
	wakeup(&q->r); 
} 
 
/* 
 *  read a string.  update the offset accordingly. 
 */ 
long 
stringread(Chan *c, uchar *buf, long n, char *str) 
{ 
	long i; 
 
	i = strlen(str); 
	i -= c->offset; 
	if(i<n) 
		n = i; 
	if(n<0) 
		return 0; 
	memcpy(buf + c->offset, str, n); 
	c->offset += n; 
	return n; 
} 
 
/* 
 *  return true if there is an output buffer available 
 */ 
static int 
isinput(void *x) 
{ 
	return ((Queue *)x)->first != 0; 
} 
 
/* 
 *  read until we fill the buffer or until a DELIM is encountered 
 */ 
long 
streamread(Chan *c, void *vbuf, long n) 
{ 
	Block *bp; 
	Stream *s; 
	Queue *q; 
	long rv = 0; 
	int left, i, x; 
	uchar *buf = vbuf; 
	char num[32]; 
 
	s = c->stream; 
	switch(STREAMTYPE(c->qid)){ 
	case Sdataqid: 
		break; 
	case Sctlqid: 
		sprint(num, "%d", s->id); 
		return stringread(c, buf, n, num); 
	default: 
		if(CHDIR & c->qid) 
			return devdirread(c, vbuf, n, 0, 0, streamgen); 
		else 
			panic("streamread"); 
	} 
 
	/* 
	 *  one reader at a time 
	 */ 
	qlock(&s->rdlock); 
	if(waserror()){ 
		qunlock(&s->rdlock); 
		nexterror(); 
	} 
 
	/* 
	 *  sleep till data is available 
	 */ 
	q = RD(s->procq); 
	left = n; 
	while(left){ 
		bp = getq(q); 
		if(bp == 0){ 
			if(q->flag & QHUNGUP) 
				break; 
			sleep(&q->r, &isinput, (void *)q); 
			continue; 
		} 
 
		i = bp->wptr - bp->rptr; 
		if(i <= left){ 
			memcpy(buf, bp->rptr, i); 
			left -= i; 
			buf += i; 
			if(bp->flags & S_DELIM){ 
				freeb(bp); 
				break; 
			} else 
				freeb(bp); 
		} else { 
			memcpy(buf, bp->rptr, left); 
			bp->rptr += left; 
			putbq(q, bp); 
			left = 0; 
		} 
	}; 
 
	qunlock(&s->rdlock); 
	poperror(); 
	return n - left;	 
} 
 
/* 
 *  Handle a ctl request.  Streamwide requests are: 
 * 
 *	hangup			-- send an M_HANGUP up the stream 
 *	push ldname		-- push the line discipline named ldname 
 *	pop			-- pop a line discipline 
 * 
 *  This routing is entrered with s->wrlock'ed and must unlock. 
 */ 
static long 
streamctlwrite(Stream *s, void *a, long n) 
{ 
	Qinfo *qi; 
	Block *bp; 
 
	/* 
	 *  package 
	 */ 
	bp = allocb(n+1); 
	memcpy(bp->wptr, a, n); 
	bp->wptr[n] = 0; 
	bp->wptr += n + 1; 
 
	/* 
	 *  check for standard requests 
	 */ 
	if(streamparse("hangup", bp)){ 
		hangup(s); 
		freeb(bp); 
	} else if(streamparse("push", bp)){ 
		qi = qinfofind((char *)bp->rptr); 
		pushq(s, qi); 
		freeb(bp); 
	} else if(streamparse("pop", bp)){ 
		popq(s); 
		freeb(bp); 
	} else { 
		bp->type = M_CTL; 
		bp->flags |= S_DELIM; 
		PUTNEXT(s->procq, bp); 
	} 
 
	return n; 
} 
 
/* 
 *  wait till there's room in the next stream 
 */ 
static int 
notfull(void *arg) 
{ 
	Queue *q; 
 
	q = (Queue *)arg; 
	return q->len < Streamhi; 
} 
void 
flowctl(Queue *q) 
{ 
	if(q->next->len >= Streamhi) 
		sleep(&q->r, notfull, q->next); 
} 
 
/* 
 *  send the request as a single delimited block 
 */ 
long 
streamwrite(Chan *c, void *a, long n) 
{ 
	Stream *s; 
	Block *bp; 
	Queue *q; 
	long rem; 
	int i; 
 
	/* 
	 *  one writer at a time 
	 */ 
	s = c->stream; 
	qlock(&s->wrlock); 
	if(waserror()){ 
		qunlock(&s->wrlock); 
		nexterror(); 
	} 
 
	/* 
	 *  decode the qid 
	 */ 
	switch(STREAMTYPE(c->qid)){ 
	case Sdataqid: 
		break; 
	case Sctlqid: 
		n = streamctlwrite(s, a, n); 
		qunlock(&s->wrlock); 
		poperror(); 
		return n; 
	default: 
		panic("bad stream qid\n"); 
	} 
 
	/* 
	 *  No writes allowed on hungup channels 
	 */ 
	q = s->procq; 
	if(q->other->flag & QHUNGUP) 
		error(0, Ehungup); 
 
	if(GLOBAL(a) || n==0){ 
		/* 
		 *  `a' is global to the whole system, just create a 
		 *  pointer to it and pass it on. 
		 */ 
		flowctl(q); 
		bp = allocb(0); 
		bp->rptr = bp->base = (uchar *)a; 
		bp->wptr = bp->lim = (uchar *)a+n; 
		bp->flags |= S_DELIM; 
		bp->type = M_DATA; 
		PUTNEXT(q, bp); 
	} else { 
		/* 
		 *  `a' is in the user's address space, copy it into 
		 *  system buffers and pass the buffers on. 
		 */ 
		for(rem = n; ; rem -= i) { 
			flowctl(q); 
			bp = allocb(rem); 
			i = bp->lim - bp->wptr; 
			if(i >= rem){ 
				memcpy(bp->wptr, a, rem); 
				bp->flags |= S_DELIM; 
				bp->wptr += rem; 
				bp->type = M_DATA; 
				PUTNEXT(q, bp); 
				break; 
			} else { 
				memcpy(bp->wptr, a, i); 
				bp->wptr += i; 
				bp->type = M_DATA; 
				PUTNEXT(q, bp); 
				a = ((char*)a) + i; 
			} 
		} 
	} 
	qunlock(&s->wrlock); 
	poperror(); 
	return n; 
} 


source code copyright © 1990-2005 Lucent Technologies; see license
Plan 9 distribution
comments to russ cox (rsc@swtch.com)