LoadCSV.java

package edu.hawaii.ics.yucheng;

import java.io.StringReader;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;

/**
 * A class implements Runnable. The run method takes a list of CSV entries and
 * upload them into corresponding tables on the distributed database system.
 * 
 * @author     Cheng Jade
 * @assignment ICS 421 Assignment 4
 * @date       Mar 22, 2010
 * @bugs       None
 */
public class LoadCSV implements Runnable {
    /* the create or drop configuration object. */
    public final LoadCSVConfiguration configuration;

    /* the catalog node extracted from the overall configuration. */
    public final ConfigurationNode catalog;

    /* a list of SQL commands to be executed. */
    private final ArrayList<String[]> csvs = new ArrayList<String[]>();

    /* a record of whether all CSV have been loaded successfully. */
    private Boolean success = null;

    /**
     * Initialize a new instance of this object
     */
    public LoadCSV(
            final LoadCSVConfiguration configuration,
            final ArrayList<String> rawCSV) throws ProgramException {
        if (null == configuration)
            throw new NullPointerException("configuration");
        if (null == rawCSV)
            throw new NullPointerException("rawCSV");

        // initialize configuration and catalog.
        this.configuration = configuration;
        this.catalog = this.configuration.catalog;

        // stitch rawCSV lines into one big string.
        final StringBuilder builder = new StringBuilder();
        for (String item : rawCSV)
            builder.append(item + "\n");
        final StringReader reader = new StringReader(builder.toString());

        // parse the string and obtain all CSV entries and fields of each entry.
        String[] fields;
        while (null != (fields = CSVParser.parse(reader)))
            csvs.add(fields);
    }

    /**
     * The main routine that insert rows to corresponding nodes and update the
     * catalog dtables.
     */
    public void run() {
        try {
            // For each node, construct a list of CSVs that will be partitioned
            // to this node.
            final ArrayList<CSVsNodePair> csvsToNodes = new ArrayList<CSVsNodePair>();
            for (int i = 0; i < this.configuration.nodes.size(); i++)
                csvsToNodes.add(new CSVsNodePair(this.configuration.nodes.get(i).node));

            // Exam every CSV and decide which list this CSV belongs.
            final ArrayList<String> columnNames = columnNames();
            for (String[] item : csvs) {
                final int toID = partitionTo(columnNames, item);
                for (CSVsNodePair pair : csvsToNodes) {
                    if (Integer.parseInt(pair.node.name.substring(4)) == toID)
                        pair.nodeCSVs.add(item);
                }
            }

            // Create a bulk insert statement for each node and start a thread
            // on each node to execute this insert statement. Threads also
            // update the catalog dtables.
            final int count = csvsToNodes.size();
            final Thread[] threads = new Thread[count];
            for (int i = 0; i < count; i++) {
                CSVsNodePair pair = csvsToNodes.get(i);
                final String bulkInsertStatement = bulkInsertStatement(pair);
                threads[i] = new Thread(new Runner(pair.node, bulkInsertStatement));
                threads[i].start();
            }

            // wait for threads.
            for (int i = 0; i < csvsToNodes.size(); i++)
                DistributedDB.join(threads[i], csvsToNodes.get(i).node);

            // Print message indicating if all commands completed successfully.
            if (this.success)
                System.out.println("All CSV were loaded successfully.");
            else
                System.out.println("Not all CSV were loaded successfully.");

        } catch (final ProgramException e) {
            System.err.println(e.getMessage());
            System.exit(1);
            return;
        }
    }

    /**
     * Connect to a node and retrieve the columns names of this table.
     * 
     * @return The list of column names in the target table.
     */
    private ArrayList<String> columnNames() throws ProgramException {

        final ConfigurationNode node = configuration.nodes.get(0).node;
        final ArrayList<String> columnNames = new ArrayList<String>();
        try {
            node.runStatement(new StatementRunner() {
                public void run(Statement statement) throws ProgramException, SQLException {
                    ResultSet set = statement.executeQuery("SELECT * FROM "
                            + configuration.tableName);
                    ResultSetMetaData meta = set.getMetaData();
                    for (int i = 1; i <= meta.getColumnCount(); i++)
                        columnNames.add(meta.getColumnName(i));
                }
            });
            node.log(System.out, "Successfully connected and retrieved column names from a node db.");
            return columnNames;
        } catch (final Exception e) {
            node.log(System.err, e.getMessage());
            throw new ProgramException("columnNames " + e);
        }
    }

