/*
 * ovdb_server.c
 * ovdb read server
 */

#include "config.h"
#include "clibrary.h"
#include "portable/mmap.h"
#include "portable/time.h"
#include "portable/setproctitle.h"
#include "portable/socket.h"
#include "portable/wait.h"
#include <errno.h>
#include <fcntl.h>
#include <signal.h>
#ifdef HAVE_SYS_SELECT_H
# include <sys/select.h>
#endif
#include <syslog.h>

#ifdef HAVE_UNIX_DOMAIN_SOCKETS
# include <sys/un.h>
#endif

#include "inn/innconf.h"
#include "inn/messages.h"
#include "libinn.h"
#include "paths.h"
#include "storage.h"
#include "ov.h"

#include "../storage/ovdb/ovdb.h"
#include "../storage/ovdb/ovdb-private.h"

#ifndef USE_BERKELEY_DB

int
main(int argc UNUSED, char **argv UNUSED)
{
    die("BerkeleyDB support not compiled");
}

#else /* USE_BERKELEY_DB */


#define SELECT_TIMEOUT 15


/* This will work unless user sets a larger clienttimeout
   in readers.conf */
#define CLIENT_TIMEOUT (innconf->clienttimeout + 60)
/*#define CLIENT_TIMEOUT 3600*/


static int listensock;

#define MODE_READ   0
#define MODE_WRITE  1
#define MODE_CLOSED 2
#define STATE_READCMD 0
#define STATE_READGROUP 1
struct reader {
    int fd;
    int mode;
    int state;
    int buflen;
    int bufpos;
    void *buf;
    time_t lastactive;
    void *currentsearch;
};

static struct reader *readertab;
static int readertablen;
static int numreaders;
static time_t now;
static pid_t parent;

struct child {
    pid_t pid;
    int num;
    time_t started;
};
static struct child *children;
#define wholistens (children[ovdb_conf.numrsprocs].num)

static int signalled = 0;
static void
sigfunc(int sig UNUSED)
{
    signalled = 1;
}

static int updated = 0;
static void
childsig(int sig UNUSED)
{
    updated = 1;
}

static void
parentsig(int sig UNUSED)
{
    int i, which, smallest;
    if(wholistens < 0) {
	which = smallest = -1;
	for(i = 0; i < ovdb_conf.numrsprocs; i++) {
	    if(children[i].pid == -1)
		continue;
	    if(!ovdb_conf.maxrsconn || children[i].num <= ovdb_conf.maxrsconn) {
		if(smallest == -1 || children[i].num < smallest) {
		    smallest = children[i].num;
		    which = i;
		}
	    }
	}
	if(which != -1) {
	    wholistens = which;
	    kill(children[which].pid, SIGUSR1);
	} else {
	    wholistens = -2;
	}
	updated = 1;
    }
}

static int putpid(const char *path)
{
    char buf[30];
    int fd = open(path, O_WRONLY|O_TRUNC|O_CREAT, 0664);
    if(fd == -1) {
        syswarn("cannot open %s", path);
        return -1;
    }
    snprintf(buf, sizeof(buf), "%d\n", getpid());
    if(write(fd, buf, strlen(buf)) < 0) {
        syswarn("cannot write to %s", path);
        close(fd);
        return -1;
    }
    close(fd);
    return 0;
}

static void
do_groupstats(struct reader *r)
{
    struct rs_groupstats *reply;
    char *group = (char *)(r->buf) + sizeof(struct rs_cmd);
    reply = xmalloc(sizeof(struct rs_groupstats));

    /*syslog(LOG_DEBUG, "OVDB: rs: do_groupstats '%s'", group);*/
    if(ovdb_groupstats(group, &reply->lo, &reply->hi, &reply->count, &reply->flag)) {
	reply->status = CMD_GROUPSTATS;
    	reply->aliaslen = 0;
    } else {
	reply->status = CMD_GROUPSTATS | RPLY_ERROR;
    }
    free(r->buf);
    r->buf = reply;
    r->buflen = sizeof(struct rs_groupstats);
    r->bufpos = 0;
    r->mode = MODE_WRITE;
}

