commit 93d8be0185771e1167edcca35683b6459657b7f0 Author: Derry Hamilton Date: Fri May 10 14:05:04 2019 +0100 Initial checkin diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..343d158 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +target +data-migration-tools +*.swp +*.orig diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..f21da6b --- /dev/null +++ b/pom.xml @@ -0,0 +1,93 @@ + + + 4.0.0 + com.rasilon.ujetl + CopyingApp + jar + 2.0.0 + uJETL + + + + junit + junit + 4.12 + + + org.apache.commons + commons-lang3 + 3.9 + + + commons-logging + commons-logging + 1.2 + + + org.apache.commons + commons-configuration2 + 2.4 + + + + commons-beanutils + commons-beanutils + 1.9.3 + + + com.beust + jcommander + 1.72 + + + org.apache.logging.log4j + log4j-api + 2.11.2 + + + org.apache.logging.log4j + log4j-core + 2.11.2 + + + org.postgresql + postgresql + 42.2.5 + + + + + + maven-compiler-plugin + 2.3.2 + + 1.8 + 1.8 + + + + maven-assembly-plugin + + + jar-with-dependencies + + + + com.rasilon.ujetl.CopyingApp + + + + + + package + + single + + + + + + + diff --git a/run_copying_job b/run_copying_job new file mode 100755 index 0000000..d0d0313 --- /dev/null +++ b/run_copying_job @@ -0,0 +1,46 @@ +#!/bin/bash + +set -e + +JOBNAME=${1:-undef} +if [ -z "${JOBNAME}" ] +then + echo "Job name not specified" + exit 1 +fi +if [ ! -e "/etc/ujetl/${JOBNAME}_config_live.xml" ] +then + echo "Could not find config file for $JOBNAME" + exit 2 +fi + +LOG_PROPS=/etc/ujetl/copying_defaults_log4j.xml +if [ -e "/etc/ujetl/${JOBNAME}_log4j.xml" ] +then + echo Using log config "/etc/ujetl/${JOBNAME}_log4j.xml" + LOG_PROPS="/etc/ujetl/${JOBNAME}_log4j.xml" +else + echo Using default logging. +fi +test -e "$LOG_PROPS" + + +LOCKFILE=/tmp/"${JOBNAME}"_copying.lock + +/usr/bin/lockfile -r ${RETRIES:-"1"} $LOCKFILE || exit 1 + +trap "rm -f $LOCKFILE; exit" INT TERM EXIT + +/usr/bin/java \ + -Xms1g \ + -Xmx2g \ + -jar target/CopyingApp-2.0.0-jar-with-dependencies.jar \ + --log4j "$LOG_PROPS" \ + --config "/etc/ujetl/${JOBNAME}_config_live.xml" + +if [ -e "/etc/ujetl/${JOBNAME}_post.sql" ] +then + psql -f "/etc/ujetl/${JOBNAME}_post.sql" data_hub +fi + +rm -f $LOCKFILE diff --git a/src/main/java/com/rasilon/ujetl/CopyingApp.java b/src/main/java/com/rasilon/ujetl/CopyingApp.java new file mode 100644 index 0000000..d53e138 --- /dev/null +++ b/src/main/java/com/rasilon/ujetl/CopyingApp.java @@ -0,0 +1,194 @@ +package com.rasilon.ujetl; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.List; +import java.util.Collection; +import java.util.Properties; +import java.util.concurrent.Executors ; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.FileReader; +import java.sql.SQLException; +import java.sql.ResultSet; +import java.sql.PreparedStatement; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.logging.log4j.LogManager; +import org.apache.commons.configuration2.Configuration; +import org.apache.commons.configuration2.builder.fluent.Configurations; +import org.apache.commons.configuration2.ex.ConfigurationException; + +import org.apache.commons.beanutils.PropertyUtils; // Why does config need this? + + +/** + * @author derryh + * + */ +public class CopyingApp { + static Logger log = LogManager.getLogger(CopyingApp.class); + + public static void main(String[] args) { + CopyingAppCommandParser cli = new CopyingAppCommandParser(args); + LoggerContext context = (org.apache.logging.log4j.core.LoggerContext) LogManager.getContext(false); + String log4jConfigLocation = cli.getLog4jConfigFile(); + File file = new File(log4jConfigLocation); + context.setConfigLocation(file.toURI()); + System.out.println("Config set from "+file.toURI()); + + CopyingApp app = new CopyingApp(cli); + try { + app.run(); + } catch(Exception e) { + log.error(String.format("%s - %s",app.getJobName(),e.toString())); + } + } + + CopyingAppCommandParser cli; + String jobName; + Integer blockSize = 100; + public CopyingApp(CopyingAppCommandParser cli) { + this.cli = cli; + + String fname = cli.getConfigFile(); + File f = new File(cli.getConfigFile()); + String[] job_name = fname.split("_"); + jobName = (f.getName().split("_"))[0]; + + + } + + public String getJobName() { + return jobName; + } + + public void run() { + log.info(String.format("%s - Starting copying job",jobName)); + + Connection sConn = null; + Connection dConn = null; + try { + + Configurations configs = new Configurations(); + + Configuration config = configs.xml(cli.getConfigFile()); + + String hardLimitSeconds = config.getString("hardLimitSeconds"); + if(hardLimitSeconds != null) { + TimeLimiter hardLimit = new TimeLimiter(Integer.decode(hardLimitSeconds).intValue(),true); + log.info(String.format("%s - Set a hard runtime limit of %s seconds",jobName,hardLimitSeconds)); + hardLimit.start(); + } else { + log.info(String.format("%s - No runtime limit specified",jobName)); + } + + String blockSizeString = config.getString("blockSize"); + if(blockSizeString != null) { + blockSize = new Integer(blockSizeString); + log.info(String.format("%s - Set a block size of %s rows",jobName,blockSizeString)); + } + + + sConn = getConnFor("source",config); + dConn = getConnFor("dest",config); + dConn.setAutoCommit(false); + + Integer nRowsToLog = null; + try { + nRowsToLog = new Integer(config.getString("nRowsToLog")); + log.info(String.format("%s - Setting Row count interval to %s", jobName, nRowsToLog)); + } catch(Exception e) { + nRowsToLog = new Integer(100); // If we don't have a new setting, use the old default + log.info(String.format("%s - Setting Row count interval to default of 100 rows.",jobName)); + } + + long startTime = System.nanoTime(); + + log.info(String.format("%s - Tables are:",jobName)); + Object prop = config.getProperty("tables.table.name"); + if(prop instanceof Collection) { + int numTabs = ((Collection) prop).size(); + log.info(String.format("%s - Number of tables: %s",jobName, new Integer(numTabs))); + for(int i=0; i < numTabs; i++ ) { + String tabName = config.getString("tables.table("+i+").name"); + String tabKey = config.getString("tables.table("+i+").key"); + String tabSelect = config.getString("tables.table("+i+").select"); + String tabInsert = config.getString("tables.table("+i+").insert"); + //processTable(sConn,dConn,tabName,tabKey,tabSelect,tabInsert,nRowsToLog); + Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,nRowsToLog,blockSize); + j.start(); + j.join(); + + } + } else if(prop instanceof String) { + String tabName = config.getString("tables.table.name"); + String tabKey = config.getString("tables.table.key"); + String tabSelect = config.getString("tables.table.select"); + String tabInsert = config.getString("tables.table.insert"); + //processTable(sConn,dConn,tabName,tabKey,tabSelect,tabInsert,nRowsToLog); + Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,nRowsToLog,blockSize); + j.start(); + j.join(); + } else { + log.info(String.format("%s - Class is actually a %s",jobName, prop.getClass().getName())); + } + + long endTime = System.nanoTime(); + logDuration("Job took", startTime, endTime); + + } catch (SQLException e) { + SQLException x = e; + do { + log.error(String.format("%s - %s",jobName, x.toString())); + x.printStackTrace(); + x = x.getNextException(); + } while(x != null && x != x.getNextException()); + throw new RuntimeException(e); + } catch (ConfigurationException e) { + log.error(String.format("%s - %s",jobName, e.toString())); + throw new RuntimeException(e); + } catch (InterruptedException e) { + log.error(String.format("%s - %s",jobName, e.toString())); + throw new RuntimeException(e); + } + } + + private boolean excluded(String thing, String[] exclusions) { + for (String exclusion : exclusions) { + if (thing.equals(exclusion)) { + return true; + } + } + return false; + } + + private void logDuration(String what, long startTime, long endTime) { + long duration = endTime - startTime; + + log.info(String.format("%s - copying job completed in %s seconds",jobName, new Double(((double) duration) / 1000000000.0)) ); + + } + + private Connection getConnFor(String connType, Configuration config) throws SQLException { + Properties p = new Properties(); + p.setProperty("user",config.getString(connType + ".username")); + p.setProperty("password",config.getString(connType + ".password")); + Connection c = DriverManager.getConnection(config.getString(connType + ".dsn"),p); + c.setAutoCommit(false); + + String timeout = config.getString(connType + ".networkTimeout"); + if(timeout != null) { + try { + c.setNetworkTimeout(Executors.newFixedThreadPool(5), Integer.decode(timeout).intValue()); + log.info(String.format("%s - Set network timeout of %s on %s",jobName,timeout,connType)); + } catch(Exception e) { + log.error(String.format("%s - Failed to set connection timeout: %s",jobName,e.toString())); + } + } + + return c; + } +} diff --git a/src/main/java/com/rasilon/ujetl/CopyingAppCommandParser.java b/src/main/java/com/rasilon/ujetl/CopyingAppCommandParser.java new file mode 100644 index 0000000..d0d4d73 --- /dev/null +++ b/src/main/java/com/rasilon/ujetl/CopyingAppCommandParser.java @@ -0,0 +1,30 @@ +package com.rasilon.ujetl; + +import com.beust.jcommander.*; + +/** + * @author derryh + * + */ +public class CopyingAppCommandParser { + + @Parameter(names = {"-config","--config"}, description = "Application Config file for this run") + private String configFile; + + @Parameter(names = {"-log4j","--log4j"}, description = "Log4J config file for this run") + private String log4jConfigFile = "/etc/ppl/default_log4j_config.properties"; + + public CopyingAppCommandParser(String[] args) { + super(); + new JCommander(this, args); + } + + public String getConfigFile() { + return configFile; + } + + public String getLog4jConfigFile() { + return log4jConfigFile; + } + +} diff --git a/src/main/java/com/rasilon/ujetl/Job.java b/src/main/java/com/rasilon/ujetl/Job.java new file mode 100644 index 0000000..77b3c2c --- /dev/null +++ b/src/main/java/com/rasilon/ujetl/Job.java @@ -0,0 +1,194 @@ +package com.rasilon.ujetl; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.ResultSet; +import java.sql.PreparedStatement; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.List; +import java.util.ArrayList; + + + +import org.apache.logging.log4j.Logger; + + +/** + * @author derryh + * + */ +public class Job extends Thread { + static Logger log = org.apache.logging.log4j.LogManager.getLogger(Job.class); + + Connection sConn; + Connection dConn; + String name; + String jobName; + String key; + String select; + String insert; + Integer nRowsToLog; + Integer blockSize; + + BlockingQueue> resultBuffer; + AtomicBoolean producerLive; + + + public Job(Connection sConn,Connection dConn,String name,String jobName,String key,String select,String insert,Integer nRowsToLog,Integer blockSize) { + this.sConn = sConn; + this.dConn = dConn; + this.name = name; + this.jobName = jobName; + this.key = key; + this.select = select; + this.insert = insert; + this.nRowsToLog = nRowsToLog; + this.blockSize = blockSize; + + resultBuffer = new ArrayBlockingQueue>( 3 * blockSize); + producerLive = new AtomicBoolean(true); + } + + int arraySum(int[] arr) { + int sum = 0; + for(int i : arr) { + sum += i; + } + return sum; + } + + + + + private class Producer extends Thread { + ResultSet src; + BlockingQueue q; + public Producer(ResultSet src,BlockingQueue q) { + this.src = src; + this.q = q; + } + public void run() { + try { + long rowsInserted = 0; + long rowNum = 0; + long stamp = System.nanoTime(); + long nstamp; + int columnCount = src.getMetaData().getColumnCount(); + log.debug("Running select."); + while(src.next()) { + List row = new ArrayList(columnCount); + + for(int i=1; i<=columnCount; i++) { + row.add(src.getString(i)); + } + q.put(row); + rowNum++; + if(rowNum % nRowsToLog == 0) { + log.info(String.format("%s - Queued %s rows for %s so far",jobName,rowNum,name)); + } + } + producerLive.set(false); + log.info(String.format("%s - Queued a total of %s rows for %s",jobName,rowNum,name)); + } catch(SQLException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } catch(InterruptedException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + } + + private class Consumer extends Thread { + PreparedStatement insertStatement; + BlockingQueue> q; + + public Consumer(PreparedStatement insertStatement,BlockingQueue q) { + this.insertStatement = insertStatement; + this.q = q; + } + public void run() { + try { + long rowNum = 0; + long rowsInserted = 0; + + while(true) { + List row = q.poll(100,java.util.concurrent.TimeUnit.MILLISECONDS); + if(row == null && producerLive.get() == false) { + row = q.poll(1,java.util.concurrent.TimeUnit.MILLISECONDS); + } + if(row == null && producerLive.get() == false) { + rowsInserted += arraySum(insertStatement.executeBatch()); + dConn.commit(); + log.info(String.format("%s - Inserted a total of %s of %s notified rows into %s",jobName,rowNum,rowsInserted,name)); + return; + } + if(row == null){ + log.warn("Queue empty."); + continue; + } + + for(int i=0; i (startTime + (1000*timeoutSeconds))) { + if(forceExit) { + log.error("Hard runtime limit hit. Exiting now."); + System.exit(27); + } else { + log.error("Interrupt runtime limit hit. Watchdog thread sending and exiting."); + parentThread.interrupt(); + return; + } + } + } + } + +}