/*  $Id: chan.c 6720 2004-05-16 20:54:25Z rra $
**
**  I/O channel (and buffer) processing.
*/

#include "config.h"
#include "clibrary.h"

/* Needed on AIX 4.1 to get fd_set and friends. */
#ifdef HAVE_SYS_SELECT_H
# include <sys/select.h>
#endif

#include "inn/innconf.h"
#include "innd.h"

/* These errno values don't exist on all systems, but may be returned as an
   (ignorable) error to setting the accept socket nonblocking.  Define them
   to 0 if they don't exist so that we can unconditionally compare errno to
   them in the code. */
#ifndef ENOTSOCK
# define ENOTSOCK 0
#endif
#ifndef ENOTTY
# define ENOTTY 0
#endif

static const char * const timer_name[] = {
    "idle", "artclean", "artwrite", "artcncl", "sitesend", "overv",
    "perl", "python", "nntpread", "artparse", "artlog", "datamove"
};

/* Minutes - basically, keep the connection open but idle */
#define PAUSE_BEFORE_DROP               5

/* Divisor of the BUFFER size. If the amount free at the beginning of the
   buffer is bigger than the quotient, then it is compacted in the
   readloop */
#define COMP_THRESHOLD 10

static fd_set	RCHANmask;
static fd_set	SCHANmask;
static fd_set	WCHANmask;
static int	SCHANcount;
static int	CHANlastfd;
static int	CHANlastsleepfd;
static int	CHANccfd;
static int	CHANtablesize;
static CHANNEL	*CHANtable;
static CHANNEL	*CHANcc;
static CHANNEL	CHANnull = { CTfree, CSerror, -1 };

#define PRIORITISE_REMCONN
#ifdef PRIORITISE_REMCONN
static int	*CHANrcfd;
static CHANNEL	**CHANrc;
static int	chanlimit;
#endif /* PRIORITISE_REMCONN */

/*
** Tear down our world
*/
void
CHANshutdown(void)
{
  CHANNEL	        *cp;
  int			i;

  if (CHANtable) {
    for (i = CHANtablesize, cp = &CHANtable[0]; --i >= 0; cp++) {
      if (cp->In.data) {
	free(cp->In.data);
      }
      if (cp->Out.data) {
	free(cp->Out.data);
      }
    }
    free(CHANtable);
    CHANtable = NULL;
  }
}

/*
**  Initialize all the I/O channels.
*/
void
CHANsetup(int i)
{
    CHANNEL	        *cp;

    FD_ZERO(&RCHANmask);
    FD_ZERO(&SCHANmask);
    FD_ZERO(&WCHANmask);
    CHANshutdown();
    CHANtablesize = i;
    CHANtable = xcalloc(CHANtablesize, sizeof(CHANNEL));
    CHANnull.NextLog = innconf->chaninacttime;
    memset(&CHANnull.Address, 0, sizeof(CHANnull.Address));
    for (cp = CHANtable; --i >= 0; cp++)
	*cp = CHANnull;
}