    /**
     * Calculates the node ID that a specified CSV entry will be partitioned to.
     * 
     * @param columnNames
     *            The list of columns names retrieved from target table.
     * 
     * @param csv
     *            The specified CSV entry to be calculated.
     * 
     * @return The node ID that this CSV entry will be partitioned to.
     * 
     * @throws ProgramException
     *             Thrown if there are any errors processing the CSV file.
     * @throws NullPointerException
     *             Thrown if the argument is null.
     */
    private int partitionTo(
            final ArrayList<String> columnNames,
            final String[] csv) throws ProgramException {
        if (null == columnNames)
            throw new NullPointerException("columnNames");
        if (null == csv)
            throw new NullPointerException("csv");

        int indexOfPartitionColumn = -1;
        String partitionColumn = this.configuration.partitionColumn;
        for (int i = 0; i < columnNames.size(); i++) {
            if (columnNames.get(i).equalsIgnoreCase(partitionColumn)) {
                indexOfPartitionColumn = i;
                break;
            }
        }
        if (indexOfPartitionColumn == -1)
            throw new ProgramException("Mismatch partition table name and the dtables");
        String partitionValue = csv[indexOfPartitionColumn];
        try {
            int intPartitionValue = Integer.parseInt(partitionValue);
            if (this.configuration.partitionMethod.equalsIgnoreCase("range")) {
                for (int i = 0; i < this.configuration.nodes.size(); i++) {
                    int param1 = Integer.parseInt(this.configuration.nodes.get(i).param1);
                    int param2 = Integer.parseInt(this.configuration.nodes.get(i).param2);
                    if (intPartitionValue > param1 && intPartitionValue <= param2)
                        return i + 1;
                }
                throw new ProgramException("Partition value out of range");
            }
            if (configuration.partitionMethod.equalsIgnoreCase("hash")) {
                int param1 = Integer.parseInt(configuration.nodes.get(0).param1);
                return (intPartitionValue % param1) + 1;
            }
            throw new ProgramException("Support only range and hash partition methods.");
        } catch (NumberFormatException e) {
            throw new ProgramException("Support only numeric partion values");
        }
    }

    /**
     * A private class that groups a node and its list of CSVs together.
     */
    private class CSVsNodePair {
        public final ConfigurationNode node;
        public final ArrayList<String[]> nodeCSVs = new ArrayList<String[]>();

        public CSVsNodePair(ConfigurationNode node) {
            this.node = node;
        }
    }

    /**
     * Create and return a bulk insert statement.
     * 
     * @param pair
     *            A CSVsNodepair used to generate insert statement.
     * 
     * @return The bulk insert statement created.
     */
    private String bulkInsertStatement(CSVsNodePair pair) {
        final StringBuilder builder = new StringBuilder();
        builder.append("INSERT INTO " + this.configuration.tableName + " VALUES \n");
        final int nodeNum = pair.nodeCSVs.size();
        for (int i = 0; i < nodeNum; i++) {
            String[] csv = pair.nodeCSVs.get(i);
            builder.append("\t\t\t\t(");
            for (int j = 0; j < csv.length - 1; j++)
                builder.append(DistributedDB.quote(csv[j]) + ", ");
            builder.append(DistributedDB.quote(csv[csv.length - 1]) + ")");
            if (i != nodeNum - 1)
                builder.append(", \n");
        }
        return builder.toString();
    }

