p2p.c

/*
 * File Name:     p2p.c
 * Author:        Jade Cheng
 * Date:          March 29, 2009
 * Course:        ICS 451
 * Assignment:    Project 2
 */

#include "common.h"
#include "data.h"
#include "log.h"
#include "messages.h"
#include "timehelp.h"
#include "udp.h"

/** The minimum number of arguments. */
#define MIN_ARGS 2

/** The standard input file descriptor. */
#define STDIN_FD 0

/** The time interval for sending listing messages. */
#define LISTING_INTERVAL 240

static void listing_msg_create(listing_msg_t * msg);
static int main_loop(void);
static int parse_uchar(const char * s, char ** endp, u_char * result);
static int parse_ushort(const char * s, char ** endp, u_short * result);
static int parse_peer(const char * s, peer_t * result);
static void print_prompt(void);
static int process_stdin(void);
static int process_udp(void);
static int receive_data(msg_buf_t * buf, const peer_t * peer);
static int receive_listing(msg_buf_t * buf, const peer_t * peer);
static int receive_request(msg_buf_t * buf, const peer_t * peer);
static int receive_try(msg_buf_t * buf, const peer_t * peer);
static void send_listing(void);
static int send_request(const char * name);
static void send_request_to_peer(
    const peer_t * to,
    const void *   buffer,
    size_t         size);

/** The peers obtained from the command line. */
static peers_t g_peers;

/** The local content obtained from the command line. */
static local_content_t g_local_content;

/** The data cache. */
static data_cache_t g_data_cache;

/** The content directory cache. */
static content_dir_t g_content_dir;

/** The outstanding request cache. */
static request_cache_t g_requests;

/* ------------------------------------------------------------------------ */
/**
 * This is the main entry of the program.
 *
 * @param argc The number of arguments.
 * @param argv The arguments.
 *
 * @return EXIT_SUCCESS or EXIT_FAILURE.
 */
extern int main(int argc, const char * argv[])
{
    char *  end;
    int     i;
    peer_t  peer;
    u_short port;

#ifdef DEBUG
    /* Print out a message if the program is running in debug mode. */
    printf("Running in DEBUG mode...\n");
#endif

    /* Check argc and print out the usage if program is not called right. */
    if (argc < MIN_ARGS || argc > MAX_ARGS) {
        fprintf(stderr, "usage: p2p port [options]\n");
        RETURN_FAILURE();
    }

    /* Parse the second argument, the listening port for UDP. */
    if (EXIT_SUCCESS != parse_ushort(argv[1], &end, &port)) {
        fprintf(stderr, "usage: port number type.\n");
        RETURN_FAILURE();
    }

    /* Examine the later parameters. */
    for (i = 2; i < argc; i++) {
        const char * arg = argv[i];

        /* If the argument contains a '/', parse it as a peer and add it to
         * the peers collection if the parsing is successful.  Ortherwise
         * report the error.
         */
        if (strchr(arg, '/') != NULL
            && EXIT_SUCCESS == parse_peer(arg, &peer)) {

            if (EXIT_SUCCESS != peers_add(&g_peers, &peer)) {
                fprintf(stderr, "Cannot add '%s' to list of peers.\n", arg);
                RETURN_FAILURE();
            }

        /* If the argument doesn't contain a '/', parse it as a file name and
         * add it to the local content if the parsing is successful.
         */
        } else if (EXIT_SUCCESS == verify_name(arg)) {
            if (EXIT_SUCCESS != local_content_add(&g_local_content, arg)) {
                fprintf(stderr, "Cannot add '%s' to local content.\n", arg);
                RETURN_FAILURE();
            }

        /* Anything other than a peer and a file name is not allowed. */
        } else {
            fprintf(stderr, "Invalid command line argument '%s'.\n", arg);
            RETURN_FAILURE();
        }
    }

    /* Initialize the log file. */
    log_init();

    /* Print out the welcome message. */
    printf("\n");
    printf("Jade p2p\n");
    printf("\n");
    printf("  Type '!quit' to quit.\n");
    printf("  Type '!info' to display information.\n");
    printf("  Type anything else to make a request.\n");
    printf("\n");

    /* Start UDP, execute the main loop, and stop UDP before terminating. */
    udp_start(port);
    main_loop();
    udp_stop();
    return EXIT_SUCCESS;
}

