RunDDL.java

package edu.hawaii.ics.yucheng;

import java.io.PrintStream;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.Statement;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;

/**
 * A class that executes a given set of DDL statements on a cluster of
 * computers, each running an instance of a DBMS. This program executes the same
 * DDL statements on the database instance of each of the computers on the
 * cluster concurrently using threads.
 * 
 * @author     Cheng Jade
 * @assignment ICS 421 Assignment 1
 * @date       Feb 10, 2010
 * @bugs       None
 */
class RunDDL implements Runnable {

    /** The catalog DBMS, which is common to all threads. */
    private static ConfigurationNode catalog;

    /** The list of commands to execute, which is common to all threads. */
    private static CommandList commands;

    /** The configuration node associated with a single thread. */
    private final ConfigurationNode node;

    /** The thread executing the commands on a single node. */
    private final Thread thread;

    /** A flag indicating if all commands executed successfully by the thread. */
    private boolean success = false;

    /**
     * The main entry point of the application.
     * 
     * @param args
     *            The command line arguments.
     */
    public static void main(final String[] args) {
        assert null != args;

        // Check for usage errors.
        if (args.length != 2) {
            final String name = RunDDL.class.getSimpleName();
            System.out.println("Usage: java " + name + " <cfg> <ddl>");
            System.out.println("       <cfg> path to a configuration file");
            System.out.println("       <ddl> path to a DDL file");
            System.exit(0);
            return;
        }

        // Read the configuration and DDL command files.
        try {
            assert null != args[0];
            assert null != args[1];

            final Configuration configuration = new Configuration(args[0]);
            commands = new CommandList(args[1]);
            catalog = configuration.getCatalog();

            // Prepare the 'DTABLES' in the catalog database. If the table does
            // not exist, a new one is created.
            prepareCatalog();

            // Start threads for each configuration node.
            final ArrayList<RunDDL> instances = new ArrayList<RunDDL>();
            for (final ConfigurationNode node : configuration.getNodes())
                instances.add(new RunDDL(node));

            // Wait for all threads to terminate.
            boolean success = true;
            for (final RunDDL instance : instances) {
                instance.join();

                // Check that all commands completed successfully.
                success &= instance.success;
            }

            // Print message indicating if all commands completed successfully.
            if (success)
                System.out.println("All DDL commands executed successfully.");
            else
                System.out.println("Not all DDL commands executed successfully.");

        } catch (final ProgramException e) {
            System.err.println(e.getMessage());
            System.exit(1);
            return;
        }

        // Exit cleanly while debugging from Eclipse.
        System.exit(0);
    }

    /**
     * Initializes a new instance of the RunDLL class.
     * 
     * @param node
     *            The cluster node associated with this instance.
     */
    private RunDDL(final ConfigurationNode node) {
        assert null != node;

        // Assign the class field to the parameter values.
        this.node = node;

        // Create a new thread for this instance.
        this.thread = new Thread(this);

        // Start the thread.
        this.thread.start();
    }
    
    /**
     * Executes the DDL commands for the node associated with this instance.
     */
    public void run() {
        try {
            StatementFactory.newStatement(this.node, new StatementRunner() {
                public void run(Statement statement) throws ProgramException, SQLException {
                    for (final String command : commands) {
                        log(System.out, "Executing statement: " + command);
                        statement.executeUpdate(command);
                        updateCatalog(command);
                        log(System.out, "Statement executed successfully.");
                    }
                }
            });
            
            // Arriving here indicates all commands executed successfully.
            this.success = true;

        } catch (final Exception e) {
            this.log(System.err, e.getMessage());
        }
    }

