/*  $Id: innxmit.c 6716 2004-05-16 20:26:56Z rra $
**
**  Transmit articles to remote site.
**  Modified for NNTP streaming: 1996-01-03 Jerry Aguirre
*/

#include "config.h"
#include "clibrary.h"
#include "portable/socket.h"
#include "portable/time.h"
#include <ctype.h>
#include <errno.h>
#include <fcntl.h>
#include <setjmp.h>
#include <signal.h>
#include <syslog.h>
#include <sys/stat.h>
#include <sys/uio.h>

/* Needed on AIX 4.1 to get fd_set and friends. */
#ifdef HAVE_SYS_SELECT_H
# include <sys/select.h>
#endif

#include "inn/history.h"
#include "inn/innconf.h"
#include "inn/messages.h"
#include "inn/qio.h"
#include "inn/timer.h"
#include "inn/wire.h"
#include "libinn.h"
#include "nntp.h"
#include "paths.h"
#include "storage.h"

#define OUTPUT_BUFFER_SIZE	(16 * 1024)

/* Streaming extensions to NNTP.  This extension removes the lock-step
** limitation of conventional NNTP.  Article transfer is several times
** faster.  Negotiated and falls back to old mode if receiver refuses.
*/

/* max number of articles that can be streamed ahead */
#define STNBUF 32

/* Send "takethis" without "check" if this many articles were
** accepted in a row.
*/
#define STNC 16

/* typical number of articles to stream  */
/* must be able to fopen this many articles */
#define STNBUFL (STNBUF/2)

/* number of retries before requeueing to disk */
#define STNRETRY 5

struct stbufs {		/* for each article we are procesing */
	char *st_fname;		/* file name */
	char *st_id;		/* message ID */
	int   st_retry;		/* retry count */
	int   st_age;		/* age count */
	ARTHANDLE *art;		/* arthandle to read article contents */
	int   st_hash;		/* hash value to speed searches */
	long  st_size;		/* article size */
};
static struct stbufs stbuf[STNBUF]; /* we keep track of this many articles */
static int stnq;	/* current number of active entries in stbuf */
static long stnofail;	/* Count of consecutive successful sends */

static int TryStream = true;	/* Should attempt stream negotation? */
static int CanStream = false;	/* Result of stream negotation */
static int DoCheck   = true;	/* Should check before takethis? */
static char modestream[] = "mode stream";
static char modeheadfeed[] = "mode headfeed";
static long retries = 0;
static int logRejects = false ;  /* syslog the 437 responses. */



/*
** Syslog formats - collected together so they remain consistent
*/
static char	STAT1[] =
	"%s stats offered %lu accepted %lu refused %lu rejected %lu missing %lu accsize %.0f rejsize %.0f";
static char	STAT2[] = "%s times user %.3f system %.3f elapsed %.3f";
static char	GOT_BADCOMMAND[] = "%s rejected %s %s";
static char	REJECTED[] = "%s rejected %s (%s) %s";
static char	REJ_STREAM[] = "%s rejected (%s) %s";
static char	CANT_CONNECT[] = "%s connect failed %s";
static char	CANT_AUTHENTICATE[] = "%s authenticate failed %s";
static char	IHAVE_FAIL[] = "%s ihave failed %s";

static char	CANT_FINDIT[] = "%s can't find %s";
static char	CANT_PARSEIT[] = "%s can't parse ID %s";
static char	UNEXPECTED[] = "%s unexpected response code %s";

/*
**  Global variables.
*/
static bool		AlwaysRewrite;
static bool		Debug;
static bool		DoRequeue = true;
static bool		Purging;
static bool		STATprint;
static bool		HeadersFeed;
static char		*BATCHname;
static char		*BATCHtemp;
static char		*REMhost;
static double		STATbegin;
static double		STATend;
static FILE		*BATCHfp;
static int		FromServer;
static int		ToServer;
static struct history	*History;
static QIOSTATE		*BATCHqp;
static sig_atomic_t	GotAlarm;
static sig_atomic_t	GotInterrupt;
static sig_atomic_t	JMPyes;
static jmp_buf		JMPwhere;
static char		*REMbuffer;
static char		*REMbuffptr;
static char		*REMbuffend;
static unsigned long	STATaccepted;
static unsigned long	STAToffered;
static unsigned long	STATrefused;
static unsigned long	STATrejected;
static unsigned long	STATmissing;
static double		STATacceptedsize;
static double		STATrejectedsize;


/* Prototypes. */
static ARTHANDLE *article_open(const char *path, const char *id);
static void article_free(ARTHANDLE *);


/*
**  Return true if the history file has the article expired.
*/
static bool
Expired(char *MessageID) {
    return !HISlookup(History, MessageID, NULL, NULL, NULL, NULL);
}


/*
**  Flush and reset the site's output buffer.  Return false on error.
*/
static bool
REMflush(void)
{
    int		i;

    if (REMbuffptr == REMbuffer) return true; /* nothing buffered */
    i = xwrite(ToServer, REMbuffer, (int)(REMbuffptr - REMbuffer));
    REMbuffptr = REMbuffer;
    return i < 0 ? false : true;
}

/*
**  Return index to entry matching this message ID.  Else return -1.
**  The hash is to speed up the search.
**  the protocol.
*/
static int
stindex(char *MessageID, int hash) {
    int i;

    for (i = 0; i < STNBUF; i++) { /* linear search for ID */
	if ((stbuf[i].st_id) && (stbuf[i].st_id[0])
	 && (stbuf[i].st_hash == hash)) {
	    int n;

	    if (strcasecmp(MessageID, stbuf[i].st_id)) continue;

	    /* left of '@' is case sensitive */
	    for (n = 0; (MessageID[n] != '@') && (MessageID[n] != '\0'); n++) ;
	    if (strncmp(MessageID, stbuf[i].st_id, n)) continue;
	    else break;	/* found a match */
	}
    }
    if (i >= STNBUF) i = -1;  /* no match found ? */
    return (i);
}