/* ------------------------------------------------------------------------ */
/**
 * Creates a listing message.
 *
 * @param msg The listing message.
 */
static void listing_msg_create(listing_msg_t * msg)
{
    size_t i;

    assert(msg != NULL);
    assert(g_local_content.count + g_data_cache.count <= MAX_LISTING_COUNT);

    memset(msg, 0, sizeof(*msg));

    /* Add all items from the local content. */
    for (i = 0; i < g_local_content.count; i++)
        strcpy(msg->entries[msg->count++], g_local_content.items[i].name);

    /* Add all items from the data cache. */
    for (i = 0; i < g_data_cache.count; i++)
        strcpy(msg->entries[msg->count++], g_data_cache.items[i].name);
}

/* ------------------------------------------------------------------------ */
/**
 * Performs the main loop of the program.
 *
 * @return EXIT_SUCCESS or EXIT_FAILURE.
 */
static int main_loop(void)
{
    timeval listing_timeout;

    printf("(p2p) ");
    fflush(stdout);

    /* Send a listing message when p2p starts, and compute the time to send
     * the next listing message.
     */
    send_listing();
    listing_timeout = time_add(time_now(), time_seconds(LISTING_INTERVAL));

    for (;;) {
        int     fd;
        timeval now;
        fd_set  readfds;
        int     result;
        timeval request_timeout;
        timeval select_timeout;

        /* The select function waits for standard input or the UDP socket. */
        fd = udp_fd();
        FD_ZERO(&readfds);
        FD_SET(STDIN_FD, &readfds);
        FD_SET(fd, &readfds);

        /* Send a listing message when current time passes the last computed
         * time to send a listing message.  The next time to send is the last
         * computed time plus a listing message interval.
          */
        now = time_now();
        if (time_cmp(now, listing_timeout) > 0) {
            send_listing();
            listing_timeout = time_add(
                listing_timeout,
                time_seconds(LISTING_INTERVAL));
        }

        /* Set the timeout to the next time to send a listing message. */
        select_timeout = time_sub(listing_timeout, now);
        assert(time_cmp(select_timeout, time_seconds(0)) > 0);

        /* Alternatively, if a request timeout occurs sooner, use this as the
         * select timeout.
         */
        if (EXIT_SUCCESS == request_get_next_timeout(
            &g_requests,
            &request_timeout)) {

            request_timeout = time_sub(request_timeout, now);

            if (time_cmp(request_timeout, select_timeout) < 0)
                select_timeout = request_timeout;
        }

        /* Wait for standard input, UDP input, or a timeout. */
        result = select(fd + 1, &readfds, NULL, NULL, &select_timeout);
        RETURN_IF_FALSE(result != -1);
        assert(result >= 0);

        /* Expire the outdated requests, and at the same time clean up the
         * content directory cache collection every time select returns.
         */
        if (EXIT_SUCCESS != request_expire(&g_requests, &g_content_dir))
            if (!FD_ISSET(STDIN_FD, &readfds))
                print_prompt();

        /* Process standard input if necessary. */
        if (FD_ISSET(STDIN_FD, &readfds)) {
            if (EXIT_SUCCESS != process_stdin())
                break;
        }

        /* Process UDP input if necessary. */
        if (FD_ISSET(fd, &readfds))
            RETURN_IF_FAILED(process_udp());
    }

    return EXIT_SUCCESS;
}

/* ------------------------------------------------------------------------ */
static int parse_uchar(const char * s, char ** endp, u_char * result)
{
    u_long temp;

    temp = strtoul(s, endp, 10);
    if (*endp == NULL ||errno != 0 || *endp == s) {
        RETURN_FAILURE();
    }

    if (temp > UCHAR_MAX) {
        RETURN_FAILURE();
    }

    *result = (u_char)temp;

    return EXIT_SUCCESS;
}

/* ------------------------------------------------------------------------ */
/**
 * Parses an unsigned short from the command line.
 *
 * @param s The source string.
 * @param endp The end pointer.
 * @param result The result.
 *
 * @return EXIT_SUCCESS or EXIT_FAILURE.
 */
