relrecv.c

/** @file relrecv.c
 *  @data May 4, 2009
 */

#ifndef CONFIG_RELRECV
#error This is used for relrecv only.
#endif /* CONFIG_RELRECV */

#include "common.h"
#include "config.h"
#include "crc.h"
#include "protocol.h"
#include "udp.h"

/* ------------------------------------------------------------------------ */
/** A general purpose buffer for sending and receiving. */
static msg_buffer_t g_buffer;

/** The next expected DATA packet to receive. */
static u_long g_expected;

/** The file pointer. */
static FILE * g_file;

/** The INIT message. */
static msg_init_t g_init;

/** An array that records the number of time a DATA message arrives. */
static size_t * g_packets;

/** The client information. */
static peer_t g_peer;

/** The number of DATA packets retried. */
static size_t g_retries;

/** The START message. */
static msg_start_t g_start;

/* ------------------------------------------------------------------------ */
static int parse_args(int argc, const char * argv[], u_short * port);
static int poll_buffer(int * timeout_flag);
static int process_file(void);
static int process_socket(void);
static int receive_all_data(void);
static int receive_all_data_helper(void);
static int receive_data(u_long packets, int * received_flag);
static int send_abort(const peer_t * peer, u_long id, const char * reason);
static int send_buffer(const peer_t * peer, const char * msg_type);
static int send_retries(void);

/* ------------------------------------------------------------------------ */
/** The main entry point of the program.
 *
 *  @param argc The number of arguments.
 *  @param argv The arguments.
 *
 *  @return EXIT_SUCCESS or EXIT_FAILURE.
 */
int main(int argc, const char * argv[])
{
    u_short port;

    /* Parse the arguments; obtain the port number. */
    RETURN_IF_FAILURE(parse_args(argc, argv, &port));

    /* Create the UDP socket. */
    if (EXIT_SUCCESS != udp_start_server(port)) {
        fprintf(stderr, "Cannot bind to port %hu.\n", port);
        RETURN_FAILURE();
    }

    /* Receive files now that the socket is open. */
    if (EXIT_SUCCESS != process_socket()) {
        udp_stop();
        RETURN_FAILURE();
    }

    /* Destroy the UDP socket. */
    if (EXIT_SUCCESS != udp_stop()) {
        fprintf(stderr, "Error closing socket.\n");
        RETURN_FAILURE();
    }

    return EXIT_SUCCESS;
}

/* ------------------------------------------------------------------------ */
/** Parses the command line arguments.
 *
 *  @param argc The number of arguments.
 *  @param argv The arguments.
 *  @param port The returned port number.
 *
 *  @return EXIT_SUCCESS or EXIT_FAILURE.
 */
static int parse_args(int argc, const char * argv[], u_short * port)
{
    assert(NULL != argv);
    assert(NULL != port);

    /* Check the number of arguments. */
    if (argc != 2) {
        fprintf(stderr, "usage: relrecv port\n");
        RETURN_FAILURE();
    }

    /* Parse the port number. */
    if (1 != sscanf(argv[1], "%hu", port)) {
        fprintf(stderr, "Invalid port number '%s'.\n", argv[1]);
        RETURN_FAILURE();
    }

    return EXIT_SUCCESS;
}

/* ------------------------------------------------------------------------ */
/** Waits for a message from the host or a timeout.
 *
 *  @param timeout_flag A flag set to one if there is a timeout.
 *
 *  @return EXIT_SUCCESS or EXIT_FAILURE.
 */
static int poll_buffer(int * timeout_flag)
{
    double timeout;

    assert(NULL != timeout_flag);

    /* Calculate the absolute time for the timeout in units of clock_t. */
    timeout = (double)clock() + ((CLOCKS_PER_SEC / 1000.0) * RELRECV_TIMEOUT);

    /* Loop until failure, timeout, or received buffer. */
    for (;;) {
        peer_t peer;
        size_t ms;

        /* Calculate the remaining time in milliseconds. */
        ms = (size_t)((timeout - (double)clock()) * 1000.0 / CLOCKS_PER_SEC);
        if (EXIT_SUCCESS != udp_wait(ms, timeout_flag)) {
            fprintf(stderr, "Socket failure.\n");
            RETURN_FAILURE();
        }

        /* Return successfully if there is a timeout. */
        if (*timeout_flag == 1)
            break;

        /* Put the received data into g_buffer. */
        if (EXIT_SUCCESS != udp_recv(
            g_buffer.data,
            sizeof(g_buffer.data),
            &g_buffer.size,
            &peer)) {
            fprintf(stderr, "Socket failure.\n");
            RETURN_FAILURE();
        }

        /* Return successfully only if it's the right peer. */
        if (peer.address == g_peer.address && peer.port == g_peer.port)
            break;

        /* Send an ABORT to the unexpected peer. */
        RETURN_IF_FAILURE(send_abort(&peer, 0, "Receiver is busy."));
    }

    return EXIT_SUCCESS;
}