/*
**  Create a channel from a descriptor.
*/
CHANNEL *
CHANcreate(int fd, CHANNELTYPE Type, CHANNELSTATE State,
           innd_callback_t Reader, innd_callback_t WriteDone)
{
    CHANNEL	        *cp;
    struct buffer	in  = { 0, 0, 0, NULL };
    struct buffer	out = { 0, 0, 0, NULL };

    cp = &CHANtable[fd];

    /* Don't overwrite the buffers with CHANnull. */
    in = cp->In;
    buffer_resize(&in, START_BUFF_SIZE);
    in.used = 0;
    in.left = in.size;
    out = cp->Out;
    buffer_resize(&out, SMBUF);
    buffer_set(&out, "", 0);

    /* Set up the channel's info. */
    *cp = CHANnull;
    cp->fd = fd;
    cp->Type = Type;
    cp->State = State;
    cp->Streaming = false;
    cp->Skip = false;
    cp->NoResendId = false;
    cp->privileged = false;
    cp->Ihave = cp->Ihave_Duplicate = cp->Ihave_Deferred = cp->Ihave_SendIt = cp->Ihave_Cybercan = 0;
    cp->Check = cp->Check_send = cp->Check_deferred = cp->Check_got = cp->Check_cybercan = 0;
    cp->Takethis = cp->Takethis_Ok = cp->Takethis_Err = 0;
    cp->Size = cp->Duplicate = 0;
    cp->Unwanted_s = cp->Unwanted_f = cp->Unwanted_d = 0;
    cp->Unwanted_g = cp->Unwanted_u = cp->Unwanted_o = 0;
    cp->Reader = Reader;
    cp->WriteDone = WriteDone;
    cp->Started = cp->LastActive = Now.time;
    cp->In = in;
    cp->Out = out;
    cp->Tracing = Tracing;
    cp->Sendid.size = 0;
    cp->Next=0;
    cp->MaxCnx=0;
    cp->ActiveCnx=0;
    cp->ArtBeg = 0;
    cp->ArtMax = 0;
    cp->Start = 0;
    HashClear(&cp->CurrentMessageIDHash);
    memset(cp->PrecommitWIP, '\0', sizeof(cp->PrecommitWIP));
    cp->PrecommitiCachenext=0;
    ARTprepare(cp);

    close_on_exec(fd, true);

#ifndef _HPUX_SOURCE
    /* Stupid HPUX 11.00 has a broken listen/accept where setting the listen
       socket to nonblocking prevents you from successfully setting the
       socket returned by accept(2) back to blocking mode, no matter what,
       resulting in all kinds of funny behaviour, data loss, etc. etc.  */
    if (nonblocking(fd, true) < 0 && errno != ENOTSOCK && errno != ENOTTY)
	syslog(L_ERROR, "%s cant nonblock %d %m", LogName, fd);
#endif

    /* Note control channel, for efficiency. */
    if (Type == CTcontrol) {
	CHANcc = cp;
	CHANccfd = fd;
    }
#ifdef PRIORITISE_REMCONN
    /* Note remconn channel, for efficiency */
    if (Type == CTremconn) {
	int j;
	for (j = 0 ; j < chanlimit ; j++ ) {
	    if (CHANrcfd[j] == -1) {
		break;
	    }
	}
	if (j < chanlimit) {
	    CHANrc[j] = cp;
	    CHANrcfd[j] = fd;
	} else if (chanlimit == 0) {
	    /* assuming two file descriptors(AF_INET and AF_INET6) */
	    chanlimit = 2;
	    CHANrc = xmalloc(chanlimit * sizeof(CHANNEL **));
	    CHANrcfd = xmalloc(chanlimit * sizeof(int *));
	    for (j = 0 ; j < chanlimit ; j++ ) {
		CHANrc[j] = NULL;
		CHANrcfd[j] = -1;
	    }
	    CHANrc[0] = cp;
	    CHANrcfd[0] = fd;
	} else {
	    /* extend to double size */
            CHANrc = xrealloc(CHANrc, chanlimit * 2 * sizeof(CHANNEL **));
            CHANrcfd = xrealloc(CHANrcfd, chanlimit * 2 * sizeof(int *));
	    for (j = chanlimit ; j < chanlimit * 2 ; j++ ) {
		CHANrc[j] = NULL;
		CHANrcfd[j] = -1;
	    }
	    CHANrc[chanlimit] = cp;
	    CHANrcfd[chanlimit] = fd;
	    chanlimit *= 2;
	}
    }
#endif /* PRIORITISE_REMCONN */
    return cp;
}


/*
**  Start tracing a channel.
*/
void
CHANtracing(CHANNEL *cp, bool Flag)
{
    char		*p;

    p = CHANname(cp);
    syslog(L_NOTICE, "%s trace %s", p, Flag ? "on" : "off");
    cp->Tracing = Flag;
    if (Flag) {
	syslog(L_NOTICE, "%s trace badwrites %d blockwrites %d badreads %d",
	    p, cp->BadWrites, cp->BlockedWrites, cp->BadReads);
	syslog(L_NOTICE, "%s trace address %s lastactive %ld nextlog %ld",
	    p, sprint_sockaddr((struct sockaddr *)&cp->Address),
	    (long) cp->LastActive, (long) cp->NextLog);
	if (FD_ISSET(cp->fd, &SCHANmask))
	    syslog(L_NOTICE, "%s trace sleeping %ld 0x%p",
		p, (long)cp->Waketime, (void *)cp->Waker);
	if (FD_ISSET(cp->fd, &RCHANmask))
	    syslog(L_NOTICE, "%s trace reading %lu %s",
		p, (unsigned long) cp->In.used,
		MaxLength(cp->In.data, cp->In.data));
	if (FD_ISSET(cp->fd, &WCHANmask))
	    syslog(L_NOTICE, "%s trace writing %lu %s",
		p, (unsigned long) cp->Out.left,
		MaxLength(cp->Out.data, cp->Out.data));
    }
}