/* stidhash(): calculate a hash value for message IDs to speed comparisons */
static int
stidhash(char *MessageID) {
    char	*p;
    int		hash;

    hash = 0;
    for (p = MessageID + 1; *p && (*p != '>'); p++) {
	hash <<= 1;
	if (isascii((int)*p) && isupper((int)*p)) {
	    hash += tolower(*p);
	} else {
	    hash += *p;
	}
    }
    return hash;
}

/* stalloc(): save path, ID, and qp into one of the streaming mode entries */
static int
stalloc(char *Article, char *MessageID, ARTHANDLE *art, int hash) {
    int i;

    for (i = 0; i < STNBUF; i++) {
	if ((!stbuf[i].st_fname) || (stbuf[i].st_fname[0] == '\0')) break;
    }
    if (i >= STNBUF) { /* stnq says not full but can not find unused */
	syslog(L_ERROR, "stalloc: Internal error");
	return (-1);
    }
    if ((int)strlen(Article) >= SPOOLNAMEBUFF) {
	syslog(L_ERROR, "stalloc: filename longer than %d", SPOOLNAMEBUFF);
	return (-1);
    }
    /* allocate buffers on first use.
    ** If filename ever is longer than SPOOLNAMEBUFF then code will abort.
    ** If ID is ever longer than NNTP_STRLEN then other code would break.
    */
    if (!stbuf[i].st_fname)
        stbuf[i].st_fname = xmalloc(SPOOLNAMEBUFF);
    if (!stbuf[i].st_id)
        stbuf[i].st_id = xmalloc(NNTP_STRLEN);
    strlcpy(stbuf[i].st_fname, Article, SPOOLNAMEBUFF);
    strlcpy(stbuf[i].st_id, MessageID, NNTP_STRLEN);
    stbuf[i].art = art;
    stbuf[i].st_hash = hash;
    stbuf[i].st_retry = 0;
    stbuf[i].st_age = 0;
    stnq++;
    return i;
}

/* strel(): release for reuse one of the streaming mode entries */
static void
strel(int i) {
	if (stbuf[i].art) {
            article_free(stbuf[i].art);
	    stbuf[i].art = NULL;
	}
	if (stbuf[i].st_id) stbuf[i].st_id[0] = '\0';
	if (stbuf[i].st_fname) stbuf[i].st_fname[0] = '\0';
	stnq--;
}

/*
**  Send a line to the server, adding the dot escape and \r\n.
*/
static bool
REMwrite(char *p, int i, bool escdot) {
    int	size;

    /* Buffer too full? */
    if (REMbuffend - REMbuffptr < i + 3) {
	if (!REMflush())
	    return false;
	if (REMbuffend - REMbuffer < i + 3) {
	    /* Line too long -- grow buffer. */
	    size = i * 2;
            REMbuffer = xrealloc(REMbuffer, size);
	    REMbuffend = &REMbuffer[size];
	}
    }

    /* Dot escape, text of the line, line terminator. */
    if (escdot && (*p == '.'))
	*REMbuffptr++ = '.';
    memcpy(REMbuffptr, p, i);
    REMbuffptr += i;
    *REMbuffptr++ = '\r';
    *REMbuffptr++ = '\n';

    return true;
}


/*
**  Print transfer statistics, clean up, and exit.
*/
static void
ExitWithStats(int x)
{
    static char		QUIT[] = "quit";
    double		usertime;
    double		systime;

    if (!Purging) {
	REMwrite(QUIT, strlen(QUIT), false);
	REMflush();
    }
    STATend = TMRnow_double();
    if (GetResourceUsage(&usertime, &systime) < 0) {
	usertime = 0;
	systime = 0;
    }

    if (STATprint) {
	printf(STAT1, REMhost, STAToffered, STATaccepted, STATrefused,
		STATrejected, STATmissing, STATacceptedsize, STATrejectedsize);
	printf("\n");
	printf(STAT2, REMhost, usertime, systime, STATend - STATbegin);
	printf("\n");
    }

    syslog(L_NOTICE, STAT1, REMhost, STAToffered, STATaccepted, STATrefused,
		STATrejected, STATmissing, STATacceptedsize, STATrejectedsize);
    syslog(L_NOTICE, STAT2, REMhost, usertime, systime, STATend - STATbegin);
    if (retries)
	syslog(L_NOTICE, "%s %lu Streaming retries", REMhost, retries);

    if (BATCHfp != NULL && unlink(BATCHtemp) < 0 && errno != ENOENT)
        syswarn("cannot remove %s", BATCHtemp);
    sleep(1);
    SMshutdown();
    HISclose(History);
    exit(x);
    /* NOTREACHED */
}


/*
**  Close the batchfile and the temporary file, and rename the temporary
**  to be the batchfile.
*/
static void
CloseAndRename(void)
{
    /* Close the files, rename the temporary. */
    if (BATCHqp) {
	QIOclose(BATCHqp);
	BATCHqp = NULL;
    }
    if (ferror(BATCHfp)
     || fflush(BATCHfp) == EOF
     || fclose(BATCHfp) == EOF) {
	unlink(BATCHtemp);
        syswarn("cannot close %s", BATCHtemp);
	ExitWithStats(1);
    }
    if (rename(BATCHtemp, BATCHname) < 0) {
        syswarn("cannot rename %s", BATCHtemp);
	ExitWithStats(1);
    }
}


