JoinSQLProcessor.java

package edu.hawaii.ics.yucheng;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;

/**
 * A class support select-from-where queries involving joins between exactly two
 * tables. The program takes two commandline arguments clustercfg and sqlfile.
 * The clustercfg file contains access information for the catalog and the local
 * DB. The sqlfile contains the join statement to be executed.
 * 
 * @author Cheng Jade
 * @assignment ICS 421 Assignment 3
 * @date Mar 19, 2010
 * @bugs None
 */
public class JoinSQLProcessor {
    /**
     * A main method to test the implementation.
     * 
     * @param args
     *            The command line arguments.
     */
    public static void main(final String[] args) {
        assert null != args;

        // Check for usage errors.
        if (args.length != 2) {
            final String name = JoinSQLProcessor.class.getSimpleName();
            System.err.println("Usage: java " + name + " <cfg> <ddl>");
            System.err.println("       <cfg> path to a configuration file");
            System.err.println("       <ddl> path to a SQL file");
            System.exit(0);
            return;
        }
        try {
            // Parse sql statement. Obtain the table names, projection push down
            // column names for each table, selection push down qualifications
            // for each table and the overall where clause.
            ParseSQL sql = new ParseSQL(args[1]);
            final String tableName1 = sql.tableName1;
            final String tableName2 = sql.tableName2;
            final String tempTableName1 = "TEMP_" + tableName1;
            final String tempTableName2 = "TEMP_" + tableName2;
            final ArrayList<String> table1RelevantColumns = stripTableName(sql.table1RelevantColumns);
            final ArrayList<String> table2RelevantColumns = stripTableName(sql.table2RelevantColumns);
            final String whereClause = sql.whereClause.replaceAll(tableName1, tempTableName1).replaceAll(tableName2, tempTableName2);

            // Parse the configuration file. Use the catalog as localhost if
            // there's no local host section detected in the configuration file.
            final Configuration configuration = new Configuration(args[0]);
            final ConfigurationNode catalog = configuration.catalog;
            final ConfigurationNode localNode = configuration.localNode;

            // Connect to the catalog and retrieve the nodes that contain pieces
            // of each target table.
            final ArrayList<ConfigurationNode> table1RelevantNodes = new ArrayList<ConfigurationNode>();
            final ArrayList<ConfigurationNode> table2RelevantNodes = new ArrayList<ConfigurationNode>();
            fetchRelevantNodes(catalog, tableName1, tableName2,
                    table1RelevantNodes, table2RelevantNodes);

            // Make a target list with "TEMP_" appended in the column names.
            final String[] targetColumns = sql.targetColumns;
            final ArrayList<String> tempTargetColumns = new ArrayList<String>();
            for (String item : targetColumns)
                tempTargetColumns.add(("TEMP_" + item));

            // an array list collecting final join results.
            final ArrayList<String> result = new ArrayList<String>();
            for (int i = 0; i < table1RelevantNodes.size(); i++) {
                ConfigurationNode nodei = table1RelevantNodes.get(i);
                prepareTempTable(nodei, localNode, tableName1, table1RelevantColumns);
                copyTable(nodei, localNode, tableName1, table1RelevantColumns, sql.table1QualificationColumns);
                for (int j = 0; j < table2RelevantNodes.size(); j++) {
                    ConfigurationNode nodej = table2RelevantNodes.get(j);
                    prepareTempTable(nodej, localNode, tableName2, table2RelevantColumns);
                    copyTable(nodej, localNode, tableName2, table2RelevantColumns, sql.table2QualificationColumns);

                    // create the join statement.
                    final String joinStatement = joinStatement(
                            tempTargetColumns,
                            whereClause,
                            tempTableName1,
                            tempTableName2);

                    // execute the join statement.
                    localNode.runStatement(new StatementRunner() {
                        public void run(Statement statement) throws ProgramException, SQLException {

                            localNode.log(System.out, "Executing " + joinStatement);
                            final ResultSet set = statement.executeQuery(joinStatement);
                            while (set.next()) {
                                final StringBuilder builder = new StringBuilder();
                                final int count = set.getMetaData().getColumnCount();
                                for (int i = 1; i <= count; i++)
                                    builder.append(set.getString(i).trim() + "  ");
                                result.add(builder.toString());
                            }
                        }
                    });
                }
            }

            // Print the final result.
            System.out.println("\nStatement, " + sql.sql + ", executing result: ");
            for (String item : result)
                System.out.println(item.trim());
            System.out.println();

            // Drop the temp tables when the program is done.
            dropTempTable(localNode, tempTableName1);
            dropTempTable(localNode, tempTableName2);
        } catch (final ProgramException e) {
            System.err.println(e.getMessage());
            System.exit(1);
        }
    }