static int parse_ushort(const char * s, char ** endp, u_short * result)
{
    u_long temp;

    temp = strtoul(s, endp, 10);
    if (*endp == NULL || errno != 0 || *endp == s) {
        RETURN_FAILURE();
    }

    if (temp > USHRT_MAX) {
        RETURN_FAILURE();
    }

    *result = (u_short)temp;

    return EXIT_SUCCESS;
}

/* ------------------------------------------------------------------------ */
/**
 * Parses a peer from the command line.
 *
 * @param s The source string.
 * @param result The peer.
 *
 * @return EXIT_SUCCESS or EXIT_FAILURE.
 */
 static int parse_peer(const char * s, peer_t * result)
{
    char    buf[16];
    char *  end;
    int     i;
    u_char  num[4];
    u_short port;

    /* Read the four numbers. */
    for (i = 0; i < 4; i ++) {
        if (EXIT_SUCCESS != parse_uchar(s, &end, &num[i])) {
            RETURN_FAILURE();
        }

        if (i != 3) {
            if (*end != '.') {
                RETURN_FAILURE();
            }
        } else {
            if (*end != '/') {
                RETURN_FAILURE();
            }
            s = end + 1;
            if (EXIT_SUCCESS != parse_ushort(s, &end, &port)) {
                RETURN_FAILURE();
            }
            if (*end != '\0') {
                printf("end = %c\n", *end);
                RETURN_FAILURE();
            }
        }

        s = end + 1;
    }

    /* Use inet_pton to parse the peer address. */
    sprintf(buf, "%u.%u.%u.%u", num[0], num[1], num[2], num[3]);
    inet_pton(AF_INET, buf, &result->addr);
    result->port = port;

    return EXIT_SUCCESS;
}

/* ------------------------------------------------------------------------ */
/**
 * Prints a prompt and flushes standard output.
 */
static void print_prompt(void)
{
    printf("(p2p) ");
    fflush(stdout);
}

/* ------------------------------------------------------------------------ */
/**
 * Processes incoming standard input data.  This functions handles special
 * commands like "!quit" and "!info".  Anything else is treated as file name
 * requests.  This function returns EXIT_FAILURE when the program should
 * terminate.
 *
 * @return EXIT_SUCCESS or EXIT_FAILURE.
 */
static int process_stdin(void)
{
    size_t count;
    char   line[1000];

    line[0] = '\0';
    count = 0;

    /* Read until nothing to read or '\n' is reached. */
    for (;;) {
        char    ch;
        ssize_t r;

        /* Check for no more characters. */
        r = read(STDIN_FD, &ch, 1);
        RETURN_IF_FALSE(r != -1);
        assert(r >= 0);
        if (r == 0)
            break;

        /* Check for an end of line. */
        assert(r == 1);
        if (ch == '\n')
            break;

        /* Otherwise, add the character to the line. */
        if (count < sizeof(line) - 1) {
            line[count++] = ch;
            line[count] = '\0';
        }
    }

    /* Quit if "!quit" is entered. */
    if (0 == strcmp(line, "!quit"))
        return EXIT_FAILURE;

    /* Prints out collection contents if "!info" is entered. */
    if (0 == strcmp(line, "!info")) {
        local_content_print(&g_local_content);
        printf("\n");
        data_cache_print(&g_data_cache);
        printf("\n");
        content_dir_print(&g_content_dir);
        printf("\n");
        peers_print(&g_peers);
        printf("\n");
        request_print(&g_requests);
        printf("\n");

    /* Otherwise, parse the input as a file name and request this file. */
    } else if (0 != strcmp(line, "")) {
        if (EXIT_SUCCESS != verify_name(line)) {
            fprintf(stderr, "Invalid file name: '%s'\n\n", line);
        } else {
            (void)send_request(line);
        }
    }

    print_prompt();
    return EXIT_SUCCESS;
}

/* ------------------------------------------------------------------------ */
/**
 * Processes incoming UDP data.  Based on the incoming header, this function
 * calls other functions to handle specific messages.
 *
 * @return EXIT_SUCCESS or EXIT_FAILURE.
 */
