SelectOrJoin.java

package edu.hawaii.ics.yucheng;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;

/**
 * A class implements Runnable. The run method takes a list of SQL statements.
 * Only select and join statements are supported. The queries will be executed
 * accordingly on the distributed database system.
 * 
 * @author Cheng Jade
 * @assignment ICS 421 Project
 * @date Mar 22, 2010
 * @bugs None
 */
public class SelectOrJoin implements Runnable {
    /* the create or drop configuration object. */
    public final SelectOrJoinConfiguration configuration;

    /* the catalog node extracted from the overall configuration. */
    public final ConfigurationNode catalog;

    /* a list of SQL commands to be executed. */
    private final SQLList sqls;

    /**
     * Initialize a new instance of this object
     */
    public SelectOrJoin(final SelectOrJoinConfiguration configuration, final ArrayList<String> sqls) {
        if (null == configuration)
            throw new NullPointerException("CreateOrDrop");
        this.configuration = configuration;
        this.catalog = this.configuration.catalog;
        this.sqls = (SQLList) sqls;
    }

    /**
     * The main routine of that executes a list of SQL statements. They are
     * either select statements or join statements.
     */
    public void run() {
        try {
            final ArrayList<SQLStructure> selectSqls = new ArrayList<SQLStructure>();
            for (final String sql : this.sqls) {
                final SQLStructure sqlStructure = new SQLStructure(sql);
                final SQLType sqlType = sqlStructure.type;
                if (sqlType == SQLType.JOIN) {
                    System.err.println("Join queries are not supported in this version of distributed DB.");
                    return;
                }
                if (sqlType == SQLType.SELECT) {
                    selectSqls.add(sqlStructure);
                    continue;
                }
                throw new ProgramException("unsupported sql: " + sql);
            }

            for (final SQLStructure sql : selectSqls)
                this.runSelect(sql);

        } catch (final ProgramException e) {
            System.err.println(e.getMessage());
            return;
        }
    }

    /**
     * Execute one select statement and print the results.
     */
    private void runSelect(final SQLStructure sql) throws ProgramException {

        // the nodes read from the catalog.
        final ArrayList<ConfigurationNode> nodes = new ArrayList<ConfigurationNode>();
        final String tableName = sql.tableName1;

        // Connect to the catalog.
        this.catalog.runStatement(new StatementRunner() {
            public void run(final Statement statement) throws ProgramException, SQLException {

                final String sql = "SELECT DISTINCT * FROM DTABLES WHERE TNAME = '" + tableName + "' OR TNAME = UCASE('" + tableName + "')";
                SelectOrJoin.this.catalog.log(System.out, "Executing " + sql);
                final ResultSet set = statement.executeQuery(sql);

                try {
                    // For all cluster node found that have this table, add
                    // the node info to the configuration.
                    while (set.next()) {
                        final String driver = set.getString("NODEDRIVER").trim();
                        final String url = set.getString("NODEURL").trim();
                        final String user = set.getString("NODEUSER").trim();
                        final String password = set.getString("NODEPASSWD").trim();
                        final String name = "node " + set.getString("NODEID").trim();

                        nodes.add(new ConfigurationNode(name, driver, url, user, password));
                    }
                } finally {
                    set.close();
                }
            }
        });
        final int count = nodes.size();
        this.catalog.log(System.out, nodes.size() + " nodes with table name '" + tableName + "' were found.");

        // start threads on each node
        final Thread[] threads = new Thread[count];
        final ArrayList<SelectRunner> runners = new ArrayList<SelectRunner>();
        for (int i = 0; i < count; i++) {
            final SelectRunner runner = new SelectRunner(nodes.get(i), sql.sql);
            runners.add(runner);
            threads[i] = new Thread(runner);
            threads[i].start();
        }

        // wait for the threads to complete and collect the results.
        final StringBuilder resultBuilder = new StringBuilder();
        for (int i = 0; i < count; i++) {
            DistributedDB.join(threads[i], nodes.get(i));
            final String[] results = runners.get(i).getResults();
            for (final String result : results)
                resultBuilder.append(result + "\n");
        }

        // print the collected result
        if (count != 0) {
            System.out.println("\nResults of executing " + sql.sql + " are: ");
            System.out.println(resultBuilder.toString());
        }

    }

}

/**
 * A runner class that implements a run method that executes the select SQL
 * statement on a node.
 */
class SelectRunner implements Runnable {

    /** The configuration node associated with a single thread. */
    private final ConfigurationNode node;

    /** The SQL statement to be executed. */
    private final String sql;

    /** The results from the query. */
    private final ArrayList<String> results = new ArrayList<String>();

    /**
     * returns the select statement results on this node as an array of strings.
     */
    public String[] getResults() {
        final String[] results = new String[this.results.size()];
        this.results.toArray(results);
        return results;
    }

    /**
     * Initializes a new instance of the RunSQL Runner.
     * 
     * @param node
     *            The cluster node associated with this instance.
     * 
     * @param sql
     *            The select SQL statement to execute.
     */
    public SelectRunner(final ConfigurationNode node, final String sql) {
        assert null != node;
        assert null != sql;

        this.node = node;
        this.sql = sql;
    }

    /**
     * Executes the select statement on this node.
     */
    public void run() {

        // Connect to the specific node and execute the select statement.
        try {
            this.node.runStatement(new StatementRunner() {
                public void run(final Statement statement) throws ProgramException, SQLException {

                    // Execute the select statement.
                    SelectRunner.this.node.log(System.out, "Executing " + SelectRunner.this.sql);
                    final ResultSet set = statement.executeQuery(SelectRunner.this.sql);

                    try {
                        while (set.next()) {
                            final StringBuilder builder = new StringBuilder();
                            final int count = set.getMetaData().getColumnCount();
                            for (int i = 1; i <= count; i++) {
                                builder.append(set.getString(i).trim());
                                if (i < count)
                                    builder.append("\t");
                            }

                            SelectRunner.this.results.add(builder.toString());
                        }

                        // SelectRunner.this.node.log(System.out,
                        // "Select statement executed successfully");
                    } finally {
                        set.close();
                    }
                }
            });

        } catch (final ProgramException e) {
            this.node.log(System.err, "SelectRunner run " + e.getMessage());
        }
    }
}
Valid HTML 4.01 Valid CSS