    /**
     * Connect to specified catalog node and retrieves the node information. It
     * fills two ArrayLists with nodes that contain pieces of each of the two
     * specified input tables.
     * 
     * @param catalog
     *            The catalog DB, whose dtables contains the distribution info.
     * @param tableName1
     *            The first table name that it's looking for.
     * @param tableName2
     *            The second table name that it's looking for.
     * @param table1RElevantNodes
     *            The ArrayList containing the nodes that relevant to
     *            tableName1.
     * @param table2RElevantNodes
     *            The ArrayList containing the nodes that relevant to
     *            tableName2.
     * 
     */
    private static void fetchRelevantNodes(
            final ConfigurationNode catalog,
            final String tableName1,
            final String tableName2,
            final ArrayList<ConfigurationNode> table1RelevantNodes,
            final ArrayList<ConfigurationNode> table2RelevantNodes) {

        // Connect to the catalog.
        try {
            catalog.runStatement(new StatementRunner() {
                public void run(Statement statement) throws ProgramException, SQLException {

                    // fetch the relevant nodes in table1
                    String sql = fetchRelevantNodesHelper(tableName1, statement, table1RelevantNodes);
                    catalog.log(System.out, "Eexcuted " + sql);

                    // fetch the relevant nodes in table2
                    sql = fetchRelevantNodesHelper(tableName2, statement, table2RelevantNodes);
                    catalog.log(System.out, "Eexcuted " + sql);
                }
            });
        } catch (final Exception e) {
            catalog.log(System.err, "failed in fetchRelevantNodes " + e.toString());
        }

    }

    /**
     * This is a helper method for fetchRelevantNodes. It executes append nodes
     * that's related to a specified table name into an ArrayList.
     * 
     * @param tableName
     *            The table name that it's looking for.
     * @param statement
     *            The statement created by catalog connection.
     * 
     * @throws ProgramException
     *             Thrown if the expected column is not found in the catalog.
     * 
     */
    private static String fetchRelevantNodesHelper(
            String tableName,
            Statement statement,
            ArrayList<ConfigurationNode> relevantNodes)
            throws SQLException, ProgramException {
        String sql = "SELECT * FROM DTABLES WHERE TNAME = '"
                + tableName + "' OR TNAME = UCASE ('" + tableName + "')";
        ResultSet set = statement.executeQuery(sql);
        try {
            while (set.next()) {
                final String driver = set.getString("NODEDRIVER").trim();
                final String url = set.getString("NODEURL").trim();
                final String user = set.getString("NODEUSER").trim();
                final String password = set.getString("NODEPASSWD").trim();
                final String name = "node " + set.getString("NODEID").trim();

                relevantNodes.add(new ConfigurationNode(name, driver, url, user, password));
            }
        } finally {
            set.close();
        }
        return sql;
    }

    /**
     * Create and return a join statement according to the target columns, where
     * clause, and target table names.
     * 
     * @param targetColumns
     *            The list of target columns that are in the select clause.
     * @param whereClause
     *            The where clause to be put in this join statement.
     * @param tableName1
     *            The first table to be joined
     * @param tableName2
     *            The second table to be joined
     * 
     */
    private static String joinStatement(
            final ArrayList<String> targetColumns,
            final String whereClause,
            final String tableName1,
            final String tableName2) {
        StringBuilder builder = new StringBuilder();
        builder.append("SELECT ");
        builder.append(commaSeparatedColumnNames(targetColumns));
        builder.append(" FROM ");
        builder.append(tableName1 + ", ");
        builder.append(tableName2 + " ");
        builder.append(whereClause);
        return builder.toString();
    }

