implement Dbfs;

#
# Copyright © 1999, 2002 Vita Nuova Limited.  All rights reserved.
#

# Enhanced to include record locking, index field generation and update notification

# TO DO:
#	make writing & reading more like real files; don't ignore offsets.
#	open with OTRUNC should work.
#	provide some way of compacting a dbfs file.

include "sys.m";
	sys: Sys;
	Qid: import Sys;

include "draw.m";

include "arg.m";

include "styx.m";
	styx: Styx;
	Rmsg, Tmsg: import styx;

include "styxservers.m";
	styxservers: Styxservers;
	Styxserver, Fid, Navigator, Navop: import styxservers;
	Enotfound, Eperm, Ebadfid, Ebadarg: import styxservers;

include "string.m";
	str: String;

include "bufio.m";
	bufio: Bufio;
	Iobuf: import bufio;

include "sh.m";
	sh: Sh;

Record: adt {
	id:		int;			# file number in directory (if block is allocated)
	offset:	int;			# start of data
	count:	int;			# length of block (excluding header)
	datalen:	int;			# length of data (-1 if block is free)
	vers:		int;			# version

	new:		fn(offset: int, length: int): ref Record;
	qid:		fn(r: self ref Record): Sys->Qid;
};

# Record lock
Lock: adt {
	qpath: big;
	fid:	int;
};

HEADLEN: con 10;
MINSIZE: con 20;

Database: adt {
	file:		ref Iobuf;
	records:	array of ref Record;
	maxid:	int;
	locking:	int;
	locklist:	list of Lock;
	indexing:	int;
	stats:	int;
	index:	int;
	s_reads:	int;
	s_writes:	int;
	s_creates:	int;
	s_removes:	int;
	updcmd:	string;
	vers:		int;

	build:	fn(f: ref Iobuf, locking, indexing: int, stats: int, updcmd: string): (ref Database, string);
	write:	fn(db: self ref Database, n: int, data: array of byte): int;
	read:		fn(db: self ref Database, n: int): array of byte;
	remove:	fn(db: self ref Database, n: int);
	create:	fn(db: self ref Database, data: array of byte): ref Record;
	updated:	fn(db: self ref Database);
	lock:		fn(db: self ref Database, c: ref Styxservers->Fid): int;
	unlock:	fn(db: self ref Database, c: ref Styxservers->Fid);
	ownlock:	fn(db: self ref Database, c: ref Styxservers->Fid): int;
};

Dbfs: module
{
	init:	fn(ctxt: ref Draw->Context, nil: list of string);
};

Qdir, Qnew, Qdata, Qindex, Qstats: con iota;

stderr: ref Sys->FD;
database: ref Database;
context: ref Draw->Context;
user: string;
Eremoved: con "file removed";
Egreg: con "thermal problems";
Elocked: con "open/create -- file is locked";

usage()
{
	sys->fprint(stderr, "Usage: dbfs [-abcelrxD][-u cmd] file mountpoint\n");
	raise "fail:usage";
}

nomod(s: string)
{
	sys->fprint(stderr, "dbfs: can't load %s: %r\n", s);
	raise "fail:load";
}