/*
**  Requeue an article, opening the temp file if we have to.  If we get
**  a file write error, exit so that the original input is left alone.
*/
static void
Requeue(const char *Article, const char *MessageID)
{
    int fd;

    /* Temp file already open? */
    if (BATCHfp == NULL) {
        fd = mkstemp(BATCHtemp);
        if (fd < 0) {
            syswarn("cannot create a temporary file");
            ExitWithStats(1);
        }
        BATCHfp = fdopen(fd, "w");
        if (BATCHfp == NULL) {
            syswarn("cannot open %s", BATCHtemp);
            ExitWithStats(1);
        }
    }

    /* Called only to get the file open? */
    if (Article == NULL)
	return;

    if (MessageID != NULL)
	fprintf(BATCHfp, "%s %s\n", Article, MessageID);
    else
	fprintf(BATCHfp, "%s\n", Article);
    if (fflush(BATCHfp) == EOF || ferror(BATCHfp)) {
        syswarn("cannot requeue %s", Article);
	ExitWithStats(1);
    }
}


/*
**  Requeue an article then copy the rest of the batch file out.
*/
static void
RequeueRestAndExit(char *Article, char *MessageID) {
    char	*p;

    if (!AlwaysRewrite
     && STATaccepted == 0 && STATrejected == 0 && STATrefused == 0
     && STATmissing == 0) {
        warn("nothing sent -- leaving batchfile alone");
	ExitWithStats(1);
    }

    warn("rewriting batch file and exiting");
    if (CanStream) {	/* streaming mode has a buffer of articles */
	int i;

	for (i = 0; i < STNBUF; i++) {    /* requeue unacknowledged articles */
	    if ((stbuf[i].st_fname) && (stbuf[i].st_fname[0] != '\0')) {
		if (Debug)
		    fprintf(stderr, "stbuf[%d]= %s, %s\n",
			    i, stbuf[i].st_fname, stbuf[i].st_id);
		Requeue(stbuf[i].st_fname, stbuf[i].st_id);
		if (Article == stbuf[i].st_fname) Article = NULL;
		strel(i); /* release entry */
	    }
	}
    }
    Requeue(Article, MessageID);

    for ( ; BATCHqp; ) {
	if ((p = QIOread(BATCHqp)) == NULL) {
	    if (QIOtoolong(BATCHqp)) {
                warn("skipping long line in %s", BATCHname);
		QIOread(BATCHqp);
		continue;
	    }
	    if (QIOerror(BATCHqp)) {
                syswarn("cannot read %s", BATCHname);
		ExitWithStats(1);
	    }

	    /* Normal EOF. */
	    break;
	}

	if (fprintf(BATCHfp, "%s\n", p) == EOF
	 || ferror(BATCHfp)) {
            syswarn("cannot requeue %s", p);
	    ExitWithStats(1);
	}
    }

    CloseAndRename();
    ExitWithStats(1);
}


/*
**  Clean up the NNTP escapes from a line.
*/
static char *
REMclean(char *buff) {
    char	*p;

    if ((p = strchr(buff, '\r')) != NULL)
	*p = '\0';
    if ((p = strchr(buff, '\n')) != NULL)
	*p = '\0';

    /* The dot-escape is only in text, not command responses. */
    return buff;
}


/*
**  Read a line of input, with timeout.  Also handle \r\n-->\n mapping
**  and the dot escape.  Return true if okay, *or we got interrupted.*
*/
static bool
REMread(char *start, int size) {
    static int		count;
    static char		buffer[BUFSIZ];
    static char		*bp;
    char		*p;
    char		*q;
    char		*end;
    struct timeval	t;
    fd_set		rmask;
    int			i;
    char		c;

    if (!REMflush())
	return false;

    for (p = start, end = &start[size - 1]; ; ) {
	if (count == 0) {
	    /* Fill the buffer. */
    Again:
	    FD_ZERO(&rmask);
	    FD_SET(FromServer, &rmask);
	    t.tv_sec = 10 * 60;
	    t.tv_usec = 0;
	    i = select(FromServer + 1, &rmask, NULL, NULL, &t);
	    if (GotInterrupt)
		return true;
	    if (i < 0) {
		if (errno == EINTR)
		    goto Again;
		return false;
	    }
	    if (i == 0 || !FD_ISSET(FromServer, &rmask))
		return false;
	    count = read(FromServer, buffer, sizeof buffer);
	    if (GotInterrupt)
		return true;
	    if (count <= 0)
		return false;
	    bp = buffer;
	}

	/* Process next character. */
	count--;
	c = *bp++;
	if (c == '\n')
	    break;
	if (p < end)
	    *p++ = c;
    }

    /* We know we got \n; if previous char was \r, turn it into \n. */
    if (p > start && p < end && p[-1] == '\r')
	p[-1] = '\n';
    *p = '\0';

    /* Handle the dot escape. */
    if (*p == '.') {
	if (p[1] == '\n' && p[2] == '\0')
	    /* EOF. */
	    return false;
	for (q = &start[1]; (*p++ = *q++) != '\0'; )
	    continue;
    }
    return true;
}


/*
**  Handle the interrupt.
*/
static void
Interrupted(char *Article, char *MessageID) {
    warn("interrupted");
    RequeueRestAndExit(Article, MessageID);
}


/*
**  Returns the length of the headers.
*/
static int
HeadersLen(ARTHANDLE *art, int *iscmsg) {
    const char	*p;
    char	lastchar = -1;

    /* from nnrpd/article.c ARTsendmmap() */
    for (p = art->data; p < (art->data + art->len); p++) {
	if (*p == '\r')
	    continue;
	if (*p == '\n') {
	    if (lastchar == '\n') {
		if (*(p-1) == '\r')
		    p--;
		break;
	    }
	    if (*(p + 1) == 'C' && strncasecmp(p + 1, "Control: ", 9) == 0)
		*iscmsg = 1;
	}
	lastchar = *p;
    }
    return (p - art->data);
}


