package edu.hawaii.ics.yucheng;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
/**
* A class implements Runnable. The run method takes a list of SQL statements.
* Only select and join statements are supported. The queries will be executed
* accordingly on the distributed database system.
*
* @author Cheng Jade
* @assignment ICS 421 Assignment 4
* @date Mar 22, 2010
* @bugs None
*/
public class SelectOrJoin implements Runnable {
/* the create or drop configuration object. */
public final SelectOrJoinConfiguration configuration;
/* the catalog node extracted from the overall configuration. */
public final ConfigurationNode catalog;
/* a list of SQL commands to be executed. */
private final SQLList sqls;
/**
* Initialize a new instance of this object
*/
public SelectOrJoin(final SelectOrJoinConfiguration configuration, final ArrayList<String> sqls) {
if (null == configuration)
throw new NullPointerException("CreateOrDrop");
this.configuration = configuration;
this.catalog = this.configuration.catalog;
this.sqls = (SQLList) sqls;
}
/**
* The main routine of that executes a list of SQL statements. They are
* either select statements or join statements.
*/
public void run() {
try {
final ArrayList<SQLStructure> selectSqls = new ArrayList<SQLStructure>();
final ArrayList<SQLStructure> joinSqls = new ArrayList<SQLStructure>();
for (final String sql : this.sqls) {
final SQLStructure sqlStructure = new SQLStructure(sql);
final SQLType sqlType = sqlStructure.type;
if (sqlType == SQLType.SELECT) {
selectSqls.add(sqlStructure);
continue;
}
if (sqlType == SQLType.JOIN) {
joinSqls.add(sqlStructure);
continue;
}
throw new ProgramException("unsupported sql: " + sql);
}
for (final SQLStructure sql : selectSqls)
this.runSelect(sql);
for (final SQLStructure sql : joinSqls)
this.runJoin(sql);
} catch (final ProgramException e) {
System.err.println(e.getMessage());
return;
}
}
/**
* Execute one join statement and print the results.
*/
private void runJoin(final SQLStructure sql) throws ProgramException {
// Parse sql statement. Obtain the table names, projection push down
// column names for each table, selection push down qualifications
// for each table and the overall where clause.
final String tableName1 = sql.tableName1;
final String tableName2 = sql.tableName2;
final ArrayList<String> table1RelevantColumns = stripTableName(sql.table1RelevantColumns);
final ArrayList<String> table2RelevantColumns = stripTableName(sql.table2RelevantColumns);
// Parse the configuration file. Use the catalog as localhost if
// there's no local host section detected in the configuration file.
final ConfigurationNode localNode = this.configuration.localNode;
// Connect to the catalog and retrieve the nodes that contain pieces
// of each target table.
final ArrayList<ConfigurationNode> table1RelevantNodes = new ArrayList<ConfigurationNode>();
final ArrayList<ConfigurationNode> table2RelevantNodes = new ArrayList<ConfigurationNode>();
this.fetchRelevantNodes(tableName1, tableName2,
table1RelevantNodes, table2RelevantNodes);
// Make a target list with "TEMP_" appended in the column names.
final String[] targetColumns = sql.targetColumns;
final ArrayList<String> tempTargetColumns = new ArrayList<String>();
for (final String item : targetColumns)
tempTargetColumns.add(("TEMP_" + item));
// an array list collecting final join results.
final ArrayList<String> result = new ArrayList<String>();
for (int i = 0; i < table1RelevantNodes.size(); i++) {
final ConfigurationNode nodei = table1RelevantNodes.get(i);
final String createTempTableStatement1 = createTempTableStatement(nodei, tableName1, table1RelevantColumns);
final String insertTempTableStatement1 = insertTempTableStatement(nodei, tableName1, table1RelevantColumns, sql.table1QualificationColumns);
for (int j = 0; j < table2RelevantNodes.size(); j++) {
final ConfigurationNode nodej = table2RelevantNodes.get(j);
final String createTempTableStatement2 = createTempTableStatement(nodej, tableName2, table2RelevantColumns);
final String insertTempTableStatement2 = insertTempTableStatement(nodej, tableName2, table2RelevantColumns, sql.table2QualificationColumns);
// create the join statement.
final String joinStatement = joinStatement(
sql.sql,
tableName1,
tableName2);
// execute the join statement.
localNode.runStatement(new StatementRunner() {
public void run(final Statement statement) throws ProgramException, SQLException {
// create global temporary table with only the relevant
// columns for table 1
localNode.log(System.out, "Executing " + createTempTableStatement1);
statement.executeUpdate(createTempTableStatement1);
// bulk insert the relevant rows from node i to
// temporary table 1
if (insertTempTableStatement1 != null) {
localNode.log(System.out, "Executing " + insertTempTableStatement1);
statement.executeUpdate(insertTempTableStatement1);
}
// create global temporary table with only the relevant
// columns for table 2
localNode.log(System.out, "Executing " + createTempTableStatement2);
statement.executeUpdate(createTempTableStatement2);
// bulk insert the relevant rows from node j to
// temporary table 2
if (insertTempTableStatement2 != null) {
localNode.log(System.out, "Executing " + insertTempTableStatement2);
statement.executeUpdate(insertTempTableStatement2);
}
// execute the join on the two temporary tables.
localNode.log(System.out, "Executing " + joinStatement);
final ResultSet set = statement.executeQuery(joinStatement);
while (set.next()) {
final StringBuilder builder = new StringBuilder();
final int count = set.getMetaData().getColumnCount();
for (int i = 1; i <= count; i++)
builder.append(set.getString(i).trim() + " ");
result.add(builder.toString());
}
}
});
}
}
// Print the final result.
System.out.println("\nStatement, " + sql.sql + ", executing result: ");
for (final String item : result)
System.out.println(item.trim());
System.out.println();
}
/**
* Execute one select statement and print the results.
*/
private void runSelect(final SQLStructure sql) throws ProgramException {
// the nodes read from the catalog.
final ArrayList<ConfigurationNode> nodes = new ArrayList<ConfigurationNode>();
final String tableName = sql.tableName1;
// Connect to the catalog.
this.catalog.runStatement(new StatementRunner() {
public void run(final Statement statement) throws ProgramException, SQLException {
final String sql = "SELECT DISTINCT * FROM DTABLES WHERE TNAME = '"
+ tableName + "' OR TNAME = UCASE('" + tableName + "')";
SelectOrJoin.this.catalog.log(System.out, "Executing " + sql);
final ResultSet set = statement.executeQuery(sql);
try {
// For all cluster node found that have this table, add
// the node info to the configuration.
while (set.next()) {
final String driver = set.getString("NODEDRIVER").trim();
final String url = set.getString("NODEURL").trim();
final String user = set.getString("NODEUSER").trim();
final String password = set.getString("NODEPASSWD").trim();
final String name = "node " + set.getString("NODEID").trim();
nodes.add(new ConfigurationNode(name, driver, url, user, password));
}
} finally {
set.close();
}
}
});
final int count = nodes.size();
this.catalog.log(System.out, nodes.size() + " nodes with table name '" + tableName + "' were found.");
// start threads on each node
final Thread[] threads = new Thread[count];
final ArrayList<SelectRunner> runners = new ArrayList<SelectRunner>();
for (int i = 0; i < count; i++) {
final SelectRunner runner = new SelectRunner(nodes.get(i), sql.sql);
runners.add(runner);
threads[i] = new Thread(runner);
threads[i].start();
}
// wait for the threads to complete and collect the results.
final StringBuilder resultBuilder = new StringBuilder();
for (int i = 0; i < count; i++) {
DistributedDB.join(threads[i], nodes.get(i));
final String[] results = runners.get(i).getResults();
for (final String result : results)
resultBuilder.append(result + "\n");
}
// print the collected result
if (count != 0) {
System.out.println("\nResults of executing " + sql.sql + " are: ");
System.out.println(resultBuilder.toString());
}
}
/**
* Method returns a bulk insert statement based on the relevant columns and
* relevant qualifications. This method applies both projection and
* selection push down.
*
* @param srcNode
* The node that's to be copied from.
* @param tableName
* The table whose contents are to be copied.
* @param relevantColumns
* The list of columns that're related to this table.
* @param relevantQualifications
* The list of qualifications that're related to this table.
*
*/
private static String insertTempTableStatement(
final ConfigurationNode srcNode,
final String tableName,
final ArrayList<String> relevantColumns,
final ArrayList<String> relevantQualifications) {
final int relaventColumnsCount = relevantColumns.size();
final String tempTableName = ("TEMP_" + tableName);
// retrieve all records from the target table in node1.
final ArrayList<String[]> records = new ArrayList<String[]>();
try {
srcNode.runStatement(new StatementRunner() {
public void run(final Statement statement) throws ProgramException, SQLException {
String whereClause = "";
final int size = relevantQualifications.size();
if (size != 0) {
whereClause += " WHERE ";
for (int i = 0; i < size - 1; i++)
whereClause += relevantQualifications.get(i) + " OR ";
whereClause += relevantQualifications.get(size - 1);
}
final String selectStatement = "SELECT " +
commaSeparatedColumnNames(relevantColumns)
+ " FROM " + tableName + whereClause;
final ResultSet set = statement.executeQuery(selectStatement);
while (set.next()) {
final String[] record = new String[relaventColumnsCount];
for (int i = 1; i <= relaventColumnsCount; i++) {
final Object o = set.getString(i);
record[i - 1] = o == null ? "null" : o.toString().trim();
}
records.add(record);
}
}
});
} catch (final Exception e) {
srcNode.log(System.err, e.toString());
}
// construct the bulk insert statement.
final StringBuilder builder = new StringBuilder();
builder.append("INSERT INTO SESSION." + tempTableName + " VALUES \n");
final int recordsCount = records.size();
if (recordsCount == 0)
return null;
for (int i = 0; i < recordsCount; i++) {
final String[] record = records.get(i);
builder.append("\t\t\t\t(");
for (int j = 0; j < relaventColumnsCount - 1; j++)
builder.append(DistributedDB.quote(record[j]) + ", ");
builder.append(DistributedDB.quote(record[relaventColumnsCount - 1]) + ")");
if (i != recordsCount - 1)
builder.append(", \n");
}
return builder.toString();
}
/**
* Method returns a creates table statement that makes a temporary table.
* The temporary table contains only the necessary columns.
*
* @param srcNode
* The node that the original table is located.
* @param tableName
* The table whose schema is used for the temporary table.
* @param relevantColumns
* The list of columns that're related to this table.
*
*/
private static String createTempTableStatement(
final ConfigurationNode srcNode,
final String tableName,
final ArrayList<String> relevantColumns) {
final String tempTableName = ("TEMP_" + tableName);
// otherwise make a temp table with minimal relevant columns
final ArrayList<String> columnNames = new ArrayList<String>();
final ArrayList<String> columnTypes = new ArrayList<String>();
try {
srcNode.runStatement(new StatementRunner() {
public void run(final Statement statement) throws ProgramException, SQLException {
final String query = "SELECT " +
commaSeparatedColumnNames(relevantColumns)
+ " FROM " + tableName;
final ResultSet set = statement.executeQuery(query);
srcNode.log(System.out, "Excecuting " + query);
// retrieve the column names and column types.
final ResultSetMetaData meta = set.getMetaData();
if (!set.next())
srcNode.log(System.out, String.format("Table '%s' is empty", tableName));
final StringBuilder builder = new StringBuilder();
for (int i = 1; i <= meta.getColumnCount(); i++) {
columnNames.add(meta.getColumnName(i));
String columnType = meta.getColumnTypeName(i);
// if column type is char, make its size to be 128
// long in the temp table.
if (columnType.equalsIgnoreCase("CHAR")) {
builder.setLength(0);
builder.append(columnType);
builder.append("(128)");
columnType = builder.toString();
}
columnTypes.add(columnType);
}
}
});
} catch (final Exception e) {
srcNode.log(System.err, e.getMessage());
}
// construct the create table statement.
final StringBuilder builder = new StringBuilder();
// builder.append("CREATE TABLE ");
builder.append("DECLARE GLOBAL TEMPORARY TABLE ");
builder.append(tempTableName + " (");
final int columnNumber = columnNames.size();
for (int i = 0; i < columnNumber - 1; i++) {
builder.append(columnNames.get(i) + " ");
builder.append(columnTypes.get(i) + ", ");
}
builder.append(columnNames.get(columnNumber - 1) + " ");
builder.append(columnTypes.get(columnNumber - 1) + ")");
builder.append(" ON COMMIT PRESERVE ROWS NOT LOGGED");
return builder.toString();
}
/**
* A helper method that takes a list of columns names and append them into
* to a comma separated string.
*
* @param columns
* The list of columns names
*
* @return The comma separated column names
*
*/
private static String commaSeparatedColumnNames(
final ArrayList<String> columns) {
final StringBuilder builder = new StringBuilder();
for (int i = 0; i < columns.size() - 1; i++)
builder.append(columns.get(i) + ", ");
builder.append(columns.get(columns.size() - 1));
return builder.toString();
}
/**
* A helper method that takes a list of columns names in the format of
* "tableName.columnName, and returns a list of columns without the leading
* table names.
*
* @param columns
* The list of columns in the format of "tableName.columnName"
*
* @return A list of column names in the format of "columnName"
*
*/
private static ArrayList<String> stripTableName(final ArrayList<String> columns) {
final ArrayList<String> stripedColumns = new ArrayList<String>();
for (String item : columns) {
item = item.replace('.', ':');
final String[] result = item.split(":");
final int size = result.length;
assert size <= 2;
if (size == 1)
stripedColumns.add(result[0].trim());
if (size == 2)
stripedColumns.add(result[1].trim());
}
return stripedColumns;
}
/**
* Connect to specified catalog node and retrieves the node information. It
* fills two ArrayLists with nodes that contain pieces of each of the two
* specified input tables.
*
* @param catalog
* The catalog DB, whose dtables contains the distribution info.
* @param tableName1
* The first table name that it's looking for.
* @param tableName2
* The second table name that it's looking for.
* @param table1RElevantNodes
* The ArrayList containing the nodes that relevant to
* tableName1.
* @param table2RElevantNodes
* The ArrayList containing the nodes that relevant to
* tableName2.
*
*/
private void fetchRelevantNodes(
final String tableName1,
final String tableName2,
final ArrayList<ConfigurationNode> table1RelevantNodes,
final ArrayList<ConfigurationNode> table2RelevantNodes) {
// Connect to the catalog.
try {
this.catalog.runStatement(new StatementRunner() {
public void run(final Statement statement) throws ProgramException, SQLException {
// fetch the relevant nodes in table1
String sql = SelectOrJoin.this.fetchRelevantNodesHelper(tableName1, statement, table1RelevantNodes);
SelectOrJoin.this.catalog.log(System.out, "Eexcuted " + sql);
// fetch the relevant nodes in table2
sql = SelectOrJoin.this.fetchRelevantNodesHelper(tableName2, statement, table2RelevantNodes);
SelectOrJoin.this.catalog.log(System.out, "Eexcuted " + sql);
}
});
} catch (final Exception e) {
this.catalog.log(System.err, "failed in fetchRelevantNodes " + e.toString());
}
}
/**
* This is a helper method for fetchRelevantNodes. It executes append nodes
* that's related to a specified table name into an ArrayList.
*
* @param tableName
* The table name that it's looking for.
* @param statement
* The statement created by catalog connection.
*
* @throws ProgramException
* Thrown if the expected column is not found in the catalog.
*
*/
private String fetchRelevantNodesHelper(
final String tableName,
final Statement statement,
final ArrayList<ConfigurationNode> relevantNodes)
throws SQLException, ProgramException {
final String sql = "SELECT * FROM DTABLES WHERE TNAME = '"
+ tableName + "' OR TNAME = UCASE ('" + tableName + "')";
final ResultSet set = statement.executeQuery(sql);
try {
while (set.next()) {
final String driver = set.getString("NODEDRIVER").trim();
final String url = set.getString("NODEURL").trim();
final String user = set.getString("NODEUSER").trim();
final String password = set.getString("NODEPASSWD").trim();
final String name = "node " + set.getString("NODEID").trim();
relevantNodes.add(new ConfigurationNode(name, driver, url, user, password));
}
} finally {
set.close();
}
return sql;
}
/**
* Create and return a join statement according to the target columns, where
* clause, and target table names.
*
* @param targetColumns
* The list of target columns that are in the select clause.
* @param whereClause
* The where clause to be put in this join statement.
* @param tableName1
* The first table to be joined
* @param tableName2
* The second table to be joined
*
*/
private static String joinStatement(
final String sql,
final String tableName1,
final String tableName2) {
final String replace = sql.replaceAll(tableName1, "SESSION.TEMP_" + tableName1);
final String result = replace.replaceAll(tableName2, "SESSION.TEMP_" + tableName2);
return result;
}
}
/**
* A runner class that implements a run method that executes the select SQL
* statement on a node.
*/
class SelectRunner implements Runnable {
/** The configuration node associated with a single thread. */
private final ConfigurationNode node;
/** The SQL statement to be executed. */
private final String sql;
/** The results from the query. */
private final ArrayList<String> results = new ArrayList<String>();
/**
* returns the select statement results on this node as an array of strings.
*/
public String[] getResults() {
final String[] results = new String[this.results.size()];
this.results.toArray(results);
return results;
}
/**
* Initializes a new instance of the RunSQL Runner.
*
* @param node
* The cluster node associated with this instance.
*
* @param sql
* The select SQL statement to execute.
*/
public SelectRunner(final ConfigurationNode node, final String sql) {
assert null != node;
assert null != sql;
this.node = node;
this.sql = sql;
}
/**
* Executes the select statement on this node.
*/
public void run() {
// Connect to the specific node and execute the select statement.
try {
this.node.runStatement(new StatementRunner() {
public void run(final Statement statement) throws ProgramException, SQLException {
// Execute the select statement.
SelectRunner.this.node.log(System.out, "Executing " + SelectRunner.this.sql);
final ResultSet set = statement.executeQuery(SelectRunner.this.sql);
try {
while (set.next()) {
final StringBuilder builder = new StringBuilder();
final int count = set.getMetaData().getColumnCount();
for (int i = 1; i <= count; i++) {
builder.append(set.getString(i).trim());
if (i < count)
builder.append("\t");
}
SelectRunner.this.results.add(builder.toString());
}
// SelectRunner.this.node.log(System.out,
// "Select statement executed successfully");
} finally {
set.close();
}
}
});
} catch (final ProgramException e) {
this.node.log(System.err, "SelectRunner run " + e.getMessage());
}
}
}