package edu.hawaii.ics.yucheng; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; /** * A class support select-from-where queries involving joins between exactly two * tables. The program takes two commandline arguments clustercfg and sqlfile. * The clustercfg file contains access information for the catalog and the local * DB. The sqlfile contains the join statement to be executed. * * @author Cheng Jade * @assignment ICS 421 Assignment 3 * @date Mar 19, 2010 * @bugs None */ public class JoinSQLProcessor { /** * A main method to test the implementation. * * @param args * The command line arguments. */ public static void main(final String[] args) { assert null != args; // Check for usage errors. if (args.length != 2) { final String name = JoinSQLProcessor.class.getSimpleName(); System.err.println("Usage: java " + name + " <cfg> <ddl>"); System.err.println(" <cfg> path to a configuration file"); System.err.println(" <ddl> path to a SQL file"); System.exit(0); return; } try { // Parse sql statement. Obtain the table names, projection push down // column names for each table, selection push down qualifications // for each table and the overall where clause. ParseSQL sql = new ParseSQL(args[1]); final String tableName1 = sql.tableName1; final String tableName2 = sql.tableName2; final String tempTableName1 = "TEMP_" + tableName1; final String tempTableName2 = "TEMP_" + tableName2; final ArrayList<String> table1RelevantColumns = stripTableName(sql.table1RelevantColumns); final ArrayList<String> table2RelevantColumns = stripTableName(sql.table2RelevantColumns); final String whereClause = sql.whereClause.replaceAll(tableName1, tempTableName1).replaceAll(tableName2, tempTableName2); // Parse the configuration file. Use the catalog as localhost if // there's no local host section detected in the configuration file. final Configuration configuration = new Configuration(args[0]); final ConfigurationNode catalog = configuration.catalog; final ConfigurationNode localNode = configuration.localNode; // Connect to the catalog and retrieve the nodes that contain pieces // of each target table. final ArrayList<ConfigurationNode> table1RelevantNodes = new ArrayList<ConfigurationNode>(); final ArrayList<ConfigurationNode> table2RelevantNodes = new ArrayList<ConfigurationNode>(); fetchRelevantNodes(catalog, tableName1, tableName2, table1RelevantNodes, table2RelevantNodes); // Make a target list with "TEMP_" appended in the column names. final String[] targetColumns = sql.targetColumns; final ArrayList<String> tempTargetColumns = new ArrayList<String>(); for (String item : targetColumns) tempTargetColumns.add(("TEMP_" + item)); // an array list collecting final join results. final ArrayList<String> result = new ArrayList<String>(); for (int i = 0; i < table1RelevantNodes.size(); i++) { ConfigurationNode nodei = table1RelevantNodes.get(i); prepareTempTable(nodei, localNode, tableName1, table1RelevantColumns); copyTable(nodei, localNode, tableName1, table1RelevantColumns, sql.table1QualificationColumns); for (int j = 0; j < table2RelevantNodes.size(); j++) { ConfigurationNode nodej = table2RelevantNodes.get(j); prepareTempTable(nodej, localNode, tableName2, table2RelevantColumns); copyTable(nodej, localNode, tableName2, table2RelevantColumns, sql.table2QualificationColumns); // create the join statement. final String joinStatement = joinStatement( tempTargetColumns, whereClause, tempTableName1, tempTableName2); // execute the join statement. localNode.runStatement(new StatementRunner() { public void run(Statement statement) throws ProgramException, SQLException { localNode.log(System.out, "Executing " + joinStatement); final ResultSet set = statement.executeQuery(joinStatement); while (set.next()) { final StringBuilder builder = new StringBuilder(); final int count = set.getMetaData().getColumnCount(); for (int i = 1; i <= count; i++) builder.append(set.getString(i).trim() + " "); result.add(builder.toString()); } } }); } } // Print the final result. System.out.println("\nStatement, " + sql.sql + ", executing result: "); for (String item : result) System.out.println(item.trim()); System.out.println(); // Drop the temp tables when the program is done. dropTempTable(localNode, tempTableName1); dropTempTable(localNode, tempTableName2); } catch (final ProgramException e) { System.err.println(e.getMessage()); System.exit(1); } } /** * Connect to specified catalog node and retrieves the node information. It * fills two ArrayLists with nodes that contain pieces of each of the two * specified input tables. * * @param catalog * The catalog DB, whose dtables contains the distribution info. * @param tableName1 * The first table name that it's looking for. * @param tableName2 * The second table name that it's looking for. * @param table1RElevantNodes * The ArrayList containing the nodes that relevant to * tableName1. * @param table2RElevantNodes * The ArrayList containing the nodes that relevant to * tableName2. * */ private static void fetchRelevantNodes( final ConfigurationNode catalog, final String tableName1, final String tableName2, final ArrayList<ConfigurationNode> table1RelevantNodes, final ArrayList<ConfigurationNode> table2RelevantNodes) { // Connect to the catalog. try { catalog.runStatement(new StatementRunner() { public void run(Statement statement) throws ProgramException, SQLException { // fetch the relevant nodes in table1 String sql = fetchRelevantNodesHelper(tableName1, statement, table1RelevantNodes); catalog.log(System.out, "Eexcuted " + sql); // fetch the relevant nodes in table2 sql = fetchRelevantNodesHelper(tableName2, statement, table2RelevantNodes); catalog.log(System.out, "Eexcuted " + sql); } }); } catch (final Exception e) { catalog.log(System.err, "failed in fetchRelevantNodes " + e.toString()); } } /** * This is a helper method for fetchRelevantNodes. It executes append nodes * that's related to a specified table name into an ArrayList. * * @param tableName * The table name that it's looking for. * @param statement * The statement created by catalog connection. * * @throws ProgramException * Thrown if the expected column is not found in the catalog. * */ private static String fetchRelevantNodesHelper( String tableName, Statement statement, ArrayList<ConfigurationNode> relevantNodes) throws SQLException, ProgramException { String sql = "SELECT * FROM DTABLES WHERE TNAME = '" + tableName + "' OR TNAME = UCASE ('" + tableName + "')"; ResultSet set = statement.executeQuery(sql); try { while (set.next()) { final String driver = set.getString("NODEDRIVER").trim(); final String url = set.getString("NODEURL").trim(); final String user = set.getString("NODEUSER").trim(); final String password = set.getString("NODEPASSWD").trim(); final String name = "node " + set.getString("NODEID").trim(); relevantNodes.add(new ConfigurationNode(name, driver, url, user, password)); } } finally { set.close(); } return sql; } /** * Create and return a join statement according to the target columns, where * clause, and target table names. * * @param targetColumns * The list of target columns that are in the select clause. * @param whereClause * The where clause to be put in this join statement. * @param tableName1 * The first table to be joined * @param tableName2 * The second table to be joined * */ private static String joinStatement( final ArrayList<String> targetColumns, final String whereClause, final String tableName1, final String tableName2) { StringBuilder builder = new StringBuilder(); builder.append("SELECT "); builder.append(commaSeparatedColumnNames(targetColumns)); builder.append(" FROM "); builder.append(tableName1 + ", "); builder.append(tableName2 + " "); builder.append(whereClause); return builder.toString(); } /** * Copies the relevant contents of a specified table from a source node to a * destination node. To do so, method executes a bulk insert statement based * on the relevant columns and relevant qualifications. This method applies * both projection and selection push down. * * @param srcNode * The node that's to be copied from. * @param desNode * The node that's to be copied to. * @param tableName * The table whose contents are to be copied. * @param relevantColumns * The list of columns that're related to this table. * @param relevantQualifications * The list of qualifications that're related to this table. * */ private static void copyTable( final ConfigurationNode srcNode, final ConfigurationNode destNode, final String tableName, final ArrayList<String> relevantColumns, final ArrayList<String> relevantQualifications) { final int relaventColumnsCount = relevantColumns.size(); final String tempTableName = ("TEMP_" + tableName); // retrieve all records from the target table in node1. final ArrayList<String[]> records = new ArrayList<String[]>(); try { srcNode.runStatement(new StatementRunner() { public void run(Statement statement) throws ProgramException, SQLException { String whereClause = ""; final int size = relevantQualifications.size(); if (size != 0) { whereClause += " WHERE "; for (int i = 0; i < size - 1; i++) whereClause += relevantQualifications.get(i) + " OR "; whereClause += relevantQualifications.get(size - 1); } String selectStatement = "SELECT " + commaSeparatedColumnNames(relevantColumns) + " FROM " + tableName + whereClause; ResultSet set = statement.executeQuery(selectStatement); while (set.next()) { String[] record = new String[relaventColumnsCount]; for (int i = 1; i <= relaventColumnsCount; i++) { Object o = set.getString(i); record[i - 1] = o == null ? "null" : o.toString().trim(); } records.add(record); } } }); srcNode.log(System.out, "Successfully retrieved records from the target table."); } catch (final Exception e) { srcNode.log(System.err, e.toString()); } // construct the bulk insert statement. StringBuilder builder = new StringBuilder(); builder.append("INSERT INTO " + tempTableName + " VALUES \n"); for (int i = 0; i < records.size(); i++) { String[] record = records.get(i); builder.append("\t\t\t\t("); for (int j = 0; j < relaventColumnsCount - 1; j++) builder.append(quoteString(record[j]) + ", "); builder.append(quoteString(record[relaventColumnsCount - 1]) + ")"); if (i != records.size() - 1) builder.append(", \n"); } final String buildInsertStatement = builder.toString(); // execute the bulk insert statement on node2 try { destNode.runStatement(new StatementRunner() { public void run(Statement statement) throws ProgramException, SQLException { destNode.log(System.out, "Executing " + buildInsertStatement); statement.executeUpdate(buildInsertStatement); } }); destNode.log(System.out, "Successfully executed the bulk insert statement."); } catch (final Exception e) { destNode.log(System.err, e.toString()); } } // return a single quoted string unless the input is "null" private static String quoteString(String entry) { if (entry.equalsIgnoreCase("null")) return entry; else return "'" + entry + "'"; } /** * Detect if a specified table exists on a specified node. * * @param statement * The statement created by the node's connection * @param tableName * The table it's looking for. * * @return True if the table exists, false otherwise. */ private static boolean isTableExist(Statement statement, String tableName) throws SQLException, ProgramException { Connection connection = statement.getConnection(); DatabaseMetaData md = connection.getMetaData(); ResultSet tables = md.getTables(null, null, tableName, null); return tables.next(); } /** * A helper method that takes a list of columns names and append them into * to a comma separated string. * * @param columns * The list of columns names * * @return The comma separated column names * */ private static String commaSeparatedColumnNames( final ArrayList<String> columns) { StringBuilder builder = new StringBuilder(); for (int i = 0; i < columns.size() - 1; i++) builder.append(columns.get(i) + ", "); builder.append(columns.get(columns.size() - 1)); return builder.toString(); } /** * Drop a specified table from a specified node * * @param node * The node where the table is located * @param tableName * The table that is going to be dropped. * */ private static void dropTempTable( final ConfigurationNode node, final String tempTableName) { try { node.runStatement(new StatementRunner() { public void run(Statement statement) throws ProgramException, SQLException { if (isTableExist(statement, tempTableName) || isTableExist(statement, tempTableName.toUpperCase())) { String dropStatement = "DROP TABLE " + tempTableName; node.log(System.out, "Executing " + dropStatement); statement.executeUpdate(dropStatement); node.log(System.out, "Successfully droped the temp table: " + tempTableName); } } }); } catch (final Exception e) { node.log(System.err, e.getMessage()); } } /** * Prepares the temporary table on the destination node based on the table * schema of a specified table from a source table. Method creates a new * temporary table is it's not already exist. Method deletes the contents of * the temporary table if otherwise. The temporary table contains only the * necessary columns. * * @param srcNode * The node that the original table is located. * @param desNode * The node where the temporary table is created. * @param tableName * The table whose schema is used for the temporary table. * @param relevantColumns * The list of columns that're related to this table. * */ private static void prepareTempTable( final ConfigurationNode srcNode, final ConfigurationNode destNode, final String tableName, final ArrayList<String> relevantColumns) { final String tempTableName = ("TEMP_" + tableName); // erase the contents of the temp table if it already exist on destNode. final boolean[] result = new boolean[] { false }; try { destNode.runStatement(new StatementRunner() { public void run(Statement statement) throws ProgramException, SQLException { if (isTableExist(statement, tempTableName) || isTableExist(statement, tempTableName.toUpperCase())) { String deleteStatement = "DELETE FROM " + tempTableName; destNode.log(System.out, "Executing " + deleteStatement); statement.executeUpdate(deleteStatement); destNode.log(System.out, "Successfully emptied the temp table: " + tempTableName); result[0] = true; } } }); } catch (final Exception e) { destNode.log(System.err, e.getMessage()); } if (result[0] == true) return; final ArrayList<String> columnNames = new ArrayList<String>(); final ArrayList<String> columnTypes = new ArrayList<String>(); try { srcNode.runStatement(new StatementRunner() { public void run(Statement statement) throws ProgramException, SQLException { ResultSet set = statement.executeQuery("SELECT " + commaSeparatedColumnNames(relevantColumns) + " FROM " + tableName); // retrieve the column names and column types. ResultSetMetaData meta = set.getMetaData(); if (!set.next()) return; StringBuilder builder = new StringBuilder(); for (int i = 1; i <= meta.getColumnCount(); i++) { columnNames.add(meta.getColumnName(i)); String columnType = meta.getColumnTypeName(i); // if column type is char, make its size to be 128 // long in the temp table. if (columnType.equalsIgnoreCase("CHAR")) { builder.setLength(0); builder.append(columnType); builder.append("(128)"); columnType = builder.toString(); } columnTypes.add(columnType); } } }); srcNode.log(System.out, "Successfully connected and retrieved column names and types from a node db."); } catch (final Exception e) { srcNode.log(System.err, e.getMessage()); } // construct the create table statement. final StringBuilder builder = new StringBuilder(); builder.append("CREATE TABLE "); builder.append(tempTableName + " ("); final int columnNumber = columnNames.size(); for (int i = 0; i < columnNumber - 1; i++) { builder.append(columnNames.get(i) + " "); builder.append(columnTypes.get(i) + ", "); } builder.append(columnNames.get(columnNumber - 1) + " "); builder.append(columnTypes.get(columnNumber - 1) + ")"); final String createTableStatement = builder.toString(); // create this create table statement to create a temp table on node2 try { destNode.runStatement(new StatementRunner() { public void run(Statement statement) throws ProgramException, SQLException { destNode.log(System.out, "Executing " + createTableStatement); statement.executeUpdate(createTableStatement); destNode.log(System.out, "Successfully created a temp table: " + tempTableName); } }); } catch (final Exception e) { destNode.log(System.err, e.getMessage()); } } /** * A helper method that takes a list of columns names in the format of * "tableName.columnName, and returns a list of columns without the leading * table names. * * @param columns * The list of columns in the format of "tableName.columnName" * * @return A list of column names in the format of "columnName" * */ private static ArrayList<String> stripTableName(final ArrayList<String> columns) { ArrayList<String> stripedColumns = new ArrayList<String>(); for (String item : columns) { item = item.replace('.', ':'); String[] result = item.split(":"); final int size = result.length; assert size <= 2; if (size == 1) stripedColumns.add(result[0].trim()); if (size == 2) stripedColumns.add(result[1].trim()); } return stripedColumns; } }