/*
**  Send a whole article to the server.
*/
static bool
REMsendarticle(char *Article, char *MessageID, ARTHANDLE *art) {
    char	buff[NNTP_STRLEN];

    if (!REMflush())
	return false;
    if (HeadersFeed) {
	struct iovec vec[3];
	char buf[20];
	int iscmsg = 0;
	int len = HeadersLen(art, &iscmsg);

	vec[0].iov_base = (char *) art->data;
	vec[0].iov_len = len;
	/* Add 14 bytes, which maybe will be the length of the Bytes header */
	snprintf(buf, sizeof(buf), "Bytes: %lu\r\n",
                 (unsigned long) art->len + 14);
	vec[1].iov_base = buf;
	vec[1].iov_len = strlen(buf);
	if (iscmsg) {
	    vec[2].iov_base = (char *) art->data + len;
	    vec[2].iov_len = art->len - len;
	} else {
	    vec[2].iov_base = (char *) "\r\n.\r\n";
	    vec[2].iov_len = 5;
	}
	if (xwritev(ToServer, vec, 3) < 0)
	    return false;
    } else
	if (xwrite(ToServer, art->data, art->len) < 0)
	    return false;
    if (GotInterrupt)
	Interrupted(Article, MessageID);
    if (Debug) {
	fprintf(stderr, "> [ article %lu ]\n", (unsigned long) art->len);
	fprintf(stderr, "> .\n");
    }

    if (CanStream) return true;	/* streaming mode does not wait for ACK */

    /* What did the remote site say? */
    if (!REMread(buff, (int)sizeof buff)) {
        syswarn("no reply after sending %s", Article);
	return false;
    }
    if (GotInterrupt)
	Interrupted(Article, MessageID);
    if (Debug)
	fprintf(stderr, "< %s", buff);

    /* Parse the reply. */
    switch (atoi(buff)) {
    default:
        warn("unknown reply after %s -- %s", Article, buff);
	if (DoRequeue)
	    Requeue(Article, MessageID);
	break;
    case NNTP_BAD_COMMAND_VAL:
    case NNTP_SYNTAX_VAL:
    case NNTP_ACCESS_VAL:
	/* The receiving server is likely confused...no point in continuing */
        syslog(L_FATAL, GOT_BADCOMMAND, REMhost, MessageID, REMclean(buff));
        RequeueRestAndExit(Article, MessageID);
        /* NOTREACHED */
    case NNTP_RESENDIT_VAL:
    case NNTP_GOODBYE_VAL:
	Requeue(Article, MessageID);
	break;
    case NNTP_TOOKIT_VAL:
	STATaccepted++;
	STATacceptedsize += (double)art->len;
	break;
    case NNTP_REJECTIT_VAL:
        if (logRejects)
            syslog(L_NOTICE, REJECTED, REMhost,
                   MessageID, Article, REMclean(buff));
	STATrejected++;
	STATrejectedsize += (double)art->len;
	break;
    }

    /* Article sent, or we requeued it. */
    return true;
}


/*
**  Get the Message-ID header from an open article.
*/
static char *
GetMessageID(ARTHANDLE *art) {
    static char	*buff;
    static int	buffsize = 0;
    const char	*p, *q;

    p = wire_findheader(art->data, art->len, "Message-ID");
    if (p == NULL)
	return NULL;
    for (q = p; q < art->data + art->len; q++) {
        if (*q == '\r' || *q == '\n')
            break;
    }
    if (q == art->data + art->len)
	return NULL;
    if (buffsize < q - p) {
	if (buffsize == 0)
	    buff = xmalloc(q - p + 1);
	else
            buff = xrealloc(buff, q - p + 1);
	buffsize = q - p;
    }
    memcpy(buff, p, q - p);
    buff[q - p] = '\0';
    return buff;
}


/*
**  Mark that we got interrupted.
*/
static RETSIGTYPE
CATCHinterrupt(int s) {
    GotInterrupt = true;

    /* Let two interrupts kill us. */
    xsignal(s, SIG_DFL);
}


/*
**  Mark that the alarm went off.
*/
static RETSIGTYPE
CATCHalarm(int s UNUSED)
{
    GotAlarm = true;
    if (JMPyes)
	longjmp(JMPwhere, 1);
}

/* check articles in streaming NNTP mode
** return true on failure.
*/
static bool
check(int i) {
    char	buff[NNTP_STRLEN];

    /* send "check <ID>" to the other system */
    snprintf(buff, sizeof(buff), "check %s", stbuf[i].st_id);
    if (!REMwrite(buff, (int)strlen(buff), false)) {
        syswarn("cannot check article");
	return true;
    }
    STAToffered++;
    if (Debug) {
	if (stbuf[i].st_retry)
	    fprintf(stderr, "> %s (retry %d)\n", buff, stbuf[i].st_retry);
	else
	    fprintf(stderr, "> %s\n", buff);
    }
    if (GotInterrupt)
	Interrupted(stbuf[i].st_fname, stbuf[i].st_id);

    /* That all.  Response is checked later by strlisten() */
    return false;
}