static void
do_opensrch(struct reader *r)
{
    struct rs_cmd *cmd = r->buf;
    struct rs_opensrch *reply;
    char *group = (char *)(r->buf) + sizeof(struct rs_cmd);
    reply = xmalloc(sizeof(struct rs_opensrch));

    /*syslog(LOG_DEBUG, "OVDB: rs: do_opensrch '%s' %d %d", group, cmd->artlo, cmd->arthi);*/

    if(r->currentsearch != NULL) {
	/* can only open one search at a time */
	reply->status = CMD_OPENSRCH | RPLY_ERROR;
    } else {
	reply->handle = ovdb_opensearch(group, cmd->artlo, cmd->arthi);
	if(reply->handle == NULL) {
	    reply->status = CMD_OPENSRCH | RPLY_ERROR;
	} else {
	    reply->status = CMD_OPENSRCH;
	}
	r->currentsearch = reply->handle;
    }
    free(r->buf);
    r->buf = reply;
    r->buflen = sizeof(struct rs_opensrch);
    r->bufpos = 0;
    r->mode = MODE_WRITE;
}

static void
do_srch(struct reader *r)
{
    struct rs_cmd *cmd = r->buf;
    struct rs_srch *reply;
    ARTNUM artnum;
    TOKEN token;
    time_t arrived;
    int len;
    char *data;

    if(ovdb_search(cmd->handle, &artnum, &data, &len, &token, &arrived)) {
    	reply = xmalloc(sizeof(struct rs_srch) + len);
	reply->status = CMD_SRCH;
	reply->artnum = artnum;
	reply->token = token;
	reply->arrived = arrived;
	reply->len = len;
	memcpy((char *)reply + sizeof(struct rs_srch), data, len);
	r->buflen = sizeof(struct rs_srch) + len;
    } else {
    	reply = xmalloc(sizeof(struct rs_srch));
	reply->status = CMD_SRCH | RPLY_ERROR;
	r->buflen = sizeof(struct rs_srch);
    }
    free(r->buf);
    r->buf = reply;
    r->bufpos = 0;
    r->mode = MODE_WRITE;
}

static void
do_closesrch(struct reader *r)
{
    struct rs_cmd *cmd = r->buf;

    ovdb_closesearch(cmd->handle);
    free(r->buf);
    r->buf = NULL;
    r->bufpos = r->buflen = 0;
    r->mode = MODE_READ;
    r->currentsearch = NULL;
}

static void
do_artinfo(struct reader *r)
{
    struct rs_cmd *cmd = r->buf;
    struct rs_artinfo *reply;
    char *group = (char *)(r->buf) + sizeof(struct rs_cmd);
    TOKEN token;

    /*syslog(LOG_DEBUG, "OVDB: rs: do_artinfo: '%s' %d", group, cmd->artlo);*/
    if(ovdb_getartinfo(group, cmd->artlo, &token)) {
    	reply = xmalloc(sizeof(struct rs_artinfo));
	reply->status = CMD_ARTINFO;
	reply->token = token;
	r->buflen = sizeof(struct rs_artinfo);
    } else {
    	reply = xmalloc(sizeof(struct rs_artinfo));
	reply->status = CMD_ARTINFO | RPLY_ERROR;
	r->buflen = sizeof(struct rs_artinfo);
    }
    free(r->buf);
    r->buf = reply;
    r->bufpos = 0;
    r->mode = MODE_WRITE;
}


static int
process_cmd(struct reader *r)
{
    struct rs_cmd *cmd = r->buf;

    if(r->state == STATE_READCMD) {
	switch(cmd->what) {
	case CMD_GROUPSTATS:
	case CMD_OPENSRCH:
	case CMD_ARTINFO:
	    r->state = STATE_READGROUP;
	    if(cmd->grouplen == 0) {
		/* shoudn't happen... */
		r->mode = MODE_CLOSED;
		close(r->fd);
		free(r->buf);
		r->buf = NULL;
		return 0;
	    }
	    r->buflen += cmd->grouplen;
            r->buf = xrealloc(r->buf, r->buflen);
	    return 1;
	}
    }

    switch(cmd->what) {
    case CMD_GROUPSTATS:
	((char *)r->buf)[r->buflen - 1] = 0;	/* make sure group is null-terminated */
	do_groupstats(r);
	break;
    case CMD_OPENSRCH:
	((char *)r->buf)[r->buflen - 1] = 0;
	do_opensrch(r);
	break;
    case CMD_SRCH:
	do_srch(r);
	break;
    case CMD_CLOSESRCH:
	do_closesrch(r);
	break;
    case CMD_ARTINFO:
	((char *)r->buf)[r->buflen - 1] = 0;
	do_artinfo(r);
	break;
    default:
	r->mode = MODE_CLOSED;
	close(r->fd);
	free(r->buf);
	r->buf = NULL;
	break;
    }

    return 0;
}