/*
**  Close a channel.
*/
void
CHANclose(CHANNEL *cp, const char *name)
{
    char	*label, *tmplabel, buff[SMBUF];

    if (cp->Type == CTfree)
	syslog(L_ERROR, "%s internal closing free channel %d", name, cp->fd);
    else {
	if (cp->Type == CTnntp) {
	    WIPprecomfree(cp);
	    NCclearwip(cp);
            if (cp->State == CScancel)
                syslog(L_NOTICE,
               "%s closed seconds %ld cancels %ld",
               name, (long)(Now.time - cp->Started),
               cp->Received);
            else {
	    snprintf(buff, sizeof(buff),
                     "accepted size %.0f duplicate size %.0f", cp->Size,
                     cp->DuplicateSize);
	    syslog(L_NOTICE,
		"%s closed seconds %ld accepted %ld refused %ld rejected %ld duplicate %ld %s",
		name, (long)(Now.time - cp->Started),
		cp->Received, cp->Refused, cp->Rejected,
		cp->Duplicate, buff);
	    }
	    if (cp->Data.Newsgroups.Data != NULL) {
		free(cp->Data.Newsgroups.Data);
		cp->Data.Newsgroups.Data = NULL;
	    }
	    if (cp->Data.Newsgroups.List != NULL) {
		free(cp->Data.Newsgroups.List);
		cp->Data.Newsgroups.List = NULL;
	    }
	    if (cp->Data.Distribution.Data != NULL) {
		free(cp->Data.Distribution.Data);
		cp->Data.Distribution.Data = NULL;
	    }
	    if (cp->Data.Distribution.List != NULL) {
		free(cp->Data.Distribution.List);
		cp->Data.Distribution.List = NULL;
	    }
	    if (cp->Data.Path.Data != NULL) {
		free(cp->Data.Path.Data);
		cp->Data.Path.Data = NULL;
	    }
	    if (cp->Data.Path.List != NULL) {
		free(cp->Data.Path.List);
		cp->Data.Path.List = NULL;
	    }
	    if (cp->Data.Overview.size != 0) {
		free(cp->Data.Overview.data);
		cp->Data.Overview.data = NULL;
		cp->Data.Overview.size = 0;
                cp->Data.Overview.left = 0;
                cp->Data.Overview.used = 0;
	    }
	    if (cp->Data.XrefBufLength != 0) {
		free(cp->Data.Xref);
		cp->Data.Xref = NULL;
		cp->Data.XrefBufLength = 0;
	    }
	} else if (cp->Type == CTreject)
	    syslog(L_NOTICE, "%s %ld", name, cp->Rejected);
	else if (cp->Out.left)
	    syslog(L_NOTICE, "%s closed lost %lu", name,
                   (unsigned long) cp->Out.left);
	else
	    syslog(L_NOTICE, "%s closed", name);
	WCHANremove(cp);
	RCHANremove(cp);
	SCHANremove(cp);
	if (cp->Argument != NULL)
	    /* Set to NULL below. */
	    free(cp->Argument);
	if (cp->fd >= 0 && close(cp->fd) < 0)
	    syslog(L_ERROR, "%s cant close %s %m", LogName, name);
 
	if (cp->MaxCnx > 0 && cp->Type == CTnntp) {
	    int tfd;
	    CHANNEL *tempchan;

	    cp->fd = -1;
	    if ((label = RClabelname(cp)) != NULL) {
		for(tfd = 0; tfd <= CHANlastfd; tfd++) {
		    tempchan = &CHANtable[tfd];
		    if(tempchan->fd > 0 && tempchan->Type == CTnntp &&
			((tmplabel = RClabelname(tempchan)) != NULL) &&
			strcmp(label, tmplabel) == 0 &&
			tempchan->ActiveCnx == 0) {
			    tempchan->ActiveCnx = cp->ActiveCnx;
			    RCHANadd(tempchan);
			    break;
		    }
		}
	    }
	}
    }

    /* Mark it unused. */
    cp->Type = CTfree;
    cp->State = CSerror;
    cp->fd = -1;
    cp->Argument = NULL;
    cp->ActiveCnx = 0;

    /* Free the buffers if they got big. */
    if (cp->In.size > BIG_BUFFER) {
	cp->In.size = 0;
        cp->In.used = 0;
        cp->In.left = 0;
	free(cp->In.data);
	cp->In.data = NULL;
    }
    if (cp->Out.size > BIG_BUFFER) {
	cp->Out.size = 0;
        cp->Out.used = 0;
        cp->Out.left = 0;
	free(cp->Out.data);
	cp->Out.data = NULL;
    }
    if (cp->Sendid.size > 0) {
	cp->Sendid.size = 0;
	cp->Sendid.used = 0;
	cp->Sendid.left = 0;
	free(cp->Sendid.data);
        cp->Sendid.data = NULL;
    }
}


/*
**  Return a printable name for the channel.
*/
char *
CHANname(const CHANNEL *cp)
{
    static char		buff[SMBUF];
    int	                i;
    SITE *	        sp;
    const char *	p;
    pid_t		pid;

    switch (cp->Type) {
    default:
	snprintf(buff, sizeof(buff), "?%d(#%d@%ld)?", cp->Type, cp->fd,
                 (long) (cp - CHANtable));
	break;
    case CTany:
	snprintf(buff, sizeof(buff), "any:%d", cp->fd);
	break;
    case CTfree:
	snprintf(buff, sizeof(buff), "free:%d", cp->fd);
	break;
    case CTremconn:
	snprintf(buff, sizeof(buff), "remconn:%d", cp->fd);
	break;
    case CTreject:
	snprintf(buff, sizeof(buff), "%s rejected", RChostname(cp));
	break;
    case CTnntp:
	snprintf(buff, sizeof(buff), "%s:%d",
                 cp->Address.ss_family == 0 ? "localhost" : RChostname(cp),
                 cp->fd);
	break;
    case CTlocalconn:
	snprintf(buff, sizeof(buff), "localconn:%d", cp->fd);
	break;
    case CTcontrol:
	snprintf(buff, sizeof(buff), "control:%d", cp->fd);
	break;
    case CTexploder:
    case CTfile:
    case CTprocess:
	/* Find the site that has this channel. */
	for (p = "?", i = nSites, sp = Sites, pid = 0; --i >= 0; sp++)
	    if (sp->Channel == cp) {
		p = sp->Name;
		if (cp->Type != CTfile)
		    pid = sp->pid;
		break;
	    }
	if (pid == 0)
	    snprintf(buff, sizeof(buff), "%s:%d:%s",
                     MaxLength(p, p), cp->fd,
                     cp->Type == CTfile ? "file" : "proc");
	else
	    snprintf(buff, sizeof(buff), "%s:%d:%s:%ld",
                     MaxLength(p, p), cp->fd,
                     cp->Type == CTfile ? "file" : "proc", (long)pid);
	break;
    }
    return buff;
}