    /**
     * Ensures the catalog 'DTABLES' table exists.
     * 
     * @throws ProgramException
     *             Thrown if there is a problem verifying the table exists or if
     *             the table cannot be created.
     */
    private static void prepareCatalog() throws ProgramException {
        assert null != catalog;

        // Open the connection, check if the table exists, and create the table
        // if it does not exist.
        try {
            StatementFactory.newStatement(catalog, new StatementRunner() {
                public void run(Statement statement) throws ProgramException, SQLException {
                    if (!tableExists(statement.getConnection(), "DTABLES")) {
                        System.out.println("Creating table CATALOG.DTABLES.");
                        statement.executeUpdate(newCreateTableCommand());
                    }
                }
            });
            
        } catch (final Exception e) {
            throw new ProgramException(e);
        }
    }

    /**
     * Returns true if a specified command is of a specified type.
     * 
     * @param command
     *            The DDL command, e.g. "CREATE TABLE ...".
     * @param type
     *            The command type, e.g. "CREATE".
     */
    private static boolean isCommandType(
            final String command,
            final String type) {
        assert null != command;
        assert null != type;

        return command.toUpperCase().startsWith(type.toUpperCase());
    }

    /**
     * Creates and returns a new DDL command to create the 'DTABLES' catalog
     * table.
     * 
     * @return The new DDL command to create the 'DTABLES' catalog table.
     */
    private static String newCreateTableCommand() {
        final StringBuilder builder = new StringBuilder();
        builder.append("CREATE TABLE DTABLES (");
        builder.append("TNAME      CHAR(32), ");
        builder.append("NODEDRIVER CHAR(64), ");
        builder.append("NODEURL    CHAR(128), ");
        builder.append("NODEUSER   CHAR(16), ");
        builder.append("NODEPASSWD CHAR(16), ");
        builder.append("PARTMTD    INT, ");
        builder.append("NODEID     INT, ");
        builder.append("PARTCOL    CHAR(32), ");
        builder.append("PARTPARAM1 CHAR(32), ");
        builder.append("PARTPARAM2 CHAR(32))");
        return builder.toString();
    }

    /**
     * Surrounds a specified string with single quotes.
     * 
     * @param text
     *            Some text to quote.
     * 
     * @return The quoted text.
     */
    private static String quote(final String text) {
        assert null != text;

        return "'" + text + "'";
    }

    /**
     * Returns true if a specified table exists in a database.
     * 
     * @param connection
     *            The DBMS connection.
     * 
     * @param tableName
     *            The name of the table.
     * 
     * @return True indicates the table exists.
     * 
     * @throws ProgramException
     *             Thrown if there is an error checking the meta-data of the
     *             DBMS connection.
     */
    private static boolean tableExists(
            final Connection connection,
            final String tableName) throws ProgramException {
        assert null != connection;
        assert null != tableName;

        try {
            // Get the tables matching the specified name. Upper-case seems to
            // work best!
            final DatabaseMetaData meta = connection.getMetaData();
            final ResultSet set = meta.getTables(
                    null, null, tableName.toUpperCase(), null);

            try {
                // Return true if there are any records found, i.e. the table
                // exists.
                return set.next();
            } finally {
                set.close();
            }
        } catch (final SQLException e) {
            throw new ProgramException(e);
        }
    }

    /**
     * Parses a command and returns the table name for the command, or null if
     * there is none.
     * 
     * @param command
     *            The DDL command.
     * 
     * @return The table name within the command.
     */
    private static String tableName(final String command) {
        assert null != command;

        // Work with the command in upper-case letters.
        String result = command.toUpperCase();

        // Search for the word TABLE.
        int index;
        if (-1 == (index = result.indexOf("TABLE") + 5))
            return null;

        // Remove whitespace around the remaining portion of the command.
        result = result.substring(index).trim();

        // Look for the first occurrence of a special character. If a '.' is
        // encountered, remove this portion of the text, which is most likely
        // the schema.
        index = 0;
        while (index < result.length()) {
            final char ch = result.charAt(index);
            if (ch == '.') {
                result = result.substring(index + 1);
                index = 0;
                continue;
            }

            if (!Character.isLetterOrDigit(result.charAt(index)))
                break;

            index++;
        }

        // The remaining portion is the table name.
        result = result.substring(0, index);
        return result.length() == 0 ? null : result;
    }