static void
handle_read(struct reader *r)
{
    int n;
    r->lastactive = now;

    if(r->buf == NULL) {
    	r->state = STATE_READCMD;
    	r->buf = xmalloc(sizeof(struct rs_cmd));
	r->buflen = sizeof(struct rs_cmd);
	r->bufpos = 0;
    }
again:
    n = read(r->fd, (char *)(r->buf) + r->bufpos, r->buflen - r->bufpos);
    if(n <= 0) {
	if(n < 0 && (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK))
	    return;
    	r->mode = MODE_CLOSED;
	close(r->fd);
	free(r->buf);
	r->buf = NULL;
    }
    r->bufpos += n;

    if(r->bufpos >= r->buflen)
	if(process_cmd(r))
	    goto again;
}

static void
handle_write(struct reader *r)
{
    int n;
    r->lastactive = now;

    if(r->buf == NULL)	/* shouldn't happen */
	return;

    n = write(r->fd, (char *)(r->buf) + r->bufpos, r->buflen - r->bufpos);
    if(n <= 0) {
	if(n < 0 && (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK))
	    return;
    	r->mode = MODE_CLOSED;
	close(r->fd);
	free(r->buf);
	r->buf = NULL;
    }
    r->bufpos += n;

    if(r->bufpos >= r->buflen) {
	free(r->buf);
    	r->buf = NULL;
	r->bufpos = r->buflen = 0;
	r->mode = MODE_READ;
    }
}

static void
newclient(int fd)
{
    struct reader *r;
    int i;

    nonblocking(fd, 1);

    if(numreaders >= readertablen) {
    	readertablen += 50;
        readertab = xrealloc(readertab, readertablen * sizeof(struct reader));
        for(i = numreaders; i < readertablen; i++) {
	    readertab[i].mode = MODE_CLOSED;
	    readertab[i].buf = NULL;
	}
    }

    r = &(readertab[numreaders]);
    numreaders++;

    r->fd = fd;
    r->mode = MODE_WRITE;
    r->buflen = sizeof(OVDB_SERVER_BANNER);
    r->bufpos = 0;
    r->buf = xstrdup(OVDB_SERVER_BANNER);
    r->lastactive = now;
    r->currentsearch = NULL;

    handle_write(r);
}

static void
delclient(int which)
{
    int i;
    struct reader *r = &(readertab[which]);

    if(r->mode != MODE_CLOSED)
    	close(r->fd);

    if(r->buf != NULL) {
    	free(r->buf);
    }
    if(r->currentsearch != NULL) {
	ovdb_closesearch(r->currentsearch);
	r->currentsearch = NULL;
    }

    /* numreaders will get decremented by the calling function */
    for(i = which; i < numreaders-1; i++)
    	readertab[i] = readertab[i+1];

    readertab[i].mode = MODE_CLOSED;
    readertab[i].buf = NULL;
}