/*
**  Return the channel for a specified descriptor.
*/
CHANNEL *
CHANfromdescriptor(int fd)
{
    if (fd <0 || fd > CHANtablesize)
	return NULL;
    return &CHANtable[fd];
}


/*
**  Iterate over all channels of a specified type.
*/
CHANNEL *
CHANiter(int *ip, CHANNELTYPE Type)
{
    CHANNEL	        *cp;
    int	                i;

    if ((i = *ip) >= 0 && i < CHANtablesize) {
	do {
	    cp = &CHANtable[i];
	    if (cp->Type == CTfree && cp->fd == -1)
		continue;
	    if (Type == CTany || cp->Type == Type) {
		*ip = ++i;
		return cp;
	    }
	} while (++i < CHANtablesize);
    }
    return NULL;
}


/*
**  Mark a channel as an active reader.
*/
void
RCHANadd(CHANNEL *cp)
{
    FD_SET(cp->fd, &RCHANmask);
    if (cp->fd > CHANlastfd)
	CHANlastfd = cp->fd;

    if (cp->Type != CTnntp)
	/* Start reading at the beginning of the buffer. */
	cp->In.used = 0;
}


/*
**  Remove a channel from the set of readers.
*/
void
RCHANremove(CHANNEL *cp)
{
    if (FD_ISSET(cp->fd, &RCHANmask)) {
	FD_CLR(cp->fd, &RCHANmask);
	if (cp->fd == CHANlastfd) {
	    /* This was the highest descriptor, get a new highest. */
	    while (!FD_ISSET(CHANlastfd, &RCHANmask)
	      && !FD_ISSET(CHANlastfd, &WCHANmask)
	      && CHANlastfd > 1)
		CHANlastfd--;
	}
    }
}


/*
**  Put a channel to sleep, call a function when it wakes.
**  Note that the Argument must be NULL or allocated memory!
*/
void
SCHANadd(CHANNEL *cp, time_t Waketime, void *Event, innd_callback_t Waker,
         void *Argument)
{
    if (!FD_ISSET(cp->fd, &SCHANmask)) {
	SCHANcount++;
	FD_SET(cp->fd, &SCHANmask);
    }
    if (cp->fd > CHANlastsleepfd)
	CHANlastsleepfd = cp->fd;
    cp->Waketime = Waketime;
    cp->Waker = Waker;
    if (cp->Argument != Argument) {
	free(cp->Argument);
	cp->Argument = Argument;
    }
    cp->Event = Event;
}


/*
**  Take a channel off the sleep list.
*/
void
SCHANremove(CHANNEL *cp)
{
    if (FD_ISSET(cp->fd, &SCHANmask)) {
	FD_CLR(cp->fd, &SCHANmask);
	SCHANcount--;
	cp->Waketime = 0;
	if (cp->fd == CHANlastsleepfd) {
	    /* This was the highest descriptor, get a new highest. */
	    while (!FD_ISSET(CHANlastsleepfd, &SCHANmask)
	      && CHANlastsleepfd > 1)
		CHANlastsleepfd--;
	}
    }
}


/*
**  Is a channel on the sleep list?
*/
bool
CHANsleeping(CHANNEL *cp)
{
    return FD_ISSET(cp->fd, &SCHANmask);
}


/*
**  Wake up channels waiting for a specific event.
*/
void
SCHANwakeup(void *Event)
{
    CHANNEL	        *cp;
    int	                i;

    for (cp = CHANtable, i = CHANtablesize; --i >= 0; cp++)
	if (cp->Type != CTfree && cp->Event == Event && CHANsleeping(cp))
	    cp->Waketime = 0;
}


/*
**  Mark a channel as an active writer.  Don't reset the Out->left field
**  since we could have buffered I/O already in there.
*/
void
WCHANadd(CHANNEL *cp)
{
    if (cp->Out.left > 0) {
	FD_SET(cp->fd, &WCHANmask);
	if (cp->fd > CHANlastfd)
	    CHANlastfd = cp->fd;
    }
}


/*
**  Remove a channel from the set of writers.
*/
void
WCHANremove(CHANNEL *cp)
{
    if (FD_ISSET(cp->fd, &WCHANmask)) {
	FD_CLR(cp->fd, &WCHANmask);
	if (cp->Out.left <= 0) {
	    /* No data left -- reset used so we don't grow the buffer. */
	    cp->Out.used = 0;
	    cp->Out.left = 0;
	}
	if (cp->fd == CHANlastfd) {
	    /* This was the highest descriptor, get a new highest. */
	    while (!FD_ISSET(CHANlastfd, &RCHANmask)
	      && !FD_ISSET(CHANlastfd, &WCHANmask)
	      && CHANlastfd > 1)
		CHANlastfd--;
	}
    }
}


