#include #include #include #include #include #include "dat.h" #include "fns.h" static int sameip(uchar *a, uchar *b) { return memcmp(a, b, IPaddrlen) == 0; } static int splitaddr(char *addr, char *proto, uchar *ipaddr, ushort *port) { char *p, *ap, *pp; ushort po; p = addr; ap = strchr(p, '!'); if(ap == nil) return -1; pp = strchr(ap+1, '!'); if(pp == nil) return -1; *ap++ = '\0'; *pp++ = '\0'; if(parseip(ipaddr, ap) < 0) return -1; if(strlen(p) > 4) return -1; strncpy(proto, p, 5); po = (short) strtoul(pp, nil, 10); if(po == 0) return -1; *port = po; return 0; } static void sendto(int udpfd, uchar *to, ushort toport, uchar *buf, ulong len) { IOchunk chk[2]; Udphdr hdr; memcpy(hdr.raddr, to, sizeof(hdr.raddr)); hnputs(hdr.rport, toport); chk[0].addr = &hdr; chk[0].len = Udphdrsize; chk[1].addr = buf; chk[1].len = len; if(writev(udpfd, chk, 2) <= 0) sysfatal("writev: %r"); } static void sendtocbor(Client *c, uchar *to, ushort toport, cbor *cbo) { uchar *buf; ulong sz, ne; sz = cbor_encode_size(cbo); buf = emalloc(sz); ne = cbor_encode(cbo, buf, sz); assert(ne == sz); sendto(c->udpdata, to, toport, buf, ne); c->bytesout += ne; } void clientping(Client *c, uchar *to, ushort toport) { vlong now; cbor *pkt; now = nsec(); c->pingsent = now; pkt = cbor_pack(&cbor_default_allocator, "[uu]", PKT_PING, 0ULL); sendtocbor(c, to, toport, pkt); cbor_free(&cbor_default_allocator, pkt); } void clientpong(Client *c, uchar *to, short toport, u64int token) { cbor *pkt; pkt = cbor_pack(&cbor_default_allocator, "[uu]", PKT_PONG, token); sendtocbor(c, to, toport, pkt); cbor_free(&cbor_default_allocator, pkt); } void clientaudio(Client *c, u64int seqno, Block *b) { cbor *pkt; pkt = cbor_pack(&cbor_default_allocator, "[uub]", PKT_AUDIO, seqno, (int)b->length, b->data); sendtocbor(c, c->them, c->themport, pkt); cbor_free(&cbor_default_allocator, pkt); } static void clientreadproc(void *v) { long n; uchar buf[2048]; Block *b; Client *c; c = v; threadsetname("client read"); while((n = read(c->udpdata, buf, sizeof(buf))) > 0){ b = newblock(n, buf); sendp(c->pkt, b); } sysfatal("%s: %ld: %r", threadgetname(), n); } void clientpkt(Client *c, Audio *a, Block *b) { int rv, alen; u64int cmd, token; ulong n; vlong ts; uchar *p, *from, *abuf; ushort fromport; Udphdr *hdr; cbor *cbo, *arg; Block *ablock; n = b->length; p = b->data; hdr = (Udphdr*)p; from = hdr->raddr; fromport = nhgets(hdr->rport); p += Udphdrsize; n -= Udphdrsize; /* accounting */ c->bytesin += n; if(0){ fprint(2, "laddr %I:%hud\n", hdr->laddr, nhgets(hdr->lport)); fprint(2, "ifcaddr %I\n", hdr->ifcaddr); fprint(2, "pkt %lud from %I:%hud\n", n, hdr->raddr, nhgets(hdr->rport)); } if(!sameip(c->them, IPnoaddr) && !sameip(from, c->them)){ fprint(2, "rejecting packet from foreigner %I rejected\n", from); return; } /* // broadcasts if(equivip6(hdr->laddr, IPv4bcast)){ if(memcmp(hdr->raddr, c->me, sizeof(hdr->raddr)) == 0) return; print("ping from %I:%hud\n", hdr->raddr, nhgets(hdr->rport)); return; } */ cbo = cbor_decode(&cbor_default_allocator, p, n); if(cbo == nil){ fprint(2, "cbor_decode failed: %r\n"); return; } rv = cbor_unpack(&cbor_default_allocator, cbo, "[uc]", &cmd, &arg); if(rv != 0){ cbor_free(&cbor_default_allocator, cbo); fprint(2, "invalid cbor packet\n"); return; } switch(cmd){ default: fprint(2, "invalid command %llud\n", cmd); break; case PKT_PING: rv = cbor_unpack(&cbor_default_allocator, arg, "u", &token); if(rv != 0){ fprint(2, "ping contained no token\n"); break; } clientpong(c, from, fromport, token); /* set our target */ memcpy(c->them, from, IPaddrlen); c->themport = fromport; break; case PKT_PONG: rv = cbor_unpack(&cbor_default_allocator, arg, "u", &token); if(rv != 0){ fprint(2, "pong contained no token\n"); break; } ts = nsec(); c->latency = ts - c->pingsent; c->lastreply = ts; break; case PKT_AUDIO: rv = cbor_unpack(&cbor_default_allocator, cbo, "[uub]", &cmd, &token, &alen, &abuf); if(rv != 0){ fprint(2, "invalid audio packet\n"); break; } c->recvpkts += 1; if(token > c->seqin+1) c->lostpkts += token - c->seqin + 1; c->seqin = token; ablock = newblock(alen, abuf); free(abuf); sendp(a->out, ablock); break; } cbor_free(&cbor_default_allocator, cbo); } Client* clientinit(char *me, char *them) { int udpctl, udpdata; char *p, buf[128], devdir[64], proto[5]; Client *c; if(me != nil) p = netmkaddr(me, "udp", "56789"); else p = netmkaddr("*", "udp", "56789"); print("we are %s\n", p); if(strncmp(p, "udp", 3) != 0) sysfatal("not a udp address"); snprint(buf, sizeof(buf), "%s", p); udpctl = announce(buf, devdir); if(udpctl < 0) sysfatal("can't announce %s: %r", buf); if(fprint(udpctl, "headers") < 0) sysfatal("can't set header mode: %r"); snprint(buf, sizeof(buf), "%s/data", devdir); udpdata = open(buf, ORDWR); if(udpdata < 0) sysfatal("open %s: %r", buf); c = emalloc(sizeof(*c)); //memcpy(c->me, me, sizeof(c->me)); if(them != nil) c->state = CLIENT_CONNECTING; else c->state = CLIENT_WAITING; if(them != nil){ if(splitaddr(them, proto, c->them, &c->themport) < 0) sysfatal("invalid remote address"); if(strcmp(proto, "udp") != 0) sysfatal("not a udp address"); } c->udpctl = udpctl; c->udpdata = udpdata; c->pkt = chancreate(sizeof(Block*), 100); if(proccreate(clientreadproc, c, 8192) < 0) sysfatal("proccreate: %r"); return c; } void clientrun(Client *c, Audio *a) { int tick, try, nprint; vlong ago; double dt; Channel *timer; Block *b; enum { TRYTIME = 60, PINGMS = 500, NTRY = TRYTIME*1000/PINGMS, PRINTSEC = 10, }; timer = timerchan(PINGMS, "client"); if(c->state == CLIENT_CONNECTING) print("connecting to %I:%hud...", c->them, c->themport); else print("waiting for connection..."); tick = try = nprint = 0; enum { TIMER, PKT, AUDIO, END }; Alt alts[] = { [TIMER] { timer, &dt, CHANRCV }, [PKT] { c->pkt, &b, CHANRCV }, [AUDIO] { a->in, &b, CHANRCV }, [END] { nil, nil, CHANEND }, }; for(;;){ switch(alt(alts)){ case PKT: clientpkt(c, a, b); free(b); break; case TIMER: tick++; switch(c->state){ case CLIENT_CONNECTING: if(c->lastreply != 0){ print(" connected.\n"); c->state = CLIENT_CONNECTED; break; } print("."); if(try++ > NTRY){ print(" timed out\n"); goto disconnected; } clientping(c, c->them, c->themport); break; case CLIENT_WAITING: if(c->themport != 0){ print(" connected.\n"); c->state = CLIENT_CONNECTED; c->lastreply = nsec(); break; } print("."); break; case CLIENT_CONNECTED: if((tick % (PRINTSEC*1000/PINGMS)) == 0){ while(nprint--) print("\b"); nprint = print("%lldms latency %llud recv p/s %llud in/%llud out kbit/s\n", c->latency/1000000LL, c->recvpkts/PRINTSEC, c->bytesin*8/PRINTSEC/1024, c->bytesout*8/PRINTSEC/1024); c->recvpkts = 0; c->bytesin = 0; c->bytesout = 0; } ago = (nsec() - c->lastreply)/1000000LL; //if(ago > 1000) // print("last pong @ %lld ms ago\n", ago); if(ago > 5000){ print("haven't heard from %I in %lldms\n", c->them, ago); goto disconnected; } clientping(c, c->them, c->themport); break; } break; case AUDIO: /* send audio */ if(c->state == CLIENT_CONNECTED) clientaudio(c, c->seqout++, b); free(b); break; } } disconnected: print("disconnected\n"); }