static int process_udp(void)
{
    msg_buf_t buf;
    peer_t peer;

    RETURN_IF_FAILED(udp_recv(
        buf.buffer,
        sizeof(buf.buffer),
        &buf.size,
        &peer));

    switch (get_msg_type(&buf)) {
    case msg_type_data:
        receive_data(&buf, &peer);
        break;

    case msg_type_listing:
        receive_listing(&buf, &peer);
        break;

    case msg_type_request:
        receive_request(&buf, &peer);
        break;

    case msg_type_try:
        receive_try(&buf, &peer);
        break;

    case msg_type_unknown:
    default:
        break;
    }

    return EXIT_SUCCESS;
}

/* ------------------------------------------------------------------------ */
/**
 * Processes an incoming data message.  If the data received does not
 * correspond to an outstanding request, the event is logged and nothing is
 * modified.  Otherwise the data received is saved to a local file, the
 * corresponding request is removed, and the data cache is updated.
 *
 * @param buf The message buffer.
 * @param peer The peer that this message is received from.
 *
 * @return EXIT_SUCCESS or EXIT_FAILURE.
 */
static int receive_data(msg_buf_t * buf, const peer_t * peer)
{
    FILE *     f;
    size_t     index;
    data_msg_t msg;

    assert(buf != NULL);
    assert(peer != NULL);

    /* Parse the message as a data message. */
    memset(&msg, 0, sizeof(msg));
    RETURN_IF_FAILED(data_msg_read(buf, &msg));

    /* Determine if the name corresponds to an outstanding request. */
    if (request_find(&g_requests, msg.name, &index) == EXIT_FAILURE) {
        FILE * f;

        if ((f = log_open()) != NULL) {
            fprintf(f, "Discarding data message for %s because it does not\n"
                       "correspond to an outstanding request.\n", msg.name);
            log_close(f);
        }

        return EXIT_SUCCESS;
    }

    /* Interrupt the user and print a message indicating it was downloaded. */
    printf("\n\ncontent found (%s)\n\n", msg.name);
    print_prompt();

    /* Delete from the outstanding requests. */
    request_remove(&g_requests, index);

    /* Write the received data to a local file with the received file name. */
    f = fopen(msg.name, "wb");
    RETURN_IF_FALSE(f != NULL);
    if (fwrite(msg.data, 1, msg.size, f) != msg.size) {
        fclose(f);
        RETURN_FAILURE();
    }

    fclose(f);

    /* Write the received data to the data cache collection. */
    data_cache_add(&g_data_cache, msg.name, msg.data, msg.size);

    /* Write this entry to content directory if it does not already exist. */
    content_dir_add(&g_content_dir, msg.name, peer);
    return EXIT_SUCCESS;
}

/* ------------------------------------------------------------------------ */
/**
 * Processes an incoming listing message.  The received file and peer
 * information is added to the content directory collection.  Duplicated
 * entries are not added more than once.
 *
 * @param buf The message buffer.
 * @param peer The peer that this message is received from.
 *
 * @return EXIT_SUCCESS or EXIT_FAILURE.
 */
static int receive_listing(msg_buf_t * buf, const peer_t * peer)
{
    size_t        i;
    listing_msg_t msg;

    assert(buf != NULL);
    assert(peer != NULL);

    /* Parse the message buffer as a listing message. */
    memset(&msg, 0, sizeof(msg));
    RETURN_IF_FAILED(listing_msg_read(buf, &msg));

    /* Add each item to the collection. */
    for (i = 0; i < msg.count; i++)
        content_dir_add(&g_content_dir, msg.entries[i], peer);

    return EXIT_SUCCESS;
}

/* ------------------------------------------------------------------------ */
/**
 * Processes an incoming request message.  If the requested file is in the
 * local content collection or the data cache, a data message is sent.  If
 * not, but the name is in the content directory, a try message is sent.
 * Otherwise the request is ignored.
 *
 * @param buf The message buffer.
 * @param peer The peer that this message is received from.
 *
 * @return EXIT_SUCCESS or EXIT_FAILURE.
 */