init(ctxt: ref Draw->Context, args: list of string)
{
	sys = load Sys Sys->PATH;
	stderr = sys->fildes(2);
	context = ctxt;
	sys->pctl(Sys->FORKFD|Sys->NEWPGRP, nil);
	styx = load Styx Styx->PATH;
	if(styx == nil)
		nomod(Styx->PATH);
	styx->init();
	styxservers = load Styxservers Styxservers->PATH;
	if(styxservers == nil)
		nomod(Styxservers->PATH);
	styxservers->init(styx);
	str = load String String->PATH;
	if(str == nil)
		nomod(String->PATH);
	bufio = load Bufio Bufio->PATH;
	if(bufio == nil)
		nomod(Bufio->PATH);

	arg := load Arg Arg->PATH;
	if(arg == nil)
		nomod(Arg->PATH);
	arg->init(args);
	flags := Sys->MREPL;
	copt := 0;
	empty := 0;
	locking := 0;
	stats := 0;
	indexing := 0;
	updcmd := "";
	while((o := arg->opt()) != 0)
		case o {
		'a' =>	flags = Sys->MAFTER;
		'b' =>	flags = Sys->MBEFORE;
		'r' =>		flags = Sys->MREPL;
		'c' =>	copt = 1;
		'e' =>	empty = 1;
		'l' =>		locking = 1;
		'u' =>	updcmd = arg->arg();
				if(updcmd == nil)
					usage();
		'x' =>	indexing = 1;
				stats = 1;
		'D' =>	styxservers->traceset(1);
		* =>		usage();
		}
	args = arg->argv();
	arg = nil;

	if(len args != 2)
		usage();
	if(copt)
		flags |= Sys->MCREATE;
	file := hd args;
	args = tl args;
	mountpt := hd args;

	if(updcmd != nil){
		sh = load Sh Sh->PATH;
		if(sh == nil)
			nomod(Sh->PATH);
	}

	df := bufio->open(file, Sys->ORDWR);
	if(df == nil && empty){
		(rc, nil) := sys->stat(file);
		if(rc < 0)
			df = bufio->create(file, Sys->ORDWR, 8r600);
	}
	if(df == nil){
		sys->fprint(stderr, "dbfs: can't open %s: %r\n", file);
		raise "fail:cannot open file";
	}
	(db, err) := Database.build(df, locking, indexing, stats, updcmd);
	if(db == nil){
		sys->fprint(stderr, "dbfs: can't read %s: %s\n", file, err);
		raise "fail:cannot read db";
	}
	database = db;

	sys->pctl(Sys->FORKFD, nil);

	user = rf("/dev/user");
	if(user == nil)
		user = "inferno";

	fds := array[2] of ref Sys->FD;
	if(sys->pipe(fds) < 0){
		sys->fprint(stderr, "dbfs: can't create pipe: %r\n");
		raise "fail:pipe";
	}

	navops := chan of ref Navop;
	spawn navigator(navops);

	(tchan, srv) := Styxserver.new(fds[0], Navigator.new(navops), big Qdir);
	fds[0] = nil;

	pidc := chan of int;
	spawn serveloop(tchan, srv, pidc, navops);
	<-pidc;

	if(sys->mount(fds[1], nil, mountpt, flags, nil) < 0) {
		sys->fprint(stderr, "dbfs: mount failed: %r\n");
		raise "fail:bad mount";
	}
}

rf(f: string): string
{
	fd := sys->open(f, Sys->OREAD);
	if(fd == nil)
		return nil;
	b := array[Sys->NAMEMAX] of byte;
	n := sys->read(fd, b, len b);
	if(n < 0)
		return nil;
	return string b[0:n];
}