/* ------------------------------------------------------------------------ */
/** Performs all functionality while the file is open.
 *
 *  @return EXIT_SUCCESS or EXIT_FAILURE.
 */
static int process_file(void)
{
    msg_done_t done;
    u_long     checksum;

    /* Print some information about this session. */
    printf(
        "Transferring file 'r%s' (%lu bytes).\n",
        g_init.name,
        g_init.filesize);

    /* Mark the time when the transfer starts. */
    stopwatch_start();
    g_retries = 0;

    /* Build and send a START message with a new id. */
    g_start.type = MSG_TYPE_START;
    g_start.id++;
    msg_start_write(&g_buffer, &g_start);
    RETURN_IF_FAILURE(send_buffer(&g_peer, "START"));

    /* Receive all DATA messages. */
    RETURN_IF_FAILURE(receive_all_data());

    /* Compute the checksum. */
    if (EXIT_SUCCESS != crc_file(g_file, &checksum)) {
        RETURN_IF_FAILURE(send_abort(
            &g_peer,
            g_start.id,
            "Error computing checksum."));
        RETURN_FAILURE();
    }

    /* Verify the checksum. */
    if (checksum != g_init.checksum) {
        RETURN_IF_FAILURE(send_abort(
            &g_peer,
            g_start.id,
            "Checksum failure."));
        RETURN_FAILURE();
    }

    /* Build and send a DONE message. */
    done.type = MSG_TYPE_DONE;
    done.id = g_start.id;
    msg_done_write(&g_buffer, &done);
    RETURN_IF_FAILURE(send_buffer(&g_peer, "DONE"));

    /* Display statistics when the transfer completes. */
    printf("Transfer complete: 'r%s'.\n\n", g_init.name);
    stopwatch_stop(
        g_init.filesize,
        g_retries,
        protocol_packet_count(&g_init));

    return EXIT_SUCCESS;
}

/* ------------------------------------------------------------------------ */
/** Performs all functionality while the socket is open.
 *
 *  @return EXIT_SUCCESS or EXIT_FAILURE.
 */
static int process_socket(void)
{
    /* The main loop waits for new connections. */
    for (;;) {
        char path[NAME_SIZE + 1];

        /* Wait for a message to arrive. */
        if (EXIT_SUCCESS != udp_recv(
            g_buffer.data,
            sizeof(g_buffer.data),
            &g_buffer.size,
            &g_peer)) {
            fprintf(stderr, "Socket failure.\n");
            RETURN_FAILURE();
        }

        /* This should be a INIT message. */
        if (EXIT_SUCCESS != msg_init_read(&g_init, &g_buffer)) {
            RETURN_IF_FAILURE(send_abort(
                &g_peer,
                0,
                "Invalid INIT message."));
            continue;
        }

        /* Check if the file already exists. */
        sprintf(path, "r%s", g_init.name);
        if (NULL != (g_file = fopen(path, "rb"))) {
            fclose(g_file);
            g_file = NULL;
            RETURN_IF_FAILURE(send_abort(&g_peer, 0, "File already exists."));
            continue;
        }

        /* Create the file for writing and reading. */
        if (NULL == (g_file = fopen(path, "w+b"))) {
            fclose(g_file);
            g_file = NULL;
            RETURN_IF_FAILURE(send_abort(&g_peer, 0, "Cannot create file."));
            continue;
        }

        /* Receive the file now that the file is open. */
        if (EXIT_SUCCESS != process_file()) {
            fclose(g_file);
            g_file = NULL;

            /* Delete the file if the transfer failed. */
            remove(path);
            continue;
        }

        /* Close the file and wait for another connection. */
        if (0 != fclose(g_file))
            fprintf(stderr, "Warning: Cannot close file.\n");
        g_file = NULL;
    }
}

/* ------------------------------------------------------------------------ */
/** Receives all DATA messages.
 *
 *  @return EXIT_SUCCESS or EXIT_FAILURE.
 */
