/*
* File Name: p2p.c
* Author: Jade Cheng
* Date: March 29, 2009
* Course: ICS 451
* Assignment: Project 2
*/
#include "common.h"
#include "data.h"
#include "log.h"
#include "messages.h"
#include "timehelp.h"
#include "udp.h"
/** The minimum number of arguments. */
#define MIN_ARGS 2
/** The standard input file descriptor. */
#define STDIN_FD 0
/** The time interval for sending listing messages. */
#define LISTING_INTERVAL 240
static void listing_msg_create(listing_msg_t * msg);
static int main_loop(void);
static int parse_uchar(const char * s, char ** endp, u_char * result);
static int parse_ushort(const char * s, char ** endp, u_short * result);
static int parse_peer(const char * s, peer_t * result);
static void print_prompt(void);
static int process_stdin(void);
static int process_udp(void);
static int receive_data(msg_buf_t * buf, const peer_t * peer);
static int receive_listing(msg_buf_t * buf, const peer_t * peer);
static int receive_request(msg_buf_t * buf, const peer_t * peer);
static int receive_try(msg_buf_t * buf, const peer_t * peer);
static void send_listing(void);
static int send_request(const char * name);
static void send_request_to_peer(
const peer_t * to,
const void * buffer,
size_t size);
/** The peers obtained from the command line. */
static peers_t g_peers;
/** The local content obtained from the command line. */
static local_content_t g_local_content;
/** The data cache. */
static data_cache_t g_data_cache;
/** The content directory cache. */
static content_dir_t g_content_dir;
/** The outstanding request cache. */
static request_cache_t g_requests;
/* ------------------------------------------------------------------------ */
/**
* This is the main entry of the program.
*
* @param argc The number of arguments.
* @param argv The arguments.
*
* @return EXIT_SUCCESS or EXIT_FAILURE.
*/
extern int main(int argc, const char * argv[])
{
char * end;
int i;
peer_t peer;
u_short port;
#ifdef DEBUG
/* Print out a message if the program is running in debug mode. */
printf("Running in DEBUG mode...\n");
#endif
/* Check argc and print out the usage if program is not called right. */
if (argc < MIN_ARGS || argc > MAX_ARGS) {
fprintf(stderr, "usage: p2p port [options]\n");
RETURN_FAILURE();
}
/* Parse the second argument, the listening port for UDP. */
if (EXIT_SUCCESS != parse_ushort(argv[1], &end, &port)) {
fprintf(stderr, "usage: port number type.\n");
RETURN_FAILURE();
}
/* Examine the later parameters. */
for (i = 2; i < argc; i++) {
const char * arg = argv[i];
/* If the argument contains a '/', parse it as a peer and add it to
* the peers collection if the parsing is successful. Ortherwise
* report the error.
*/
if (strchr(arg, '/') != NULL
&& EXIT_SUCCESS == parse_peer(arg, &peer)) {
if (EXIT_SUCCESS != peers_add(&g_peers, &peer)) {
fprintf(stderr, "Cannot add '%s' to list of peers.\n", arg);
RETURN_FAILURE();
}
/* If the argument doesn't contain a '/', parse it as a file name and
* add it to the local content if the parsing is successful.
*/
} else if (EXIT_SUCCESS == verify_name(arg)) {
if (EXIT_SUCCESS != local_content_add(&g_local_content, arg)) {
fprintf(stderr, "Cannot add '%s' to local content.\n", arg);
RETURN_FAILURE();
}
/* Anything other than a peer and a file name is not allowed. */
} else {
fprintf(stderr, "Invalid command line argument '%s'.\n", arg);
RETURN_FAILURE();
}
}
/* Initialize the log file. */
log_init();
/* Print out the welcome message. */
printf("\n");
printf("Jade p2p\n");
printf("\n");
printf(" Type '!quit' to quit.\n");
printf(" Type '!info' to display information.\n");
printf(" Type anything else to make a request.\n");
printf("\n");
/* Start UDP, execute the main loop, and stop UDP before terminating. */
udp_start(port);
main_loop();
udp_stop();
return EXIT_SUCCESS;
}
/* ------------------------------------------------------------------------ */
/**
* Creates a listing message.
*
* @param msg The listing message.
*/
static void listing_msg_create(listing_msg_t * msg)
{
size_t i;
assert(msg != NULL);
assert(g_local_content.count + g_data_cache.count <= MAX_LISTING_COUNT);
memset(msg, 0, sizeof(*msg));
/* Add all items from the local content. */
for (i = 0; i < g_local_content.count; i++)
strcpy(msg->entries[msg->count++], g_local_content.items[i].name);
/* Add all items from the data cache. */
for (i = 0; i < g_data_cache.count; i++)
strcpy(msg->entries[msg->count++], g_data_cache.items[i].name);
}
/* ------------------------------------------------------------------------ */
/**
* Performs the main loop of the program.
*
* @return EXIT_SUCCESS or EXIT_FAILURE.
*/
static int main_loop(void)
{
timeval listing_timeout;
printf("(p2p) ");
fflush(stdout);
/* Send a listing message when p2p starts, and compute the time to send
* the next listing message.
*/
send_listing();
listing_timeout = time_add(time_now(), time_seconds(LISTING_INTERVAL));
for (;;) {
int fd;
timeval now;
fd_set readfds;
int result;
timeval request_timeout;
timeval select_timeout;
/* The select function waits for standard input or the UDP socket. */
fd = udp_fd();
FD_ZERO(&readfds);
FD_SET(STDIN_FD, &readfds);
FD_SET(fd, &readfds);
/* Send a listing message when current time passes the last computed
* time to send a listing message. The next time to send is the last
* computed time plus a listing message interval.
*/
now = time_now();
if (time_cmp(now, listing_timeout) > 0) {
send_listing();
listing_timeout = time_add(
listing_timeout,
time_seconds(LISTING_INTERVAL));
}
/* Set the timeout to the next time to send a listing message. */
select_timeout = time_sub(listing_timeout, now);
assert(time_cmp(select_timeout, time_seconds(0)) > 0);
/* Alternatively, if a request timeout occurs sooner, use this as the
* select timeout.
*/
if (EXIT_SUCCESS == request_get_next_timeout(
&g_requests,
&request_timeout)) {
request_timeout = time_sub(request_timeout, now);
if (time_cmp(request_timeout, select_timeout) < 0)
select_timeout = request_timeout;
}
/* Wait for standard input, UDP input, or a timeout. */
result = select(fd + 1, &readfds, NULL, NULL, &select_timeout);
RETURN_IF_FALSE(result != -1);
assert(result >= 0);
/* Expire the outdated requests, and at the same time clean up the
* content directory cache collection every time select returns.
*/
if (EXIT_SUCCESS != request_expire(&g_requests, &g_content_dir))
if (!FD_ISSET(STDIN_FD, &readfds))
print_prompt();
/* Process standard input if necessary. */
if (FD_ISSET(STDIN_FD, &readfds)) {
if (EXIT_SUCCESS != process_stdin())
break;
}
/* Process UDP input if necessary. */
if (FD_ISSET(fd, &readfds))
RETURN_IF_FAILED(process_udp());
}
return EXIT_SUCCESS;
}
/* ------------------------------------------------------------------------ */
static int parse_uchar(const char * s, char ** endp, u_char * result)
{
u_long temp;
temp = strtoul(s, endp, 10);
if (*endp == NULL ||errno != 0 || *endp == s) {
RETURN_FAILURE();
}
if (temp > UCHAR_MAX) {
RETURN_FAILURE();
}
*result = (u_char)temp;
return EXIT_SUCCESS;
}
/* ------------------------------------------------------------------------ */
/**
* Parses an unsigned short from the command line.
*
* @param s The source string.
* @param endp The end pointer.
* @param result The result.
*
* @return EXIT_SUCCESS or EXIT_FAILURE.
*/
static int parse_ushort(const char * s, char ** endp, u_short * result)
{
u_long temp;
temp = strtoul(s, endp, 10);
if (*endp == NULL || errno != 0 || *endp == s) {
RETURN_FAILURE();
}
if (temp > USHRT_MAX) {
RETURN_FAILURE();
}
*result = (u_short)temp;
return EXIT_SUCCESS;
}
/* ------------------------------------------------------------------------ */
/**
* Parses a peer from the command line.
*
* @param s The source string.
* @param result The peer.
*
* @return EXIT_SUCCESS or EXIT_FAILURE.
*/
static int parse_peer(const char * s, peer_t * result)
{
char buf[16];
char * end;
int i;
u_char num[4];
u_short port;
/* Read the four numbers. */
for (i = 0; i < 4; i ++) {
if (EXIT_SUCCESS != parse_uchar(s, &end, &num[i])) {
RETURN_FAILURE();
}
if (i != 3) {
if (*end != '.') {
RETURN_FAILURE();
}
} else {
if (*end != '/') {
RETURN_FAILURE();
}
s = end + 1;
if (EXIT_SUCCESS != parse_ushort(s, &end, &port)) {
RETURN_FAILURE();
}
if (*end != '\0') {
printf("end = %c\n", *end);
RETURN_FAILURE();
}
}
s = end + 1;
}
/* Use inet_pton to parse the peer address. */
sprintf(buf, "%u.%u.%u.%u", num[0], num[1], num[2], num[3]);
inet_pton(AF_INET, buf, &result->addr);
result->port = port;
return EXIT_SUCCESS;
}
/* ------------------------------------------------------------------------ */
/**
* Prints a prompt and flushes standard output.
*/
static void print_prompt(void)
{
printf("(p2p) ");
fflush(stdout);
}
/* ------------------------------------------------------------------------ */
/**
* Processes incoming standard input data. This functions handles special
* commands like "!quit" and "!info". Anything else is treated as file name
* requests. This function returns EXIT_FAILURE when the program should
* terminate.
*
* @return EXIT_SUCCESS or EXIT_FAILURE.
*/
static int process_stdin(void)
{
size_t count;
char line[1000];
line[0] = '\0';
count = 0;
/* Read until nothing to read or '\n' is reached. */
for (;;) {
char ch;
ssize_t r;
/* Check for no more characters. */
r = read(STDIN_FD, &ch, 1);
RETURN_IF_FALSE(r != -1);
assert(r >= 0);
if (r == 0)
break;
/* Check for an end of line. */
assert(r == 1);
if (ch == '\n')
break;
/* Otherwise, add the character to the line. */
if (count < sizeof(line) - 1) {
line[count++] = ch;
line[count] = '\0';
}
}
/* Quit if "!quit" is entered. */
if (0 == strcmp(line, "!quit"))
return EXIT_FAILURE;
/* Prints out collection contents if "!info" is entered. */
if (0 == strcmp(line, "!info")) {
local_content_print(&g_local_content);
printf("\n");
data_cache_print(&g_data_cache);
printf("\n");
content_dir_print(&g_content_dir);
printf("\n");
peers_print(&g_peers);
printf("\n");
request_print(&g_requests);
printf("\n");
/* Otherwise, parse the input as a file name and request this file. */
} else if (0 != strcmp(line, "")) {
if (EXIT_SUCCESS != verify_name(line)) {
fprintf(stderr, "Invalid file name: '%s'\n\n", line);
} else {
(void)send_request(line);
}
}
print_prompt();
return EXIT_SUCCESS;
}
/* ------------------------------------------------------------------------ */
/**
* Processes incoming UDP data. Based on the incoming header, this function
* calls other functions to handle specific messages.
*
* @return EXIT_SUCCESS or EXIT_FAILURE.
*/
static int process_udp(void)
{
msg_buf_t buf;
peer_t peer;
RETURN_IF_FAILED(udp_recv(
buf.buffer,
sizeof(buf.buffer),
&buf.size,
&peer));
switch (get_msg_type(&buf)) {
case msg_type_data:
receive_data(&buf, &peer);
break;
case msg_type_listing:
receive_listing(&buf, &peer);
break;
case msg_type_request:
receive_request(&buf, &peer);
break;
case msg_type_try:
receive_try(&buf, &peer);
break;
case msg_type_unknown:
default:
break;
}
return EXIT_SUCCESS;
}
/* ------------------------------------------------------------------------ */
/**
* Processes an incoming data message. If the data received does not
* correspond to an outstanding request, the event is logged and nothing is
* modified. Otherwise the data received is saved to a local file, the
* corresponding request is removed, and the data cache is updated.
*
* @param buf The message buffer.
* @param peer The peer that this message is received from.
*
* @return EXIT_SUCCESS or EXIT_FAILURE.
*/
static int receive_data(msg_buf_t * buf, const peer_t * peer)
{
FILE * f;
size_t index;
data_msg_t msg;
assert(buf != NULL);
assert(peer != NULL);
/* Parse the message as a data message. */
memset(&msg, 0, sizeof(msg));
RETURN_IF_FAILED(data_msg_read(buf, &msg));
/* Determine if the name corresponds to an outstanding request. */
if (request_find(&g_requests, msg.name, &index) == EXIT_FAILURE) {
FILE * f;
if ((f = log_open()) != NULL) {
fprintf(f, "Discarding data message for %s because it does not\n"
"correspond to an outstanding request.\n", msg.name);
log_close(f);
}
return EXIT_SUCCESS;
}
/* Interrupt the user and print a message indicating it was downloaded. */
printf("\n\ncontent found (%s)\n\n", msg.name);
print_prompt();
/* Delete from the outstanding requests. */
request_remove(&g_requests, index);
/* Write the received data to a local file with the received file name. */
f = fopen(msg.name, "wb");
RETURN_IF_FALSE(f != NULL);
if (fwrite(msg.data, 1, msg.size, f) != msg.size) {
fclose(f);
RETURN_FAILURE();
}
fclose(f);
/* Write the received data to the data cache collection. */
data_cache_add(&g_data_cache, msg.name, msg.data, msg.size);
/* Write this entry to content directory if it does not already exist. */
content_dir_add(&g_content_dir, msg.name, peer);
return EXIT_SUCCESS;
}
/* ------------------------------------------------------------------------ */
/**
* Processes an incoming listing message. The received file and peer
* information is added to the content directory collection. Duplicated
* entries are not added more than once.
*
* @param buf The message buffer.
* @param peer The peer that this message is received from.
*
* @return EXIT_SUCCESS or EXIT_FAILURE.
*/
static int receive_listing(msg_buf_t * buf, const peer_t * peer)
{
size_t i;
listing_msg_t msg;
assert(buf != NULL);
assert(peer != NULL);
/* Parse the message buffer as a listing message. */
memset(&msg, 0, sizeof(msg));
RETURN_IF_FAILED(listing_msg_read(buf, &msg));
/* Add each item to the collection. */
for (i = 0; i < msg.count; i++)
content_dir_add(&g_content_dir, msg.entries[i], peer);
return EXIT_SUCCESS;
}
/* ------------------------------------------------------------------------ */
/**
* Processes an incoming request message. If the requested file is in the
* local content collection or the data cache, a data message is sent. If
* not, but the name is in the content directory, a try message is sent.
* Otherwise the request is ignored.
*
* @param buf The message buffer.
* @param peer The peer that this message is received from.
*
* @return EXIT_SUCCESS or EXIT_FAILURE.
*/
static int receive_request(msg_buf_t * buf, const peer_t * peer)
{
msg_buf_t buffer;
data_msg_t data_msg;
size_t index;
request_msg_t request_msg;
assert(buf != NULL);
assert(peer != NULL);
memset(&request_msg, 0, sizeof(request_msg));
memset(&data_msg, 0, sizeof(data_msg));
memset(&buffer, 0, sizeof(buffer));
/* Parse the message buffer as a request message. */
RETURN_IF_FAILED(request_msg_read(buf, &request_msg));
/* A data message is created if the file is found in the local content. */
strcpy(data_msg.name, request_msg.name);
if (local_content_find(
&g_local_content,
data_msg.name, &index) == EXIT_SUCCESS) {
data_msg.size = g_local_content.items[index].size;
memcpy(
data_msg.data,
g_local_content.items[index].buffer,
data_msg.size);
data_msg_write(&buffer, &data_msg);
/* A data message is created if the file is found in the data cache. */
} else if (data_cache_find(
&g_data_cache,
data_msg.name, &index) == EXIT_SUCCESS) {
data_msg.size = g_data_cache.items[index].size;
memcpy(
data_msg.data,
g_data_cache.items[index].buffer,
data_msg.size);
data_msg_write(&buffer, &data_msg);
/* If the file name is found in the content directory, a try message is
* created with all peers who know about the file.
*/
} else {
try_msg_t try_msg;
memset(&try_msg, 0, sizeof(try_msg));
strcpy(try_msg.name, request_msg.name);
while (content_dir_find(
&g_content_dir,
request_msg.name,
&index) == EXIT_SUCCESS) {
try_msg.peers[try_msg.count++] =
g_content_dir.items[index++].peer;
}
if (try_msg.count > 0)
try_msg_write(&buffer, &try_msg);
}
/* Send the message if a data message or a try message was created. */
if (buffer.size > 0)
udp_send(peer, buffer.buffer, buffer.size);
return EXIT_SUCCESS;
}
/* ------------------------------------------------------------------------ */
/**
* Process an incoming try message. If the requested file is in the local
* content collection or the data cache, a data message is sent. If not, but
* the name is in the content directory, a try message is sent. Otherwise
* the request is ignored.
*
* @param buf The message buffer.
* @param peer The peer that this message is received from.
*
* @return EXIT_SUCCESS or EXIT_FAILURE.
*/
static int receive_try(msg_buf_t * buf, const peer_t * peer)
{
size_t i;
size_t j;
try_msg_t msg;
int new_peer_flag;
assert(buf != NULL);
assert(peer != NULL);
/* Parse the message buffer as a try message. */
memset(&msg, 0, sizeof(msg));
RETURN_IF_FAILED(try_msg_read(buf, &msg));
/* Ignore try messages for names not in the request cache. */
if (request_find(&g_requests, msg.name, &i) == EXIT_FAILURE) {
FILE * f;
if ((f = log_open()) != NULL) {
fprintf(f, "Discarding try message for %s because it does not\n"
"correspond to an outstanding request.\n", msg.name);
log_close(f);
}
return EXIT_SUCCESS;
}
/* Loop over each peer listed in the try message. */
new_peer_flag = 0;
for (i = 0; i < msg.count; i++) {
msg_buf_t buffer;
request_msg_t request_msg;
memset(&request_msg, 0, sizeof(request_msg));
memset(&buffer, 0, sizeof(buffer));
/* Search in the content directory to see if it has the peer. */
for (j = 0; j < g_content_dir.count; j++) {
if (peer_cmp(&msg.peers[i], &g_content_dir.items[j].peer) == 0)
new_peer_flag = 1;
}
/* If the content directory doesn't already have this peer, send a
* request to this peer.
*/
if (new_peer_flag == 0) {
strcpy(request_msg.name, msg.name);
request_msg_write(&buffer, &request_msg);
udp_send(&msg.peers[i], buffer.buffer, buffer.size);
}
/* Add the peers received into the content directory. */
content_dir_add(&g_content_dir, msg.name, &msg.peers[i]);
}
return EXIT_SUCCESS;
}
/* ------------------------------------------------------------------------ */
/**
* Sends a listing message.
*/
static void send_listing(void)
{
msg_buf_t buf;
size_t i;
listing_msg_t msg;
peers_t peers;
memset(&msg, 0, sizeof(msg));
memset(&buf, 0, sizeof(buf));
memset(&peers, 0, sizeof(peers));
/* Create a combined list of peers from the content directory and the
* ones specified on the command line.
*/
peers_combine(&peers, &g_content_dir, &g_peers);
listing_msg_create(&msg);
listing_msg_write(&buf, &msg);
/* Send the listing message to each peer. */
for (i = 0; i < peers.count; i++)
(void)udp_send(&peers.peers[i], buf.buffer, buf.size);
}
/* ------------------------------------------------------------------------ */
/**
* Sends a request for a file name. This function prints "content found" if
* the name already exists in the local content or the data cache. If name of
* the file is found in the content directory, a request is sent to those
* peers listed in the content directory. If not, a request is sent to all
* known peers.
*
* @param name The name of file.
*
* @return EXIT_SUCCESS or EXIT_FAILURE.
*/
static int send_request(const char * name)
{
int counter;
msg_buf_t buf;
size_t index;
request_msg_t msg;
/* Print a message if the item is found locally. */
if (local_content_find(&g_local_content, name, &index) == EXIT_SUCCESS
|| data_cache_find(&g_data_cache, name, &index) == EXIT_SUCCESS) {
printf("content found\n\n");
return EXIT_SUCCESS;
}
/* Add the request to the outstanding request collection. */
request_add(&g_requests, name);
strcpy(msg.name, name);
request_msg_write(&buf, &msg);
printf("Searching for %s...\n\n", name);
/* Check if the item is found in the content directory. */
counter = 0;
while (content_dir_find(&g_content_dir, name, &index) == EXIT_SUCCESS) {
send_request_to_peer(
&g_content_dir.items[index].peer,
buf.buffer,
buf.size);
index++;
counter++;
}
/* Otherwise, send it to all known peers. */
if (counter == 0) {
peers_t combined_peers;
memset(&combined_peers, 0, sizeof(combined_peers));
peers_combine(&combined_peers, &g_content_dir, &g_peers);
for (counter = 0; counter < combined_peers.count; counter++)
send_request_to_peer(
&combined_peers.peers[counter],
buf.buffer,
buf.size);
}
return EXIT_SUCCESS;
}
/* ------------------------------------------------------------------------ */
/**
* Sends a request message to a single peer and prints an error message if
* there is a problem.
*
* @param to The peer.
* @param buffer The message buffer.
* @param size The size of the message buffer in bytes.
*/
static void send_request_to_peer(
const peer_t * to,
const void * buffer,
size_t size)
{
if (udp_send(to, buffer, size) != EXIT_SUCCESS) {
printf("Failed to send request to ");
fprint_peer(stdout, to);
printf(".\n");
}
}
/* ------------------------------------------------------------------------ */
#ifdef TESTS
static void test() {
if (EXIT_SUCCESS != udp_start(port)) {
fprintf(stderr, "Cannot bind to port %hu.", port);
RETURN_FAILURE();
}
for (i = 0; i < p_index; i++) {
int j;
for (j = 0; j < f_index; j++) {
if (EXIT_SUCCESS != udp_send(&peers[i], files[j], strlen(files[j]) + 1)) {
RETURN_FAILURE();
}
}
}
printf("Parsed %d peers and %d files.\n", p_index, f_index);
for (;;) {
char buffer[1200];
char ip[16];
size_t actual;
peer_t peer;
printf("Listening...\n");
if (EXIT_SUCCESS != udp_recv(buffer, sizeof(buffer), &actual, &peer)) {
RETURN_FAILURE();
}
inet_ntop(AF_INET, &peer.addr, ip, sizeof(struct sockaddr_in));
printf("%d bytes arrived from %s on port %hu.\n", actual, ip, peer.port);
dump_buffer(buffer, actual);
printf("\n");
}
udp_stop();
}
#endif /* TESTS */