serveloop(tchan: chan of ref Tmsg, srv: ref Styxserver, pidc: chan of int, navops: chan of ref Navop)
{
	pidc <-= sys->pctl(Sys->FORKNS|Sys->NEWFD, stderr.fd::1::2::database.file.fd.fd::srv.fd.fd::nil);
#	stderr = sys->fildes(stderr.fd);
	database.file.fd = sys->fildes(database.file.fd.fd);
Serve:
	while((gm := <-tchan) != nil){
		pick m := gm {
		Readerror =>
			sys->fprint(stderr, "dbfs: fatal read error: %s\n", m.error);
			break Serve;
		Open =>
			open(srv, m);
		Read =>
			(c, err) := srv.canread(m);
			if(c == nil) {
				srv.reply(ref Rmsg.Error(m.tag, err));
				break;
			}
			if(c.qtype & Sys->QTDIR){
				srv.read(m);
				break;
			}
			case TYPE(c.path) {
			Qindex =>
				if(database.index < 0) {
					srv.reply(ref Rmsg.Error(m.tag, Eperm));
					break;
				}
				if (m.offset > big 0) {
					srv.reply(ref Rmsg.Read(m.tag, nil));
					break;
				}
				reply := array of byte string ++database.index;
				if(m.count < len reply)
					reply = reply[:m.count];
				srv.reply(ref Rmsg.Read(m.tag, reply));
			Qstats =>
				if (m.offset > big 0) {
					srv.reply(ref Rmsg.Read(m.tag, nil));
					break;
				}
				reply := array of byte sys->sprint("%d %d %d %d", database.s_reads, database.s_writes,
												database.s_creates, database.s_removes);
				if(m.count < len reply) reply = reply[:m.count];
				srv.reply(ref Rmsg.Read(m.tag, reply));
			Qdata =>
				recno := id2recno(FILENO(c.path));
				if(recno == -1)
					srv.reply(ref Rmsg.Error(m.tag, Eremoved));
				else
					srv.reply(styxservers->readbytes(m, database.read(recno)));
			* =>
				srv.reply(ref Rmsg.Error(m.tag, Egreg));
			}
		Write =>
			(c, err) := srv.canwrite(m);
			if(c == nil){
				srv.reply(ref Rmsg.Error(m.tag, err));
				break;
			}
			if(!database.ownlock(c)) {
				# shouldn't happen: open checks
				srv.reply(ref Rmsg.Error(m.tag, Elocked));
				break;
			}
			case TYPE(c.path) {
			Qindex =>
				if(database.index >= 0) {
					srv.reply(ref Rmsg.Error(m.tag, Eperm));
					break;
				}
				database.index = int string m.data;
				srv.reply(ref Rmsg.Write(m.tag, len m.data));
			Qdata =>
				recno := id2recno(FILENO(c.path));
				if(recno == -1)
					srv.reply(ref Rmsg.Error(m.tag, "phase error"));
				else {
					changed := 1;
					if(database.updcmd != nil){
						oldrec := database.read(recno);
						changed = !eqbytes(m.data, oldrec);
					}
					if(changed && database.write(recno, m.data) == -1){
						srv.reply(ref Rmsg.Error(m.tag, sys->sprint("%r")));
						break;
					}
					if(changed)
						database.updated();	# run the command before reply
					srv.reply(ref Rmsg.Write(m.tag, len m.data));
				}
			* =>
				srv.reply(ref Rmsg.Error(m.tag, Eperm));
			}
		Clunk =>
			c := srv.getfid(m.fid);
			if(c != nil)
				database.unlock(c);
			srv.clunk(m);
		Remove =>
			c := srv.getfid(m.fid);
			database.unlock(c);
			if(c == nil || c.qtype & Sys->QTDIR || TYPE(c.path) != Qdata){
				# let it diagnose all the errors
				srv.remove(m);
				break;
			}
			recno := id2recno(FILENO(c.path));
			if(recno == -1)
				srv.reply(ref Rmsg.Error(m.tag, "phase error"));
			else {
				database.remove(recno);
				database.updated();
				srv.reply(ref Rmsg.Remove(m.tag));
			}
			srv.delfid(c);
		* =>
			srv.default(gm);
		}
	}
	navops <-= nil;		# shut down navigator
}

eqbytes(a, b: array of byte): int
{
	if(len a != len b)
		return 0;
	for(i := 0; i < len a; i++)
		if(a[i] != b[i])
			return 0;
	return 1;
}

id2recno(id: int): int
{
	recs := database.records;
	for(i := 0; i < len recs; i++)
		if(recs[i].datalen >= 0 && recs[i].id == id)
			return i;
	return -1;
}
	
open(srv: ref Styxserver, m: ref Tmsg.Open): ref Fid
{
	(c, mode, d, err) := srv.canopen(m);
	if(c == nil){
		srv.reply(ref Rmsg.Error(m.tag, err));
		return nil;
	}
	if(TYPE(c.path) == Qnew){
		# generate new file
		if(c.uname != user){
			srv.reply(ref Rmsg.Error(m.tag, Eperm));
			return nil;
		}
		r := database.create(array[0] of byte);
		if(r == nil) {
			srv.reply(ref Rmsg.Error(m.tag, "create -- i/o error"));
			return nil;
		}
		(d, nil) = dirgen(QPATH(r.id, Qdata));
	}
	if(m.mode & Sys->OTRUNC) {
		# TO DO
	}
	c.open(mode, d.qid);
	if(database.locking && TYPE(c.path) == Qdata && (m.mode & (Sys->OWRITE|Sys->ORDWR))) {
		if(!database.lock(c)) {
			srv.reply(ref Rmsg.Error(m.tag, Elocked));
			return nil;
		}
	}
	srv.reply(ref Rmsg.Open(m.tag, d.qid, srv.iounit()));
	return c;
}