    /**
     * Copies the relevant contents of a specified table from a source node to a
     * destination node. To do so, method executes a bulk insert statement based
     * on the relevant columns and relevant qualifications. This method applies
     * both projection and selection push down.
     * 
     * @param srcNode
     *            The node that's to be copied from.
     * @param desNode
     *            The node that's to be copied to.
     * @param tableName
     *            The table whose contents are to be copied.
     * @param relevantColumns
     *            The list of columns that're related to this table.
     * @param relevantQualifications
     *            The list of qualifications that're related to this table.
     * 
     */
    private static void copyTable(
            final ConfigurationNode srcNode,
            final ConfigurationNode destNode,
            final String tableName,
            final ArrayList<String> relevantColumns,
            final ArrayList<String> relevantQualifications) {

        final int relaventColumnsCount = relevantColumns.size();
        final String tempTableName = ("TEMP_" + tableName);

        // retrieve all records from the target table in node1.
        final ArrayList<String[]> records = new ArrayList<String[]>();
        try {
            srcNode.runStatement(new StatementRunner() {
                public void run(Statement statement) throws ProgramException, SQLException {

                    String whereClause = "";
                    final int size = relevantQualifications.size();
                    if (size != 0) {
                        whereClause += " WHERE ";
                        for (int i = 0; i < size - 1; i++)
                            whereClause += relevantQualifications.get(i) + " OR ";
                        whereClause += relevantQualifications.get(size - 1);
                    }
                    String selectStatement = "SELECT " +
                            commaSeparatedColumnNames(relevantColumns)
                            + " FROM " + tableName + whereClause;
                    ResultSet set = statement.executeQuery(selectStatement);
                    while (set.next()) {
                        String[] record = new String[relaventColumnsCount];
                        for (int i = 1; i <= relaventColumnsCount; i++) {
                            Object o = set.getString(i);
                            record[i - 1] = o == null ? "null" : o.toString().trim();
                        }
                        records.add(record);
                    }
                }
            });
            srcNode.log(System.out, "Successfully retrieved records from the target table.");
        } catch (final Exception e) {
            srcNode.log(System.err, e.toString());
        }

        // construct the bulk insert statement.
        StringBuilder builder = new StringBuilder();
        builder.append("INSERT INTO " + tempTableName + " VALUES \n");
        for (int i = 0; i < records.size(); i++) {
            String[] record = records.get(i);
            builder.append("\t\t\t\t(");
            for (int j = 0; j < relaventColumnsCount - 1; j++)
                builder.append(quoteString(record[j]) + ", ");
            builder.append(quoteString(record[relaventColumnsCount - 1]) + ")");
            if (i != records.size() - 1)
                builder.append(", \n");
        }
        final String buildInsertStatement = builder.toString();

        // execute the bulk insert statement on node2
        try {
            destNode.runStatement(new StatementRunner() {
                public void run(Statement statement) throws ProgramException, SQLException {
                    destNode.log(System.out, "Executing " + buildInsertStatement);
                    statement.executeUpdate(buildInsertStatement);
                }
            });
            destNode.log(System.out, "Successfully executed the bulk insert statement.");
        } catch (final Exception e) {
            destNode.log(System.err, e.toString());
        }
    }

    // return a single quoted string unless the input is "null"
    private static String quoteString(String entry) {
        if (entry.equalsIgnoreCase("null"))
            return entry;
        else
            return "'" + entry + "'";
    }

    /**
     * Detect if a specified table exists on a specified node.
     * 
     * @param statement
     *            The statement created by the node's connection
     * @param tableName
     *            The table it's looking for.
     * 
     * @return True if the table exists, false otherwise.
     */
    private static boolean isTableExist(Statement statement, String tableName)
            throws SQLException, ProgramException {
        Connection connection = statement.getConnection();
        DatabaseMetaData md = connection.getMetaData();
        ResultSet tables = md.getTables(null, null, tableName, null);
        return tables.next();
    }

    /**
     * A helper method that takes a list of columns names and append them into
     * to a comma separated string.
     * 
     * @param columns
     *            The list of columns names
     * 
     * @return The comma separated column names
     * 
     */
    private static String commaSeparatedColumnNames(
            final ArrayList<String> columns) {
        StringBuilder builder = new StringBuilder();
        for (int i = 0; i < columns.size() - 1; i++)
            builder.append(columns.get(i) + ", ");
        builder.append(columns.get(columns.size() - 1));
        return builder.toString();
    }

    /**
     * Drop a specified table from a specified node
     * 
     * @param node
     *            The node where the table is located
     * @param tableName
     *            The table that is going to be dropped.
     * 
     */
    private static void dropTempTable(
            final ConfigurationNode node,
            final String tempTableName) {
        try {
            node.runStatement(new StatementRunner() {
                public void run(Statement statement) throws ProgramException, SQLException {
                    if (isTableExist(statement, tempTableName) || isTableExist(statement, tempTableName.toUpperCase())) {
                        String dropStatement = "DROP TABLE " + tempTableName;
                        node.log(System.out, "Executing " + dropStatement);
                        statement.executeUpdate(dropStatement);
                        node.log(System.out, "Successfully droped the temp table: " + tempTableName);
                    }
                }
            });
        } catch (final Exception e) {
            node.log(System.err, e.getMessage());
        }
    }

