SelectOrJoin.java

package edu.hawaii.ics.yucheng;

import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;

/**
 * A class implements Runnable. The run method takes a list of SQL statements.
 * Only select and join statements are supported. The queries will be executed
 * accordingly on the distributed database system.
 * 
 * @author Cheng Jade
 * @assignment ICS 421 Assignment 4
 * @date Mar 22, 2010
 * @bugs None
 */
public class SelectOrJoin implements Runnable {
    /* the create or drop configuration object. */
    public final SelectOrJoinConfiguration configuration;

    /* the catalog node extracted from the overall configuration. */
    public final ConfigurationNode catalog;

    /* a list of SQL commands to be executed. */
    private final SQLList sqls;

    /**
     * Initialize a new instance of this object
     */
    public SelectOrJoin(final SelectOrJoinConfiguration configuration, final ArrayList<String> sqls) {
        if (null == configuration)
            throw new NullPointerException("CreateOrDrop");
        this.configuration = configuration;
        this.catalog = this.configuration.catalog;
        this.sqls = (SQLList) sqls;
    }

    /**
     * The main routine of that executes a list of SQL statements. They are
     * either select statements or join statements.
     */
    public void run() {
        try {
            final ArrayList<SQLStructure> selectSqls = new ArrayList<SQLStructure>();
            final ArrayList<SQLStructure> joinSqls = new ArrayList<SQLStructure>();
            for (final String sql : this.sqls) {
                final SQLStructure sqlStructure = new SQLStructure(sql);
                final SQLType sqlType = sqlStructure.type;
                if (sqlType == SQLType.SELECT) {
                    selectSqls.add(sqlStructure);
                    continue;
                }
                if (sqlType == SQLType.JOIN) {
                    joinSqls.add(sqlStructure);
                    continue;
                }
                throw new ProgramException("unsupported sql: " + sql);
            }

            for (final SQLStructure sql : selectSqls)
                this.runSelect(sql);
            for (final SQLStructure sql : joinSqls)
                this.runJoin(sql);

        } catch (final ProgramException e) {
            System.err.println(e.getMessage());
            return;
        }
    }