dirslot(n: int): int
{
	for(i := 0; i < len database.records; i++){
		r := database.records[i];
		if(r != nil && r.datalen >= 0){
			if(n == 0)
				return i;
			n--;
		}
	}
	return -1;
}

dir(qid: Sys->Qid, name: string, length: big, uid: string, perm: int): ref Sys->Dir
{
	d := ref sys->zerodir;
	d.qid = qid;
	if(qid.qtype & Sys->QTDIR)
		perm |= Sys->DMDIR;
	d.mode = perm;
	d.name = name;
	d.uid = uid;
	d.gid = uid;
	d.length = length;
	return d;
}

dirgen(p: big): (ref Sys->Dir, string)
{
	case TYPE(p) {
	Qdir =>
		return (dir(Qid(QPATH(0, Qdir),database.vers,Sys->QTDIR), "/", big 0, user, 8r700), nil);
	Qnew =>
		return (dir(Qid(QPATH(0, Qnew),0,Sys->QTFILE), "new", big 0, user, 8r600), nil);
	Qindex =>
		return (dir(Qid(QPATH(0, Qindex),0,Sys->QTFILE), "index", big 0, user, 8r600), nil);
	Qstats =>
		return (dir(Qid(QPATH(0, Qstats),0,Sys->QTFILE), "stats", big 0, user, 8r400), nil);
	* =>
		n := id2recno(FILENO(p));
		if(n < 0 || n >= len database.records)
			return (nil, nil);
		r := database.records[n];
		if(r == nil || r.datalen < 0)
			return (nil, Enotfound);
		l := r.datalen;
		if(l < 0)
			l = 0;
		return (dir(r.qid(), sys->sprint("%d", r.id), big l, user, 8r600), nil);
	}
}

navigator(navops: chan of ref Navop)
{
	while((m := <-navops) != nil){
		pick n := m {
		Stat =>
			n.reply <-= dirgen(n.path);
		Walk =>
			if(int n.path != Qdir){
				n.reply <-= (nil, "not a directory");
				break;
			}
			case n.name {
			".." =>
				;	# nop
			"new" =>
				n.path = QPATH(0, Qnew);
			"stats" =>
				if(!database.indexing){
					n.reply <-= (nil, Enotfound);
					continue;
				}
				n.path = QPATH(0, Qstats);
			"index" =>
				if(!database.indexing){
					n.reply <-= (nil, Enotfound);
					continue;
				}
				n.path = QPATH(0, Qindex);
			* =>
				if(len n.name < 1 || !(n.name[0]>='0' && n.name[0]<='9')){	# weak test for now
					n.reply <-= (nil, Enotfound);
					continue;
				}
				n.path = QPATH(int n.name, Qdata);
			}
			n.reply <-= dirgen(n.path);
		Readdir =>
			if(int m.path != Qdir){
				n.reply <-= (nil, "not a directory");
				break;
			}
			o := 1;	# Qnew;
			stats := -1;
			indexing := -1;
			if(database.indexing)
				indexing = o++;
			if(database.stats)
				stats = o++;
		    Dread:
			for(i := n.offset; --n.count >= 0; i++){
				case i {
				0 =>
					n.reply <-= dirgen(QPATH(0,Qnew));
				* =>
					if(i == indexing)
						n.reply <-= dirgen(QPATH(0, Qindex));
					if(i == stats)
						n.reply <-= dirgen(QPATH(0, Qstats));
					j := dirslot(i-o);	# n² but fine if the file will be small
					if(j < 0)
						break Dread;
					r := database.records[j];
					n.reply <-= dirgen(QPATH(r.id,Qdata));
				}
			}
			n.reply <-= (nil, nil);
		}
	}
}

