diff --git a/docker/multistage/TEST_config_live.xml b/docker/multistage/TEST_config_live.xml index 290d35d..fa38eec 100644 --- a/docker/multistage/TEST_config_live.xml +++ b/docker/multistage/TEST_config_live.xml @@ -18,6 +18,8 @@ test + select 'PID:'||pg_backend_pid() + select 'PID:'||pg_backend_pid() select coalesce(-1,max(id),-1) as key from dest select @@ -84,6 +88,8 @@ denormalise + select 'PID:'||pg_backend_pid() + select 'PID:'||pg_backend_pid() select -1 as key diff --git a/src/main/java/com/rasilon/ujetl/CopyingApp.java b/src/main/java/com/rasilon/ujetl/CopyingApp.java index d2465fb..e1e47a5 100644 --- a/src/main/java/com/rasilon/ujetl/CopyingApp.java +++ b/src/main/java/com/rasilon/ujetl/CopyingApp.java @@ -133,8 +133,25 @@ public class CopyingApp { String tabInsert = config.getString("jobs.job("+i+").insert"); String preTarget = config.getString("jobs.job("+i+").preTarget"); String postTarget = config.getString("jobs.job("+i+").postTarget"); + String identifySourceSQL = config.getString("jobs.job.identifySourceSQL"); + String identifyDestinationSQL = config.getString("jobs.job.identifyDestinationSQL"); - Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,preTarget,postTarget,nRowsToLog,blockSize,pollTimeout); + Job j = new Job( + sConn, + dConn, + tabName, + jobName, + tabKey, + tabSelect, + tabInsert, + preTarget, + postTarget, + nRowsToLog, + blockSize, + pollTimeout, + identifySourceSQL, + identifyDestinationSQL + ); j.start(); j.join(); @@ -146,7 +163,26 @@ public class CopyingApp { String tabInsert = config.getString("jobs.job.insert"); String preTarget = config.getString("jobs.job.preTarget"); String postTarget = config.getString("jobs.job.postTarget"); - Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,preTarget,postTarget,nRowsToLog,blockSize,pollTimeout); + String identifySourceSQL = config.getString("jobs.job.identifySourceSQL"); + String identifyDestinationSQL = config.getString("jobs.job.identifyDestinationSQL"); + + + Job j = new Job( + sConn, + dConn, + tabName, + jobName, + tabKey, + tabSelect, + tabInsert, + preTarget, + postTarget, + nRowsToLog, + blockSize, + pollTimeout, + identifySourceSQL, + identifyDestinationSQL + ); j.start(); j.join(); } else { diff --git a/src/main/java/com/rasilon/ujetl/Job.java b/src/main/java/com/rasilon/ujetl/Job.java index c0eb8e5..67f82a3 100644 --- a/src/main/java/com/rasilon/ujetl/Job.java +++ b/src/main/java/com/rasilon/ujetl/Job.java @@ -22,25 +22,29 @@ import org.apache.logging.log4j.Logger; public class Job extends Thread { static Logger log = org.apache.logging.log4j.LogManager.getLogger(Job.class); - Connection sConn; - Connection dConn; - String name; - String jobName; - String key; - String select; - String insert; - String preTarget; - String postTarget; - Integer nRowsToLog; - Integer blockSize; - Integer pollTimeout; + private Connection sConn; + private Connection dConn; + private String name; + private String jobName; + private String key; + private String select; + private String insert; + private String preTarget; + private String postTarget; + private Integer nRowsToLog; + private Integer blockSize; + private Integer pollTimeout; + private String identifySourceSQL; + private String identifyDestinationSQL; - BlockingQueue> resultBuffer; - AtomicBoolean producerLive; - AtomicBoolean threadsExit = new AtomicBoolean(false);; + private BlockingQueue> resultBuffer; + private AtomicBoolean producerLive; + private AtomicBoolean threadsExit = new AtomicBoolean(false);; + private String sourceID; + private String destID; - public Job(Connection sConn,Connection dConn,String name,String jobName,String key,String select,String insert,String preTarget,String postTarget,Integer nRowsToLog,Integer blockSize,Integer pollTimeout) { + public Job(Connection sConn,Connection dConn,String name,String jobName,String key,String select,String insert,String preTarget,String postTarget,Integer nRowsToLog,Integer blockSize,Integer pollTimeout,String identifySourceSQL, String identifyDestinationSQL) { this.sConn = sConn; this.dConn = dConn; this.name = name; @@ -53,6 +57,8 @@ public class Job extends Thread { this.nRowsToLog = nRowsToLog; this.blockSize = blockSize; this.pollTimeout = pollTimeout; + this.identifySourceSQL = identifySourceSQL; + this.identifyDestinationSQL = identifyDestinationSQL; resultBuffer = new ArrayBlockingQueue>( 3 * blockSize); producerLive = new AtomicBoolean(true); @@ -178,7 +184,20 @@ public class Job extends Thread { try { ResultSet rs; - log.info(String.format("%s - Processing table: %s",jobName,name)); + if(identifySourceSQL != null) sourceID = getSingleString(identifySourceSQL,sConn); + if(identifyDestinationSQL != null) destID = getSingleString(identifyDestinationSQL,dConn); + + if(sourceID != null || destID != null){ + log.info(String.format( + "%s - Processing table: %s with source: %s, dest: %s", + jobName, + name, + sourceID==null?"":sourceID, + destID==null?"":destID + )); + }else{ + log.info(String.format("%s - Processing table: %s",jobName,name)); + } if(preTarget != null){ log.info("Trying to execute preTarget SQL"); PreparedStatement s = dConn.prepareStatement(preTarget); @@ -242,4 +261,16 @@ public class Job extends Thread { throw new RuntimeException(e); } } + + private String getSingleString(String sql, Connection conn){ + try{ + PreparedStatement s = conn.prepareStatement(sql); + ResultSet r = s.executeQuery(); + r.next(); + return r.getString(1); + } catch(SQLException e) { + throw new RuntimeException(e); + } + } + } diff --git a/src/test/java/com/rasilon/ujetl/TestJob.java b/src/test/java/com/rasilon/ujetl/TestJob.java index 98bf01c..853aeb4 100644 --- a/src/test/java/com/rasilon/ujetl/TestJob.java +++ b/src/test/java/com/rasilon/ujetl/TestJob.java @@ -49,7 +49,9 @@ public class TestJob { null, 100, 100, - 100 + 100, + "select 'PID:'||session_id()", + "select 'PID:'||session_id()" ); j.start(); j.join(); diff --git a/src/test/java/com/rasilon/ujetl/TestPrePost.java b/src/test/java/com/rasilon/ujetl/TestPrePost.java index 7392a28..77ff2a6 100644 --- a/src/test/java/com/rasilon/ujetl/TestPrePost.java +++ b/src/test/java/com/rasilon/ujetl/TestPrePost.java @@ -49,7 +49,9 @@ public class TestPrePost { "INSERT INTO dest SELECT * from tmp_dest;", 100, 100, - 100 + 100, + "select 'PID:'||session_id()", + "select 'PID:'||session_id()" ); j.start(); j.join(); diff --git a/src/test/resources/TEST_config_live.xml b/src/test/resources/TEST_config_live.xml index b4630a3..338faff 100644 --- a/src/test/resources/TEST_config_live.xml +++ b/src/test/resources/TEST_config_live.xml @@ -18,6 +18,8 @@ test + select 'PID:'||pg_backend_pid() + select 'PID:'||pg_backend_pid() select coalesce(-1,max(id),-1) as key from dest