    /**
     * Logs a message to a specified output stream.
     * 
     * @param stream
     *            The output stream, e.g. System.out.
     * 
     * @param message
     *            The message to log.
     */
    private void log(final PrintStream stream, final String message) {
        assert null != stream;
        assert null != message;

        stream.println("[" + node.hostname + "] " + message);
    }

    /**
     * Joins the thread and checks for errors. In the case of an error, a
     * warning is displayed, but no exception is thrown.
     */
    private void join() {
        try {
            this.thread.join();
        } catch (final InterruptedException e) {
            System.err.println(
                    "WARNING: Thread [" +
                    this.node.name +
                    "] failed to respond.");
        }
    }

    /**
     * Creates and returns a new DDL command to delete a row from the 'DTABLES'
     * catalog table.
     * 
     * @param tableName
     *            The name of the table to delete.
     * 
     * @return A new DDL command to delete a row from the 'DTABLES' catalog
     *         table.
     */
    private String newDeleteRowCommand(final String tableName) {
        assert null != tableName;

        final StringBuilder builder = new StringBuilder();
        builder.append("DELETE FROM DTABLES WHERE TNAME = ");
        builder.append(quote(tableName));
        builder.append(" AND NODEURL = ");
        builder.append(quote(this.node.hostname));
        return builder.toString();
    }

    /**
     * Creates and returns a new DDL command to insert a row from the 'DTABLES'
     * catalog table.
     * 
     * @param tableName
     *            The name of the table to insert.
     * 
     * @return A new DDL command to insert a row from the 'DTABLES' catalog
     *         table.
     */
    private String newInsertRowCommand(final String tableName) {
        assert null != tableName;

        final StringBuilder builder = new StringBuilder();
        builder.append("INSERT INTO DTABLES VALUES (");
        builder.append(quote(tableName));
        builder.append(", ");
        builder.append(quote(this.node.driver));
        builder.append(", ");
        builder.append(quote(this.node.hostname));
        builder.append(", ");
        builder.append(quote(this.node.username));
        builder.append(", ");
        builder.append(quote(this.node.password));
        builder.append(", NULL, ");
        builder.append(this.node.name.substring(4));
        builder.append(", NULL");
        builder.append(", NULL");
        builder.append(", NULL)");
        return builder.toString();
    }

    /**
     * Updates the catalog database based on a successfully executed DDL command
     * by one of the threads.
     * 
     * @param command
     *            The DDL command that just completed successfully.
     * 
     * @throws ProgramException
     *             Thrown if there is an error updating the catalog database.
     */
    private void updateCatalog(final String command) throws ProgramException {
        assert null != command;

        // Get the table name. If there is none, return immediately.
        final String tableName;
        if (null == (tableName = tableName(command)))
            return;
        
        // Open a connection to the catalog database.
        try {
            StatementFactory.newStatement(catalog, new StatementRunner() {
                public void run(Statement statement) throws ProgramException, SQLException {

                    // Check for CREATE and DROP commands; in these cases,
                    // add or remove a row from the DTABLES table, respectively.
                    final String catalogCommand;
                    if (isCommandType(command, "CREATE"))
                        catalogCommand = newInsertRowCommand(tableName);
                    else if (isCommandType(command, "DROP"))
                        catalogCommand = newDeleteRowCommand(tableName);
                    else {
                        log(System.err, "Unsupported command: " + command);
                        return;
                    }

                    log(System.out, "Updating catalog: " + catalogCommand);
                    statement.executeUpdate(catalogCommand);
                }
            });
            
            // Arriving here indicates all commands executed successfully.
            this.success = true;

        } catch (final Exception e) {
            this.log(System.err, "Failed to update catalog dtables. " + e.getMessage());
        }
    }
}
Valid HTML 4.01 Valid CSS