Initial checkin

This commit is contained in:
Derry Hamilton 2019-05-10 14:05:04 +01:00
commit 93d8be0185
7 changed files with 607 additions and 0 deletions

View file

@ -0,0 +1,194 @@
package com.rasilon.ujetl;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.List;
import java.util.Collection;
import java.util.Properties;
import java.util.concurrent.Executors ;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.FileReader;
import java.sql.SQLException;
import java.sql.ResultSet;
import java.sql.PreparedStatement;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.LogManager;
import org.apache.commons.configuration2.Configuration;
import org.apache.commons.configuration2.builder.fluent.Configurations;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.commons.beanutils.PropertyUtils; // Why does config need this?
/**
* @author derryh
*
*/
public class CopyingApp {
static Logger log = LogManager.getLogger(CopyingApp.class);
public static void main(String[] args) {
CopyingAppCommandParser cli = new CopyingAppCommandParser(args);
LoggerContext context = (org.apache.logging.log4j.core.LoggerContext) LogManager.getContext(false);
String log4jConfigLocation = cli.getLog4jConfigFile();
File file = new File(log4jConfigLocation);
context.setConfigLocation(file.toURI());
System.out.println("Config set from "+file.toURI());
CopyingApp app = new CopyingApp(cli);
try {
app.run();
} catch(Exception e) {
log.error(String.format("%s - %s",app.getJobName(),e.toString()));
}
}
CopyingAppCommandParser cli;
String jobName;
Integer blockSize = 100;
public CopyingApp(CopyingAppCommandParser cli) {
this.cli = cli;
String fname = cli.getConfigFile();
File f = new File(cli.getConfigFile());
String[] job_name = fname.split("_");
jobName = (f.getName().split("_"))[0];
}
public String getJobName() {
return jobName;
}
public void run() {
log.info(String.format("%s - Starting copying job",jobName));
Connection sConn = null;
Connection dConn = null;
try {
Configurations configs = new Configurations();
Configuration config = configs.xml(cli.getConfigFile());
String hardLimitSeconds = config.getString("hardLimitSeconds");
if(hardLimitSeconds != null) {
TimeLimiter hardLimit = new TimeLimiter(Integer.decode(hardLimitSeconds).intValue(),true);
log.info(String.format("%s - Set a hard runtime limit of %s seconds",jobName,hardLimitSeconds));
hardLimit.start();
} else {
log.info(String.format("%s - No runtime limit specified",jobName));
}
String blockSizeString = config.getString("blockSize");
if(blockSizeString != null) {
blockSize = new Integer(blockSizeString);
log.info(String.format("%s - Set a block size of %s rows",jobName,blockSizeString));
}
sConn = getConnFor("source",config);
dConn = getConnFor("dest",config);
dConn.setAutoCommit(false);
Integer nRowsToLog = null;
try {
nRowsToLog = new Integer(config.getString("nRowsToLog"));
log.info(String.format("%s - Setting Row count interval to %s", jobName, nRowsToLog));
} catch(Exception e) {
nRowsToLog = new Integer(100); // If we don't have a new setting, use the old default
log.info(String.format("%s - Setting Row count interval to default of 100 rows.",jobName));
}
long startTime = System.nanoTime();
log.info(String.format("%s - Tables are:",jobName));
Object prop = config.getProperty("tables.table.name");
if(prop instanceof Collection) {
int numTabs = ((Collection<?>) prop).size();
log.info(String.format("%s - Number of tables: %s",jobName, new Integer(numTabs)));
for(int i=0; i < numTabs; i++ ) {
String tabName = config.getString("tables.table("+i+").name");
String tabKey = config.getString("tables.table("+i+").key");
String tabSelect = config.getString("tables.table("+i+").select");
String tabInsert = config.getString("tables.table("+i+").insert");
//processTable(sConn,dConn,tabName,tabKey,tabSelect,tabInsert,nRowsToLog);
Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,nRowsToLog,blockSize);
j.start();
j.join();
}
} else if(prop instanceof String) {
String tabName = config.getString("tables.table.name");
String tabKey = config.getString("tables.table.key");
String tabSelect = config.getString("tables.table.select");
String tabInsert = config.getString("tables.table.insert");
//processTable(sConn,dConn,tabName,tabKey,tabSelect,tabInsert,nRowsToLog);
Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,nRowsToLog,blockSize);
j.start();
j.join();
} else {
log.info(String.format("%s - Class is actually a %s",jobName, prop.getClass().getName()));
}
long endTime = System.nanoTime();
logDuration("Job took", startTime, endTime);
} catch (SQLException e) {
SQLException x = e;
do {
log.error(String.format("%s - %s",jobName, x.toString()));
x.printStackTrace();
x = x.getNextException();
} while(x != null && x != x.getNextException());
throw new RuntimeException(e);
} catch (ConfigurationException e) {
log.error(String.format("%s - %s",jobName, e.toString()));
throw new RuntimeException(e);
} catch (InterruptedException e) {
log.error(String.format("%s - %s",jobName, e.toString()));
throw new RuntimeException(e);
}
}
private boolean excluded(String thing, String[] exclusions) {
for (String exclusion : exclusions) {
if (thing.equals(exclusion)) {
return true;
}
}
return false;
}
private void logDuration(String what, long startTime, long endTime) {
long duration = endTime - startTime;
log.info(String.format("%s - copying job completed in %s seconds",jobName, new Double(((double) duration) / 1000000000.0)) );
}
private Connection getConnFor(String connType, Configuration config) throws SQLException {
Properties p = new Properties();
p.setProperty("user",config.getString(connType + ".username"));
p.setProperty("password",config.getString(connType + ".password"));
Connection c = DriverManager.getConnection(config.getString(connType + ".dsn"),p);
c.setAutoCommit(false);
String timeout = config.getString(connType + ".networkTimeout");
if(timeout != null) {
try {
c.setNetworkTimeout(Executors.newFixedThreadPool(5), Integer.decode(timeout).intValue());
log.info(String.format("%s - Set network timeout of %s on %s",jobName,timeout,connType));
} catch(Exception e) {
log.error(String.format("%s - Failed to set connection timeout: %s",jobName,e.toString()));
}
}
return c;
}
}