static pid_t
serverproc(int me)
{
    fd_set rdset, wrset;
    int i, ret, count, lastfd, lastnumreaders;
    socklen_t salen;
    struct sockaddr_in sa;
    struct timeval tv;
    pid_t pid;

    pid = fork();
    if (pid != 0)
	return pid;

    if (!ovdb_open(OV_READ|OVDB_SERVER))
        die("cannot open overview");
    xsignal_norestart(SIGINT, sigfunc);
    xsignal_norestart(SIGTERM, sigfunc);
    xsignal_norestart(SIGHUP, sigfunc);
    xsignal_norestart(SIGUSR1, childsig);
    xsignal(SIGPIPE, SIG_IGN);

    numreaders = lastnumreaders = 0;
    if(ovdb_conf.maxrsconn) {
	readertablen = ovdb_conf.maxrsconn;
    } else {
    	readertablen = 50;
    }
    readertab = xmalloc(readertablen * sizeof(struct reader));
    for(i = 0; i < readertablen; i++) {
	readertab[i].mode = MODE_CLOSED;
	readertab[i].buf = NULL;
    }

    setproctitle("0 clients");

    /* main loop */
    while(!signalled) {
	FD_ZERO(&rdset);
	FD_ZERO(&wrset);
	lastfd = 0;
	if(wholistens == me) {
	    if(!ovdb_conf.maxrsconn || numreaders < ovdb_conf.maxrsconn) {
		FD_SET(listensock, &rdset);
		lastfd = listensock;
                setproctitle("%d client%s *", numreaders,
                             numreaders == 1 ? "" : "s");
	    } else {
		wholistens = -1;
		kill(parent, SIGUSR1);
	    }
        }

	for(i = 0; i < numreaders; i++) {
	    switch(readertab[i].mode) {
	    case MODE_READ:
	    	FD_SET(readertab[i].fd, &rdset);
		break;
	    case MODE_WRITE:
	    	FD_SET(readertab[i].fd, &wrset);
		break;
	    default:
	    	continue;
	    }
	    if(readertab[i].fd > lastfd)
	    	lastfd = readertab[i].fd;
	}
	tv.tv_usec = 0;
	tv.tv_sec = SELECT_TIMEOUT;
	count = select(lastfd + 1, &rdset, &wrset, NULL, &tv);

	if(signalled)
	    break;
	if(count <= 0)
	    continue;

	now = time(NULL);

	if(FD_ISSET(listensock, &rdset)) {
	    if(!ovdb_conf.maxrsconn || numreaders < ovdb_conf.maxrsconn) {
		salen = sizeof(sa);
	    	ret = accept(listensock, (struct sockaddr *)&sa, &salen);
		if(ret >= 0) {
		    newclient(ret);
		    wholistens = -1;
		    children[me].num = numreaders;
		    kill(parent, SIGUSR1);
		}
	    }
	}

	for(i = 0; i < numreaders; i++) {
	    switch(readertab[i].mode) {
	    case MODE_READ:
	    	if(FD_ISSET(readertab[i].fd, &rdset))
		    handle_read(&(readertab[i]));
	        break;
	    case MODE_WRITE:
	    	if(FD_ISSET(readertab[i].fd, &wrset))
		    handle_write(&(readertab[i]));
		break;
	    }
	}

	for(i = 0; i < numreaders; i++) {
	    if(readertab[i].mode == MODE_CLOSED
		  || readertab[i].lastactive + CLIENT_TIMEOUT < now) {
	    	delclient(i);
		numreaders--;
		i--;
	    }
	}
	if(children[me].num != numreaders) {
	    children[me].num = numreaders;
	    kill(parent, SIGUSR1);
        }
	if(numreaders != lastnumreaders) {
	    lastnumreaders = numreaders;
            setproctitle("%d client%s", numreaders,
                         numreaders == 1 ? "" : "s");
	}
    }

    ovdb_close();
    exit(0);
}

static int
reap(void)
{
    int i, cs;
    pid_t c;

    while((c = waitpid(-1, &cs, WNOHANG)) > 0) {
	for(i = 0; i < ovdb_conf.numrsprocs; i++) {
	    if(c == children[i].pid) {
		if(children[i].started + 30 > time(NULL))
		    return 1;

		children[i].num = 0;

		if(wholistens == i)
		    wholistens = -1;

		if((children[i].pid = serverproc(i)) == -1)
		    return 1;

		children[i].started = time(NULL);
		break;
	    }
	}
    }
    if(wholistens == -1)
	parentsig(SIGUSR1);
    return 0;
}

#ifndef MAP_ANON
#ifdef MAP_ANONYMOUS
#define MAP_ANON MAP_ANONYMOUS
#endif
#endif