    /**
     * Execute one join statement and print the results.
     */
    private void runJoin(final SQLStructure sql) throws ProgramException {

        // Parse sql statement. Obtain the table names, projection push down
        // column names for each table, selection push down qualifications
        // for each table and the overall where clause.
        final String tableName1 = sql.tableName1;
        final String tableName2 = sql.tableName2;
        final ArrayList<String> table1RelevantColumns = stripTableName(sql.table1RelevantColumns);
        final ArrayList<String> table2RelevantColumns = stripTableName(sql.table2RelevantColumns);

        // Parse the configuration file. Use the catalog as localhost if
        // there's no local host section detected in the configuration file.
        final ConfigurationNode localNode = this.configuration.localNode;

        // Connect to the catalog and retrieve the nodes that contain pieces
        // of each target table.
        final ArrayList<ConfigurationNode> table1RelevantNodes = new ArrayList<ConfigurationNode>();
        final ArrayList<ConfigurationNode> table2RelevantNodes = new ArrayList<ConfigurationNode>();
        this.fetchRelevantNodes(tableName1, tableName2,
                table1RelevantNodes, table2RelevantNodes);

        // Make a target list with "TEMP_" appended in the column names.
        final String[] targetColumns = sql.targetColumns;
        final ArrayList<String> tempTargetColumns = new ArrayList<String>();
        for (final String item : targetColumns)
            tempTargetColumns.add(("TEMP_" + item));

        // an array list collecting final join results.
        final ArrayList<String> result = new ArrayList<String>();
        for (int i = 0; i < table1RelevantNodes.size(); i++) {
            final ConfigurationNode nodei = table1RelevantNodes.get(i);
            final String createTempTableStatement1 = createTempTableStatement(nodei, tableName1, table1RelevantColumns);
            final String insertTempTableStatement1 = insertTempTableStatement(nodei, tableName1, table1RelevantColumns, sql.table1QualificationColumns);
            for (int j = 0; j < table2RelevantNodes.size(); j++) {
                final ConfigurationNode nodej = table2RelevantNodes.get(j);
                final String createTempTableStatement2 = createTempTableStatement(nodej, tableName2, table2RelevantColumns);
                final String insertTempTableStatement2 = insertTempTableStatement(nodej, tableName2, table2RelevantColumns, sql.table2QualificationColumns);

                // create the join statement.
                final String joinStatement = joinStatement(
                        sql.sql,
                        tableName1,
                        tableName2);

                // execute the join statement.
                localNode.runStatement(new StatementRunner() {
                    public void run(final Statement statement) throws ProgramException, SQLException {

                        // create global temporary table with only the relevant
                        // columns for table 1
                        localNode.log(System.out, "Executing " + createTempTableStatement1);
                        statement.executeUpdate(createTempTableStatement1);

                        // bulk insert the relevant rows from node i to
                        // temporary table 1
                        if (insertTempTableStatement1 != null) {
                            localNode.log(System.out, "Executing " + insertTempTableStatement1);
                            statement.executeUpdate(insertTempTableStatement1);
                        }

                        // create global temporary table with only the relevant
                        // columns for table 2
                        localNode.log(System.out, "Executing " + createTempTableStatement2);
                        statement.executeUpdate(createTempTableStatement2);

                        // bulk insert the relevant rows from node j to
                        // temporary table 2
                        if (insertTempTableStatement2 != null) {
                            localNode.log(System.out, "Executing " + insertTempTableStatement2);
                            statement.executeUpdate(insertTempTableStatement2);
                        }

                        // execute the join on the two temporary tables.
                        localNode.log(System.out, "Executing " + joinStatement);
                        final ResultSet set = statement.executeQuery(joinStatement);
                        while (set.next()) {
                            final StringBuilder builder = new StringBuilder();
                            final int count = set.getMetaData().getColumnCount();
                            for (int i = 1; i <= count; i++)
                                builder.append(set.getString(i).trim() + "  ");
                            result.add(builder.toString());
                        }
                    }
                });
            }
        }

        // Print the final result.
        System.out.println("\nStatement, " + sql.sql + ", executing result: ");
        for (final String item : result)
            System.out.println(item.trim());
        System.out.println();
    }

    /**
     * Execute one select statement and print the results.
     */
    private void runSelect(final SQLStructure sql) throws ProgramException {

        // the nodes read from the catalog.
        final ArrayList<ConfigurationNode> nodes = new ArrayList<ConfigurationNode>();
        final String tableName = sql.tableName1;

        // Connect to the catalog.
        this.catalog.runStatement(new StatementRunner() {
            public void run(final Statement statement) throws ProgramException, SQLException {

                final String sql = "SELECT DISTINCT * FROM DTABLES WHERE TNAME = '"
                        + tableName + "' OR TNAME = UCASE('" + tableName + "')";
                SelectOrJoin.this.catalog.log(System.out, "Executing " + sql);
                final ResultSet set = statement.executeQuery(sql);

                try {
                    // For all cluster node found that have this table, add
                    // the node info to the configuration.
                    while (set.next()) {
                        final String driver = set.getString("NODEDRIVER").trim();
                        final String url = set.getString("NODEURL").trim();
                        final String user = set.getString("NODEUSER").trim();
                        final String password = set.getString("NODEPASSWD").trim();
                        final String name = "node " + set.getString("NODEID").trim();

                        nodes.add(new ConfigurationNode(name, driver, url, user, password));
                    }
                } finally {
                    set.close();
                }
            }
        });
        final int count = nodes.size();
        this.catalog.log(System.out, nodes.size() + " nodes with table name '" + tableName + "' were found.");

        // start threads on each node
        final Thread[] threads = new Thread[count];
        final ArrayList<SelectRunner> runners = new ArrayList<SelectRunner>();
        for (int i = 0; i < count; i++) {
            final SelectRunner runner = new SelectRunner(nodes.get(i), sql.sql);
            runners.add(runner);
            threads[i] = new Thread(runner);
            threads[i].start();
        }

        // wait for the threads to complete and collect the results.
        final StringBuilder resultBuilder = new StringBuilder();
        for (int i = 0; i < count; i++) {
            DistributedDB.join(threads[i], nodes.get(i));
            final String[] results = runners.get(i).getResults();
            for (final String result : results)
                resultBuilder.append(result + "\n");
        }

        // print the collected result
        if (count != 0) {
            System.out.println("\nResults of executing " + sql.sql + " are: ");
            System.out.println(resultBuilder.toString());
        }

    }