static int receive_all_data(void)
{
    /* Allocate and initialize the array of packet counts. */
    g_packets = (size_t *)calloc(
        protocol_packet_count(&g_init),
        sizeof(size_t));

    /* Check for memory failures. */
    if (NULL == g_packets) {
        RETURN_IF_FAILURE(send_abort(&g_peer, g_start.id, "Out of memory."));
        RETURN_FAILURE();
    }

    /* Receive all the DATA messages. */
    if (EXIT_SUCCESS != receive_all_data_helper()) {
        free(g_packets);
        g_packets = NULL;
        RETURN_FAILURE();
    }

    /* Free the array of packet counts before returning. */
    free(g_packets);
    g_packets = NULL;
    return EXIT_SUCCESS;
}

/* ------------------------------------------------------------------------ */
/** Receives all DATA messages.
 *
 *  @return EXIT_SUCCESS or EXIT_FAILURE.
 */
static int receive_all_data_helper(void)
{
    u_long packets;
    u_long remaining;
    size_t timeouts;

    /* Determine the total number of packets that will be sent. */
    packets = protocol_packet_count(&g_init);
    remaining = packets;

    /* The first expected packet. */
    g_expected = 0;

    /* Assuming no more than 10% of the DATA messages are lost, and assuming
     * no more than 10% of all retried DATA messages are lost, then we can
     * assume there are no more than the following number of timeouts.  Of
     * course, some of the RETRY messages are lost as well, but to keep it
     * simply, we round up a little.
     */
    timeouts = (size_t)log10((double)remaining) + 1;

    /* Loop while there are more packets. */
    while (remaining > 0) {
        int timeout_flag;
        int received_flag;

        /* Get a message or timeout. */
        RETURN_IF_FAILURE(poll_buffer(&timeout_flag));
        if (timeout_flag == 1) {

            /* Do not allow too many timeouts. */
            if (timeouts-- == 0) {
                RETURN_IF_FAILURE(send_abort(
                    &g_peer,
                    g_start.id,
                    "Too many timeouts."));
                RETURN_FAILURE();
            }

            /* Send all the RETRY messages. */
            g_expected = packets;
            RETURN_IF_FAILURE(send_retries());
            continue;
        }

        /* Check what type of message this is. */
        switch (g_buffer.data[0]) {
        case MSG_TYPE_DATA:
            break;

        /* Abort if this is an INIT message. */
        case MSG_TYPE_INIT:
            RETURN_IF_FAILURE(send_abort(
                &g_peer,
                g_start.id,
                "Unexpected INIT message."));
            RETURN_FAILURE();

        /* Abort if this is an unknown message. */
        default:
            RETURN_IF_FAILURE(send_abort(
                &g_peer,
                g_start.id,
                "Received invalid message."));
            RETURN_FAILURE();
        }

        /* Use a helper function to process the DATA message. */
        RETURN_IF_FAILURE(receive_data(packets, &received_flag));

        /* Keep track of the remaining number of packets. */
        if (received_flag == 1)
            remaining--;
    }

    return EXIT_SUCCESS;
}

/* ------------------------------------------------------------------------ */
/** Processes a DATA message in g_buffer.
 *
 *  @param packets The total number of packets.
 *  @param received_flag Set to 1 for new valid DATA packets.
 *
 *  @return EXIT_SUCCESS or EXIT_FAILURE.
 */
static int receive_data(u_long packets, int * received_flag)
{
    msg_data_t data;
    size_t     data_size;

    assert(NULL != received_flag);

    /* Assume this is not a new DATA packet. */
    *received_flag = 0;

    /* Parse the message; abort if bad. */
    if (EXIT_SUCCESS != msg_data_read(&data, &g_buffer)) {
        RETURN_IF_FAILURE(send_abort(
            &g_peer,
            g_start.id,
            "Received invalid DATA message."));
        RETURN_FAILURE();
    }

    /* Check the id; ignore if bad. */
    if (data.id != g_start.id) {
        fprintf(stderr, "Warning: Ignoring packet with bad id.\n");
        return EXIT_SUCCESS;
    }

    /* Check the DATA message index; abort if bad. */
    if (data.packet >= packets) {
        RETURN_IF_FAILURE(send_abort(
            &g_peer,
            g_start.id,
            "Received invalid DATA message index."));
        RETURN_FAILURE();
    }

    /* Check the data size; abort if bad. */
    data_size = protocol_data_size(&g_init, data.packet);
    if ((size_t)g_buffer.size != data_size + DATA_HEADER_SIZE) {
        RETURN_IF_FAILURE(send_abort(
            &g_peer,
            g_start.id,
            "Received invalid DATA message size."));
        RETURN_FAILURE();
    }

    /* Ignore packets received more than once. */
    if (++g_packets[data.packet] != 1)
        return EXIT_SUCCESS;

    /* Send RETRY message if DATA was skipped. */
    if (data.packet > g_expected) {
        msg_retry_t retry;

        retry.type = MSG_TYPE_RETRY;
        retry.id = g_start.id;
        retry.first = g_expected;
        retry.last = data.packet - 1;

        fprintf(
            stderr,
            "DATA skipped; sending RETRY message.\n"
            "Sending RETRY message for packets %lu to %lu.\n",
            retry.first,
            retry.last);

        g_retries += (retry.last - retry.first) + 1;

        msg_retry_write(&g_buffer, &retry);
        RETURN_IF_FAILURE(send_buffer(&g_peer, "RETRY"));
    }

    /* Save the new expected packet to receive. */
    if (data.packet >= g_expected)
        g_expected = data.packet + 1;

    /* Seek within the file; abort if failed. */
    if (0 != fseek(
        g_file,
        protocol_packet_offset(&g_init, data.packet),
        SEEK_SET)) {
        RETURN_IF_FAILURE(send_abort(
            &g_peer,
            g_start.id,
            "Error writing to file."));
        RETURN_FAILURE();
    }

    /* Write to the file; abort if failed. */
    if (data_size != fwrite(data.data, 1, data_size, g_file)) {
        RETURN_IF_FAILURE(send_abort(
            &g_peer,
            g_start.id,
            "Error writing to file."));
        RETURN_FAILURE();
    }

    /* Set the flag to indicate a new valid DATA message. */
    *received_flag = 1;
    return EXIT_SUCCESS;
}