static void *
sharemem(size_t len)
{
#ifdef MAP_ANON
    return mmap(0, len, PROT_READ|PROT_WRITE, MAP_ANON|MAP_SHARED, -1, 0);
#else
    int fd = open("/dev/zero", O_RDWR, 0);
    char *ptr = mmap(0, len, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
    close(fd);
    return ptr;
#endif
}

int
main(int argc, char *argv[])
{
    int i, ret;
    socklen_t salen;
    char *path, *pidfile;
#ifdef HAVE_UNIX_DOMAIN_SOCKETS
    struct sockaddr_un sa;
#else
    struct sockaddr_in sa;
#endif
    struct timeval tv;
    fd_set rdset;

    setproctitle_init(argc, argv);

    openlog("ovdb_server", L_OPENLOG_FLAGS | LOG_PID, LOG_INN_PROG);
    message_program_name = "ovdb_server";

    if(argc != 2 || strcmp(argv[1], SPACES))
        die("should be started by ovdb_init");
    message_handlers_warn(1, message_log_syslog_err);
    message_handlers_die(1, message_log_syslog_err);

    if (!innconf_read(NULL))
	exit(1);

    if(strcmp(innconf->ovmethod, "ovdb"))
        die("ovmethod not set to ovdb in inn.conf");

    read_ovdb_conf();

#ifdef HAVE_UNIX_DOMAIN_SOCKETS
    listensock = socket(AF_UNIX, SOCK_STREAM, 0);
#else
    listensock = socket(AF_INET, SOCK_STREAM, 0);
#endif
    if(listensock < 0)
        sysdie("cannot create socket");

    nonblocking(listensock, 1);

#ifdef HAVE_UNIX_DOMAIN_SOCKETS
    sa.sun_family = AF_UNIX;
    path = concatpath(innconf->pathrun, OVDB_SERVER_SOCKET);
    strlcpy(sa.sun_path, path, sizeof(sa.sun_path));
    unlink(sa.sun_path);
    free(path);
    ret = bind(listensock, (struct sockaddr *)&sa, sizeof sa);
#else
    sa.sin_family = AF_INET;
    sa.sin_port = htons(OVDB_SERVER_PORT);
    sa.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
    
    ret = bind(listensock, (struct sockaddr *)&sa, sizeof sa);

    if(ret != 0 && errno == EADDRNOTAVAIL) {
	sa.sin_family = AF_INET;
	sa.sin_port = htons(OVDB_SERVER_PORT);
	sa.sin_addr.s_addr = INADDR_ANY;
	ret = bind(listensock, (struct sockaddr *)&sa, sizeof sa);
    }
#endif

    if(ret != 0)
        sysdie("cannot bind socket");
    if(listen(listensock, MAXLISTEN) < 0)
        sysdie("cannot listen on socket");

    pidfile = concatpath(innconf->pathrun, OVDB_SERVER_PIDFILE);
    if(putpid(pidfile))
        exit(1);

    xsignal_norestart(SIGINT, sigfunc);
    xsignal_norestart(SIGTERM, sigfunc);
    xsignal_norestart(SIGHUP, sigfunc);

    xsignal_norestart(SIGUSR1, parentsig);
    xsignal_norestart(SIGCHLD, childsig);
    parent = getpid();

    children = sharemem(sizeof(struct child) * (ovdb_conf.numrsprocs+1));

    if(children == NULL)
        sysdie("cannot mmap shared memory");
    for(i = 0; i < ovdb_conf.numrsprocs+1; i++) {
	children[i].pid = -1;
	children[i].num = 0;
    }

    for(i = 0; i < ovdb_conf.numrsprocs; i++) {
	if((children[i].pid = serverproc(i)) == -1) {
	    for(i--; i >= 0; i--)
		kill(children[i].pid, SIGTERM);
	    exit(1);
	}
	children[i].started = time(NULL);
	sleep(1);
    }

    while(!signalled) {
	if(reap())
	    break;

	if(wholistens == -2) {
	    FD_ZERO(&rdset);
	    FD_SET(listensock, &rdset);
	    tv.tv_usec = 0;
	    tv.tv_sec = SELECT_TIMEOUT;
	    ret = select(listensock+1, &rdset, NULL, NULL, &tv);

	    if(ret == 1 && wholistens == -2) {
		salen = sizeof(sa);
		ret = accept(listensock, (struct sockaddr *)&sa, &salen);
		if(ret >= 0)
		   close(ret);
	    }
	} else {
	    pause();
	}
    }

    for(i = 0; i < ovdb_conf.numrsprocs; i++)
	if(children[i].pid != -1)
	    kill(children[i].pid, SIGTERM);

    while(wait(&ret) > 0)
	;

    unlink(pidfile);

    exit(0);
}


#endif /* USE_BERKELEY_DB */


syntax highlighted by Code2HTML, v. 0.9.1