/*
**  Set a channel to start off with the contents of an existing channel.
*/
void
WCHANsetfrombuffer(CHANNEL *cp, struct buffer *bp)
{
    WCHANset(cp, &bp->data[bp->used], bp->left);
}



/*
**  Read in text data, return the amount we read.
*/
int
CHANreadtext(CHANNEL *cp)
{
    ptrdiff_t           i, j;
    struct buffer       *bp;
    char		*p;
    int			oerrno;
    int			maxbyte;
    HDRCONTENT		*hc = cp->Data.HdrContent;

    /* Grow buffer if we're getting close to current limit.  FIXME: The In
       buffer doesn't use the normal meanings of .used and .left.  */
    bp = &cp->In;
    bp->left = bp->size - bp->used;
    if (bp->left <= LOW_WATER) {
	i = GROW_AMOUNT(bp->size);
	bp->size += i;
	bp->left += i;
	p = bp->data;
	TMRstart(TMR_DATAMOVE);
        bp->data = xrealloc(bp->data, bp->size);

	/* Adjust offets of realloc moved the location of the memory region.
           FIXME: This is invalid C, although it will work on most (all?)
           common systems.  The pointers need to be reduced to offets and then
           turned back into relative pointers rather than adjusting the
           pointers directly, since as soon as realloc is called, pointers
           into the old space become invalid and may not be used further. */
	if ((i = p - bp->data) != 0) {
	    if (cp->State == CSgetheader || cp->State == CSgetbody ||
		cp->State == CSeatarticle) {
		/* adjust offset only in CSgetheader, CSgetbody or
		   CSeatarticle */
		if (cp->Data.BytesHeader != NULL)
		  cp->Data.BytesHeader -= i;
		for (j = 0 ; j < MAX_ARTHEADER ; j++, hc++) {
		    if (hc->Value != NULL)
			hc->Value -= i;
		}
	    }
	}
	TMRstop(TMR_DATAMOVE);
    }

    /* Read in whatever is there, up to some reasonable limit.

       We want to limit the amount of time devoted to processing the incoming
       data for any given channel.  There's no easy way of doing that, though,
       so we restrict the data size instead.

       If the data is part of a single large article, then reading and
       processing many kilobytes at a time costs very little.  If the data is
       a long list of CHECK commands from a streaming feed, then every line of
       data will require a history lookup, and we probably don't want to do
       more than about 10 of those per channel on each cycle of the main
       select() loop (otherwise we might take too long before giving other
       channels a turn).  10 lines of CHECK commands suggests a limit of about
       1KB of data, or less.  innconf->maxcmdreadsize (BUFSIZ by default) is
       often about 1KB, and is attractive for other reasons, so let's use that
       as our size limit.  If innconf->maxcmdreadsize is 0, there is no limit.

       Reduce the read size only if we're reading commands.

       FIXME: A better approach would be to limit the number of commands we
       process for each channel. */
    if (innconf->maxcmdreadsize <= 0 || cp->State != CSgetcmd
        || bp->left < innconf->maxcmdreadsize)
        maxbyte = bp->left;
    else
        maxbyte = innconf->maxcmdreadsize;
    TMRstart(TMR_NNTPREAD);
    i = read(cp->fd, &bp->data[bp->used], maxbyte);
    TMRstop(TMR_NNTPREAD);
    if (i < 0) {
        /* Solaris (at least 2.4 through 2.6) will occasionally return
           EAGAIN in response to a read even if the file descriptor already
           selected true for reading, apparently due to some internal
           resource exhaustion.  In that case, return -2, which will drop
           back out to the main loop and go on to the next file descriptor,
           as if the descriptor never selected true.  This check will
           probably never trigger on platforms other than Solaris. */
        if (errno == EAGAIN)
            return -2;
	oerrno = errno;
	p = CHANname(cp);
	errno = oerrno;
	sysnotice("%s cant read", p);
	return -1;
    }
    if (i == 0) {
	p = CHANname(cp);
	notice("%s readclose", p);
	return 0;
    }

    bp->used += i;
    bp->left -= i;
    return i;
}


/*
**  If I/O backs up a lot, we can get EMSGSIZE on some systems.  If that
**  happens we want to do the I/O in chunks.  We assume stdio's BUFSIZ is
**  a good chunk value.
*/
static int
CHANwrite(int fd, char *p, long length)
{
    int	i;
    char	*save;

    do {
	/* Try the standard case -- write it all. */
	i = write(fd, p, length);
	if (i > 0 || (i < 0 && errno != EMSGSIZE && errno != EINTR))
	    return i;
    } while (i < 0 && errno == EINTR);

    /* Write it in pieces. */
    for (save = p, i = 0; length; p += i, length -= i) {
	i = write(fd, p, (length > BUFSIZ ? BUFSIZ : length));
	if (i <= 0)
	    break;
    }

    /* Return error, or partial results if we got something. */
    return p == save ? i : p - save;
}