    /**
     * Prepares the temporary table on the destination node based on the table
     * schema of a specified table from a source table. Method creates a new
     * temporary table is it's not already exist. Method deletes the contents of
     * the temporary table if otherwise. The temporary table contains only the
     * necessary columns.
     * 
     * @param srcNode
     *            The node that the original table is located.
     * @param desNode
     *            The node where the temporary table is created.
     * @param tableName
     *            The table whose schema is used for the temporary table.
     * @param relevantColumns
     *            The list of columns that're related to this table.
     * 
     */
    private static void prepareTempTable(
            final ConfigurationNode srcNode,
            final ConfigurationNode destNode,
            final String tableName,
            final ArrayList<String> relevantColumns) {

        final String tempTableName = ("TEMP_" + tableName);
        // erase the contents of the temp table if it already exist on destNode.
        final boolean[] result = new boolean[] { false };
        try {
            destNode.runStatement(new StatementRunner() {
                public void run(Statement statement) throws ProgramException, SQLException {
                    if (isTableExist(statement, tempTableName) || isTableExist(statement, tempTableName.toUpperCase())) {
                        String deleteStatement = "DELETE FROM " + tempTableName;
                        destNode.log(System.out, "Executing " + deleteStatement);
                        statement.executeUpdate(deleteStatement);
                        destNode.log(System.out, "Successfully emptied the temp table: " + tempTableName);
                        result[0] = true;
                    }
                }
            });
        } catch (final Exception e) {
            destNode.log(System.err, e.getMessage());
        }
        if (result[0] == true)
            return;

        final ArrayList<String> columnNames = new ArrayList<String>();
        final ArrayList<String> columnTypes = new ArrayList<String>();
        try {
            srcNode.runStatement(new StatementRunner() {
                public void run(Statement statement) throws ProgramException, SQLException {

                    ResultSet set = statement.executeQuery("SELECT " +
                            commaSeparatedColumnNames(relevantColumns)
                            + " FROM " + tableName);

                    // retrieve the column names and column types.
                    ResultSetMetaData meta = set.getMetaData();
                    if (!set.next())
                        return;
                    StringBuilder builder = new StringBuilder();
                    for (int i = 1; i <= meta.getColumnCount(); i++) {
                        columnNames.add(meta.getColumnName(i));
                        String columnType = meta.getColumnTypeName(i);

                        // if column type is char, make its size to be 128
                        // long in the temp table.
                        if (columnType.equalsIgnoreCase("CHAR")) {
                            builder.setLength(0);
                            builder.append(columnType);
                            builder.append("(128)");
                            columnType = builder.toString();
                        }
                        columnTypes.add(columnType);
                    }
                }
            });
            srcNode.log(System.out, "Successfully connected and retrieved column names and types from a node db.");
        } catch (final Exception e) {
            srcNode.log(System.err, e.getMessage());
        }

        // construct the create table statement.
        final StringBuilder builder = new StringBuilder();
        builder.append("CREATE TABLE ");
        builder.append(tempTableName + " (");
        final int columnNumber = columnNames.size();
        for (int i = 0; i < columnNumber - 1; i++) {
            builder.append(columnNames.get(i) + " ");
            builder.append(columnTypes.get(i) + ", ");
        }
        builder.append(columnNames.get(columnNumber - 1) + " ");
        builder.append(columnTypes.get(columnNumber - 1) + ")");
        final String createTableStatement = builder.toString();

        // create this create table statement to create a temp table on node2
        try {
            destNode.runStatement(new StatementRunner() {
                public void run(Statement statement) throws ProgramException, SQLException {
                    destNode.log(System.out, "Executing " + createTableStatement);
                    statement.executeUpdate(createTableStatement);
                    destNode.log(System.out, "Successfully created a temp table: " + tempTableName);
                }
            });
        } catch (final Exception e) {
            destNode.log(System.err, e.getMessage());
        }
    }

    /**
     * A helper method that takes a list of columns names in the format of
     * "tableName.columnName, and returns a list of columns without the leading
     * table names.
     * 
     * @param columns
     *            The list of columns in the format of "tableName.columnName"
     *            
     * @return A list of column names in the format of "columnName"
     * 
     */
    private static ArrayList<String> stripTableName(final ArrayList<String> columns) {
        ArrayList<String> stripedColumns = new ArrayList<String>();
        for (String item : columns) {
            item = item.replace('.', ':');
            String[] result = item.split(":");
            final int size = result.length;
            assert size <= 2;
            if (size == 1)
                stripedColumns.add(result[0].trim());
            if (size == 2)
                stripedColumns.add(result[1].trim());
        }
        return stripedColumns;
    }

}
Valid HTML 4.01 Valid CSS