/** @file relrecv.c
* @data May 4, 2009
*/
#ifndef CONFIG_RELRECV
#error This is used for relrecv only.
#endif /* CONFIG_RELRECV */
#include "common.h"
#include "config.h"
#include "crc.h"
#include "protocol.h"
#include "udp.h"
/* ------------------------------------------------------------------------ */
/** A general purpose buffer for sending and receiving. */
static msg_buffer_t g_buffer;
/** The next expected DATA packet to receive. */
static u_long g_expected;
/** The file pointer. */
static FILE * g_file;
/** The INIT message. */
static msg_init_t g_init;
/** An array that records the number of time a DATA message arrives. */
static size_t * g_packets;
/** The client information. */
static peer_t g_peer;
/** The number of DATA packets retried. */
static size_t g_retries;
/** The START message. */
static msg_start_t g_start;
/* ------------------------------------------------------------------------ */
static int parse_args(int argc, const char * argv[], u_short * port);
static int poll_buffer(int * timeout_flag);
static int process_file(void);
static int process_socket(void);
static int receive_all_data(void);
static int receive_all_data_helper(void);
static int receive_data(u_long packets, int * received_flag);
static int send_abort(const peer_t * peer, u_long id, const char * reason);
static int send_buffer(const peer_t * peer, const char * msg_type);
static int send_retries(void);
/* ------------------------------------------------------------------------ */
/** The main entry point of the program.
*
* @param argc The number of arguments.
* @param argv The arguments.
*
* @return EXIT_SUCCESS or EXIT_FAILURE.
*/
int main(int argc, const char * argv[])
{
u_short port;
/* Parse the arguments; obtain the port number. */
RETURN_IF_FAILURE(parse_args(argc, argv, &port));
/* Create the UDP socket. */
if (EXIT_SUCCESS != udp_start_server(port)) {
fprintf(stderr, "Cannot bind to port %hu.\n", port);
RETURN_FAILURE();
}
/* Receive files now that the socket is open. */
if (EXIT_SUCCESS != process_socket()) {
udp_stop();
RETURN_FAILURE();
}
/* Destroy the UDP socket. */
if (EXIT_SUCCESS != udp_stop()) {
fprintf(stderr, "Error closing socket.\n");
RETURN_FAILURE();
}
return EXIT_SUCCESS;
}
/* ------------------------------------------------------------------------ */
/** Parses the command line arguments.
*
* @param argc The number of arguments.
* @param argv The arguments.
* @param port The returned port number.
*
* @return EXIT_SUCCESS or EXIT_FAILURE.
*/
static int parse_args(int argc, const char * argv[], u_short * port)
{
assert(NULL != argv);
assert(NULL != port);
/* Check the number of arguments. */
if (argc != 2) {
fprintf(stderr, "usage: relrecv port\n");
RETURN_FAILURE();
}
/* Parse the port number. */
if (1 != sscanf(argv[1], "%hu", port)) {
fprintf(stderr, "Invalid port number '%s'.\n", argv[1]);
RETURN_FAILURE();
}
return EXIT_SUCCESS;
}
/* ------------------------------------------------------------------------ */
/** Waits for a message from the host or a timeout.
*
* @param timeout_flag A flag set to one if there is a timeout.
*
* @return EXIT_SUCCESS or EXIT_FAILURE.
*/
static int poll_buffer(int * timeout_flag)
{
double timeout;
assert(NULL != timeout_flag);
/* Calculate the absolute time for the timeout in units of clock_t. */
timeout = (double)clock() + ((CLOCKS_PER_SEC / 1000.0) * RELRECV_TIMEOUT);
/* Loop until failure, timeout, or received buffer. */
for (;;) {
peer_t peer;
size_t ms;
/* Calculate the remaining time in milliseconds. */
ms = (size_t)((timeout - (double)clock()) * 1000.0 / CLOCKS_PER_SEC);
if (EXIT_SUCCESS != udp_wait(ms, timeout_flag)) {
fprintf(stderr, "Socket failure.\n");
RETURN_FAILURE();
}
/* Return successfully if there is a timeout. */
if (*timeout_flag == 1)
break;
/* Put the received data into g_buffer. */
if (EXIT_SUCCESS != udp_recv(
g_buffer.data,
sizeof(g_buffer.data),
&g_buffer.size,
&peer)) {
fprintf(stderr, "Socket failure.\n");
RETURN_FAILURE();
}
/* Return successfully only if it's the right peer. */
if (peer.address == g_peer.address && peer.port == g_peer.port)
break;
/* Send an ABORT to the unexpected peer. */
RETURN_IF_FAILURE(send_abort(&peer, 0, "Receiver is busy."));
}
return EXIT_SUCCESS;
}
/* ------------------------------------------------------------------------ */
/** Performs all functionality while the file is open.
*
* @return EXIT_SUCCESS or EXIT_FAILURE.
*/
static int process_file(void)
{
msg_done_t done;
u_long checksum;
/* Print some information about this session. */
printf(
"Transferring file 'r%s' (%lu bytes).\n",
g_init.name,
g_init.filesize);
/* Mark the time when the transfer starts. */
stopwatch_start();
g_retries = 0;
/* Build and send a START message with a new id. */
g_start.type = MSG_TYPE_START;
g_start.id++;
msg_start_write(&g_buffer, &g_start);
RETURN_IF_FAILURE(send_buffer(&g_peer, "START"));
/* Receive all DATA messages. */
RETURN_IF_FAILURE(receive_all_data());
/* Compute the checksum. */
if (EXIT_SUCCESS != crc_file(g_file, &checksum)) {
RETURN_IF_FAILURE(send_abort(
&g_peer,
g_start.id,
"Error computing checksum."));
RETURN_FAILURE();
}
/* Verify the checksum. */
if (checksum != g_init.checksum) {
RETURN_IF_FAILURE(send_abort(
&g_peer,
g_start.id,
"Checksum failure."));
RETURN_FAILURE();
}
/* Build and send a DONE message. */
done.type = MSG_TYPE_DONE;
done.id = g_start.id;
msg_done_write(&g_buffer, &done);
RETURN_IF_FAILURE(send_buffer(&g_peer, "DONE"));
/* Display statistics when the transfer completes. */
printf("Transfer complete: 'r%s'.\n\n", g_init.name);
stopwatch_stop(
g_init.filesize,
g_retries,
protocol_packet_count(&g_init));
return EXIT_SUCCESS;
}
/* ------------------------------------------------------------------------ */
/** Performs all functionality while the socket is open.
*
* @return EXIT_SUCCESS or EXIT_FAILURE.
*/
static int process_socket(void)
{
/* The main loop waits for new connections. */
for (;;) {
char path[NAME_SIZE + 1];
/* Wait for a message to arrive. */
if (EXIT_SUCCESS != udp_recv(
g_buffer.data,
sizeof(g_buffer.data),
&g_buffer.size,
&g_peer)) {
fprintf(stderr, "Socket failure.\n");
RETURN_FAILURE();
}
/* This should be a INIT message. */
if (EXIT_SUCCESS != msg_init_read(&g_init, &g_buffer)) {
RETURN_IF_FAILURE(send_abort(
&g_peer,
0,
"Invalid INIT message."));
continue;
}
/* Check if the file already exists. */
sprintf(path, "r%s", g_init.name);
if (NULL != (g_file = fopen(path, "rb"))) {
fclose(g_file);
g_file = NULL;
RETURN_IF_FAILURE(send_abort(&g_peer, 0, "File already exists."));
continue;
}
/* Create the file for writing and reading. */
if (NULL == (g_file = fopen(path, "w+b"))) {
fclose(g_file);
g_file = NULL;
RETURN_IF_FAILURE(send_abort(&g_peer, 0, "Cannot create file."));
continue;
}
/* Receive the file now that the file is open. */
if (EXIT_SUCCESS != process_file()) {
fclose(g_file);
g_file = NULL;
/* Delete the file if the transfer failed. */
remove(path);
continue;
}
/* Close the file and wait for another connection. */
if (0 != fclose(g_file))
fprintf(stderr, "Warning: Cannot close file.\n");
g_file = NULL;
}
}
/* ------------------------------------------------------------------------ */
/** Receives all DATA messages.
*
* @return EXIT_SUCCESS or EXIT_FAILURE.
*/
static int receive_all_data(void)
{
/* Allocate and initialize the array of packet counts. */
g_packets = (size_t *)calloc(
protocol_packet_count(&g_init),
sizeof(size_t));
/* Check for memory failures. */
if (NULL == g_packets) {
RETURN_IF_FAILURE(send_abort(&g_peer, g_start.id, "Out of memory."));
RETURN_FAILURE();
}
/* Receive all the DATA messages. */
if (EXIT_SUCCESS != receive_all_data_helper()) {
free(g_packets);
g_packets = NULL;
RETURN_FAILURE();
}
/* Free the array of packet counts before returning. */
free(g_packets);
g_packets = NULL;
return EXIT_SUCCESS;
}
/* ------------------------------------------------------------------------ */
/** Receives all DATA messages.
*
* @return EXIT_SUCCESS or EXIT_FAILURE.
*/
static int receive_all_data_helper(void)
{
u_long packets;
u_long remaining;
size_t timeouts;
/* Determine the total number of packets that will be sent. */
packets = protocol_packet_count(&g_init);
remaining = packets;
/* The first expected packet. */
g_expected = 0;
/* Assuming no more than 10% of the DATA messages are lost, and assuming
* no more than 10% of all retried DATA messages are lost, then we can
* assume there are no more than the following number of timeouts. Of
* course, some of the RETRY messages are lost as well, but to keep it
* simply, we round up a little.
*/
timeouts = (size_t)log10((double)remaining) + 1;
/* Loop while there are more packets. */
while (remaining > 0) {
int timeout_flag;
int received_flag;
/* Get a message or timeout. */
RETURN_IF_FAILURE(poll_buffer(&timeout_flag));
if (timeout_flag == 1) {
/* Do not allow too many timeouts. */
if (timeouts-- == 0) {
RETURN_IF_FAILURE(send_abort(
&g_peer,
g_start.id,
"Too many timeouts."));
RETURN_FAILURE();
}
/* Send all the RETRY messages. */
g_expected = packets;
RETURN_IF_FAILURE(send_retries());
continue;
}
/* Check what type of message this is. */
switch (g_buffer.data[0]) {
case MSG_TYPE_DATA:
break;
/* Abort if this is an INIT message. */
case MSG_TYPE_INIT:
RETURN_IF_FAILURE(send_abort(
&g_peer,
g_start.id,
"Unexpected INIT message."));
RETURN_FAILURE();
/* Abort if this is an unknown message. */
default:
RETURN_IF_FAILURE(send_abort(
&g_peer,
g_start.id,
"Received invalid message."));
RETURN_FAILURE();
}
/* Use a helper function to process the DATA message. */
RETURN_IF_FAILURE(receive_data(packets, &received_flag));
/* Keep track of the remaining number of packets. */
if (received_flag == 1)
remaining--;
}
return EXIT_SUCCESS;
}
/* ------------------------------------------------------------------------ */
/** Processes a DATA message in g_buffer.
*
* @param packets The total number of packets.
* @param received_flag Set to 1 for new valid DATA packets.
*
* @return EXIT_SUCCESS or EXIT_FAILURE.
*/
static int receive_data(u_long packets, int * received_flag)
{
msg_data_t data;
size_t data_size;
assert(NULL != received_flag);
/* Assume this is not a new DATA packet. */
*received_flag = 0;
/* Parse the message; abort if bad. */
if (EXIT_SUCCESS != msg_data_read(&data, &g_buffer)) {
RETURN_IF_FAILURE(send_abort(
&g_peer,
g_start.id,
"Received invalid DATA message."));
RETURN_FAILURE();
}
/* Check the id; ignore if bad. */
if (data.id != g_start.id) {
fprintf(stderr, "Warning: Ignoring packet with bad id.\n");
return EXIT_SUCCESS;
}
/* Check the DATA message index; abort if bad. */
if (data.packet >= packets) {
RETURN_IF_FAILURE(send_abort(
&g_peer,
g_start.id,
"Received invalid DATA message index."));
RETURN_FAILURE();
}
/* Check the data size; abort if bad. */
data_size = protocol_data_size(&g_init, data.packet);
if ((size_t)g_buffer.size != data_size + DATA_HEADER_SIZE) {
RETURN_IF_FAILURE(send_abort(
&g_peer,
g_start.id,
"Received invalid DATA message size."));
RETURN_FAILURE();
}
/* Ignore packets received more than once. */
if (++g_packets[data.packet] != 1)
return EXIT_SUCCESS;
/* Send RETRY message if DATA was skipped. */
if (data.packet > g_expected) {
msg_retry_t retry;
retry.type = MSG_TYPE_RETRY;
retry.id = g_start.id;
retry.first = g_expected;
retry.last = data.packet - 1;
fprintf(
stderr,
"DATA skipped; sending RETRY message.\n"
"Sending RETRY message for packets %lu to %lu.\n",
retry.first,
retry.last);
g_retries += (retry.last - retry.first) + 1;
msg_retry_write(&g_buffer, &retry);
RETURN_IF_FAILURE(send_buffer(&g_peer, "RETRY"));
}
/* Save the new expected packet to receive. */
if (data.packet >= g_expected)
g_expected = data.packet + 1;
/* Seek within the file; abort if failed. */
if (0 != fseek(
g_file,
protocol_packet_offset(&g_init, data.packet),
SEEK_SET)) {
RETURN_IF_FAILURE(send_abort(
&g_peer,
g_start.id,
"Error writing to file."));
RETURN_FAILURE();
}
/* Write to the file; abort if failed. */
if (data_size != fwrite(data.data, 1, data_size, g_file)) {
RETURN_IF_FAILURE(send_abort(
&g_peer,
g_start.id,
"Error writing to file."));
RETURN_FAILURE();
}
/* Set the flag to indicate a new valid DATA message. */
*received_flag = 1;
return EXIT_SUCCESS;
}
/* ------------------------------------------------------------------------ */
/** Sends an ABORT message.
*
* @param peer The peer.
* @param id The sequence id or 0.
* @param reason The reason for the aborting.
*
* @return EXIT_SUCCESS or EXIT_FAILURE.
*/
static int send_abort(const peer_t * peer, u_long id, const char * reason)
{
msg_abort_t abort;
assert(NULL != peer);
assert(NULL != reason);
assert(strlen(reason) < sizeof(abort.reason));
/* First display a warning to the console. */
fprintf(stderr, "Warning: Sending ABORT message: %s\n", reason);
/* Build and send the ABORT message. */
abort.type = MSG_TYPE_ABORT;
abort.id = id;
strcpy(abort.reason, reason);
msg_abort_write(&g_buffer, &abort);
RETURN_IF_FAILURE(send_buffer(peer, "ABORT"));
return EXIT_SUCCESS;
}
/* ------------------------------------------------------------------------ */
/** Sends g_buffer to the specified peer.
*
* @param peer The peer.
* @param msg_type The type of message.
*
* @return EXIT_SUCCESS or EXIT_FAILURE.
*/
static int send_buffer(const peer_t * peer, const char * msg_type)
{
assert(NULL != peer);
/* Send the buffer and check for errors. */
if (EXIT_SUCCESS != udp_send(peer, g_buffer.data, g_buffer.size)) {
fprintf(stderr, "Error sending %s message.\n", msg_type);
RETURN_FAILURE();
}
/* Reset the global buffer for clarity in the debugger. */
memset(&g_buffer, 0, sizeof(g_buffer));
return EXIT_SUCCESS;
}
/* ------------------------------------------------------------------------ */
/** Send a series of RETRY messages for all missing packets.
*
* @return EXIT_SUCCESS or EXIT_FAILURE.
*/
static int send_retries(void)
{
msg_retry_t retry;
u_long i;
u_long count;
/* These values don't change during the loop. */
count = protocol_packet_count(&g_init);
retry.type = MSG_TYPE_RETRY;
retry.id = g_start.id;
fprintf(stderr, "Timeout occurred; sending RETRY messages.\n");
/* Looping over every packet, received or not. */
for (i = 0; i < count; i++) {
/* Skip over the received packets. */
if (g_packets[i] != 0)
continue;
retry.first = i;
/* Find the last missing packet inclusive. */
while (++i < count)
if (g_packets[i] != 0)
break;
retry.last = --i;
/* Send the RETRY message. */
msg_retry_write(&g_buffer, &retry);
RETURN_IF_FAILURE(send_buffer(&g_peer, "RETRY"));
/* Keep some statistics and display a warning. */
g_retries += (retry.last - retry.first) + 1;
fprintf(
stderr,
"Sending RETRY message for packets %lu to %lu.\n",
retry.first,
retry.last);
}
return EXIT_SUCCESS;
}