package edu.hawaii.ics.yucheng;
import java.sql.ResultSet;
import java.sql.Statement;
import java.sql.SQLException;
import java.util.ArrayList;
import org.antlr.runtime.ANTLRStringStream;
import org.antlr.runtime.CharStream;
import org.antlr.runtime.CommonTokenStream;
import org.antlr.runtime.ParserRuleReturnScope;
import org.antlr.runtime.RecognitionException;
import org.antlr.runtime.TokenStream;
import org.antlr.runtime.tree.BaseTree;
import org.antlr.runtime.tree.Tree;
/**
* A class that executes a given SQL statements on a cluster of computers, each
* running an instance of a DBMS. This program executes the same DDL statements
* on the database instance of each of the computers on the cluster concurrently
* using threads.
*
* @author Cheng Jade
* @assignment ICS 421 Assignment 2-1
* @date Feb 29, 2010
* @bugs None
*/
public class RunSQL {
/**
* Parses an SQL command and returns the table name for a SELECT statement.
*
* @param command
* The command to parse.
*
* @return The table name.
*
* @throws ProgramException
* Thrown if the table name cannot be determined.
*/
private static String selectTableName(String command)
throws ProgramException {
assert null != command;
// Get ready for ANTLR!
final CharStream charStream = new ANTLRStringStream(command);
final SQLLexer lexer = new SQLLexer(charStream);
final TokenStream tokenStream = new CommonTokenStream(lexer);
final SQLParser parser = new SQLParser(tokenStream);
try {
final ParserRuleReturnScope scope = parser.selectStatement();
final BaseTree tree = (BaseTree) scope.getTree();
if (tree == null)
throw new ProgramException("Cannot parse SQL statement.");
final Tree from = tree.getFirstChildWithType(SQLParser.SQLFROM);
if (from == null)
throw new ProgramException("Cannot parse SQL statement.");
final int fromIndex = from.getChildIndex();
if (fromIndex + 1 >= tree.getChildCount())
throw new ProgramException("Cannot parse SQL statement.");
final String result = tree.getChild(fromIndex + 1).getText();
// Note: ANTLR could do this.
if (result.startsWith("'"))
return result.substring(1, result.length() - 1);
return result;
} catch (final RecognitionException e) {
throw new ProgramException(e);
}
}
/**
* The main entry point of the application.
*
* @param args
* The command line arguments.
*/
public static void main(final String[] args) {
assert null != args;
// Check for usage errors.
if (args.length != 2) {
final String name = RunSQL.class.getSimpleName();
System.err.println("Usage: java " + name + " <cfg> <ddl>");
System.err.println(" <cfg> path to a configuration file");
System.err.println(" <ddl> path to a SQL file");
System.exit(0);
return;
}
// the nodes read from the catalog.
final ArrayList<ConfigurationNode> nodes = new ArrayList<ConfigurationNode>();
// Read the configuration and SQL command files.
try {
assert null != args[0];
final ConfigurationNode catalog = new ConfigurationNode(args[0], "catalog");
assert null != args[1];
final CommandList commands = new CommandList(args[1]);
if (commands.size() != 1)
throw new ProgramException(
"This program supports only one SQL file statement.");
final String command = commands.get(0);
final String tableName = selectTableName(command);
// Connect to the catalog.
catalog.runStatement(new StatementRunner() {
public void run(Statement statement) throws ProgramException, SQLException {
final String lowerSql = "SELECT DISTINCT * FROM DTABLES WHERE TNAME = '" + tableName + "'";
catalog.log(System.out, "Executing " + lowerSql);
ResultSet set = statement.executeQuery(lowerSql);
if (!set.next()) {
final String upperSql = "SELECT DISTINCT * FROM DTABLES WHERE TNAME = '" + tableName.toUpperCase() + "'";
catalog.log(System.out, "Executing " + upperSql);
set = statement.executeQuery(upperSql);
} else {
set = statement.executeQuery(lowerSql);
}
try {
// For all cluster node found that have this table, add
// the node info to the configuration.
while (set.next()) {
final String driver = set.getString("NODEDRIVER").trim();
final String url = set.getString("NODEURL").trim();
final String user = set.getString("NODEUSER").trim();
final String password = set.getString("NODEPASSWD").trim();
final String name = "node " + set.getString("NODEID").trim();
nodes.add(new ConfigurationNode(name, driver, url, user, password));
}
} finally {
set.close();
}
}
});
if (nodes.size() == 0)
throw new ProgramException(
"No cluster node with talbe name '" + tableName + "' was found.");
catalog.log(System.out, nodes.size() + " nodes with table name '" + tableName + "' were found.");
// Start threads for each configuration node.
final ArrayList<Runner> runners = new ArrayList<Runner>();
for (final ConfigurationNode node : nodes) {
final Runner runner = new Runner(node, command);
runner.start();
runners.add(runner);
}
// Wait for threads the complete.
for (final Runner runner : runners)
runner.join();
// Print the results.
for (final Runner runner : runners)
for (final String result : runner.getResults())
System.out.println(result);
} catch (final ProgramException e) {
System.err.println(e.getMessage());
System.exit(1);
return;
}
// Exit cleanly while debugging from Eclipse.
System.exit(0);
}
}
class Runner implements Runnable {
/** The configuration node associated with a single thread. */
private final ConfigurationNode node;
/** The SQL statement to be executed. */
private final String command;
/** The thread that executes the query statement. */
private final Thread thread = new Thread(this);
/** The results from the query. */
private final ArrayList<String> results = new ArrayList<String>();
/**
* Initializes a new instance of the RunSQL Runner.
*
* @param node
* The cluster node associated with this instance.
*
* @param command
* The command to execute.
*/
public Runner(final ConfigurationNode node, final String command) {
assert null != node;
assert null != command;
this.node = node;
this.command = command;
}
/**
* Gets the results.
*
* @return The array of records from the query.
*/
public String[] getResults() {
synchronized (this) {
final String[] result = new String[this.results.size()];
this.results.toArray(result);
return result;
}
}
/**
* Joins the thread. If there are any errors, a warning is displayed, but
* the method returns successfully.
*/
public void join() {
try {
this.thread.join();
} catch (final InterruptedException e) {
this.node.log(System.err, "Thread did not respond.");
}
}
/**
* Starts the thread.
*/
public void start() {
// Clear any previous results.
synchronized (this) {
this.results.clear();
}
this.thread.start();
}
/**
* Executes the DDL commands for the node associated with this instance.
*/
public void run() {
// Connect to the specific node and execute the select statement.
try {
this.node.runStatement(new StatementRunner() {
public void run(Statement statement) throws ProgramException, SQLException {
// Execute the select statement.
Runner.this.node.log(System.out, "Executing " + Runner.this.command);
ResultSet set = statement.executeQuery(Runner.this.command);
try {
synchronized (Runner.this) {
while (set.next()) {
final StringBuilder builder = new StringBuilder();
final int count = set.getMetaData().getColumnCount();
for (int i = 1; i <= count; i++) {
builder.append(set.getString(i).trim());
if (i < count)
builder.append("\t");
}
Runner.this.results.add(builder.toString());
}
}
Runner.this.node.log(System.out, "Success");
} finally {
set.close();
}
}
});
} catch (final ProgramException e) {
this.node.log(System.err, e.getMessage());
}
}
}