    /**
     * Create and return an update statement based on the partition information.
     * This SQL statement will be used to modify dtables.
     * 
     * @param partmtd
     *            An dtable entry, 1 for range, 2 for hash
     * 
     * @param partparam1
     *            An dtable entry, partition parameter 1
     * 
     * @param partparam2
     *            An dtable entry, partition parameter 2
     * 
     * @param node
     *            A ConfigurationNode to figure out the node ID entry of dtables
     * 
     * @return The the update statement
     */
    private String updateStatement(
            final String partmtd,
            final String partparam1,
            String partparam2,
            final ConfigurationNode node) {
        assert null != partmtd;
        assert null != partparam1;

        partparam2 = partparam2 == null ? "" : partparam2;

        StringBuilder builder = new StringBuilder();
        builder.append("UPDATE DTABLES");
        builder.append(" SET PARTMTD = ");
        builder.append(DistributedDB.quote(partmtd));
        builder.append(", PARTCOL = ");
        builder.append(DistributedDB.quote(this.configuration.partitionColumn));
        builder.append(", PARTPARAM1 = ");
        builder.append(DistributedDB.quote(partparam1));
        builder.append(", PARTPARAM2 = ");
        builder.append(DistributedDB.quote(partparam2));
        builder.append(" WHERE (TNAME = ");
        builder.append(DistributedDB.quote(this.configuration.tableName));
        builder.append(" OR TNAME = UCASE(");
        builder.append(DistributedDB.quote(configuration.tableName) + "))");
        if (node == null)
            return builder.toString();
        builder.append(" AND NODEID = ");
        builder.append(DistributedDB.quote(node.name.substring(4)));
        return builder.toString();

    }

    /**
     * A private runner object.
     */
    private class Runner implements Runnable {

        /** The configuration node associated with a single thread. */
        private final ConfigurationNode node;

        /** The SQL statement to be executed. */
        private final String bulkInsertStatement;

        /**
         * Initializes a new instance of the RunSQL Runner.
         * 
         * @param node
         *            The cluster node associated with this instance.
         * 
         * @param command
         *            The command to execute.
         */
        public Runner(final ConfigurationNode node, final String bulkInsertStatement) {
            assert null != node;
            assert null != bulkInsertStatement;

            this.node = node;
            this.bulkInsertStatement = bulkInsertStatement;
        }

        /**
         * Executes the insert commands for the node associated with this
         * instance, and update the catalog datables when insertion finishes.
         */
        public void run() {
            try {
                // connect to the node and execute the bulk insert statement.
                this.node.runStatement(new StatementRunner() {
                    public void run(Statement statement) throws ProgramException, SQLException {
                        node.log(System.out, "Executing: " + bulkInsertStatement);
                        statement.execute(bulkInsertStatement);
                        node.log(System.out, "Statement executed successfully.");
                    }
                });

                // connect to the catalog and update dtables.
                catalog.runStatement(new StatementRunner() {
                    public void run(Statement statement) throws ProgramException, SQLException {

                        // dispatch if the partition method is hash.
                        if (configuration.partitionMethod.equalsIgnoreCase("hash")) {
                            String updateStatement = updateStatement("2",
                                    configuration.nodes.get(0).param1,
                                    configuration.nodes.get(0).param2, null);
                            // test code
                            catalog.log(System.out, "Executing a update statement " + updateStatement);
                            statement.execute(updateStatement);
                            catalog.log(System.out, "Updated for '" + node.hostname + "'");
                            if (success == null)
                                success = true;
                            return;
                        }

                        // dispatch if the partition method is range.
                        if (configuration.partitionMethod.equalsIgnoreCase("range")) {
                            for (int i = 0; i < configuration.nodes.size(); i++) {
                                String updateStatement = updateStatement("1",
                                        configuration.nodes.get(i).param1,
                                        configuration.nodes.get(i).param2,
                                        configuration.nodes.get(i).node);
                                // test code
                                catalog.log(System.out, "Executing a update statement " + updateStatement);
                                statement.execute(updateStatement);
                            }
                            catalog.log(System.out, "Updated for '" + node.hostname + "'");
                            if (success == null)
                                success = true;
                            return;
                        }
                        // no other partition method is supported.
                        throw new ProgramException("Support only range and hash two partition methods.");
                    }
                });

            } catch (final ProgramException e) {
                success = false;
                this.node.log(System.err, e.getMessage());
            }
        }
    }

    public String toString() {
        return this.configuration.toString();
    }

}
Valid HTML 4.01 Valid CSS