View file

@ -0,0 +1,30 @@
package com.rasilon.ujetl;
import com.beust.jcommander.*;
/**
* @author derryh
*
*/
public class CopyingAppCommandParser {
@Parameter(names = {"-config","--config"}, description = "Application Config file for this run")
private String configFile;
@Parameter(names = {"-log4j","--log4j"}, description = "Log4J config file for this run")
private String log4jConfigFile = "/etc/ppl/default_log4j_config.properties";
public CopyingAppCommandParser(String[] args) {
super();
new JCommander(this, args);
}
public String getConfigFile() {
return configFile;
}
public String getLog4jConfigFile() {
return log4jConfigFile;
}
}

View file

@ -0,0 +1,194 @@
package com.rasilon.ujetl;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.ResultSet;
import java.sql.PreparedStatement;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.List;
import java.util.ArrayList;
import org.apache.logging.log4j.Logger;
/**
* @author derryh
*
*/
public class Job extends Thread {
static Logger log = org.apache.logging.log4j.LogManager.getLogger(Job.class);
Connection sConn;
Connection dConn;
String name;
String jobName;
String key;
String select;
String insert;
Integer nRowsToLog;
Integer blockSize;
BlockingQueue<List<String>> resultBuffer;
AtomicBoolean producerLive;
public Job(Connection sConn,Connection dConn,String name,String jobName,String key,String select,String insert,Integer nRowsToLog,Integer blockSize) {
this.sConn = sConn;
this.dConn = dConn;
this.name = name;
this.jobName = jobName;
this.key = key;
this.select = select;
this.insert = insert;
this.nRowsToLog = nRowsToLog;
this.blockSize = blockSize;
resultBuffer = new ArrayBlockingQueue<List<String>>( 3 * blockSize);
producerLive = new AtomicBoolean(true);
}
int arraySum(int[] arr) {
int sum = 0;
for(int i : arr) {
sum += i;
}
return sum;
}
private class Producer extends Thread {
ResultSet src;
BlockingQueue q;
public Producer(ResultSet src,BlockingQueue q) {
this.src = src;
this.q = q;
}
public void run() {
try {
long rowsInserted = 0;
long rowNum = 0;
long stamp = System.nanoTime();
long nstamp;
int columnCount = src.getMetaData().getColumnCount();
log.debug("Running select.");
while(src.next()) {
List<String> row = new ArrayList(columnCount);
for(int i=1; i<=columnCount; i++) {
row.add(src.getString(i));
}
q.put(row);
rowNum++;
if(rowNum % nRowsToLog == 0) {
log.info(String.format("%s - Queued %s rows for %s so far",jobName,rowNum,name));
}
}
producerLive.set(false);
log.info(String.format("%s - Queued a total of %s rows for %s",jobName,rowNum,name));
} catch(SQLException e) {
e.printStackTrace();
throw new RuntimeException(e);
} catch(InterruptedException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
private class Consumer extends Thread {
PreparedStatement insertStatement;
BlockingQueue<List<String>> q;
public Consumer(PreparedStatement insertStatement,BlockingQueue q) {
this.insertStatement = insertStatement;
this.q = q;
}
public void run() {
try {
long rowNum = 0;
long rowsInserted = 0;
while(true) {
List<String> row = q.poll(100,java.util.concurrent.TimeUnit.MILLISECONDS);
if(row == null && producerLive.get() == false) {
row = q.poll(1,java.util.concurrent.TimeUnit.MILLISECONDS);
}
if(row == null && producerLive.get() == false) {
rowsInserted += arraySum(insertStatement.executeBatch());
dConn.commit();
log.info(String.format("%s - Inserted a total of %s of %s notified rows into %s",jobName,rowNum,rowsInserted,name));
return;
}
if(row == null){
log.warn("Queue empty.");
continue;
}
for(int i=0; i<row.size(); i++) {
insertStatement.setString(i+1,row.get(i));
}
insertStatement.addBatch();
rowNum++;
if(rowNum % nRowsToLog == 0) {
rowsInserted += arraySum(insertStatement.executeBatch());
dConn.commit();
log.info(String.format("%s - Inserted %s of %s notified rows into %s so far",jobName,rowNum,rowsInserted,name));
}
}
} catch(SQLException e) {
e.printStackTrace();
throw new RuntimeException(e);
} catch(InterruptedException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
public void run() {
try {
ResultSet rs;
log.info(String.format("%s - Processing table: %s",jobName,name));
log.debug("Trying to execute: "+key);
PreparedStatement keyStatement = dConn.prepareStatement(key);
rs = keyStatement.executeQuery();
rs.next();
String keyVal = rs.getString("key");
log.info(String.format("%s - Our start key is %s",jobName,keyVal));
log.debug("Trying to execute: "+select);
PreparedStatement selectStatement = sConn.prepareStatement(select);
selectStatement.setFetchSize(blockSize);
selectStatement.setString(1,keyVal);
log.debug("Trying to prepare: "+insert);
PreparedStatement insertStatement = dConn.prepareStatement(insert);
log.debug("About to execute select.");
rs = selectStatement.executeQuery();
Thread p = new Producer(rs,resultBuffer);
p.start();
Thread c = new Consumer(insertStatement,resultBuffer);
c.start();
p.join();
c.join();
} catch(InterruptedException e) {
throw new RuntimeException(e);
} catch(SQLException e) {
throw new RuntimeException(e);
}
}
}

View file

@ -0,0 +1,46 @@
package com.rasilon.ujetl;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
public class TimeLimiter extends Thread {
static Logger log = LogManager.getLogger(TimeLimiter.class);
private long timeoutSeconds = 0;
private Thread parentThread = Thread.currentThread();
private boolean forceExit = false;
public TimeLimiter(long timeoutSeconds,boolean forceExit) {
this.timeoutSeconds = timeoutSeconds;
this.forceExit = forceExit;
setDaemon(true);
}
public TimeLimiter(long timeoutSeconds) {
this.timeoutSeconds = timeoutSeconds;
setDaemon(true);
}
public void run() {
long startTime = System.currentTimeMillis();
while(true) {
try {
Thread.sleep(1000);
} catch(InterruptedException e) {
// Handled by the next test. We really don't care here.
}
if(System.currentTimeMillis() > (startTime + (1000*timeoutSeconds))) {
if(forceExit) {
log.error("Hard runtime limit hit. Exiting now.");
System.exit(27);
} else {
log.error("Interrupt runtime limit hit. Watchdog thread sending and exiting.");
parentThread.interrupt();
return;
}
}
}
}
}