/* Send article in "takethis <id> streaming NNTP mode.
** return true on failure.
*/
static bool
takethis(int i) {
    char	buff[NNTP_STRLEN];

    if (!stbuf[i].art) {
        warn("internal error: null article for %s in takethis",
             stbuf[i].st_fname);
        return true;
    }
    /* send "takethis <ID>" to the other system */
    snprintf(buff, sizeof(buff), "takethis %s", stbuf[i].st_id);
    if (!REMwrite(buff, (int)strlen(buff), false)) {
        syswarn("cannot send takethis");
        return true;
    }
    if (Debug)
        fprintf(stderr, "> %s\n", buff);
    if (GotInterrupt)
        Interrupted((char *)0, (char *)0);
    if (!REMsendarticle(stbuf[i].st_fname, stbuf[i].st_id, stbuf[i].art))
        return true;
    stbuf[i].st_size = stbuf[i].art->len;
    article_free(stbuf[i].art); /* should not need file again */
    stbuf[i].art = 0;		/* so close to free descriptor */
    stbuf[i].st_age = 0;
    /* That all.  Response is checked later by strlisten() */
    return false;
}


/* listen for responses.  Process acknowledgments to remove items from
** the queue.  Also sends the articles on request.  Returns true on error.
** return true on failure.
*/
static bool
strlisten(void)
{
    int		resp;
    int		i;
    char	*id, *p;
    char	buff[NNTP_STRLEN];
    int		hash;

    while(true) {
	if (!REMread(buff, (int)sizeof buff)) {
            syswarn("no reply to check");
	    return true;
	}
	if (GotInterrupt)
	    Interrupted((char *)0, (char *)0);
	if (Debug)
	    fprintf(stderr, "< %s", buff);

	/* Parse the reply. */
	resp =  atoi(buff);
	/* Skip the 1XX informational messages */
	if ((resp >= 100) && (resp < 200)) continue;
	switch (resp) { /* first time is to verify it */
	case NNTP_ERR_GOTID_VAL:
	case NNTP_OK_SENDID_VAL:
	case NNTP_OK_RECID_VAL:
	case NNTP_ERR_FAILID_VAL:
	case NNTP_RESENDID_VAL:
	    if ((id = strchr(buff, '<')) != NULL) {
		p = strchr(id, '>');
		if (p) *(p+1) = '\0';
		hash = stidhash(id);
		i = stindex(id, hash);	/* find table entry */
		if (i < 0) { /* should not happen */
		    syslog(L_NOTICE, CANT_FINDIT, REMhost, REMclean(buff));
		    return (true); /* can't find it! */
		}
	    } else {
		syslog(L_NOTICE, CANT_PARSEIT, REMhost, REMclean(buff));
		return (true);
	    }
	    break;
	case NNTP_GOODBYE_VAL:
	    /* Most likely out of space -- no point in continuing. */
	    syslog(L_NOTICE, IHAVE_FAIL, REMhost, REMclean(buff));
	    return true;
	    /* NOTREACHED */
	default:
	    syslog(L_NOTICE, UNEXPECTED, REMhost, REMclean(buff));
	    if (Debug)
		fprintf(stderr, "Unknown reply \"%s\"",
						    buff);
	    return (true);
	}
	switch (resp) { /* now we take some action */
	case NNTP_RESENDID_VAL:	/* remote wants it later */
	    /* try again now because time has passed */
	    if (stbuf[i].st_retry < STNRETRY) {
		if (check(i)) return true;
		stbuf[i].st_retry++;
		stbuf[i].st_age = 0;
	    } else { /* requeue to disk for later */
		Requeue(stbuf[i].st_fname, stbuf[i].st_id);
		strel(i); /* release entry */
	    }
	    break;
	case NNTP_ERR_GOTID_VAL:	/* remote doesn't want it */
	    strel(i); /* release entry */
	    STATrefused++;
	    stnofail = 0;
	    break;
		
	case NNTP_OK_SENDID_VAL:	/* remote wants article */
	    if (takethis(i)) return true;
	    stnofail++;
	    break;

	case NNTP_OK_RECID_VAL:	/* remote received it OK */
	    STATacceptedsize += (double) stbuf[i].st_size;
	    strel(i); /* release entry */
	    STATaccepted++;
	    break;
		
	case NNTP_ERR_FAILID_VAL:
	    STATrejectedsize += (double) stbuf[i].st_size;
	    if (logRejects)
		syslog(L_NOTICE, REJ_STREAM, REMhost,
		    stbuf[i].st_fname, REMclean(buff));
/* XXXXX Caution THERE BE DRAGONS, I don't think this logs properly
   The message ID is returned in the peer response... so this is redundant
		    stbuf[i].st_id, stbuf[i].st_fname, REMclean(buff)); */
	    strel(i); /* release entry */
	    STATrejected++;
	    stnofail = 0;
	    break;
	}
	break;
    }
    return (false);
}

/*
**  Print a usage message and exit.
*/
static void
Usage(void)
{
    die("Usage: innxmit [-acdHlprs] [-t#] [-T#] host file");
}


/*
**  Open an article.  If the argument is a token, retrieve the article via
**  the storage API.  Otherwise, open the file and fake up an ARTHANDLE for
**  it.  Only fill in those fields that we'll need.  Articles not retrieved
**  via the storage API will have a type of TOKEN_EMPTY.
*/
static ARTHANDLE *
article_open(const char *path, const char *id)
{
    TOKEN token;
    ARTHANDLE *article;
    int fd, length;
    struct stat st;
    char *p;

    if (IsToken(path)) {
        token = TextToToken(path);
        article = SMretrieve(token, RETR_ALL);
        if (article == NULL) {
            if (SMerrno == SMERR_NOENT || SMerrno == SMERR_UNINIT)
                STATmissing++;
            else {
                warn("requeue %s: %s", path, SMerrorstr);
                Requeue(path, id);
            }
        }
        return article;
    } else {
        char *data;
        fd = open(path, O_RDONLY);
        if (fd < 0)
            return NULL;
        if (fstat(fd, &st) < 0) {
            syswarn("requeue %s", path);
            Requeue(path, id);
            return NULL;
        }
        article = xmalloc(sizeof(ARTHANDLE));
        article->type = TOKEN_EMPTY;
        article->len = st.st_size;
        data = xmalloc(article->len);
        if (xread(fd, data, article->len) < 0) {
            syswarn("requeue %s", path);
            free(data);
            free(article);
            close(fd);
            Requeue(path, id);
            return NULL;
        }
        close(fd);
        p = memchr(data, '\n', article->len);
        if (p == NULL || p == data) {
            warn("requeue %s: cannot find headers", path);
            free(data);
            free(article);
            Requeue(path, id);
            return NULL;
        }
        if (p[-1] != '\r') {
            p = ToWireFmt(data, article->len, (size_t *)&length);
            free(data);
            data = p;
            article->len = length;
        }
        article->data = data;
        return article;
    }
}


