package edu.hawaii.ics.yucheng;
import java.io.PrintStream;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.Statement;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
/**
* A class that executes a given set of DDL statements on a cluster of
* computers, each running an instance of a DBMS. This program executes the same
* DDL statements on the database instance of each of the computers on the
* cluster concurrently using threads.
*
* @author Cheng Jade
* @assignment ICS 421 Assignment 1
* @date Feb 10, 2010
* @bugs None
*/
class RunDDL implements Runnable {
/** The catalog DBMS, which is common to all threads. */
private static ConfigurationNode catalog;
/** The list of commands to execute, which is common to all threads. */
private static CommandList commands;
/** The configuration node associated with a single thread. */
private final ConfigurationNode node;
/** The thread executing the commands on a single node. */
private final Thread thread;
/** A flag indicating if all commands executed successfully by the thread. */
private boolean success = false;
/**
* The main entry point of the application.
*
* @param args
* The command line arguments.
*/
public static void main(final String[] args) {
assert null != args;
// Check for usage errors.
if (args.length != 2) {
final String name = RunDDL.class.getSimpleName();
System.out.println("Usage: java " + name + " <cfg> <ddl>");
System.out.println(" <cfg> path to a configuration file");
System.out.println(" <ddl> path to a DDL file");
System.exit(0);
return;
}
// Read the configuration and DDL command files.
try {
assert null != args[0];
assert null != args[1];
final Configuration configuration = new Configuration(args[0]);
commands = new CommandList(args[1]);
catalog = configuration.getCatalog();
// Prepare the 'DTABLES' in the catalog database. If the table does
// not exist, a new one is created.
prepareCatalog();
// Start threads for each configuration node.
final ArrayList<RunDDL> instances = new ArrayList<RunDDL>();
for (final ConfigurationNode node : configuration.getNodes())
instances.add(new RunDDL(node));
// Wait for all threads to terminate.
boolean success = true;
for (final RunDDL instance : instances) {
instance.join();
// Check that all commands completed successfully.
success &= instance.success;
}
// Print message indicating if all commands completed successfully.
if (success)
System.out.println("All DDL commands executed successfully.");
else
System.out.println("Not all DDL commands executed successfully.");
} catch (final ProgramException e) {
System.err.println(e.getMessage());
System.exit(1);
return;
}
// Exit cleanly while debugging from Eclipse.
System.exit(0);
}
/**
* Initializes a new instance of the RunDLL class.
*
* @param node
* The cluster node associated with this instance.
*/
private RunDDL(final ConfigurationNode node) {
assert null != node;
// Assign the class field to the parameter values.
this.node = node;
// Create a new thread for this instance.
this.thread = new Thread(this);
// Start the thread.
this.thread.start();
}
/**
* Executes the DDL commands for the node associated with this instance.
*/
public void run() {
try {
StatementFactory.newStatement(this.node, new StatementRunner() {
public void run(Statement statement) throws ProgramException, SQLException {
for (final String command : commands) {
log(System.out, "Executing statement: " + command);
statement.executeUpdate(command);
updateCatalog(command);
log(System.out, "Statement executed successfully.");
}
}
});
// Arriving here indicates all commands executed successfully.
this.success = true;
} catch (final Exception e) {
this.log(System.err, e.getMessage());
}
}
/**
* Ensures the catalog 'DTABLES' table exists.
*
* @throws ProgramException
* Thrown if there is a problem verifying the table exists or if
* the table cannot be created.
*/
private static void prepareCatalog() throws ProgramException {
assert null != catalog;
// Open the connection, check if the table exists, and create the table
// if it does not exist.
try {
StatementFactory.newStatement(catalog, new StatementRunner() {
public void run(Statement statement) throws ProgramException, SQLException {
if (!tableExists(statement.getConnection(), "DTABLES")) {
System.out.println("Creating table CATALOG.DTABLES.");
statement.executeUpdate(newCreateTableCommand());
}
}
});
} catch (final Exception e) {
throw new ProgramException(e);
}
}
/**
* Returns true if a specified command is of a specified type.
*
* @param command
* The DDL command, e.g. "CREATE TABLE ...".
* @param type
* The command type, e.g. "CREATE".
*/
private static boolean isCommandType(
final String command,
final String type) {
assert null != command;
assert null != type;
return command.toUpperCase().startsWith(type.toUpperCase());
}
/**
* Creates and returns a new DDL command to create the 'DTABLES' catalog
* table.
*
* @return The new DDL command to create the 'DTABLES' catalog table.
*/
private static String newCreateTableCommand() {
final StringBuilder builder = new StringBuilder();
builder.append("CREATE TABLE DTABLES (");
builder.append("TNAME CHAR(32), ");
builder.append("NODEDRIVER CHAR(64), ");
builder.append("NODEURL CHAR(128), ");
builder.append("NODEUSER CHAR(16), ");
builder.append("NODEPASSWD CHAR(16), ");
builder.append("PARTMTD INT, ");
builder.append("NODEID INT, ");
builder.append("PARTCOL CHAR(32), ");
builder.append("PARTPARAM1 CHAR(32), ");
builder.append("PARTPARAM2 CHAR(32))");
return builder.toString();
}
/**
* Surrounds a specified string with single quotes.
*
* @param text
* Some text to quote.
*
* @return The quoted text.
*/
private static String quote(final String text) {
assert null != text;
return "'" + text + "'";
}
/**
* Returns true if a specified table exists in a database.
*
* @param connection
* The DBMS connection.
*
* @param tableName
* The name of the table.
*
* @return True indicates the table exists.
*
* @throws ProgramException
* Thrown if there is an error checking the meta-data of the
* DBMS connection.
*/
private static boolean tableExists(
final Connection connection,
final String tableName) throws ProgramException {
assert null != connection;
assert null != tableName;
try {
// Get the tables matching the specified name. Upper-case seems to
// work best!
final DatabaseMetaData meta = connection.getMetaData();
final ResultSet set = meta.getTables(
null, null, tableName.toUpperCase(), null);
try {
// Return true if there are any records found, i.e. the table
// exists.
return set.next();
} finally {
set.close();
}
} catch (final SQLException e) {
throw new ProgramException(e);
}
}
/**
* Parses a command and returns the table name for the command, or null if
* there is none.
*
* @param command
* The DDL command.
*
* @return The table name within the command.
*/
private static String tableName(final String command) {
assert null != command;
// Work with the command in upper-case letters.
String result = command.toUpperCase();
// Search for the word TABLE.
int index;
if (-1 == (index = result.indexOf("TABLE") + 5))
return null;
// Remove whitespace around the remaining portion of the command.
result = result.substring(index).trim();
// Look for the first occurrence of a special character. If a '.' is
// encountered, remove this portion of the text, which is most likely
// the schema.
index = 0;
while (index < result.length()) {
final char ch = result.charAt(index);
if (ch == '.') {
result = result.substring(index + 1);
index = 0;
continue;
}
if (!Character.isLetterOrDigit(result.charAt(index)))
break;
index++;
}
// The remaining portion is the table name.
result = result.substring(0, index);
return result.length() == 0 ? null : result;
}
/**
* Logs a message to a specified output stream.
*
* @param stream
* The output stream, e.g. System.out.
*
* @param message
* The message to log.
*/
private void log(final PrintStream stream, final String message) {
assert null != stream;
assert null != message;
stream.println("[" + node.hostname + "] " + message);
}
/**
* Joins the thread and checks for errors. In the case of an error, a
* warning is displayed, but no exception is thrown.
*/
private void join() {
try {
this.thread.join();
} catch (final InterruptedException e) {
System.err.println(
"WARNING: Thread [" +
this.node.name +
"] failed to respond.");
}
}
/**
* Creates and returns a new DDL command to delete a row from the 'DTABLES'
* catalog table.
*
* @param tableName
* The name of the table to delete.
*
* @return A new DDL command to delete a row from the 'DTABLES' catalog
* table.
*/
private String newDeleteRowCommand(final String tableName) {
assert null != tableName;
final StringBuilder builder = new StringBuilder();
builder.append("DELETE FROM DTABLES WHERE TNAME = ");
builder.append(quote(tableName));
builder.append(" AND NODEURL = ");
builder.append(quote(this.node.hostname));
return builder.toString();
}
/**
* Creates and returns a new DDL command to insert a row from the 'DTABLES'
* catalog table.
*
* @param tableName
* The name of the table to insert.
*
* @return A new DDL command to insert a row from the 'DTABLES' catalog
* table.
*/
private String newInsertRowCommand(final String tableName) {
assert null != tableName;
final StringBuilder builder = new StringBuilder();
builder.append("INSERT INTO DTABLES VALUES (");
builder.append(quote(tableName));
builder.append(", ");
builder.append(quote(this.node.driver));
builder.append(", ");
builder.append(quote(this.node.hostname));
builder.append(", ");
builder.append(quote(this.node.username));
builder.append(", ");
builder.append(quote(this.node.password));
builder.append(", NULL, ");
builder.append(this.node.name.substring(4));
builder.append(", NULL");
builder.append(", NULL");
builder.append(", NULL)");
return builder.toString();
}
/**
* Updates the catalog database based on a successfully executed DDL command
* by one of the threads.
*
* @param command
* The DDL command that just completed successfully.
*
* @throws ProgramException
* Thrown if there is an error updating the catalog database.
*/
private void updateCatalog(final String command) throws ProgramException {
assert null != command;
// Get the table name. If there is none, return immediately.
final String tableName;
if (null == (tableName = tableName(command)))
return;
// Open a connection to the catalog database.
try {
StatementFactory.newStatement(catalog, new StatementRunner() {
public void run(Statement statement) throws ProgramException, SQLException {
// Check for CREATE and DROP commands; in these cases,
// add or remove a row from the DTABLES table, respectively.
final String catalogCommand;
if (isCommandType(command, "CREATE"))
catalogCommand = newInsertRowCommand(tableName);
else if (isCommandType(command, "DROP"))
catalogCommand = newDeleteRowCommand(tableName);
else {
log(System.err, "Unsupported command: " + command);
return;
}
log(System.out, "Updating catalog: " + catalogCommand);
statement.executeUpdate(catalogCommand);
}
});
// Arriving here indicates all commands executed successfully.
this.success = true;
} catch (final Exception e) {
this.log(System.err, "Failed to update catalog dtables. " + e.getMessage());
}
}
}