/* $Id: chan.c 6720 2004-05-16 20:54:25Z rra $
**
** I/O channel (and buffer) processing.
*/
#include "config.h"
#include "clibrary.h"
/* Needed on AIX 4.1 to get fd_set and friends. */
#ifdef HAVE_SYS_SELECT_H
# include <sys/select.h>
#endif
#include "inn/innconf.h"
#include "innd.h"
/* These errno values don't exist on all systems, but may be returned as an
(ignorable) error to setting the accept socket nonblocking. Define them
to 0 if they don't exist so that we can unconditionally compare errno to
them in the code. */
#ifndef ENOTSOCK
# define ENOTSOCK 0
#endif
#ifndef ENOTTY
# define ENOTTY 0
#endif
static const char * const timer_name[] = {
"idle", "artclean", "artwrite", "artcncl", "sitesend", "overv",
"perl", "python", "nntpread", "artparse", "artlog", "datamove"
};
/* Minutes - basically, keep the connection open but idle */
#define PAUSE_BEFORE_DROP 5
/* Divisor of the BUFFER size. If the amount free at the beginning of the
buffer is bigger than the quotient, then it is compacted in the
readloop */
#define COMP_THRESHOLD 10
static fd_set RCHANmask;
static fd_set SCHANmask;
static fd_set WCHANmask;
static int SCHANcount;
static int CHANlastfd;
static int CHANlastsleepfd;
static int CHANccfd;
static int CHANtablesize;
static CHANNEL *CHANtable;
static CHANNEL *CHANcc;
static CHANNEL CHANnull = { CTfree, CSerror, -1 };
#define PRIORITISE_REMCONN
#ifdef PRIORITISE_REMCONN
static int *CHANrcfd;
static CHANNEL **CHANrc;
static int chanlimit;
#endif /* PRIORITISE_REMCONN */
/*
** Tear down our world
*/
void
CHANshutdown(void)
{
CHANNEL *cp;
int i;
if (CHANtable) {
for (i = CHANtablesize, cp = &CHANtable[0]; --i >= 0; cp++) {
if (cp->In.data) {
free(cp->In.data);
}
if (cp->Out.data) {
free(cp->Out.data);
}
}
free(CHANtable);
CHANtable = NULL;
}
}
/*
** Initialize all the I/O channels.
*/
void
CHANsetup(int i)
{
CHANNEL *cp;
FD_ZERO(&RCHANmask);
FD_ZERO(&SCHANmask);
FD_ZERO(&WCHANmask);
CHANshutdown();
CHANtablesize = i;
CHANtable = xcalloc(CHANtablesize, sizeof(CHANNEL));
CHANnull.NextLog = innconf->chaninacttime;
memset(&CHANnull.Address, 0, sizeof(CHANnull.Address));
for (cp = CHANtable; --i >= 0; cp++)
*cp = CHANnull;
}
/*
** Create a channel from a descriptor.
*/
CHANNEL *
CHANcreate(int fd, CHANNELTYPE Type, CHANNELSTATE State,
innd_callback_t Reader, innd_callback_t WriteDone)
{
CHANNEL *cp;
struct buffer in = { 0, 0, 0, NULL };
struct buffer out = { 0, 0, 0, NULL };
cp = &CHANtable[fd];
/* Don't overwrite the buffers with CHANnull. */
in = cp->In;
buffer_resize(&in, START_BUFF_SIZE);
in.used = 0;
in.left = in.size;
out = cp->Out;
buffer_resize(&out, SMBUF);
buffer_set(&out, "", 0);
/* Set up the channel's info. */
*cp = CHANnull;
cp->fd = fd;
cp->Type = Type;
cp->State = State;
cp->Streaming = false;
cp->Skip = false;
cp->NoResendId = false;
cp->privileged = false;
cp->Ihave = cp->Ihave_Duplicate = cp->Ihave_Deferred = cp->Ihave_SendIt = cp->Ihave_Cybercan = 0;
cp->Check = cp->Check_send = cp->Check_deferred = cp->Check_got = cp->Check_cybercan = 0;
cp->Takethis = cp->Takethis_Ok = cp->Takethis_Err = 0;
cp->Size = cp->Duplicate = 0;
cp->Unwanted_s = cp->Unwanted_f = cp->Unwanted_d = 0;
cp->Unwanted_g = cp->Unwanted_u = cp->Unwanted_o = 0;
cp->Reader = Reader;
cp->WriteDone = WriteDone;
cp->Started = cp->LastActive = Now.time;
cp->In = in;
cp->Out = out;
cp->Tracing = Tracing;
cp->Sendid.size = 0;
cp->Next=0;
cp->MaxCnx=0;
cp->ActiveCnx=0;
cp->ArtBeg = 0;
cp->ArtMax = 0;
cp->Start = 0;
HashClear(&cp->CurrentMessageIDHash);
memset(cp->PrecommitWIP, '\0', sizeof(cp->PrecommitWIP));
cp->PrecommitiCachenext=0;
ARTprepare(cp);
close_on_exec(fd, true);
#ifndef _HPUX_SOURCE
/* Stupid HPUX 11.00 has a broken listen/accept where setting the listen
socket to nonblocking prevents you from successfully setting the
socket returned by accept(2) back to blocking mode, no matter what,
resulting in all kinds of funny behaviour, data loss, etc. etc. */
if (nonblocking(fd, true) < 0 && errno != ENOTSOCK && errno != ENOTTY)
syslog(L_ERROR, "%s cant nonblock %d %m", LogName, fd);
#endif
/* Note control channel, for efficiency. */
if (Type == CTcontrol) {
CHANcc = cp;
CHANccfd = fd;
}
#ifdef PRIORITISE_REMCONN
/* Note remconn channel, for efficiency */
if (Type == CTremconn) {
int j;
for (j = 0 ; j < chanlimit ; j++ ) {
if (CHANrcfd[j] == -1) {
break;
}
}
if (j < chanlimit) {
CHANrc[j] = cp;
CHANrcfd[j] = fd;
} else if (chanlimit == 0) {
/* assuming two file descriptors(AF_INET and AF_INET6) */
chanlimit = 2;
CHANrc = xmalloc(chanlimit * sizeof(CHANNEL **));
CHANrcfd = xmalloc(chanlimit * sizeof(int *));
for (j = 0 ; j < chanlimit ; j++ ) {
CHANrc[j] = NULL;
CHANrcfd[j] = -1;
}
CHANrc[0] = cp;
CHANrcfd[0] = fd;
} else {
/* extend to double size */
CHANrc = xrealloc(CHANrc, chanlimit * 2 * sizeof(CHANNEL **));
CHANrcfd = xrealloc(CHANrcfd, chanlimit * 2 * sizeof(int *));
for (j = chanlimit ; j < chanlimit * 2 ; j++ ) {
CHANrc[j] = NULL;
CHANrcfd[j] = -1;
}
CHANrc[chanlimit] = cp;
CHANrcfd[chanlimit] = fd;
chanlimit *= 2;
}
}
#endif /* PRIORITISE_REMCONN */
return cp;
}
/*
** Start tracing a channel.
*/
void
CHANtracing(CHANNEL *cp, bool Flag)
{
char *p;
p = CHANname(cp);
syslog(L_NOTICE, "%s trace %s", p, Flag ? "on" : "off");
cp->Tracing = Flag;
if (Flag) {
syslog(L_NOTICE, "%s trace badwrites %d blockwrites %d badreads %d",
p, cp->BadWrites, cp->BlockedWrites, cp->BadReads);
syslog(L_NOTICE, "%s trace address %s lastactive %ld nextlog %ld",
p, sprint_sockaddr((struct sockaddr *)&cp->Address),
(long) cp->LastActive, (long) cp->NextLog);
if (FD_ISSET(cp->fd, &SCHANmask))
syslog(L_NOTICE, "%s trace sleeping %ld 0x%p",
p, (long)cp->Waketime, (void *)cp->Waker);
if (FD_ISSET(cp->fd, &RCHANmask))
syslog(L_NOTICE, "%s trace reading %lu %s",
p, (unsigned long) cp->In.used,
MaxLength(cp->In.data, cp->In.data));
if (FD_ISSET(cp->fd, &WCHANmask))
syslog(L_NOTICE, "%s trace writing %lu %s",
p, (unsigned long) cp->Out.left,
MaxLength(cp->Out.data, cp->Out.data));
}
}
/*
** Close a channel.
*/
void
CHANclose(CHANNEL *cp, const char *name)
{
char *label, *tmplabel, buff[SMBUF];
if (cp->Type == CTfree)
syslog(L_ERROR, "%s internal closing free channel %d", name, cp->fd);
else {
if (cp->Type == CTnntp) {
WIPprecomfree(cp);
NCclearwip(cp);
if (cp->State == CScancel)
syslog(L_NOTICE,
"%s closed seconds %ld cancels %ld",
name, (long)(Now.time - cp->Started),
cp->Received);
else {
snprintf(buff, sizeof(buff),
"accepted size %.0f duplicate size %.0f", cp->Size,
cp->DuplicateSize);
syslog(L_NOTICE,
"%s closed seconds %ld accepted %ld refused %ld rejected %ld duplicate %ld %s",
name, (long)(Now.time - cp->Started),
cp->Received, cp->Refused, cp->Rejected,
cp->Duplicate, buff);
}
if (cp->Data.Newsgroups.Data != NULL) {
free(cp->Data.Newsgroups.Data);
cp->Data.Newsgroups.Data = NULL;
}
if (cp->Data.Newsgroups.List != NULL) {
free(cp->Data.Newsgroups.List);
cp->Data.Newsgroups.List = NULL;
}
if (cp->Data.Distribution.Data != NULL) {
free(cp->Data.Distribution.Data);
cp->Data.Distribution.Data = NULL;
}
if (cp->Data.Distribution.List != NULL) {
free(cp->Data.Distribution.List);
cp->Data.Distribution.List = NULL;
}
if (cp->Data.Path.Data != NULL) {
free(cp->Data.Path.Data);
cp->Data.Path.Data = NULL;
}
if (cp->Data.Path.List != NULL) {
free(cp->Data.Path.List);
cp->Data.Path.List = NULL;
}
if (cp->Data.Overview.size != 0) {
free(cp->Data.Overview.data);
cp->Data.Overview.data = NULL;
cp->Data.Overview.size = 0;
cp->Data.Overview.left = 0;
cp->Data.Overview.used = 0;
}
if (cp->Data.XrefBufLength != 0) {
free(cp->Data.Xref);
cp->Data.Xref = NULL;
cp->Data.XrefBufLength = 0;
}
} else if (cp->Type == CTreject)
syslog(L_NOTICE, "%s %ld", name, cp->Rejected);
else if (cp->Out.left)
syslog(L_NOTICE, "%s closed lost %lu", name,
(unsigned long) cp->Out.left);
else
syslog(L_NOTICE, "%s closed", name);
WCHANremove(cp);
RCHANremove(cp);
SCHANremove(cp);
if (cp->Argument != NULL)
/* Set to NULL below. */
free(cp->Argument);
if (cp->fd >= 0 && close(cp->fd) < 0)
syslog(L_ERROR, "%s cant close %s %m", LogName, name);
if (cp->MaxCnx > 0 && cp->Type == CTnntp) {
int tfd;
CHANNEL *tempchan;
cp->fd = -1;
if ((label = RClabelname(cp)) != NULL) {
for(tfd = 0; tfd <= CHANlastfd; tfd++) {
tempchan = &CHANtable[tfd];
if(tempchan->fd > 0 && tempchan->Type == CTnntp &&
((tmplabel = RClabelname(tempchan)) != NULL) &&
strcmp(label, tmplabel) == 0 &&
tempchan->ActiveCnx == 0) {
tempchan->ActiveCnx = cp->ActiveCnx;
RCHANadd(tempchan);
break;
}
}
}
}
}
/* Mark it unused. */
cp->Type = CTfree;
cp->State = CSerror;
cp->fd = -1;
cp->Argument = NULL;
cp->ActiveCnx = 0;
/* Free the buffers if they got big. */
if (cp->In.size > BIG_BUFFER) {
cp->In.size = 0;
cp->In.used = 0;
cp->In.left = 0;
free(cp->In.data);
cp->In.data = NULL;
}
if (cp->Out.size > BIG_BUFFER) {
cp->Out.size = 0;
cp->Out.used = 0;
cp->Out.left = 0;
free(cp->Out.data);
cp->Out.data = NULL;
}
if (cp->Sendid.size > 0) {
cp->Sendid.size = 0;
cp->Sendid.used = 0;
cp->Sendid.left = 0;
free(cp->Sendid.data);
cp->Sendid.data = NULL;
}
}
/*
** Return a printable name for the channel.
*/
char *
CHANname(const CHANNEL *cp)
{
static char buff[SMBUF];
int i;
SITE * sp;
const char * p;
pid_t pid;
switch (cp->Type) {
default:
snprintf(buff, sizeof(buff), "?%d(#%d@%ld)?", cp->Type, cp->fd,
(long) (cp - CHANtable));
break;
case CTany:
snprintf(buff, sizeof(buff), "any:%d", cp->fd);
break;
case CTfree:
snprintf(buff, sizeof(buff), "free:%d", cp->fd);
break;
case CTremconn:
snprintf(buff, sizeof(buff), "remconn:%d", cp->fd);
break;
case CTreject:
snprintf(buff, sizeof(buff), "%s rejected", RChostname(cp));
break;
case CTnntp:
snprintf(buff, sizeof(buff), "%s:%d",
cp->Address.ss_family == 0 ? "localhost" : RChostname(cp),
cp->fd);
break;
case CTlocalconn:
snprintf(buff, sizeof(buff), "localconn:%d", cp->fd);
break;
case CTcontrol:
snprintf(buff, sizeof(buff), "control:%d", cp->fd);
break;
case CTexploder:
case CTfile:
case CTprocess:
/* Find the site that has this channel. */
for (p = "?", i = nSites, sp = Sites, pid = 0; --i >= 0; sp++)
if (sp->Channel == cp) {
p = sp->Name;
if (cp->Type != CTfile)
pid = sp->pid;
break;
}
if (pid == 0)
snprintf(buff, sizeof(buff), "%s:%d:%s",
MaxLength(p, p), cp->fd,
cp->Type == CTfile ? "file" : "proc");
else
snprintf(buff, sizeof(buff), "%s:%d:%s:%ld",
MaxLength(p, p), cp->fd,
cp->Type == CTfile ? "file" : "proc", (long)pid);
break;
}
return buff;
}
/*
** Return the channel for a specified descriptor.
*/
CHANNEL *
CHANfromdescriptor(int fd)
{
if (fd <0 || fd > CHANtablesize)
return NULL;
return &CHANtable[fd];
}
/*
** Iterate over all channels of a specified type.
*/
CHANNEL *
CHANiter(int *ip, CHANNELTYPE Type)
{
CHANNEL *cp;
int i;
if ((i = *ip) >= 0 && i < CHANtablesize) {
do {
cp = &CHANtable[i];
if (cp->Type == CTfree && cp->fd == -1)
continue;
if (Type == CTany || cp->Type == Type) {
*ip = ++i;
return cp;
}
} while (++i < CHANtablesize);
}
return NULL;
}
/*
** Mark a channel as an active reader.
*/
void
RCHANadd(CHANNEL *cp)
{
FD_SET(cp->fd, &RCHANmask);
if (cp->fd > CHANlastfd)
CHANlastfd = cp->fd;
if (cp->Type != CTnntp)
/* Start reading at the beginning of the buffer. */
cp->In.used = 0;
}
/*
** Remove a channel from the set of readers.
*/
void
RCHANremove(CHANNEL *cp)
{
if (FD_ISSET(cp->fd, &RCHANmask)) {
FD_CLR(cp->fd, &RCHANmask);
if (cp->fd == CHANlastfd) {
/* This was the highest descriptor, get a new highest. */
while (!FD_ISSET(CHANlastfd, &RCHANmask)
&& !FD_ISSET(CHANlastfd, &WCHANmask)
&& CHANlastfd > 1)
CHANlastfd--;
}
}
}
/*
** Put a channel to sleep, call a function when it wakes.
** Note that the Argument must be NULL or allocated memory!
*/
void
SCHANadd(CHANNEL *cp, time_t Waketime, void *Event, innd_callback_t Waker,
void *Argument)
{
if (!FD_ISSET(cp->fd, &SCHANmask)) {
SCHANcount++;
FD_SET(cp->fd, &SCHANmask);
}
if (cp->fd > CHANlastsleepfd)
CHANlastsleepfd = cp->fd;
cp->Waketime = Waketime;
cp->Waker = Waker;
if (cp->Argument != Argument) {
free(cp->Argument);
cp->Argument = Argument;
}
cp->Event = Event;
}
/*
** Take a channel off the sleep list.
*/
void
SCHANremove(CHANNEL *cp)
{
if (FD_ISSET(cp->fd, &SCHANmask)) {
FD_CLR(cp->fd, &SCHANmask);
SCHANcount--;
cp->Waketime = 0;
if (cp->fd == CHANlastsleepfd) {
/* This was the highest descriptor, get a new highest. */
while (!FD_ISSET(CHANlastsleepfd, &SCHANmask)
&& CHANlastsleepfd > 1)
CHANlastsleepfd--;
}
}
}
/*
** Is a channel on the sleep list?
*/
bool
CHANsleeping(CHANNEL *cp)
{
return FD_ISSET(cp->fd, &SCHANmask);
}
/*
** Wake up channels waiting for a specific event.
*/
void
SCHANwakeup(void *Event)
{
CHANNEL *cp;
int i;
for (cp = CHANtable, i = CHANtablesize; --i >= 0; cp++)
if (cp->Type != CTfree && cp->Event == Event && CHANsleeping(cp))
cp->Waketime = 0;
}
/*
** Mark a channel as an active writer. Don't reset the Out->left field
** since we could have buffered I/O already in there.
*/
void
WCHANadd(CHANNEL *cp)
{
if (cp->Out.left > 0) {
FD_SET(cp->fd, &WCHANmask);
if (cp->fd > CHANlastfd)
CHANlastfd = cp->fd;
}
}
/*
** Remove a channel from the set of writers.
*/
void
WCHANremove(CHANNEL *cp)
{
if (FD_ISSET(cp->fd, &WCHANmask)) {
FD_CLR(cp->fd, &WCHANmask);
if (cp->Out.left <= 0) {
/* No data left -- reset used so we don't grow the buffer. */
cp->Out.used = 0;
cp->Out.left = 0;
}
if (cp->fd == CHANlastfd) {
/* This was the highest descriptor, get a new highest. */
while (!FD_ISSET(CHANlastfd, &RCHANmask)
&& !FD_ISSET(CHANlastfd, &WCHANmask)
&& CHANlastfd > 1)
CHANlastfd--;
}
}
}
/*
** Set a channel to start off with the contents of an existing channel.
*/
void
WCHANsetfrombuffer(CHANNEL *cp, struct buffer *bp)
{
WCHANset(cp, &bp->data[bp->used], bp->left);
}
/*
** Read in text data, return the amount we read.
*/
int
CHANreadtext(CHANNEL *cp)
{
ptrdiff_t i, j;
struct buffer *bp;
char *p;
int oerrno;
int maxbyte;
HDRCONTENT *hc = cp->Data.HdrContent;
/* Grow buffer if we're getting close to current limit. FIXME: The In
buffer doesn't use the normal meanings of .used and .left. */
bp = &cp->In;
bp->left = bp->size - bp->used;
if (bp->left <= LOW_WATER) {
i = GROW_AMOUNT(bp->size);
bp->size += i;
bp->left += i;
p = bp->data;
TMRstart(TMR_DATAMOVE);
bp->data = xrealloc(bp->data, bp->size);
/* Adjust offets of realloc moved the location of the memory region.
FIXME: This is invalid C, although it will work on most (all?)
common systems. The pointers need to be reduced to offets and then
turned back into relative pointers rather than adjusting the
pointers directly, since as soon as realloc is called, pointers
into the old space become invalid and may not be used further. */
if ((i = p - bp->data) != 0) {
if (cp->State == CSgetheader || cp->State == CSgetbody ||
cp->State == CSeatarticle) {
/* adjust offset only in CSgetheader, CSgetbody or
CSeatarticle */
if (cp->Data.BytesHeader != NULL)
cp->Data.BytesHeader -= i;
for (j = 0 ; j < MAX_ARTHEADER ; j++, hc++) {
if (hc->Value != NULL)
hc->Value -= i;
}
}
}
TMRstop(TMR_DATAMOVE);
}
/* Read in whatever is there, up to some reasonable limit.
We want to limit the amount of time devoted to processing the incoming
data for any given channel. There's no easy way of doing that, though,
so we restrict the data size instead.
If the data is part of a single large article, then reading and
processing many kilobytes at a time costs very little. If the data is
a long list of CHECK commands from a streaming feed, then every line of
data will require a history lookup, and we probably don't want to do
more than about 10 of those per channel on each cycle of the main
select() loop (otherwise we might take too long before giving other
channels a turn). 10 lines of CHECK commands suggests a limit of about
1KB of data, or less. innconf->maxcmdreadsize (BUFSIZ by default) is
often about 1KB, and is attractive for other reasons, so let's use that
as our size limit. If innconf->maxcmdreadsize is 0, there is no limit.
Reduce the read size only if we're reading commands.
FIXME: A better approach would be to limit the number of commands we
process for each channel. */
if (innconf->maxcmdreadsize <= 0 || cp->State != CSgetcmd
|| bp->left < innconf->maxcmdreadsize)
maxbyte = bp->left;
else
maxbyte = innconf->maxcmdreadsize;
TMRstart(TMR_NNTPREAD);
i = read(cp->fd, &bp->data[bp->used], maxbyte);
TMRstop(TMR_NNTPREAD);
if (i < 0) {
/* Solaris (at least 2.4 through 2.6) will occasionally return
EAGAIN in response to a read even if the file descriptor already
selected true for reading, apparently due to some internal
resource exhaustion. In that case, return -2, which will drop
back out to the main loop and go on to the next file descriptor,
as if the descriptor never selected true. This check will
probably never trigger on platforms other than Solaris. */
if (errno == EAGAIN)
return -2;
oerrno = errno;
p = CHANname(cp);
errno = oerrno;
sysnotice("%s cant read", p);
return -1;
}
if (i == 0) {
p = CHANname(cp);
notice("%s readclose", p);
return 0;
}
bp->used += i;
bp->left -= i;
return i;
}
/*
** If I/O backs up a lot, we can get EMSGSIZE on some systems. If that
** happens we want to do the I/O in chunks. We assume stdio's BUFSIZ is
** a good chunk value.
*/
static int
CHANwrite(int fd, char *p, long length)
{
int i;
char *save;
do {
/* Try the standard case -- write it all. */
i = write(fd, p, length);
if (i > 0 || (i < 0 && errno != EMSGSIZE && errno != EINTR))
return i;
} while (i < 0 && errno == EINTR);
/* Write it in pieces. */
for (save = p, i = 0; length; p += i, length -= i) {
i = write(fd, p, (length > BUFSIZ ? BUFSIZ : length));
if (i <= 0)
break;
}
/* Return error, or partial results if we got something. */
return p == save ? i : p - save;
}
/*
** Try to flush out the buffer. Use this only on file channels!
*/
bool
WCHANflush(CHANNEL *cp)
{
struct buffer *bp;
int i;
/* Write it. */
for (bp = &cp->Out; bp->left > 0; bp->left -= i, bp->used += i) {
i = CHANwrite(cp->fd, &bp->data[bp->used], bp->left);
if (i < 0) {
syslog(L_ERROR, "%s cant flush count %lu %m",
CHANname(cp), (unsigned long) bp->left);
return false;
}
if (i == 0) {
syslog(L_ERROR, "%s cant flush count %lu",
CHANname(cp), (unsigned long) bp->left);
return false;
}
}
WCHANremove(cp);
return true;
}
/*
** Wakeup routine called after a write channel was put to sleep.
*/
static void
CHANwakeup(CHANNEL *cp)
{
syslog(L_NOTICE, "%s wakeup", CHANname(cp));
WCHANadd(cp);
}
/*
** Attempting to write would block; stop output or give up.
*/
static void
CHANwritesleep(CHANNEL *cp, char *p)
{
int i;
if ((i = ++(cp->BlockedWrites)) > innconf->badiocount)
switch (cp->Type) {
default:
break;
case CTreject:
case CTnntp:
case CTfile:
case CTexploder:
case CTprocess:
syslog(L_ERROR, "%s blocked closing", p);
SITEchanclose(cp);
CHANclose(cp, p);
return;
}
i *= innconf->blockbackoff;
syslog(L_ERROR, "%s blocked sleeping %d", p, i);
SCHANadd(cp, Now.time + i, NULL, CHANwakeup, NULL);
}
#if defined(INND_FIND_BAD_FDS)
/*
** We got an unknown error in select. Find out the culprit.
** Not really ready for production use yet, and it's expensive, too.
*/
static void
CHANdiagnose(void)
{
fd_set Test;
int i;
struct timeval t;
FD_ZERO(&Test);
for (i = CHANlastfd; i >= 0; i--) {
if (FD_ISSET(i, &RCHANmask)) {
FD_SET(i, &Test);
t.tv_sec = 0;
t.tv_usec = 0;
if (select(i + 1, &Test, NULL, NULL, &t) < 0
&& errno != EINTR) {
syslog(L_ERROR, "%s Bad Read File %d", LogName, i);
FD_CLR(i, &RCHANmask);
/* Probably do something about the file descriptor here; call
* CHANclose on it? */
}
FD_CLR(i, &Test);
}
if (FD_ISSET(i, &WCHANmask)) {
FD_SET(i, &Test);
t.tv_sec = 0;
t.tv_usec = 0;
if (select(i + 1, NULL, &Test, NULL, &t) < 0
&& errno != EINTR) {
syslog(L_ERROR, "%s Bad Write File %d", LogName, i);
FD_CLR(i, &WCHANmask);
/* Probably do something about the file descriptor here; call
* CHANclose on it? */
}
FD_CLR(i, &Test);
}
}
}
#endif /* defined(INND_FIND_BAD_FDS) */
void
CHANsetActiveCnx(CHANNEL *cp) {
int found;
CHANNEL *tempchan;
char *label, *tmplabel;
int tfd;
if((cp->fd > 0) && (cp->Type == CTnntp) && (cp->ActiveCnx == 0)) {
found = 1;
if ((label = RClabelname(cp)) != NULL) {
for(tfd = 0; tfd <= CHANlastfd; tfd++) {
tempchan = &CHANtable[tfd];
if ((tmplabel = RClabelname(tempchan)) == NULL) {
continue;
}
if(strcmp(label, tmplabel) == 0) {
if(tempchan->ActiveCnx != 0)
found++;
}
}
}
cp->ActiveCnx = found;
}
}
/*
** Main I/O loop. Wait for data, call the channel's handler when there is
** something to read or when the queued write is finished. In order to
** be fair (i.e., don't always give descriptor n priority over n+1), we
** remember where we last had something and pick up from there.
**
** Yes, the main code has really wandered over to the side a lot.
*/
void
CHANreadloop(void)
{
static char EXITING[] = "INND exiting because of signal\n";
static int fd;
ptrdiff_t i, j;
int startpoint;
int count;
int lastfd;
int oerrno;
CHANNEL *cp;
struct buffer *bp;
fd_set MyRead;
fd_set MyWrite;
struct timeval MyTime;
long silence;
char *p;
time_t LastUpdate;
HDRCONTENT *hc;
STATUSinit();
LastUpdate = GetTimeInfo(&Now) < 0 ? 0 : Now.time;
for ( ; ; ) {
/* See if any processes died. */
PROCscan();
/* Wait for data, note the time. */
MyRead = RCHANmask;
MyWrite = WCHANmask;
MyTime = TimeOut;
if (innconf->timer) {
unsigned long now = TMRnow();
if (now >= 1000 * (unsigned long)(innconf->timer)) {
TMRsummary("ME", timer_name);
InndHisLogStats();
MyTime.tv_sec = innconf->timer;
}
else {
MyTime.tv_sec = innconf->timer - now / 1000;
}
}
TMRstart(TMR_IDLE);
count = select(CHANlastfd + 1, &MyRead, &MyWrite, NULL, &MyTime);
TMRstop(TMR_IDLE);
STATUSmainloophook();
if (GotTerminate) {
write(2, EXITING, strlen(EXITING));
CleanupAndExit(0, (char *)NULL);
}
if (count < 0) {
if (errno != EINTR) {
syslog(L_ERROR, "%s cant select %m", LogName);
#if defined(INND_FIND_BAD_FDS)
CHANdiagnose();
#endif /* defined(INND_FIND_BAD_FDS) */
}
continue;
}
/* Update the "reasonably accurate" time. */
if (GetTimeInfo(&Now) < 0)
syslog(L_ERROR, "%s cant gettimeinfo %m", LogName);
if (Now.time > LastUpdate + TimeOut.tv_sec) {
HISsync(History);
if (ICDactivedirty) {
ICDwriteactive();
ICDactivedirty = 0;
}
LastUpdate = Now.time;
}
if (count == 0) {
/* No channels active, so flush and skip if nobody's
* sleeping. */
if (Mode == OMrunning)
ICDwrite();
if (SCHANcount == 0)
continue;
}
/* Try the control channel first. */
if (FD_ISSET(CHANccfd, &RCHANmask) && FD_ISSET(CHANccfd, &MyRead)) {
count--;
if (count > 3)
count = 3; /* might be more requests */
(*CHANcc->Reader)(CHANcc);
FD_CLR(CHANccfd, &MyRead);
}
#ifdef PRIORITISE_REMCONN
/* Try the remconn channel next. */
for (j = 0 ; (j < chanlimit) && (CHANrcfd[j] >= 0) ; j++) {
if (FD_ISSET(CHANrcfd[j], &RCHANmask) && FD_ISSET(CHANrcfd[j], &MyRead)) {
count--;
if (count > 3)
count = 3; /* might be more requests */
(*CHANrc[j]->Reader)(CHANrc[j]);
FD_CLR(CHANrcfd[j], &MyRead);
}
}
#endif /* PRIORITISE_REMCONN */
/* Loop through all active channels. Somebody could have closed
* closed a channel so we double-check the global mask before
* looking at what select returned. The code here is written so
* that a channel could be reading and writing and sleeping at the
* same time, even though that's not possible. (Just as well,
* since in SysVr4 the count would be wrong.) */
lastfd = CHANlastfd;
if (lastfd < CHANlastsleepfd)
lastfd = CHANlastsleepfd;
if (fd > lastfd)
fd = 0;
startpoint = fd;
do {
cp = &CHANtable[fd];
if (cp->MaxCnx > 0 && cp->HoldTime > 0) {
CHANsetActiveCnx(cp);
if((cp->ActiveCnx > cp->MaxCnx) && (cp->fd > 0)) {
if(cp->Started + cp->HoldTime < Now.time) {
CHANclose(cp, CHANname(cp));
} else {
if (fd >= lastfd)
fd = 0;
else
fd++;
cp->ActiveCnx = 0;
RCHANremove(cp);
}
continue;
}
}
/* Anything to read? */
if (FD_ISSET(fd, &RCHANmask) && FD_ISSET(fd, &MyRead)) {
count--;
if (cp->Type == CTfree) {
syslog(L_ERROR, "%s %d free but was in RMASK",
CHANname(cp), fd);
/* Don't call RCHANremove since cp->fd will be -1. */
FD_CLR(fd, &RCHANmask);
close(fd);
}
else {
cp->LastActive = Now.time;
(*cp->Reader)(cp);
}
}
/* Check and see if the buffer is grossly overallocated and shrink
if needed */
if (cp->In.size > (BIG_BUFFER)) {
if (cp->In.used != 0) {
if ((cp->In.size / cp->In.used) > 10) {
cp->In.size = (cp->In.used * 2) > START_BUFF_SIZE ? (cp->In.used * 2) : START_BUFF_SIZE;
p = cp->In.data;
TMRstart(TMR_DATAMOVE);
cp->In.data = xrealloc(cp->In.data, cp->In.size);
cp->In.left = cp->In.size - cp->In.used;
/* do not move data, since xrealloc did it already */
if ((i = p - cp->In.data) != 0) {
if (cp->State == CSgetheader ||
cp->State == CSgetbody ||
cp->State == CSeatarticle) {
/* adjust offset only in CSgetheader, CSgetbody
or CSeatarticle */
if (cp->Data.BytesHeader != NULL)
cp->Data.BytesHeader -= i;
hc = cp->Data.HdrContent;
for (j = 0 ; j < MAX_ARTHEADER ; j++, hc++) {
if (hc->Value != NULL)
hc->Value -= i;
}
}
}
TMRstop(TMR_DATAMOVE);
}
} else {
p = cp->In.data;
TMRstart(TMR_DATAMOVE);
cp->In.data = xrealloc(cp->In.data, START_BUFF_SIZE);
cp->In.size = cp->In.left = START_BUFF_SIZE;
if ((i = p - cp->In.data) != 0) {
if (cp->State == CSgetheader ||
cp->State == CSgetbody ||
cp->State == CSeatarticle) {
/* adjust offset only in CSgetheader, CSgetbody
or CSeatarticle */
if (cp->Data.BytesHeader != NULL)
cp->Data.BytesHeader -= i;
hc = cp->Data.HdrContent;
for (j = 0 ; j < MAX_ARTHEADER ; j++, hc++) {
if (hc->Value != NULL)
hc->Value -= i;
}
}
}
TMRstop(TMR_DATAMOVE);
}
}
/* Possibly recheck for dead children so we don't get SIGPIPE
* on readerless channels. */
if (PROCneedscan)
PROCscan();
/* Ready to write? */
if (FD_ISSET(fd, &WCHANmask) && FD_ISSET(fd, &MyWrite)) {
count--;
if (cp->Type == CTfree) {
syslog(L_ERROR, "%s %d free but was in WMASK",
CHANname(cp), fd);
/* Don't call WCHANremove since cp->fd will be -1. */
FD_CLR(fd, &WCHANmask);
close(fd);
}
else {
bp = &cp->Out;
if (bp->left) {
cp->LastActive = Now.time;
i = CHANwrite(fd, &bp->data[bp->used], bp->left);
if (i <= 0) {
oerrno = errno;
p = CHANname(cp);
errno = oerrno;
if (i < 0)
sysnotice("%s cant write", p);
else
notice("%s cant write", p);
cp->BadWrites++;
if (i < 0 && oerrno == EPIPE) {
SITEchanclose(cp);
CHANclose(cp, p);
}
else if (i < 0 &&
(oerrno == EWOULDBLOCK
|| oerrno == EAGAIN)) {
WCHANremove(cp);
CHANwritesleep(cp, p);
}
else if (cp->BadWrites >= innconf->badiocount) {
syslog(L_ERROR, "%s sleeping", p);
WCHANremove(cp);
SCHANadd(cp,
Now.time + innconf->pauseretrytime,
NULL, CHANwakeup, NULL);
}
}
else {
cp->BadWrites = 0;
cp->BlockedWrites = 0;
bp->left -= i;
bp->used += i;
if (bp->left <= 0) {
WCHANremove(cp);
(*cp->WriteDone)(cp);
} else if (bp->used > (bp->size/COMP_THRESHOLD)) {
/* compact the buffer, shoving the
data back to the beginning.
<rmtodd@mailhost.ecn.ou.edu> */
buffer_set(bp, &bp->data[bp->used], bp->left);
}
}
}
else
/* Should not be possible. */
WCHANremove(cp);
}
}
/* Coming off a sleep? */
if (FD_ISSET(fd, &SCHANmask) && cp->Waketime <= Now.time) {
if (cp->Type == CTfree) {
syslog(L_ERROR,"%s ERROR s-select free %d",CHANname(cp),fd);
FD_CLR(fd, &SCHANmask);
close(fd);
} else {
cp->LastActive = Now.time;
SCHANremove(cp);
(*cp->Waker)(cp);
}
}
/* Toss CTreject channel early if it's inactive. */
if (cp->Type == CTreject
&& cp->LastActive + REJECT_TIMEOUT < Now.time) {
p = CHANname(cp);
syslog(L_NOTICE, "%s timeout reject", p);
CHANclose(cp, p);
}
/* Has this channel been inactive very long? */
if (cp->Type == CTnntp
&& cp->LastActive + cp->NextLog < Now.time) {
p = CHANname(cp);
silence = Now.time - cp->LastActive;
cp->NextLog += innconf->chaninacttime;
syslog(L_NOTICE, "%s inactive %ld", p, silence / 60L);
if (silence > innconf->peertimeout) {
syslog(L_NOTICE, "%s timeout", p);
CHANclose(cp, p);
}
}
/* Bump pointer, modulo the table size. */
if (fd >= lastfd)
fd = 0;
else
fd++;
/* If there is nothing to do, break out. */
if (count == 0 && SCHANcount == 0)
break;
} while (fd != startpoint);
}
}
syntax highlighted by Code2HTML, v. 0.9.1