    /**
     * Method returns a bulk insert statement based on the relevant columns and
     * relevant qualifications. This method applies both projection and
     * selection push down.
     * 
     * @param srcNode
     *            The node that's to be copied from.
     * @param tableName
     *            The table whose contents are to be copied.
     * @param relevantColumns
     *            The list of columns that're related to this table.
     * @param relevantQualifications
     *            The list of qualifications that're related to this table.
     * 
     */
    private static String insertTempTableStatement(
            final ConfigurationNode srcNode,
            final String tableName,
            final ArrayList<String> relevantColumns,
            final ArrayList<String> relevantQualifications) {

        final int relaventColumnsCount = relevantColumns.size();
        final String tempTableName = ("TEMP_" + tableName);

        // retrieve all records from the target table in node1.
        final ArrayList<String[]> records = new ArrayList<String[]>();
        try {
            srcNode.runStatement(new StatementRunner() {
                public void run(final Statement statement) throws ProgramException, SQLException {

                    String whereClause = "";
                    final int size = relevantQualifications.size();
                    if (size != 0) {
                        whereClause += " WHERE ";
                        for (int i = 0; i < size - 1; i++)
                            whereClause += relevantQualifications.get(i) + " OR ";
                        whereClause += relevantQualifications.get(size - 1);
                    }
                    final String selectStatement = "SELECT " +
                            commaSeparatedColumnNames(relevantColumns)
                            + " FROM " + tableName + whereClause;
                    final ResultSet set = statement.executeQuery(selectStatement);
                    while (set.next()) {
                        final String[] record = new String[relaventColumnsCount];
                        for (int i = 1; i <= relaventColumnsCount; i++) {
                            final Object o = set.getString(i);
                            record[i - 1] = o == null ? "null" : o.toString().trim();
                        }
                        records.add(record);
                    }
                }
            });
        } catch (final Exception e) {
            srcNode.log(System.err, e.toString());
        }

        // construct the bulk insert statement.
        final StringBuilder builder = new StringBuilder();
        builder.append("INSERT INTO SESSION." + tempTableName + " VALUES \n");
        final int recordsCount = records.size();
        if (recordsCount == 0)
            return null;
        for (int i = 0; i < recordsCount; i++) {
            final String[] record = records.get(i);
            builder.append("\t\t\t\t(");
            for (int j = 0; j < relaventColumnsCount - 1; j++)
                builder.append(DistributedDB.quote(record[j]) + ", ");
            builder.append(DistributedDB.quote(record[relaventColumnsCount - 1]) + ")");
            if (i != recordsCount - 1)
                builder.append(", \n");
        }
        return builder.toString();
    }