static int receive_request(msg_buf_t * buf, const peer_t * peer)
{
    msg_buf_t     buffer;
    data_msg_t    data_msg;
    size_t        index;
    request_msg_t request_msg;

    assert(buf != NULL);
    assert(peer != NULL);

    memset(&request_msg, 0, sizeof(request_msg));
    memset(&data_msg, 0, sizeof(data_msg));
    memset(&buffer, 0, sizeof(buffer));

    /* Parse the message buffer as a request message. */
    RETURN_IF_FAILED(request_msg_read(buf, &request_msg));

    /* A data message is created if the file is found in the local content. */
    strcpy(data_msg.name, request_msg.name);
    if (local_content_find(
        &g_local_content,
        data_msg.name, &index) == EXIT_SUCCESS) {

        data_msg.size = g_local_content.items[index].size;

        memcpy(
            data_msg.data,
            g_local_content.items[index].buffer,
            data_msg.size);

        data_msg_write(&buffer, &data_msg);

    /* A data message is created if the file is found in the data cache. */
    } else if (data_cache_find(
        &g_data_cache,
        data_msg.name, &index) == EXIT_SUCCESS) {

        data_msg.size = g_data_cache.items[index].size;

        memcpy(
            data_msg.data,
            g_data_cache.items[index].buffer,
            data_msg.size);

        data_msg_write(&buffer, &data_msg);

    /* If the file name is found in the content directory, a try message is
     * created with all peers who know about the file.
     */
    } else {
        try_msg_t try_msg;

        memset(&try_msg, 0, sizeof(try_msg));

        strcpy(try_msg.name, request_msg.name);
        while (content_dir_find(
            &g_content_dir,
            request_msg.name,
            &index) == EXIT_SUCCESS) {

            try_msg.peers[try_msg.count++] =
                g_content_dir.items[index++].peer;
        }

        if (try_msg.count > 0)
            try_msg_write(&buffer, &try_msg);
    }

    /* Send the message if a data message or a try message was created. */
    if (buffer.size > 0)
        udp_send(peer, buffer.buffer, buffer.size);

    return EXIT_SUCCESS;
}

/* ------------------------------------------------------------------------ */
/**
 * Process an incoming try message.  If the requested file is in the local
 * content collection or the data cache, a data message is sent.  If not, but
 * the name is in the content directory, a try message is sent.  Otherwise
 * the request is ignored.
 *
 * @param buf The message buffer.
 * @param peer The peer that this message is received from.
 *
 * @return EXIT_SUCCESS or EXIT_FAILURE.
 */
static int receive_try(msg_buf_t * buf, const peer_t * peer)
{
    size_t    i;
    size_t    j;
    try_msg_t msg;
    int       new_peer_flag;

    assert(buf != NULL);
    assert(peer != NULL);

    /* Parse the message buffer as a try message. */
    memset(&msg, 0, sizeof(msg));
    RETURN_IF_FAILED(try_msg_read(buf, &msg));

    /* Ignore try messages for names not in the request cache. */
    if (request_find(&g_requests, msg.name, &i) == EXIT_FAILURE) {
        FILE * f;

        if ((f = log_open()) != NULL) {
            fprintf(f, "Discarding try message for %s because it does not\n"
                       "correspond to an outstanding request.\n", msg.name);
            log_close(f);
        }

        return EXIT_SUCCESS;
    }

    /* Loop over each peer listed in the try message. */
    new_peer_flag = 0;
    for (i = 0; i < msg.count; i++) {
        msg_buf_t     buffer;
        request_msg_t request_msg;

        memset(&request_msg, 0, sizeof(request_msg));
        memset(&buffer, 0, sizeof(buffer));

        /* Search in the content directory to see if it has the peer. */
        for (j = 0; j < g_content_dir.count; j++) {
            if (peer_cmp(&msg.peers[i], &g_content_dir.items[j].peer) == 0)
                new_peer_flag = 1;
        }

        /* If the content directory doesn't already have this peer, send a
         * request to this peer.
         */
        if (new_peer_flag == 0) {
            strcpy(request_msg.name, msg.name);
            request_msg_write(&buffer, &request_msg);
            udp_send(&msg.peers[i], buffer.buffer, buffer.size);
        }

        /* Add the peers received into the content directory. */
        content_dir_add(&g_content_dir, msg.name, &msg.peers[i]);
    }

    return EXIT_SUCCESS;
}

/* ------------------------------------------------------------------------ */
/**
 * Sends a listing message.
 */