QPATH(w, q: int): big
{
	return big ((w<<8)|q);
}

TYPE(path: big): int
{
	return int path & 16rFF;
}

FILENO(path: big): int
{
	return (int path >> 8) & 16rFFFFFF;
}

Database.build(f: ref Iobuf, locking, indexing, stats: int, updcmd: string): (ref Database, string)
{
	rl: list of ref Record;
	offset := 0;
	maxid := 0;
	for(;;) {
		d := array[HEADLEN] of byte;
		n := f.read(d, HEADLEN);
		if(n < HEADLEN)
			break;
		orig := s := string d;
		if(len s != HEADLEN)
			return (nil, "found bad header");
		r := ref Record;
		r.vers = 0;
		(r.count, s) = str->toint(s, 10);
		(r.datalen, s) = str->toint(s, 10);
		if(s != "\n")
			return (nil, sys->sprint("found bad header '%s'\n", orig));
		r.offset = offset + HEADLEN;
		offset += r.count + HEADLEN;
		f.seek(big offset, Bufio->SEEKSTART);
		r.id = maxid++;
		rl = r :: rl;
	}
	db := ref Database(f, array[maxid] of ref Record, maxid, locking, nil, indexing, stats, -1, 0, 0, 0, 0, updcmd, 0);
	for(i := len db.records - 1; i >= 0; i--) {
		db.records[i] = hd rl;
		rl = tl rl;
	}
	return (db, nil);
}

Database.write(db: self ref Database, recno: int, data: array of byte): int
{
	db.s_writes++;
	r := db.records[recno];
	r.vers++;
	if(len data <= r.count) {
		if(r.count - len data >= HEADLEN + MINSIZE)
			splitrec(db, recno, len data);
		writerec(db, recno, data);
		db.file.flush();
	} else {
		freerec(db, recno);
		n := allocrec(db, len data);
		if(n == -1)
			return -1;		# BUG: we lose the original data in this case.
		db.records[n].id = r.id;
		db.write(n, data);
	}
	return 0;
}

Database.create(db: self ref Database, data: array of byte): ref Record
{
	db.s_creates++;
	db.vers++;
	n := allocrec(db, len data);
	if(n < 0)
		return nil;
	if(db.write(n, data) < 0){
		freerec(db, n);
		return nil;
	}
	r := db.records[n];
	r.id = db.maxid++;
	return r;
}

Database.read(db: self ref Database, recno: int): array of byte
{
	db.s_reads++;
	r := db.records[recno];
	if(r.datalen <= 0)
		return nil;
	db.file.seek(big r.offset, Bufio->SEEKSTART);
	d := array[r.datalen] of byte;
	n := db.file.read(d, r.datalen);
	if(n != r.datalen) {
		sys->fprint(stderr, "dbfs: only read %d bytes (expected %d)\n", n, r.datalen);
		return nil;
	}
	return d;
}

Database.remove(db: self ref Database, recno: int)
{
	db.s_removes++;
	db.vers++;
	freerec(db, recno);
	db.file.flush();
}

Database.updated(db: self ref Database)
{
	if(db.updcmd != nil)
		sh->system(context, db.updcmd);
}

# Locking - try to lock a record

Database.lock(db: self ref Database, c: ref Styxservers->Fid): int
{
	if(TYPE(c.path) != Qdata || !db.locking)
		return 1;
	for(ll := db.locklist; ll != nil; ll = tl ll) {
		lock := hd ll;
		if(lock.qpath == c.path)
			return lock.fid == c.fid;
	}
	db.locklist = (c.path, c.fid) :: db.locklist;
	return 1;
}


# Locking - unlock a record

Database.unlock(db: self ref Database, c: ref Styxservers->Fid)
{
	if(TYPE(c.path) != Qdata || !db.locking)
		return;
	ll := db.locklist;
	db.locklist = nil;
	for(; ll != nil; ll = tl ll){
		lock := hd ll;
		if(lock.qpath == c.path && lock.fid == c.fid){
			# not replaced on list
		}else
			db.locklist = hd ll :: db.locklist;
	}
}


# Locking - check if Fid c has the lock on its record

