package edu.hawaii.ics.yucheng;
import java.io.StringReader;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
/**
* A class implements Runnable. The run method takes a list of CSV entries and
* upload them into corresponding tables on the distributed database system.
*
* @author Cheng Jade
* @assignment ICS 421 Assignment 4
* @date Mar 22, 2010
* @bugs None
*/
public class LoadCSV implements Runnable {
/* the create or drop configuration object. */
public final LoadCSVConfiguration configuration;
/* the catalog node extracted from the overall configuration. */
public final ConfigurationNode catalog;
/* a list of SQL commands to be executed. */
private final ArrayList<String[]> csvs = new ArrayList<String[]>();
/* a record of whether all CSV have been loaded successfully. */
private Boolean success = null;
/**
* Initialize a new instance of this object
*/
public LoadCSV(
final LoadCSVConfiguration configuration,
final ArrayList<String> rawCSV) throws ProgramException {
if (null == configuration)
throw new NullPointerException("configuration");
if (null == rawCSV)
throw new NullPointerException("rawCSV");
// initialize configuration and catalog.
this.configuration = configuration;
this.catalog = this.configuration.catalog;
// stitch rawCSV lines into one big string.
final StringBuilder builder = new StringBuilder();
for (String item : rawCSV)
builder.append(item + "\n");
final StringReader reader = new StringReader(builder.toString());
// parse the string and obtain all CSV entries and fields of each entry.
String[] fields;
while (null != (fields = CSVParser.parse(reader)))
csvs.add(fields);
}
/**
* The main routine that insert rows to corresponding nodes and update the
* catalog dtables.
*/
public void run() {
try {
// For each node, construct a list of CSVs that will be partitioned
// to this node.
final ArrayList<CSVsNodePair> csvsToNodes = new ArrayList<CSVsNodePair>();
for (int i = 0; i < this.configuration.nodes.size(); i++)
csvsToNodes.add(new CSVsNodePair(this.configuration.nodes.get(i).node));
// Exam every CSV and decide which list this CSV belongs.
final ArrayList<String> columnNames = columnNames();
for (String[] item : csvs) {
final int toID = partitionTo(columnNames, item);
for (CSVsNodePair pair : csvsToNodes) {
if (Integer.parseInt(pair.node.name.substring(4)) == toID)
pair.nodeCSVs.add(item);
}
}
// Create a bulk insert statement for each node and start a thread
// on each node to execute this insert statement. Threads also
// update the catalog dtables.
final int count = csvsToNodes.size();
final Thread[] threads = new Thread[count];
for (int i = 0; i < count; i++) {
CSVsNodePair pair = csvsToNodes.get(i);
final String bulkInsertStatement = bulkInsertStatement(pair);
threads[i] = new Thread(new Runner(pair.node, bulkInsertStatement));
threads[i].start();
}
// wait for threads.
for (int i = 0; i < csvsToNodes.size(); i++)
DistributedDB.join(threads[i], csvsToNodes.get(i).node);
// Print message indicating if all commands completed successfully.
if (this.success)
System.out.println("All CSV were loaded successfully.");
else
System.out.println("Not all CSV were loaded successfully.");
} catch (final ProgramException e) {
System.err.println(e.getMessage());
System.exit(1);
return;
}
}
/**
* Connect to a node and retrieve the columns names of this table.
*
* @return The list of column names in the target table.
*/
private ArrayList<String> columnNames() throws ProgramException {
final ConfigurationNode node = configuration.nodes.get(0).node;
final ArrayList<String> columnNames = new ArrayList<String>();
try {
node.runStatement(new StatementRunner() {
public void run(Statement statement) throws ProgramException, SQLException {
ResultSet set = statement.executeQuery("SELECT * FROM "
+ configuration.tableName);
ResultSetMetaData meta = set.getMetaData();
for (int i = 1; i <= meta.getColumnCount(); i++)
columnNames.add(meta.getColumnName(i));
}
});
node.log(System.out, "Successfully connected and retrieved column names from a node db.");
return columnNames;
} catch (final Exception e) {
node.log(System.err, e.getMessage());
throw new ProgramException("columnNames " + e);
}
}
/**
* Calculates the node ID that a specified CSV entry will be partitioned to.
*
* @param columnNames
* The list of columns names retrieved from target table.
*
* @param csv
* The specified CSV entry to be calculated.
*
* @return The node ID that this CSV entry will be partitioned to.
*
* @throws ProgramException
* Thrown if there are any errors processing the CSV file.
* @throws NullPointerException
* Thrown if the argument is null.
*/
private int partitionTo(
final ArrayList<String> columnNames,
final String[] csv) throws ProgramException {
if (null == columnNames)
throw new NullPointerException("columnNames");
if (null == csv)
throw new NullPointerException("csv");
int indexOfPartitionColumn = -1;
String partitionColumn = this.configuration.partitionColumn;
for (int i = 0; i < columnNames.size(); i++) {
if (columnNames.get(i).equalsIgnoreCase(partitionColumn)) {
indexOfPartitionColumn = i;
break;
}
}
if (indexOfPartitionColumn == -1)
throw new ProgramException("Mismatch partition table name and the dtables");
String partitionValue = csv[indexOfPartitionColumn];
try {
int intPartitionValue = Integer.parseInt(partitionValue);
if (this.configuration.partitionMethod.equalsIgnoreCase("range")) {
for (int i = 0; i < this.configuration.nodes.size(); i++) {
int param1 = Integer.parseInt(this.configuration.nodes.get(i).param1);
int param2 = Integer.parseInt(this.configuration.nodes.get(i).param2);
if (intPartitionValue > param1 && intPartitionValue <= param2)
return i + 1;
}
throw new ProgramException("Partition value out of range");
}
if (configuration.partitionMethod.equalsIgnoreCase("hash")) {
int param1 = Integer.parseInt(configuration.nodes.get(0).param1);
return (intPartitionValue % param1) + 1;
}
throw new ProgramException("Support only range and hash partition methods.");
} catch (NumberFormatException e) {
throw new ProgramException("Support only numeric partion values");
}
}
/**
* A private class that groups a node and its list of CSVs together.
*/
private class CSVsNodePair {
public final ConfigurationNode node;
public final ArrayList<String[]> nodeCSVs = new ArrayList<String[]>();
public CSVsNodePair(ConfigurationNode node) {
this.node = node;
}
}
/**
* Create and return a bulk insert statement.
*
* @param pair
* A CSVsNodepair used to generate insert statement.
*
* @return The bulk insert statement created.
*/
private String bulkInsertStatement(CSVsNodePair pair) {
final StringBuilder builder = new StringBuilder();
builder.append("INSERT INTO " + this.configuration.tableName + " VALUES \n");
final int nodeNum = pair.nodeCSVs.size();
for (int i = 0; i < nodeNum; i++) {
String[] csv = pair.nodeCSVs.get(i);
builder.append("\t\t\t\t(");
for (int j = 0; j < csv.length - 1; j++)
builder.append(DistributedDB.quote(csv[j]) + ", ");
builder.append(DistributedDB.quote(csv[csv.length - 1]) + ")");
if (i != nodeNum - 1)
builder.append(", \n");
}
return builder.toString();
}
/**
* Create and return an update statement based on the partition information.
* This SQL statement will be used to modify dtables.
*
* @param partmtd
* An dtable entry, 1 for range, 2 for hash
*
* @param partparam1
* An dtable entry, partition parameter 1
*
* @param partparam2
* An dtable entry, partition parameter 2
*
* @param node
* A ConfigurationNode to figure out the node ID entry of dtables
*
* @return The the update statement
*/
private String updateStatement(
final String partmtd,
final String partparam1,
String partparam2,
final ConfigurationNode node) {
assert null != partmtd;
assert null != partparam1;
partparam2 = partparam2 == null ? "" : partparam2;
StringBuilder builder = new StringBuilder();
builder.append("UPDATE DTABLES");
builder.append(" SET PARTMTD = ");
builder.append(DistributedDB.quote(partmtd));
builder.append(", PARTCOL = ");
builder.append(DistributedDB.quote(this.configuration.partitionColumn));
builder.append(", PARTPARAM1 = ");
builder.append(DistributedDB.quote(partparam1));
builder.append(", PARTPARAM2 = ");
builder.append(DistributedDB.quote(partparam2));
builder.append(" WHERE (TNAME = ");
builder.append(DistributedDB.quote(this.configuration.tableName));
builder.append(" OR TNAME = UCASE(");
builder.append(DistributedDB.quote(configuration.tableName) + "))");
if (node == null)
return builder.toString();
builder.append(" AND NODEID = ");
builder.append(DistributedDB.quote(node.name.substring(4)));
return builder.toString();
}
/**
* A private runner object.
*/
private class Runner implements Runnable {
/** The configuration node associated with a single thread. */
private final ConfigurationNode node;
/** The SQL statement to be executed. */
private final String bulkInsertStatement;
/**
* Initializes a new instance of the RunSQL Runner.
*
* @param node
* The cluster node associated with this instance.
*
* @param command
* The command to execute.
*/
public Runner(final ConfigurationNode node, final String bulkInsertStatement) {
assert null != node;
assert null != bulkInsertStatement;
this.node = node;
this.bulkInsertStatement = bulkInsertStatement;
}
/**
* Executes the insert commands for the node associated with this
* instance, and update the catalog datables when insertion finishes.
*/
public void run() {
try {
// connect to the node and execute the bulk insert statement.
this.node.runStatement(new StatementRunner() {
public void run(Statement statement) throws ProgramException, SQLException {
node.log(System.out, "Executing: " + bulkInsertStatement);
statement.execute(bulkInsertStatement);
node.log(System.out, "Statement executed successfully.");
}
});
// connect to the catalog and update dtables.
catalog.runStatement(new StatementRunner() {
public void run(Statement statement) throws ProgramException, SQLException {
// dispatch if the partition method is hash.
if (configuration.partitionMethod.equalsIgnoreCase("hash")) {
String updateStatement = updateStatement("2",
configuration.nodes.get(0).param1,
configuration.nodes.get(0).param2, null);
// test code
catalog.log(System.out, "Executing a update statement " + updateStatement);
statement.execute(updateStatement);
catalog.log(System.out, "Updated for '" + node.hostname + "'");
if (success == null)
success = true;
return;
}
// dispatch if the partition method is range.
if (configuration.partitionMethod.equalsIgnoreCase("range")) {
for (int i = 0; i < configuration.nodes.size(); i++) {
String updateStatement = updateStatement("1",
configuration.nodes.get(i).param1,
configuration.nodes.get(i).param2,
configuration.nodes.get(i).node);
// test code
catalog.log(System.out, "Executing a update statement " + updateStatement);
statement.execute(updateStatement);
}
catalog.log(System.out, "Updated for '" + node.hostname + "'");
if (success == null)
success = true;
return;
}
// no other partition method is supported.
throw new ProgramException("Support only range and hash two partition methods.");
}
});
} catch (final ProgramException e) {
success = false;
this.node.log(System.err, e.getMessage());
}
}
}
public String toString() {
return this.configuration.toString();
}
}