/*
**  Try to flush out the buffer.  Use this only on file channels!
*/
bool
WCHANflush(CHANNEL *cp)
{
    struct buffer       *bp;
    int	                i;

    /* Write it. */
    for (bp = &cp->Out; bp->left > 0; bp->left -= i, bp->used += i) {
	i = CHANwrite(cp->fd, &bp->data[bp->used], bp->left);
	if (i < 0) {
	    syslog(L_ERROR, "%s cant flush count %lu %m",
		CHANname(cp), (unsigned long) bp->left);
	    return false;
	}
	if (i == 0) {
	    syslog(L_ERROR, "%s cant flush count %lu",
		CHANname(cp), (unsigned long) bp->left);
	    return false;
	}
    }
    WCHANremove(cp);
    return true;
}



/*
**  Wakeup routine called after a write channel was put to sleep.
*/
static void
CHANwakeup(CHANNEL *cp)
{
    syslog(L_NOTICE, "%s wakeup", CHANname(cp));
    WCHANadd(cp);
}


/*
**  Attempting to write would block; stop output or give up.
*/
static void
CHANwritesleep(CHANNEL *cp, char *p)
{
    int			i;

    if ((i = ++(cp->BlockedWrites)) > innconf->badiocount)
	switch (cp->Type) {
	default:
	    break;
	case CTreject:
	case CTnntp:
	case CTfile:
	case CTexploder:
	case CTprocess:
	    syslog(L_ERROR, "%s blocked closing", p);
	    SITEchanclose(cp);
	    CHANclose(cp, p);
	    return;
	}
    i *= innconf->blockbackoff;
    syslog(L_ERROR, "%s blocked sleeping %d", p, i);
    SCHANadd(cp, Now.time + i, NULL, CHANwakeup, NULL);
}


#if	defined(INND_FIND_BAD_FDS)
/*
**  We got an unknown error in select.  Find out the culprit.
**  Not really ready for production use yet, and it's expensive, too.
*/
static void
CHANdiagnose(void)
{
    fd_set		Test;
    int			i;
    struct timeval	t;

    FD_ZERO(&Test);
    for (i = CHANlastfd; i >= 0; i--) {
	if (FD_ISSET(i, &RCHANmask)) {
	    FD_SET(i, &Test);
	    t.tv_sec = 0;
	    t.tv_usec = 0;
	    if (select(i + 1, &Test, NULL, NULL, &t) < 0
	      && errno != EINTR) {
		syslog(L_ERROR, "%s Bad Read File %d", LogName, i);
		FD_CLR(i, &RCHANmask);
		/* Probably do something about the file descriptor here; call
		 * CHANclose on it? */
	    }
	    FD_CLR(i, &Test);
	}
	if (FD_ISSET(i, &WCHANmask)) {
	    FD_SET(i, &Test);
	    t.tv_sec = 0;
	    t.tv_usec = 0;
	    if (select(i + 1, NULL, &Test, NULL, &t) < 0
	     && errno != EINTR) {
		syslog(L_ERROR, "%s Bad Write File %d", LogName, i);
		FD_CLR(i, &WCHANmask);
		/* Probably do something about the file descriptor here; call
		 * CHANclose on it? */
	    }
	    FD_CLR(i, &Test);
	}
    }
}
#endif	/* defined(INND_FIND_BAD_FDS) */

void
CHANsetActiveCnx(CHANNEL *cp) {
    int		found;  
    CHANNEL	*tempchan;
    char	*label, *tmplabel;
    int		tfd;
    
    if((cp->fd > 0) && (cp->Type == CTnntp) && (cp->ActiveCnx == 0)) {
	found = 1;      
	if ((label = RClabelname(cp)) != NULL) {
	    for(tfd = 0; tfd <= CHANlastfd; tfd++) {
		tempchan = &CHANtable[tfd];
		if ((tmplabel = RClabelname(tempchan)) == NULL) {
		    continue;
		}
		if(strcmp(label, tmplabel) == 0) {
		    if(tempchan->ActiveCnx != 0)
			found++;
		}
	    }
	} 
	cp->ActiveCnx = found;
    }   
}

