/* * this is the 'watcher' daemon - the parent and Borg collective. As this * is the hive mind, it controls all other processes - kill this, and they * should all die. Kill one, and this should restart it. */ CCEd : { main: int smd_pid, txnqd_pid, ed_pid; handle signal HUP = HUP all children handle signal TERM = TERM all children atexit() kill all children smd_pid = fork, exec(session manager daemon) txnqd_pid = fork, exec(txn queue daemon) ed_pid = fork, exec(event dispatcher) while (1) { pid = wait for child to die() if (pid == smd_pid) { restart smd } else if (pid == txnqd_pid) { restart txnqd } else if (pid == ed_pid) { restart ed } } } /* * this is the session manager * It is responsible for accepting a client connection, forking a new copy * of itself, and waiting for more connections. * It processes incoming CSCP and handles reads with libODB, and compiles * and writes into a transaction-object. When COMMITed, the txn object is * sent to the TxnQ Daemon */ SMd : { main: handle signals setup to reap children when we exit setup to reap children when they exit setup UNIX domain socket /* fork and call subroutine */ while (1) { sock = accept() fork() if child { manage_session(sock) } else { close sock } } manage_session: Initialization: open socket to TxnQ odb = new ODB interface object (implicitly creates a new txn ) Identification-state: write id string to client Authentication-state: get auth tokens if (odb.auth(tokens) fails) { exit } Read-only-state: while (1) { /* we can block here */ wait for data on socket { read data switch (data) { case begin_cmd: odb.flush_txn switch to transaction state case check_cmd: status_mode() break; case read_cmd: read from odb write results to client break case commit_cmd: case write_cmd: error break } } Transaction-state: while (1) { /* we can block here */ wait for data on socket { read data switch (data) { case commit_cmd: write txn object to TxnQ socket case check_cmd: status_mode() break; case read_cmd: read from odb write results to client break case write_cmd: add to txn object break } } status_mode: write status_cmd to TxnQ socket /* can block here */ while (1) { if results on TxnQ socket { read msg ack msg send msg to client if msg is txn_done { return } } if command on incoming socket { read command switch (command) { case bg_cmd: write bg_cmd to TxnQ socket break; case cancel_cmd: write cancel_cmd to TxnQ socket break; default: error } } } } /* * This is the transaction queue daemon * It's responsibilities are: * read txn-logs from ED * write txn-results to SM clients * act as the log historian for txn-results * read incmoing commands, and deal with them appropriately * queue transactions locally * dispatch transactions to the ED as needed */ TxnQd: { main: setup a UNIX domain socket, listen() /* FIXME: how are we doing this ? */ connect to ED localQ = empty; /* big fat poll loop */ while (1) { /* 1. if ED is idle, and we have a TXN, feed ED a new TXN */ if (state == normal) { if (txn in local queue) { write txn to ED, state = in_txn } } /* this will block */ poll(UDsocket, all connected sockets, ABL, ED); /* 2. Read data from SMs */ if (data from a connected socket) { if (data == a txn) { write txn to the ABL /* to be read in step 3 */ } if (data is a cmd) { /* SM only passes on status, bg, cancel commands */ if (cmd == cancel) { if (state == in_txn && canceled-txn is alive) { write cancel to ED } else { error } } else if (cmd == status) { write status start msg for this SM to client wait for ack if (! timeout) { if (txn not done) { add sm to txn-msg-list } read history logs for request write to client read ack from client if (timeout) { clean up resultq remove sm from notify list } } else if (cmd == bg) { remove sm from txn-msg-list clean up resultq for this sm } else { error } } } else if (data from ABL) { read from ABL if (data is a txn) { add txn to local queue init history for txn } else if (data is a cancel-cmd) { write to ED } else { error } } else if (data from ED) { read data if (data == valid message) { write to history log if any requestors for this txn { write to resultq for each wait for ack in resultq if (timeout && !acked) { remove sm from notify-list } } } else { error } if (data == done-msg) { close history for txn clear notify list for txn remove finished txn from local queue state = normal } } } } /* * this is the event dispatcher - perhaps it is better called the * "actuation manager" */ ED: { main: state = normal while (1) { /* * we have one or two inputs - the TxnQ daemon, and child procs * we select between them as appropriate - this gmakes two nice * select() algos */ wait for data from TxnQ daemon read data if data is a txn object { /* lets do a txn! */ do_txn() } else { error - no commands now } } do_txn: /* initalize list of handlers: */ extract list of events from txn object hlist = new empty list foreach event (list of events from txn object) { push (hlist, handlers associated with event) } uniquify(hlist) sort(hlist) /* useful, even with the DEFER stuff */ foreach handler in hlist { fork() connect child stdin to an fd connect child stdout to an fd connect child stderr to an fd child: exec handler while (child !exited) { /* select */ wait for data from TxnQ daemon or child if data from TxnQ daemon { read data /* should only accept magic 'cancel' requests */ if data == cancel cmd cancel child reap child write result to TxnQd set rollback flag } else { error - only commands now } } else { /* results from handler */ read data write results to TxnQd if (data == handler done) { reap child write status to TxnQ daemon } if (data == subtransactio) { process subtransactions } process data } } if (handler exited && !rollback flag) { write result to TxnQd if (handler defers) { move to end of handler queue } else if (handler fails) { set rollback flag } } if (rollback flag) { /* FIXME: expand rollback algo */ do a rollback break; } } write result to TxnQd return }