package edu.hawaii.ics.yucheng;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
/**
* A class support select-from-where queries involving joins between exactly two
* tables. The program takes two commandline arguments clustercfg and sqlfile.
* The clustercfg file contains access information for the catalog and the local
* DB. The sqlfile contains the join statement to be executed.
*
* @author Cheng Jade
* @assignment ICS 421 Assignment 3
* @date Mar 19, 2010
* @bugs None
*/
public class JoinSQLProcessor {
/**
* A main method to test the implementation.
*
* @param args
* The command line arguments.
*/
public static void main(final String[] args) {
assert null != args;
// Check for usage errors.
if (args.length != 2) {
final String name = JoinSQLProcessor.class.getSimpleName();
System.err.println("Usage: java " + name + " <cfg> <ddl>");
System.err.println(" <cfg> path to a configuration file");
System.err.println(" <ddl> path to a SQL file");
System.exit(0);
return;
}
try {
// Parse sql statement. Obtain the table names, projection push down
// column names for each table, selection push down qualifications
// for each table and the overall where clause.
ParseSQL sql = new ParseSQL(args[1]);
final String tableName1 = sql.tableName1;
final String tableName2 = sql.tableName2;
final String tempTableName1 = "TEMP_" + tableName1;
final String tempTableName2 = "TEMP_" + tableName2;
final ArrayList<String> table1RelevantColumns = stripTableName(sql.table1RelevantColumns);
final ArrayList<String> table2RelevantColumns = stripTableName(sql.table2RelevantColumns);
final String whereClause = sql.whereClause.replaceAll(tableName1, tempTableName1).replaceAll(tableName2, tempTableName2);
// Parse the configuration file. Use the catalog as localhost if
// there's no local host section detected in the configuration file.
final Configuration configuration = new Configuration(args[0]);
final ConfigurationNode catalog = configuration.catalog;
final ConfigurationNode localNode = configuration.localNode;
// Connect to the catalog and retrieve the nodes that contain pieces
// of each target table.
final ArrayList<ConfigurationNode> table1RelevantNodes = new ArrayList<ConfigurationNode>();
final ArrayList<ConfigurationNode> table2RelevantNodes = new ArrayList<ConfigurationNode>();
fetchRelevantNodes(catalog, tableName1, tableName2,
table1RelevantNodes, table2RelevantNodes);
// Make a target list with "TEMP_" appended in the column names.
final String[] targetColumns = sql.targetColumns;
final ArrayList<String> tempTargetColumns = new ArrayList<String>();
for (String item : targetColumns)
tempTargetColumns.add(("TEMP_" + item));
// an array list collecting final join results.
final ArrayList<String> result = new ArrayList<String>();
for (int i = 0; i < table1RelevantNodes.size(); i++) {
ConfigurationNode nodei = table1RelevantNodes.get(i);
prepareTempTable(nodei, localNode, tableName1, table1RelevantColumns);
copyTable(nodei, localNode, tableName1, table1RelevantColumns, sql.table1QualificationColumns);
for (int j = 0; j < table2RelevantNodes.size(); j++) {
ConfigurationNode nodej = table2RelevantNodes.get(j);
prepareTempTable(nodej, localNode, tableName2, table2RelevantColumns);
copyTable(nodej, localNode, tableName2, table2RelevantColumns, sql.table2QualificationColumns);
// create the join statement.
final String joinStatement = joinStatement(
tempTargetColumns,
whereClause,
tempTableName1,
tempTableName2);
// execute the join statement.
localNode.runStatement(new StatementRunner() {
public void run(Statement statement) throws ProgramException, SQLException {
localNode.log(System.out, "Executing " + joinStatement);
final ResultSet set = statement.executeQuery(joinStatement);
while (set.next()) {
final StringBuilder builder = new StringBuilder();
final int count = set.getMetaData().getColumnCount();
for (int i = 1; i <= count; i++)
builder.append(set.getString(i).trim() + " ");
result.add(builder.toString());
}
}
});
}
}
// Print the final result.
System.out.println("\nStatement, " + sql.sql + ", executing result: ");
for (String item : result)
System.out.println(item.trim());
System.out.println();
// Drop the temp tables when the program is done.
dropTempTable(localNode, tempTableName1);
dropTempTable(localNode, tempTableName2);
} catch (final ProgramException e) {
System.err.println(e.getMessage());
System.exit(1);
}
}
/**
* Connect to specified catalog node and retrieves the node information. It
* fills two ArrayLists with nodes that contain pieces of each of the two
* specified input tables.
*
* @param catalog
* The catalog DB, whose dtables contains the distribution info.
* @param tableName1
* The first table name that it's looking for.
* @param tableName2
* The second table name that it's looking for.
* @param table1RElevantNodes
* The ArrayList containing the nodes that relevant to
* tableName1.
* @param table2RElevantNodes
* The ArrayList containing the nodes that relevant to
* tableName2.
*
*/
private static void fetchRelevantNodes(
final ConfigurationNode catalog,
final String tableName1,
final String tableName2,
final ArrayList<ConfigurationNode> table1RelevantNodes,
final ArrayList<ConfigurationNode> table2RelevantNodes) {
// Connect to the catalog.
try {
catalog.runStatement(new StatementRunner() {
public void run(Statement statement) throws ProgramException, SQLException {
// fetch the relevant nodes in table1
String sql = fetchRelevantNodesHelper(tableName1, statement, table1RelevantNodes);
catalog.log(System.out, "Eexcuted " + sql);
// fetch the relevant nodes in table2
sql = fetchRelevantNodesHelper(tableName2, statement, table2RelevantNodes);
catalog.log(System.out, "Eexcuted " + sql);
}
});
} catch (final Exception e) {
catalog.log(System.err, "failed in fetchRelevantNodes " + e.toString());
}
}
/**
* This is a helper method for fetchRelevantNodes. It executes append nodes
* that's related to a specified table name into an ArrayList.
*
* @param tableName
* The table name that it's looking for.
* @param statement
* The statement created by catalog connection.
*
* @throws ProgramException
* Thrown if the expected column is not found in the catalog.
*
*/
private static String fetchRelevantNodesHelper(
String tableName,
Statement statement,
ArrayList<ConfigurationNode> relevantNodes)
throws SQLException, ProgramException {
String sql = "SELECT * FROM DTABLES WHERE TNAME = '"
+ tableName + "' OR TNAME = UCASE ('" + tableName + "')";
ResultSet set = statement.executeQuery(sql);
try {
while (set.next()) {
final String driver = set.getString("NODEDRIVER").trim();
final String url = set.getString("NODEURL").trim();
final String user = set.getString("NODEUSER").trim();
final String password = set.getString("NODEPASSWD").trim();
final String name = "node " + set.getString("NODEID").trim();
relevantNodes.add(new ConfigurationNode(name, driver, url, user, password));
}
} finally {
set.close();
}
return sql;
}
/**
* Create and return a join statement according to the target columns, where
* clause, and target table names.
*
* @param targetColumns
* The list of target columns that are in the select clause.
* @param whereClause
* The where clause to be put in this join statement.
* @param tableName1
* The first table to be joined
* @param tableName2
* The second table to be joined
*
*/
private static String joinStatement(
final ArrayList<String> targetColumns,
final String whereClause,
final String tableName1,
final String tableName2) {
StringBuilder builder = new StringBuilder();
builder.append("SELECT ");
builder.append(commaSeparatedColumnNames(targetColumns));
builder.append(" FROM ");
builder.append(tableName1 + ", ");
builder.append(tableName2 + " ");
builder.append(whereClause);
return builder.toString();
}
/**
* Copies the relevant contents of a specified table from a source node to a
* destination node. To do so, method executes a bulk insert statement based
* on the relevant columns and relevant qualifications. This method applies
* both projection and selection push down.
*
* @param srcNode
* The node that's to be copied from.
* @param desNode
* The node that's to be copied to.
* @param tableName
* The table whose contents are to be copied.
* @param relevantColumns
* The list of columns that're related to this table.
* @param relevantQualifications
* The list of qualifications that're related to this table.
*
*/
private static void copyTable(
final ConfigurationNode srcNode,
final ConfigurationNode destNode,
final String tableName,
final ArrayList<String> relevantColumns,
final ArrayList<String> relevantQualifications) {
final int relaventColumnsCount = relevantColumns.size();
final String tempTableName = ("TEMP_" + tableName);
// retrieve all records from the target table in node1.
final ArrayList<String[]> records = new ArrayList<String[]>();
try {
srcNode.runStatement(new StatementRunner() {
public void run(Statement statement) throws ProgramException, SQLException {
String whereClause = "";
final int size = relevantQualifications.size();
if (size != 0) {
whereClause += " WHERE ";
for (int i = 0; i < size - 1; i++)
whereClause += relevantQualifications.get(i) + " OR ";
whereClause += relevantQualifications.get(size - 1);
}
String selectStatement = "SELECT " +
commaSeparatedColumnNames(relevantColumns)
+ " FROM " + tableName + whereClause;
ResultSet set = statement.executeQuery(selectStatement);
while (set.next()) {
String[] record = new String[relaventColumnsCount];
for (int i = 1; i <= relaventColumnsCount; i++) {
Object o = set.getString(i);
record[i - 1] = o == null ? "null" : o.toString().trim();
}
records.add(record);
}
}
});
srcNode.log(System.out, "Successfully retrieved records from the target table.");
} catch (final Exception e) {
srcNode.log(System.err, e.toString());
}
// construct the bulk insert statement.
StringBuilder builder = new StringBuilder();
builder.append("INSERT INTO " + tempTableName + " VALUES \n");
for (int i = 0; i < records.size(); i++) {
String[] record = records.get(i);
builder.append("\t\t\t\t(");
for (int j = 0; j < relaventColumnsCount - 1; j++)
builder.append(quoteString(record[j]) + ", ");
builder.append(quoteString(record[relaventColumnsCount - 1]) + ")");
if (i != records.size() - 1)
builder.append(", \n");
}
final String buildInsertStatement = builder.toString();
// execute the bulk insert statement on node2
try {
destNode.runStatement(new StatementRunner() {
public void run(Statement statement) throws ProgramException, SQLException {
destNode.log(System.out, "Executing " + buildInsertStatement);
statement.executeUpdate(buildInsertStatement);
}
});
destNode.log(System.out, "Successfully executed the bulk insert statement.");
} catch (final Exception e) {
destNode.log(System.err, e.toString());
}
}
// return a single quoted string unless the input is "null"
private static String quoteString(String entry) {
if (entry.equalsIgnoreCase("null"))
return entry;
else
return "'" + entry + "'";
}
/**
* Detect if a specified table exists on a specified node.
*
* @param statement
* The statement created by the node's connection
* @param tableName
* The table it's looking for.
*
* @return True if the table exists, false otherwise.
*/
private static boolean isTableExist(Statement statement, String tableName)
throws SQLException, ProgramException {
Connection connection = statement.getConnection();
DatabaseMetaData md = connection.getMetaData();
ResultSet tables = md.getTables(null, null, tableName, null);
return tables.next();
}
/**
* A helper method that takes a list of columns names and append them into
* to a comma separated string.
*
* @param columns
* The list of columns names
*
* @return The comma separated column names
*
*/
private static String commaSeparatedColumnNames(
final ArrayList<String> columns) {
StringBuilder builder = new StringBuilder();
for (int i = 0; i < columns.size() - 1; i++)
builder.append(columns.get(i) + ", ");
builder.append(columns.get(columns.size() - 1));
return builder.toString();
}
/**
* Drop a specified table from a specified node
*
* @param node
* The node where the table is located
* @param tableName
* The table that is going to be dropped.
*
*/
private static void dropTempTable(
final ConfigurationNode node,
final String tempTableName) {
try {
node.runStatement(new StatementRunner() {
public void run(Statement statement) throws ProgramException, SQLException {
if (isTableExist(statement, tempTableName) || isTableExist(statement, tempTableName.toUpperCase())) {
String dropStatement = "DROP TABLE " + tempTableName;
node.log(System.out, "Executing " + dropStatement);
statement.executeUpdate(dropStatement);
node.log(System.out, "Successfully droped the temp table: " + tempTableName);
}
}
});
} catch (final Exception e) {
node.log(System.err, e.getMessage());
}
}
/**
* Prepares the temporary table on the destination node based on the table
* schema of a specified table from a source table. Method creates a new
* temporary table is it's not already exist. Method deletes the contents of
* the temporary table if otherwise. The temporary table contains only the
* necessary columns.
*
* @param srcNode
* The node that the original table is located.
* @param desNode
* The node where the temporary table is created.
* @param tableName
* The table whose schema is used for the temporary table.
* @param relevantColumns
* The list of columns that're related to this table.
*
*/
private static void prepareTempTable(
final ConfigurationNode srcNode,
final ConfigurationNode destNode,
final String tableName,
final ArrayList<String> relevantColumns) {
final String tempTableName = ("TEMP_" + tableName);
// erase the contents of the temp table if it already exist on destNode.
final boolean[] result = new boolean[] { false };
try {
destNode.runStatement(new StatementRunner() {
public void run(Statement statement) throws ProgramException, SQLException {
if (isTableExist(statement, tempTableName) || isTableExist(statement, tempTableName.toUpperCase())) {
String deleteStatement = "DELETE FROM " + tempTableName;
destNode.log(System.out, "Executing " + deleteStatement);
statement.executeUpdate(deleteStatement);
destNode.log(System.out, "Successfully emptied the temp table: " + tempTableName);
result[0] = true;
}
}
});
} catch (final Exception e) {
destNode.log(System.err, e.getMessage());
}
if (result[0] == true)
return;
final ArrayList<String> columnNames = new ArrayList<String>();
final ArrayList<String> columnTypes = new ArrayList<String>();
try {
srcNode.runStatement(new StatementRunner() {
public void run(Statement statement) throws ProgramException, SQLException {
ResultSet set = statement.executeQuery("SELECT " +
commaSeparatedColumnNames(relevantColumns)
+ " FROM " + tableName);
// retrieve the column names and column types.
ResultSetMetaData meta = set.getMetaData();
if (!set.next())
return;
StringBuilder builder = new StringBuilder();
for (int i = 1; i <= meta.getColumnCount(); i++) {
columnNames.add(meta.getColumnName(i));
String columnType = meta.getColumnTypeName(i);
// if column type is char, make its size to be 128
// long in the temp table.
if (columnType.equalsIgnoreCase("CHAR")) {
builder.setLength(0);
builder.append(columnType);
builder.append("(128)");
columnType = builder.toString();
}
columnTypes.add(columnType);
}
}
});
srcNode.log(System.out, "Successfully connected and retrieved column names and types from a node db.");
} catch (final Exception e) {
srcNode.log(System.err, e.getMessage());
}
// construct the create table statement.
final StringBuilder builder = new StringBuilder();
builder.append("CREATE TABLE ");
builder.append(tempTableName + " (");
final int columnNumber = columnNames.size();
for (int i = 0; i < columnNumber - 1; i++) {
builder.append(columnNames.get(i) + " ");
builder.append(columnTypes.get(i) + ", ");
}
builder.append(columnNames.get(columnNumber - 1) + " ");
builder.append(columnTypes.get(columnNumber - 1) + ")");
final String createTableStatement = builder.toString();
// create this create table statement to create a temp table on node2
try {
destNode.runStatement(new StatementRunner() {
public void run(Statement statement) throws ProgramException, SQLException {
destNode.log(System.out, "Executing " + createTableStatement);
statement.executeUpdate(createTableStatement);
destNode.log(System.out, "Successfully created a temp table: " + tempTableName);
}
});
} catch (final Exception e) {
destNode.log(System.err, e.getMessage());
}
}
/**
* A helper method that takes a list of columns names in the format of
* "tableName.columnName, and returns a list of columns without the leading
* table names.
*
* @param columns
* The list of columns in the format of "tableName.columnName"
*
* @return A list of column names in the format of "columnName"
*
*/
private static ArrayList<String> stripTableName(final ArrayList<String> columns) {
ArrayList<String> stripedColumns = new ArrayList<String>();
for (String item : columns) {
item = item.replace('.', ':');
String[] result = item.split(":");
final int size = result.length;
assert size <= 2;
if (size == 1)
stripedColumns.add(result[0].trim());
if (size == 2)
stripedColumns.add(result[1].trim());
}
return stripedColumns;
}
}