/*
**  Main I/O loop.  Wait for data, call the channel's handler when there is
**  something to read or when the queued write is finished.  In order to
**  be fair (i.e., don't always give descriptor n priority over n+1), we
**  remember where we last had something and pick up from there.
**
**  Yes, the main code has really wandered over to the side a lot.
*/
void
CHANreadloop(void)
{
    static char		EXITING[] = "INND exiting because of signal\n";
    static int		fd;
    ptrdiff_t		i, j;
    int			startpoint;
    int			count;
    int			lastfd;
    int			oerrno;
    CHANNEL		*cp;
    struct buffer	*bp;
    fd_set		MyRead;
    fd_set		MyWrite;
    struct timeval	MyTime;
    long		silence;
    char		*p;
    time_t		LastUpdate;
    HDRCONTENT		*hc;

    STATUSinit();
    
    LastUpdate = GetTimeInfo(&Now) < 0 ? 0 : Now.time;
    for ( ; ; ) {
	/* See if any processes died. */
	PROCscan();

	/* Wait for data, note the time. */
	MyRead = RCHANmask;
	MyWrite = WCHANmask;
	MyTime = TimeOut;
	if (innconf->timer) {
	    unsigned long now = TMRnow();

	    if (now >= 1000 * (unsigned long)(innconf->timer)) {
		TMRsummary("ME", timer_name);
		InndHisLogStats();
		MyTime.tv_sec = innconf->timer;
	    }
	    else {
		MyTime.tv_sec = innconf->timer - now / 1000;
	    }
	}
	TMRstart(TMR_IDLE);
	count = select(CHANlastfd + 1, &MyRead, &MyWrite, NULL, &MyTime);
	TMRstop(TMR_IDLE);

	STATUSmainloophook();
	if (GotTerminate) {
	    write(2, EXITING, strlen(EXITING));
	    CleanupAndExit(0, (char *)NULL);
	}
	if (count < 0) {
	    if (errno != EINTR) {
		syslog(L_ERROR, "%s cant select %m", LogName);
#if	defined(INND_FIND_BAD_FDS)
		CHANdiagnose();
#endif	/* defined(INND_FIND_BAD_FDS) */
	    }
	    continue;
	}

	/* Update the "reasonably accurate" time. */
	if (GetTimeInfo(&Now) < 0)
	    syslog(L_ERROR, "%s cant gettimeinfo %m", LogName);
	if (Now.time > LastUpdate + TimeOut.tv_sec) {
	    HISsync(History);
	    if (ICDactivedirty) {
		ICDwriteactive();
		ICDactivedirty = 0;
	    }
            LastUpdate = Now.time;
	}

	if (count == 0) {
	    /* No channels active, so flush and skip if nobody's
	     * sleeping. */
	    if (Mode == OMrunning)
		ICDwrite();
	    if (SCHANcount == 0)
		continue;
	}

	/* Try the control channel first. */
	if (FD_ISSET(CHANccfd, &RCHANmask) && FD_ISSET(CHANccfd, &MyRead)) {
	    count--;
 	    if (count > 3)
		count = 3; /* might be more requests */
	    (*CHANcc->Reader)(CHANcc);
	    FD_CLR(CHANccfd, &MyRead);
	}

#ifdef PRIORITISE_REMCONN
	/* Try the remconn channel next. */
	for (j = 0 ; (j < chanlimit) && (CHANrcfd[j] >= 0) ; j++) {
	    if (FD_ISSET(CHANrcfd[j], &RCHANmask) && FD_ISSET(CHANrcfd[j], &MyRead)) {
		count--;
		if (count > 3)
		    count = 3; /* might be more requests */
		(*CHANrc[j]->Reader)(CHANrc[j]);
		FD_CLR(CHANrcfd[j], &MyRead);
	    }
	}
#endif /* PRIORITISE_REMCONN */

	/* Loop through all active channels.  Somebody could have closed
	 * closed a channel so we double-check the global mask before
	 * looking at what select returned.  The code here is written so
	 * that a channel could be reading and writing and sleeping at the
	 * same time, even though that's not possible.  (Just as well,
	 * since in SysVr4 the count would be wrong.) */
	lastfd = CHANlastfd;
	if (lastfd < CHANlastsleepfd)
	    lastfd = CHANlastsleepfd;
	if (fd > lastfd)
	    fd = 0;
	startpoint = fd;
	do {
	    cp = &CHANtable[fd];

            if (cp->MaxCnx > 0 && cp->HoldTime > 0) {
		CHANsetActiveCnx(cp);
                if((cp->ActiveCnx > cp->MaxCnx) && (cp->fd > 0)) {
		    if(cp->Started + cp->HoldTime < Now.time) {
                        CHANclose(cp, CHANname(cp));
                    } else {
                        if (fd >= lastfd)
                            fd = 0;
                        else
                            fd++;
			cp->ActiveCnx = 0;
			RCHANremove(cp);
                    }
                    continue;
                }
            }
	    
	    /* Anything to read? */
	    if (FD_ISSET(fd, &RCHANmask) && FD_ISSET(fd, &MyRead)) {
		count--;
		if (cp->Type == CTfree) {
		    syslog(L_ERROR, "%s %d free but was in RMASK",
			CHANname(cp), fd);
		    /* Don't call RCHANremove since cp->fd will be -1. */
		    FD_CLR(fd, &RCHANmask);
		    close(fd);
		}
		else {
		    cp->LastActive = Now.time;
		    (*cp->Reader)(cp);
		}
	    }

	    /* Check and see if the buffer is grossly overallocated and shrink
	       if needed */
	    if (cp->In.size > (BIG_BUFFER)) {
		if (cp->In.used != 0) {
		    if ((cp->In.size / cp->In.used) > 10) {
			cp->In.size = (cp->In.used * 2) > START_BUFF_SIZE ? (cp->In.used * 2) : START_BUFF_SIZE;
			p = cp->In.data;
			TMRstart(TMR_DATAMOVE);
                        cp->In.data = xrealloc(cp->In.data, cp->In.size);
			cp->In.left = cp->In.size - cp->In.used;
			/* do not move data, since xrealloc did it already */
			if ((i = p - cp->In.data) != 0) {
			    if (cp->State == CSgetheader ||
				cp->State == CSgetbody ||
				cp->State == CSeatarticle) {
				/* adjust offset only in CSgetheader, CSgetbody
				   or CSeatarticle */
				if (cp->Data.BytesHeader != NULL)
				  cp->Data.BytesHeader -= i;
				hc = cp->Data.HdrContent;
				for (j = 0 ; j < MAX_ARTHEADER ; j++, hc++) {
				    if (hc->Value != NULL)
					hc->Value -= i;
				}
			    }
			}
			TMRstop(TMR_DATAMOVE);
		    }
		} else {
		    p = cp->In.data;
		    TMRstart(TMR_DATAMOVE);
                    cp->In.data = xrealloc(cp->In.data, START_BUFF_SIZE);
		    cp->In.size = cp->In.left = START_BUFF_SIZE;
		    if ((i = p - cp->In.data) != 0) {
			if (cp->State == CSgetheader ||
			    cp->State == CSgetbody ||
			    cp->State == CSeatarticle) {
			    /* adjust offset only in CSgetheader, CSgetbody
			       or CSeatarticle */
			    if (cp->Data.BytesHeader != NULL)
			      cp->Data.BytesHeader -= i;
			    hc = cp->Data.HdrContent;
			    for (j = 0 ; j < MAX_ARTHEADER ; j++, hc++) {
				if (hc->Value != NULL)
				    hc->Value -= i;
			    }
			}
		    }
		    TMRstop(TMR_DATAMOVE);
		}
	    }
	    /* Possibly recheck for dead children so we don't get SIGPIPE
	     * on readerless channels. */
	    if (PROCneedscan)
		PROCscan();

	    /* Ready to write? */
	    if (FD_ISSET(fd, &WCHANmask) && FD_ISSET(fd, &MyWrite)) {
		count--;
		if (cp->Type == CTfree) {
		    syslog(L_ERROR, "%s %d free but was in WMASK",
			CHANname(cp), fd);
		    /* Don't call WCHANremove since cp->fd will be -1. */
		    FD_CLR(fd, &WCHANmask);
		    close(fd);
		}
		else {
		    bp = &cp->Out;
		    if (bp->left) {
			cp->LastActive = Now.time;
			i = CHANwrite(fd, &bp->data[bp->used], bp->left);
			if (i <= 0) {
			    oerrno = errno;
			    p = CHANname(cp);
			    errno = oerrno;
			    if (i < 0)
				sysnotice("%s cant write", p);
			    else
				notice("%s cant write", p);
			    cp->BadWrites++;
			    if (i < 0 && oerrno == EPIPE) {
				SITEchanclose(cp);
				CHANclose(cp, p);
			    }
			    else if (i < 0 &&
                                     (oerrno == EWOULDBLOCK
                                      || oerrno == EAGAIN)) {
				WCHANremove(cp);
				CHANwritesleep(cp, p);
			    }
			    else if (cp->BadWrites >= innconf->badiocount) {
				syslog(L_ERROR, "%s sleeping", p);
				WCHANremove(cp);
				SCHANadd(cp,
                                         Now.time + innconf->pauseretrytime,
                                         NULL, CHANwakeup, NULL);
			    }
			}
			else {
			    cp->BadWrites = 0;
			    cp->BlockedWrites = 0;
			    bp->left -= i;
			    bp->used += i;
			    if (bp->left <= 0) {
				WCHANremove(cp);
				(*cp->WriteDone)(cp);
			    } else if (bp->used > (bp->size/COMP_THRESHOLD)) {
                                /* compact the buffer, shoving the
                                   data back to the beginning.
                                   <rmtodd@mailhost.ecn.ou.edu> */
                                buffer_set(bp, &bp->data[bp->used], bp->left);
 			    }
			}
		    }
		    else
			/* Should not be possible. */
			WCHANremove(cp);
		}
	    }

	    /* Coming off a sleep? */
	    if (FD_ISSET(fd, &SCHANmask) && cp->Waketime <= Now.time) {
		if (cp->Type == CTfree) {
		    syslog(L_ERROR,"%s ERROR s-select free %d",CHANname(cp),fd);
		    FD_CLR(fd, &SCHANmask);
		     close(fd);
		} else {
		    cp->LastActive = Now.time;
		    SCHANremove(cp);
		    (*cp->Waker)(cp);
		}
	    }

	    /* Toss CTreject channel early if it's inactive. */
	    if (cp->Type == CTreject
	     && cp->LastActive + REJECT_TIMEOUT < Now.time) {
		p = CHANname(cp);
		syslog(L_NOTICE, "%s timeout reject", p);
		CHANclose(cp, p);
	    }

	    /* Has this channel been inactive very long? */
	    if (cp->Type == CTnntp
	     && cp->LastActive + cp->NextLog < Now.time) {
		p = CHANname(cp);
		silence = Now.time - cp->LastActive;
		cp->NextLog += innconf->chaninacttime;
		syslog(L_NOTICE, "%s inactive %ld", p, silence / 60L);
		if (silence > innconf->peertimeout) {
		    syslog(L_NOTICE, "%s timeout", p);
		    CHANclose(cp, p);
		}
	    }

	    /* Bump pointer, modulo the table size. */
	    if (fd >= lastfd)
		fd = 0;
	    else
		fd++;

	    /* If there is nothing to do, break out. */
	    if (count == 0 && SCHANcount == 0)
		break;

	} while (fd != startpoint);
    }
}


syntax highlighted by Code2HTML, v. 0.9.1