static void send_listing(void)
{
    msg_buf_t     buf;
    size_t        i;
    listing_msg_t msg;
    peers_t       peers;

    memset(&msg, 0, sizeof(msg));
    memset(&buf, 0, sizeof(buf));
    memset(&peers, 0, sizeof(peers));

    /* Create a combined list of peers from the content directory and the
     * ones specified on the command line.
     */
    peers_combine(&peers, &g_content_dir, &g_peers);
    listing_msg_create(&msg);
    listing_msg_write(&buf, &msg);

    /* Send the listing message to each peer. */
    for (i = 0; i < peers.count; i++)
        (void)udp_send(&peers.peers[i], buf.buffer, buf.size);
}

/* ------------------------------------------------------------------------ */
/**
 * Sends a request for a file name.  This function prints "content found" if
 * the name already exists in the local content or the data cache.  If name of
 * the file is found in the content directory, a request is sent to those
 * peers listed in the content directory.  If not, a request is sent to all
 * known peers.
 *
 * @param name The name of file.
 *
 * @return EXIT_SUCCESS or EXIT_FAILURE.
 */
static int send_request(const char * name)
{
    int           counter;
    msg_buf_t     buf;
    size_t        index;
    request_msg_t msg;

    /* Print a message if the item is found locally. */
    if (local_content_find(&g_local_content, name, &index) == EXIT_SUCCESS
        || data_cache_find(&g_data_cache, name, &index) == EXIT_SUCCESS) {

        printf("content found\n\n");
        return EXIT_SUCCESS;
    }

    /* Add the request to the outstanding request collection. */
    request_add(&g_requests, name);

    strcpy(msg.name, name);
    request_msg_write(&buf, &msg);

    printf("Searching for %s...\n\n", name);

    /* Check if the item is found in the content directory. */
    counter = 0;
    while (content_dir_find(&g_content_dir, name, &index) == EXIT_SUCCESS) {
        send_request_to_peer(
            &g_content_dir.items[index].peer,
            buf.buffer,
            buf.size);

        index++;
        counter++;
    }

    /* Otherwise, send it to all known peers. */
    if (counter == 0) {
        peers_t combined_peers;

        memset(&combined_peers, 0, sizeof(combined_peers));
        peers_combine(&combined_peers, &g_content_dir, &g_peers);

        for (counter = 0; counter < combined_peers.count; counter++)
            send_request_to_peer(
                &combined_peers.peers[counter],
                buf.buffer,
                buf.size);
    }

    return EXIT_SUCCESS;
}

/* ------------------------------------------------------------------------ */
/**
 * Sends a request message to a single peer and prints an error message if
 * there is a problem.
 *
 * @param to The peer.
 * @param buffer The message buffer.
 * @param size The size of the message buffer in bytes.
 */
static void send_request_to_peer(
    const peer_t * to,
    const void *   buffer,
    size_t         size)
{
    if (udp_send(to, buffer, size) != EXIT_SUCCESS) {
        printf("Failed to send request to ");
        fprint_peer(stdout, to);
        printf(".\n");
    }
}

/* ------------------------------------------------------------------------ */
#ifdef TESTS
static void test() {

    if (EXIT_SUCCESS != udp_start(port)) {
        fprintf(stderr, "Cannot bind to port %hu.", port);
        RETURN_FAILURE();
    }

    for (i = 0; i < p_index; i++) {
        int j;

        for (j = 0; j < f_index; j++) {
            if (EXIT_SUCCESS != udp_send(&peers[i], files[j], strlen(files[j]) + 1)) {
                RETURN_FAILURE();
            }
        }
    }

    printf("Parsed %d peers and %d files.\n", p_index, f_index);

    for (;;) {
        char buffer[1200];
        char ip[16];
        size_t actual;
        peer_t peer;

        printf("Listening...\n");
        if (EXIT_SUCCESS != udp_recv(buffer, sizeof(buffer), &actual, &peer)) {
            RETURN_FAILURE();
        }

        inet_ntop(AF_INET, &peer.addr, ip, sizeof(struct sockaddr_in));
        printf("%d bytes arrived from %s on port %hu.\n", actual, ip, peer.port);
        dump_buffer(buffer, actual);
        printf("\n");
    }

    udp_stop();
}

#endif /* TESTS */
Valid HTML 4.01 Valid CSS