Database.ownlock(db: self ref Database, c: ref Styxservers->Fid): int
{
	if(TYPE(c.path) != Qdata || !db.locking)
		return 1;
	for(ll := db.locklist; ll != nil; ll = tl ll) {
		lock := hd ll;
		if(lock.qpath == c.path)
			return lock.fid == c.fid;
	}
	return 0;
}

Record.new(offset: int, length: int): ref Record
{
	return ref Record(-1, offset, length, -1, 0);
}

Record.qid(r: self ref Record): Qid
{
	return Qid(QPATH(r.id,Qdata), r.vers, Sys->QTFILE);
}

freerec(db: ref Database, recno: int)
{
	nr := len db.records;
	db.records[recno].datalen = -1;
	for(i := recno; i >= 0; i--)
		if(db.records[i].datalen != -1)
			break;
	f := i + 1;
	nb := 0;
	for(i = f; i < nr; i++) {
		if(db.records[i].datalen != -1)
			break;
		nb += db.records[i].count + HEADLEN;
	}
	db.records[f].count = nb - HEADLEN;
	writeheader(db.file, db.records[f]);
	# could blank out freed entries here if we cared.
	if(i < nr && f < i)
		db.records[f+1:] = db.records[i:];
	db.records = db.records[0:nr - (i - f - 1)];
}

splitrec(db: ref Database, recno: int, pos: int)
{
	a := array[len db.records + 1] of ref Record;
	a[0:] = db.records[0:recno+1];
	if(recno < len db.records - 1)
		a[recno+2:] = db.records[recno+1:];
	db.records = a;
	r := a[recno];
	a[recno+1] = Record.new(r.offset + pos + HEADLEN, r.count - HEADLEN - pos);
	r.count = pos;
	writeheader(db.file, a[recno+1]);
}

writerec(db: ref Database, recno: int, data: array of byte): int
{
	db.records[recno].datalen = len data;
	if(writeheader(db.file, db.records[recno]) == -1)
		return -1;
	if(db.file.write(data, len data) == Bufio->ERROR)
		return -1;
	return 0;
}

writeheader(f: ref Iobuf, r: ref Record): int
{
	f.seek(big r.offset - big HEADLEN, Bufio->SEEKSTART);
	if(f.puts(sys->sprint("%4d %4d\n", r.count, r.datalen)) == Bufio->ERROR) {
		sys->fprint(stderr, "dbfs: error writing header (id %d, offset %d, count %d, datalen %d): %r\n",
					r.id, r.offset, r.count, r.datalen);
		return -1;
	}
	return 0;
}

# finds or creates a record of the requisite size; does not mark it as allocated.
allocrec(db: ref Database, nb: int): int
{
	if(nb < MINSIZE)
		nb = MINSIZE;
	best := -1;
	n := -1;
	for(i := 0; i < len db.records; i++) {
		r := db.records[i];
		if(r.datalen == -1) {
			avail := r.count - nb;
			if(avail >= 0 && (n == -1 || avail < best)) {
				best = avail;
				n = i;
			}
		}
	}
	if(n != -1)
		return n;
	nr := len db.records;
	a := array[nr + 1] of ref Record;
	a[0:] = db.records[0:];
	offset := 0;
	if(nr > 0)
		offset = a[nr-1].offset + a[nr-1].count;
	db.file.seek(big offset, Bufio->SEEKSTART);
	if(db.file.write(array[nb + HEADLEN] of {* => byte(0)}, nb + HEADLEN) == Bufio->ERROR
			|| db.file.flush() == Bufio->ERROR) {
		sys->fprint(stderr, "dbfs: write of new entry failed: %r\n");
		return -1;
	}
	a[nr] = Record.new(offset + HEADLEN, nb);
	db.records = a;
	return nr;
}

now(fd: ref Sys->FD): int
{
	if(fd == nil)
		return 0;
	buf := array[128] of byte;
	sys->seek(fd, big 0, 0);
	n := sys->read(fd, buf, len buf);
	if(n < 0)
		return 0;
	t := (big string buf[0:n]) / big 1000000;
	return int t;
}