/*
**  Free an article, using the type field to determine whether to free it
**  via the storage API.
*/
static void
article_free(ARTHANDLE *article)
{
    if (article->type == TOKEN_EMPTY) {
        free((char *)article->data);
        free(article);
    } else
        SMfreearticle(article);
}


int main(int ac, char *av[]) {
    static char		SKIPPING[] = "Skipping \"%s\" --%s?\n";
    int	                i;
    char	        *p;
    ARTHANDLE		*art;
    FILE		*From;
    FILE		*To;
    char		buff[8192+128];
    char		*Article;
    char		*MessageID;
    RETSIGTYPE		(*old)(int) = NULL;
    unsigned int	ConnectTimeout;
    unsigned int	TotalTimeout;
    int                 port = NNTP_PORT;
    bool		val;
    char                *path;

    openlog("innxmit", L_OPENLOG_FLAGS | LOG_PID, LOG_INN_PROG);
    message_program_name = "innxmit";

    /* Set defaults. */
    if (!innconf_read(NULL))
        exit(1);

    ConnectTimeout = 0;
    TotalTimeout = 0;
    
    umask(NEWSUMASK);

    /* Parse JCL. */
    while ((i = getopt(ac, av, "lacdHprst:T:vP:")) != EOF)
	switch (i) {
	default:
	    Usage();
	    /* NOTREACHED */
	case 'P':
	    port = atoi(optarg);
	    break;
	case 'a':
	    AlwaysRewrite = true;
	    break;
	case 'c':
	    DoCheck = false;
	    break;
	case 'd':
	    Debug = true;
	    break;
	case 'H':
	    HeadersFeed = true;
	    break;
        case 'l':
            logRejects = true ;
            break ;
	case 'p':
	    AlwaysRewrite = true;
	    Purging = true;
	    break;
	case 'r':
	    DoRequeue = false;
	    break;
	case 's':
	    TryStream = false;
	    break;
	case 't':
	    ConnectTimeout = atoi(optarg);
	    break;
	case 'T':
	    TotalTimeout = atoi(optarg);
	    break;
	case 'v':
	    STATprint = true;
	    break;
	}
    ac -= optind;
    av += optind;

    /* Parse arguments; host and filename. */
    if (ac != 2)
	Usage();
    REMhost = av[0];
    BATCHname = av[1];

    if (chdir(innconf->patharticles) < 0)
        sysdie("cannot cd to %s", innconf->patharticles);

    val = true;
    if (!SMsetup(SM_PREOPEN,(void *)&val))
        die("cannot set up the storage manager");
    if (!SMinit())
        die("cannot initialize the storage manager: %s", SMerrorstr);

    /* Open the batch file and lock others out. */
    if (BATCHname[0] != '/') {
        BATCHname = concatpath(innconf->pathoutgoing, av[1]);
    }
    if (((i = open(BATCHname, O_RDWR)) < 0) || ((BATCHqp = QIOfdopen(i)) == NULL)) {
        syswarn("cannot open %s", BATCHname);
	SMshutdown();
	exit(1);
    }
    if (!inn_lock_file(QIOfileno(BATCHqp), INN_LOCK_WRITE, true)) {
#if	defined(EWOULDBLOCK)
	if (errno == EWOULDBLOCK) {
	    SMshutdown();
	    exit(0);
	}
#endif	/* defined(EWOULDBLOCK) */
        syswarn("cannot lock %s", BATCHname);
	SMshutdown();
	exit(1);
    }

    /* Get a temporary name in the same directory as the batch file. */
    p = strrchr(BATCHname, '/');
    *p = '\0';
    BATCHtemp = concatpath(BATCHname, "bchXXXXXX");
    *p = '/';

    /* Set up buffer used by REMwrite. */
    REMbuffer = xmalloc(OUTPUT_BUFFER_SIZE);
    REMbuffend = &REMbuffer[OUTPUT_BUFFER_SIZE];
    REMbuffptr = REMbuffer;

    /* Start timing. */
    STATbegin = TMRnow_double();

    if (!Purging) {
	/* Open a connection to the remote server. */
	if (ConnectTimeout) {
	    GotAlarm = false;
	    old = xsignal(SIGALRM, CATCHalarm);
            if (setjmp(JMPwhere)) {
                warn("cannot connect to %s: timed out", REMhost);
                SMshutdown();
                exit(1);
            }
	    JMPyes = true;
	    alarm(ConnectTimeout);
	}
	if (NNTPconnect(REMhost, port, &From, &To, buff) < 0 || GotAlarm) {
	    i = errno;
            warn("cannot connect to %s: %s", REMhost,
                 buff[0] ? REMclean(buff) : strerror(errno));
	    if (GotAlarm)
		syslog(L_NOTICE, CANT_CONNECT, REMhost, "timeout");
	    else 
		syslog(L_NOTICE, CANT_CONNECT, REMhost,
		    buff[0] ? REMclean(buff) : strerror(i));
	    SMshutdown();
	    exit(1);
	}
	if (Debug)
	    fprintf(stderr, "< %s\n", REMclean(buff));
	if (NNTPsendpassword(REMhost, From, To) < 0 || GotAlarm) {
	    i = errno;
            syswarn("cannot authenticate with %s", REMhost);
	    syslog(L_ERROR, CANT_AUTHENTICATE,
		REMhost, GotAlarm ? "timeout" : strerror(i));
	    /* Don't send quit; we want the remote to print a message. */
	    SMshutdown();
	    exit(1);
	}
	if (ConnectTimeout) {
	    alarm(0);
	    xsignal(SIGALRM, old);
	    JMPyes = false;
	}

	/* We no longer need standard I/O. */
	FromServer = fileno(From);
	ToServer = fileno(To);

	if (TryStream) {
	    if (!REMwrite(modestream, (int)strlen(modestream), false)) {
                syswarn("cannot negotiate %s", modestream);
	    }
	    if (Debug)
		fprintf(stderr, ">%s\n", modestream);
	    /* Does he understand mode stream? */
	    if (!REMread(buff, (int)sizeof buff)) {
                syswarn("no reply to %s", modestream);
	    } else {
		if (Debug)
		    fprintf(stderr, "< %s", buff);

		/* Parse the reply. */
		switch (atoi(buff)) {
		default:
                    warn("unknown reply to %s -- %s", modestream, buff);
		    CanStream = false;
		    break;
		case NNTP_OK_STREAM_VAL:	/* YES! */
		    CanStream = true;
		    break;
                case NNTP_AUTH_NEEDED_VAL: /* authentication refusal */
		case NNTP_BAD_COMMAND_VAL: /* normal refusal */
		    CanStream = false;
		    break;
		}
	    }
	    if (CanStream) {
		for (i = 0; i < STNBUF; i++) { /* reset buffers */
		    stbuf[i].st_fname = 0;
		    stbuf[i].st_id = 0;
		    stbuf[i].art = 0;
		}
		stnq = 0;
	    }
	}
	if (HeadersFeed) {
	    if (!REMwrite(modeheadfeed, strlen(modeheadfeed), false))
                syswarn("cannot negotiate %s", modeheadfeed);
	    if (Debug)
		fprintf(stderr, ">%s\n", modeheadfeed);
	    if (!REMread(buff, sizeof buff)) {
                syswarn("no reply to %s", modeheadfeed);
	    } else {
		if (Debug)
		    fprintf(stderr, "< %s", buff);

		/* Parse the reply. */
		switch (atoi(buff)) {
		case 250:		/* YES! */
		    break;
		case NNTP_BAD_COMMAND_VAL: /* normal refusal */
                    die("%s not allowed -- %s", modeheadfeed, buff);
		default:
                    die("unknown reply to %s -- %s", modeheadfeed, buff);
		}
	    }
	}
    }

    /* Set up signal handlers. */
    xsignal(SIGHUP, CATCHinterrupt);
    xsignal(SIGINT, CATCHinterrupt);
    xsignal(SIGTERM, CATCHinterrupt);
    xsignal(SIGPIPE, SIG_IGN);
    if (TotalTimeout) {
	xsignal(SIGALRM, CATCHalarm);
	alarm(TotalTimeout);
    }

    path = concatpath(innconf->pathdb, _PATH_HISTORY);
    History = HISopen(path, innconf->hismethod, HIS_RDONLY);
    free(path);

    /* Main processing loop. */
    GotInterrupt = false;
    GotAlarm = false;
    for (Article = NULL, MessageID = NULL; ; ) {
	if (GotAlarm) {
            warn("timed out");
	    /* Don't resend the current article. */
	    RequeueRestAndExit((char *)NULL, (char *)NULL);
	}
	if (GotInterrupt)
	    Interrupted(Article, MessageID);

	if ((Article = QIOread(BATCHqp)) == NULL) {
	    if (QIOtoolong(BATCHqp)) {
                warn("skipping long line in %s", BATCHname);
		QIOread(BATCHqp);
		continue;
	    }
	    if (QIOerror(BATCHqp)) {
                syswarn("cannot read %s", BATCHname);
		ExitWithStats(1);
	    }

	    /* Normal EOF -- we're done. */
	    QIOclose(BATCHqp);
	    BATCHqp = NULL;
	    break;
	}

	/* Ignore blank lines. */
	if (*Article == '\0')
	    continue;

	/* Split the line into possibly two fields. */
	if (Article[0] == '/'
	 && Article[strlen(innconf->patharticles)] == '/'
	 && strncmp(Article, innconf->patharticles, strlen(innconf->patharticles)) == 0)
	    Article += strlen(innconf->patharticles) + 1;
	if ((MessageID = strchr(Article, ' ')) != NULL) {
	    *MessageID++ = '\0';
	    if (*MessageID != '<'
		|| (p = strrchr(MessageID, '>')) == NULL
		|| *++p != '\0') {
                warn("ignoring line %s %s...", Article, MessageID);
		continue;
	    }
	}

	if (*Article == '\0') {
	    if (MessageID)
                warn("empty file name for %s in %s", MessageID, BATCHname);
	    else
                warn("empty file name, no message ID in %s", BATCHname);
	    /* We could do a history lookup. */
	    continue;
	}

	if (Purging && MessageID != NULL && !Expired(MessageID)) {
	    Requeue(Article, MessageID);
	    continue;
	}

        /* Drop articles with a message ID longer than NNTP_MSGID_MAXLEN to
           avoid overrunning buffers and throwing the server on the
           receiving end a blow from behind. */
        if (MessageID != NULL && strlen(MessageID) > NNTP_MSGID_MAXLEN) {
            warn("dropping article in %s: long message ID %s", BATCHname,
                 MessageID);
            continue;
        }

        art = article_open(Article, MessageID);
        if (art == NULL)
	    continue;

	if (Purging) {
            article_free(art);
	    Requeue(Article, MessageID);
	    continue;
	}

	/* Get the Message-ID from the article if we need to. */
	if (MessageID == NULL) {
	    if ((MessageID = GetMessageID(art)) == NULL) {
                warn(SKIPPING, Article, "no message ID");
                article_free(art);
		continue;
	    }
	}
	if (GotInterrupt)
	    Interrupted(Article, MessageID);

	/* Offer the article. */
	if (CanStream) {
	    int lim;
	    int hash;

	    hash = stidhash(MessageID);
	    if (stindex(MessageID, hash) >= 0) { /* skip duplicates in queue */
		if (Debug)
		    fprintf(stderr, "Skipping duplicate ID %s\n",
							    MessageID);
                article_free(art);
		continue;
	    }
	    /* This code tries to optimize by sending a burst of "check"
	     * commands before flushing the buffer.  This should result
	     * in several being sent in one packet reducing the network
	     * overhead.
	     */
	    if (DoCheck && (stnofail < STNC)) lim = STNBUF;
	    else                              lim = STNBUFL;
	    if (stnq >= lim) { /* need to empty a buffer */
		while (stnq >= STNBUFL) { /* or several */
		    if (strlisten()) {
			RequeueRestAndExit(Article, MessageID);
		    }
		}
	    }
	    /* save new article in the buffer */
	    i = stalloc(Article, MessageID, art, hash);
	    if (i < 0) {
                article_free(art);
		RequeueRestAndExit(Article, MessageID);
	    }
	    if (DoCheck && (stnofail < STNC)) {
		if (check(i)) {
		    RequeueRestAndExit((char *)NULL, (char *)NULL);
		}
	    } else {
                STAToffered++ ;
		if (takethis(i)) {
		    RequeueRestAndExit((char *)NULL, (char *)NULL);
		}
	    }
	    /* check for need to resend any IDs */
	    for (i = 0; i < STNBUF; i++) {
		if ((stbuf[i].st_fname) && (stbuf[i].st_fname[0] != '\0')) {
		    if (stbuf[i].st_age++ > stnq) {
			/* This should not happen but just in case ... */
			if (stbuf[i].st_retry < STNRETRY) {
			    if (check(i)) /* resend check */
				RequeueRestAndExit((char *)NULL, (char *)NULL);
			    retries++;
			    stbuf[i].st_retry++;
			    stbuf[i].st_age = 0;
			} else { /* requeue to disk for later */
			    Requeue(stbuf[i].st_fname, stbuf[i].st_id);
			    strel(i); /* release entry */
			}
		    }
		}
	    }
	    continue; /* next article */
	}
	snprintf(buff, sizeof(buff), "ihave %s", MessageID);
	if (!REMwrite(buff, (int)strlen(buff), false)) {
            syswarn("cannot offer article");
            article_free(art);
	    RequeueRestAndExit(Article, MessageID);
	}
	STAToffered++;
	if (Debug)
	    fprintf(stderr, "> %s\n", buff);
	if (GotInterrupt)
	    Interrupted(Article, MessageID);

	/* Does he want it? */
	if (!REMread(buff, (int)sizeof buff)) {
            syswarn("no reply to ihave");
            article_free(art);
	    RequeueRestAndExit(Article, MessageID);
	}
	if (GotInterrupt)
	    Interrupted(Article, MessageID);
	if (Debug)
	    fprintf(stderr, "< %s", buff);

	/* Parse the reply. */
	switch (atoi(buff)) {
	default:
            warn("unknown reply to %s -- %s", Article, buff);
	    if (DoRequeue)
		Requeue(Article, MessageID);
	    break;
        case NNTP_BAD_COMMAND_VAL:
        case NNTP_SYNTAX_VAL:
        case NNTP_ACCESS_VAL:
            /* The receiving server is likely confused...no point in continuing */
            syslog(L_FATAL, GOT_BADCOMMAND, REMhost, MessageID, REMclean(buff));
	    RequeueRestAndExit(Article, MessageID);
	    /* NOTREACHED */
        case NNTP_AUTH_NEEDED_VAL:
	case NNTP_RESENDIT_VAL:
	case NNTP_GOODBYE_VAL:
	    /* Most likely out of space -- no point in continuing. */
	    syslog(L_NOTICE, IHAVE_FAIL, REMhost, REMclean(buff));
	    RequeueRestAndExit(Article, MessageID);
	    /* NOTREACHED */
	case NNTP_SENDIT_VAL:
	    if (!REMsendarticle(Article, MessageID, art))
		RequeueRestAndExit(Article, MessageID);
	    break;
	case NNTP_HAVEIT_VAL:
	    STATrefused++;
	    break;
#if	defined(NNTP_SENDIT_LATER)
	case NNTP_SENDIT_LATER_VAL:
	    Requeue(Article, MessageID);
	    break;
#endif	/* defined(NNTP_SENDIT_LATER) */
	}

        article_free(art);
    }
    if (CanStream) { /* need to wait for rest of ACKs */
	while (stnq > 0) {
	    if (strlisten()) {
		RequeueRestAndExit((char *)NULL, (char *)NULL);
	    }
	}
    }

    if (BATCHfp != NULL)
	/* We requeued something, so close the temp file. */
	CloseAndRename();
    else if (unlink(BATCHname) < 0 && errno != ENOENT)
        syswarn("cannot remove %s", BATCHtemp);
    ExitWithStats(0);
    /* NOTREACHED */
    return 0;
}


syntax highlighted by Code2HTML, v. 0.9.1