    /**
     * Method returns a creates table statement that makes a temporary table.
     * The temporary table contains only the necessary columns.
     * 
     * @param srcNode
     *            The node that the original table is located.
     * @param tableName
     *            The table whose schema is used for the temporary table.
     * @param relevantColumns
     *            The list of columns that're related to this table.
     * 
     */
    private static String createTempTableStatement(
            final ConfigurationNode srcNode,
            final String tableName,
            final ArrayList<String> relevantColumns) {

        final String tempTableName = ("TEMP_" + tableName);

        // otherwise make a temp table with minimal relevant columns
        final ArrayList<String> columnNames = new ArrayList<String>();
        final ArrayList<String> columnTypes = new ArrayList<String>();
        try {
            srcNode.runStatement(new StatementRunner() {
                public void run(final Statement statement) throws ProgramException, SQLException {

                    final String query = "SELECT " +
                            commaSeparatedColumnNames(relevantColumns)
                            + " FROM " + tableName;
                    final ResultSet set = statement.executeQuery(query);
                    srcNode.log(System.out, "Excecuting " + query);

                    // retrieve the column names and column types.
                    final ResultSetMetaData meta = set.getMetaData();
                    if (!set.next())
                        srcNode.log(System.out, String.format("Table '%s' is empty", tableName));

                    final StringBuilder builder = new StringBuilder();
                    for (int i = 1; i <= meta.getColumnCount(); i++) {
                        columnNames.add(meta.getColumnName(i));
                        String columnType = meta.getColumnTypeName(i);

                        // if column type is char, make its size to be 128
                        // long in the temp table.
                        if (columnType.equalsIgnoreCase("CHAR")) {
                            builder.setLength(0);
                            builder.append(columnType);
                            builder.append("(128)");
                            columnType = builder.toString();
                        }
                        columnTypes.add(columnType);
                    }
                }
            });
        } catch (final Exception e) {
            srcNode.log(System.err, e.getMessage());
        }

        // construct the create table statement.
        final StringBuilder builder = new StringBuilder();
        // builder.append("CREATE TABLE ");
        builder.append("DECLARE GLOBAL TEMPORARY TABLE ");
        builder.append(tempTableName + " (");
        final int columnNumber = columnNames.size();
        for (int i = 0; i < columnNumber - 1; i++) {
            builder.append(columnNames.get(i) + " ");
            builder.append(columnTypes.get(i) + ", ");
        }
        builder.append(columnNames.get(columnNumber - 1) + " ");
        builder.append(columnTypes.get(columnNumber - 1) + ")");
        builder.append(" ON COMMIT PRESERVE ROWS NOT LOGGED");
        return builder.toString();
    }

    /**
     * A helper method that takes a list of columns names and append them into
     * to a comma separated string.
     * 
     * @param columns
     *            The list of columns names
     * 
     * @return The comma separated column names
     * 
     */
    private static String commaSeparatedColumnNames(
            final ArrayList<String> columns) {
        final StringBuilder builder = new StringBuilder();
        for (int i = 0; i < columns.size() - 1; i++)
            builder.append(columns.get(i) + ", ");
        builder.append(columns.get(columns.size() - 1));
        return builder.toString();
    }

    /**
     * A helper method that takes a list of columns names in the format of
     * "tableName.columnName, and returns a list of columns without the leading
     * table names.
     * 
     * @param columns
     *            The list of columns in the format of "tableName.columnName"
     * 
     * @return A list of column names in the format of "columnName"
     * 
     */
    private static ArrayList<String> stripTableName(final ArrayList<String> columns) {
        final ArrayList<String> stripedColumns = new ArrayList<String>();
        for (String item : columns) {
            item = item.replace('.', ':');
            final String[] result = item.split(":");
            final int size = result.length;
            assert size <= 2;
            if (size == 1)
                stripedColumns.add(result[0].trim());
            if (size == 2)
                stripedColumns.add(result[1].trim());
        }
        return stripedColumns;
    }

    /**
     * Connect to specified catalog node and retrieves the node information. It
     * fills two ArrayLists with nodes that contain pieces of each of the two
     * specified input tables.
     * 
     * @param catalog
     *            The catalog DB, whose dtables contains the distribution info.
     * @param tableName1
     *            The first table name that it's looking for.
     * @param tableName2
     *            The second table name that it's looking for.
     * @param table1RElevantNodes
     *            The ArrayList containing the nodes that relevant to
     *            tableName1.
     * @param table2RElevantNodes
     *            The ArrayList containing the nodes that relevant to
     *            tableName2.
     * 
     */
    private void fetchRelevantNodes(
            final String tableName1,
            final String tableName2,
            final ArrayList<ConfigurationNode> table1RelevantNodes,
            final ArrayList<ConfigurationNode> table2RelevantNodes) {

        // Connect to the catalog.
        try {
            this.catalog.runStatement(new StatementRunner() {
                public void run(final Statement statement) throws ProgramException, SQLException {

                    // fetch the relevant nodes in table1
                    String sql = SelectOrJoin.this.fetchRelevantNodesHelper(tableName1, statement, table1RelevantNodes);
                    SelectOrJoin.this.catalog.log(System.out, "Eexcuted " + sql);

                    // fetch the relevant nodes in table2
                    sql = SelectOrJoin.this.fetchRelevantNodesHelper(tableName2, statement, table2RelevantNodes);
                    SelectOrJoin.this.catalog.log(System.out, "Eexcuted " + sql);
                }
            });
        } catch (final Exception e) {
            this.catalog.log(System.err, "failed in fetchRelevantNodes " + e.toString());
        }

    }