/* ------------------------------------------------------------------------ */
/** Sends an ABORT message.
 *
 *  @param peer The peer.
 *  @param id The sequence id or 0.
 *  @param reason The reason for the aborting.
 *
 *  @return EXIT_SUCCESS or EXIT_FAILURE.
 */
static int send_abort(const peer_t * peer, u_long id, const char * reason)
{
    msg_abort_t abort;

    assert(NULL != peer);
    assert(NULL != reason);
    assert(strlen(reason) < sizeof(abort.reason));

    /* First display a warning to the console. */
    fprintf(stderr, "Warning: Sending ABORT message: %s\n", reason);

    /* Build and send the ABORT message. */
    abort.type = MSG_TYPE_ABORT;
    abort.id = id;
    strcpy(abort.reason, reason);
    msg_abort_write(&g_buffer, &abort);
    RETURN_IF_FAILURE(send_buffer(peer, "ABORT"));

    return EXIT_SUCCESS;
}

/* ------------------------------------------------------------------------ */
/** Sends g_buffer to the specified peer.
 *
 *  @param peer The peer.
 *  @param msg_type The type of message.
 *
 *  @return EXIT_SUCCESS or EXIT_FAILURE.
 */
static int send_buffer(const peer_t * peer, const char * msg_type)
{
    assert(NULL != peer);

    /* Send the buffer and check for errors. */
    if (EXIT_SUCCESS != udp_send(peer, g_buffer.data, g_buffer.size)) {
        fprintf(stderr, "Error sending %s message.\n", msg_type);
        RETURN_FAILURE();
    }

    /* Reset the global buffer for clarity in the debugger. */
    memset(&g_buffer, 0, sizeof(g_buffer));
    return EXIT_SUCCESS;
}

/* ------------------------------------------------------------------------ */
/** Send a series of RETRY messages for all missing packets.
 *
 *  @return EXIT_SUCCESS or EXIT_FAILURE.
 */
static int send_retries(void)
{
    msg_retry_t retry;
    u_long i;
    u_long count;

    /* These values don't change during the loop. */
    count = protocol_packet_count(&g_init);
    retry.type = MSG_TYPE_RETRY;
    retry.id = g_start.id;

    fprintf(stderr, "Timeout occurred; sending RETRY messages.\n");

    /* Looping over every packet, received or not. */
    for (i = 0; i < count; i++) {

        /* Skip over the received packets. */
        if (g_packets[i] != 0)
            continue;
        retry.first = i;

        /* Find the last missing packet inclusive. */
        while (++i < count)
            if (g_packets[i] != 0)
                break;
        retry.last = --i;

        /* Send the RETRY message. */
        msg_retry_write(&g_buffer, &retry);
        RETURN_IF_FAILURE(send_buffer(&g_peer, "RETRY"));

        /* Keep some statistics and display a warning. */
        g_retries += (retry.last - retry.first) + 1;
        fprintf(
            stderr,
            "Sending RETRY message for packets %lu to %lu.\n",
            retry.first,
            retry.last);
    }

    return EXIT_SUCCESS;
}
Valid HTML 4.01 Valid CSS