    /**
     * This is a helper method for fetchRelevantNodes. It executes append nodes
     * that's related to a specified table name into an ArrayList.
     * 
     * @param tableName
     *            The table name that it's looking for.
     * @param statement
     *            The statement created by catalog connection.
     * 
     * @throws ProgramException
     *             Thrown if the expected column is not found in the catalog.
     * 
     */
    private String fetchRelevantNodesHelper(
            final String tableName,
            final Statement statement,
            final ArrayList<ConfigurationNode> relevantNodes)
            throws SQLException, ProgramException {
        final String sql = "SELECT * FROM DTABLES WHERE TNAME = '"
                + tableName + "' OR TNAME = UCASE ('" + tableName + "')";
        final ResultSet set = statement.executeQuery(sql);
        try {
            while (set.next()) {
                final String driver = set.getString("NODEDRIVER").trim();
                final String url = set.getString("NODEURL").trim();
                final String user = set.getString("NODEUSER").trim();
                final String password = set.getString("NODEPASSWD").trim();
                final String name = "node " + set.getString("NODEID").trim();

                relevantNodes.add(new ConfigurationNode(name, driver, url, user, password));
            }
        } finally {
            set.close();
        }
        return sql;
    }

    /**
     * Create and return a join statement according to the target columns, where
     * clause, and target table names.
     * 
     * @param targetColumns
     *            The list of target columns that are in the select clause.
     * @param whereClause
     *            The where clause to be put in this join statement.
     * @param tableName1
     *            The first table to be joined
     * @param tableName2
     *            The second table to be joined
     * 
     */
    private static String joinStatement(
            final String sql,
            final String tableName1,
            final String tableName2) {
        final String replace = sql.replaceAll(tableName1, "SESSION.TEMP_" + tableName1);
        final String result = replace.replaceAll(tableName2, "SESSION.TEMP_" + tableName2);
        return result;
    }
}

/**
 * A runner class that implements a run method that executes the select SQL
 * statement on a node.
 */
class SelectRunner implements Runnable {

    /** The configuration node associated with a single thread. */
    private final ConfigurationNode node;

    /** The SQL statement to be executed. */
    private final String sql;

    /** The results from the query. */
    private final ArrayList<String> results = new ArrayList<String>();

    /**
     * returns the select statement results on this node as an array of strings.
     */
    public String[] getResults() {
        final String[] results = new String[this.results.size()];
        this.results.toArray(results);
        return results;
    }

    /**
     * Initializes a new instance of the RunSQL Runner.
     * 
     * @param node
     *            The cluster node associated with this instance.
     * 
     * @param sql
     *            The select SQL statement to execute.
     */
    public SelectRunner(final ConfigurationNode node, final String sql) {
        assert null != node;
        assert null != sql;

        this.node = node;
        this.sql = sql;
    }

    /**
     * Executes the select statement on this node.
     */
    public void run() {

        // Connect to the specific node and execute the select statement.
        try {
            this.node.runStatement(new StatementRunner() {
                public void run(final Statement statement) throws ProgramException, SQLException {

                    // Execute the select statement.
                    SelectRunner.this.node.log(System.out, "Executing " + SelectRunner.this.sql);
                    final ResultSet set = statement.executeQuery(SelectRunner.this.sql);

                    try {
                        while (set.next()) {
                            final StringBuilder builder = new StringBuilder();
                            final int count = set.getMetaData().getColumnCount();
                            for (int i = 1; i <= count; i++) {
                                builder.append(set.getString(i).trim());
                                if (i < count)
                                    builder.append("\t");
                            }

                            SelectRunner.this.results.add(builder.toString());
                        }

                        // SelectRunner.this.node.log(System.out,
                        // "Select statement executed successfully");
                    } finally {
                        set.close();
                    }
                }
            });

        } catch (final ProgramException e) {
            this.node.log(System.err, "SelectRunner run " + e.getMessage());
        }
    }
}
Valid HTML 4.01 Valid CSS