From ddc67f3a4148b2fb2f0c64df3277e24c7c0cf04b Mon Sep 17 00:00:00 2001 From: Derry Hamilton Date: Fri, 12 Jul 2019 13:51:30 +0100 Subject: [PATCH 01/59] Bugfix type quoting and add select generator --- config_util/ujetl_insert_generator.sql | 33 +++++++------ config_util/ujetl_select_generator.sql | 65 ++++++++++++++++++++++++++ 2 files changed, 81 insertions(+), 17 deletions(-) create mode 100644 config_util/ujetl_select_generator.sql diff --git a/config_util/ujetl_insert_generator.sql b/config_util/ujetl_insert_generator.sql index 20cb2eb..a7375f6 100644 --- a/config_util/ujetl_insert_generator.sql +++ b/config_util/ujetl_insert_generator.sql @@ -14,22 +14,22 @@ declare pks text; begin SELECT - array_to_string(array_agg(quote_ident(pg_attribute.attname::text) ),', ') into pks - FROM - pg_index, - pg_class, - pg_attribute, - pg_namespace - WHERE - pg_class.relname = tabname AND - indrelid = pg_class.oid AND - nspname = sch AND - pg_class.relnamespace = pg_namespace.oid AND - pg_attribute.attrelid = pg_class.oid AND - pg_attribute.attnum = any(pg_index.indkey) - AND indisprimary ; + array_to_string(array_agg(quote_ident(pg_attribute.attname::text) ),', ') into pks + FROM + pg_index, + pg_class, + pg_attribute, + pg_namespace + WHERE + pg_class.relname = tabname + AND indrelid = pg_class.oid + AND nspname = sch + AND pg_class.relnamespace = pg_namespace.oid + AND pg_attribute.attrelid = pg_class.oid + AND pg_attribute.attnum = any(pg_index.indkey) + AND indisprimary ; - header := 'INSERT INTO '||quote_ident(sch)||'.'||quote_ident(tabname)||E' as t (\n '; + header := E'INSERT INTO '||quote_ident(sch)||'.'||quote_ident(tabname)||E' as t (\n '; for colinfo in select * @@ -40,7 +40,6 @@ begin and table_name = tabname order by ordinal_position loop - raise info 'Working on %.% (%)',sch,tabname,colinfo::text; if not is_first then col_list := col_list || E',\n '; vals := vals || E',\n '; @@ -48,7 +47,7 @@ begin changes := changes || E'\n OR '; end if; col_list := col_list || quote_ident(colinfo.column_name); - vals := vals || '?::' || quote_ident(colinfo.data_type); + vals := vals || '?::' || colinfo.data_type; sets := sets || quote_ident(colinfo.column_name) || E' = EXCLUDED.' || quote_ident(colinfo.column_name); changes := changes || E't.' || quote_ident(colinfo.column_name) || diff --git a/config_util/ujetl_select_generator.sql b/config_util/ujetl_select_generator.sql new file mode 100644 index 0000000..5c475af --- /dev/null +++ b/config_util/ujetl_select_generator.sql @@ -0,0 +1,65 @@ +CREATE OR REPLACE FUNCTION pg_temp.ujetl_select(sch text, tabname text) + RETURNS text + LANGUAGE plpgsql +AS $function$ +declare + s text := ''; + header text := ''; + col_list text := ''; + vals text := ''; + sets text := ''; + changes text := ''; + is_first boolean := true; + colinfo record; + pks text; +begin + SELECT + array_to_string(array_agg(quote_ident(pg_attribute.attname::text) ),', ') into pks + FROM + pg_index, + pg_class, + pg_attribute, + pg_namespace + WHERE + pg_class.relname = tabname + AND indrelid = pg_class.oid + AND nspname = sch + AND pg_class.relnamespace = pg_namespace.oid + AND pg_attribute.attrelid = pg_class.oid + AND pg_attribute.attnum = any(pg_index.indkey) + AND indisprimary ; + + header := E'SELECT\n '; + for colinfo in + select + * + from + information_schema.columns + where + table_schema = sch + and table_name = tabname + order by ordinal_position + loop + if not is_first then + col_list := col_list || E',\n '; + end if; + col_list := col_list || quote_ident(colinfo.column_name); + + is_first = false; + end loop; + + s := header || + coalesce(col_list,'col_list failed') || + E'\nFROM\n ' || + quote_ident(sch)||'.'||quote_ident(tabname)||E' as t \n '|| + E'WHERE\n insert criteria here >= ?::datatype'; + return s; +end; +$function$ +; + + + + + + From 8fdbc6a78ec304865234d0762c1ca6326e84e482 Mon Sep 17 00:00:00 2001 From: Derry Hamilton Date: Mon, 5 Aug 2019 11:10:12 +0100 Subject: [PATCH 02/59] Add pre and post SQL --- .../java/com/rasilon/ujetl/CopyingApp.java | 9 ++- src/main/java/com/rasilon/ujetl/Job.java | 25 +++++++- src/test/java/com/rasilon/ujetl/TestJob.java | 2 + .../java/com/rasilon/ujetl/TestPrePost.java | 62 +++++++++++++++++++ 4 files changed, 95 insertions(+), 3 deletions(-) create mode 100644 src/test/java/com/rasilon/ujetl/TestPrePost.java diff --git a/src/main/java/com/rasilon/ujetl/CopyingApp.java b/src/main/java/com/rasilon/ujetl/CopyingApp.java index 803e741..d2465fb 100644 --- a/src/main/java/com/rasilon/ujetl/CopyingApp.java +++ b/src/main/java/com/rasilon/ujetl/CopyingApp.java @@ -131,7 +131,10 @@ public class CopyingApp { String tabKey = config.getString("jobs.job("+i+").key"); String tabSelect = config.getString("jobs.job("+i+").select"); String tabInsert = config.getString("jobs.job("+i+").insert"); - Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,nRowsToLog,blockSize,pollTimeout); + String preTarget = config.getString("jobs.job("+i+").preTarget"); + String postTarget = config.getString("jobs.job("+i+").postTarget"); + + Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,preTarget,postTarget,nRowsToLog,blockSize,pollTimeout); j.start(); j.join(); @@ -141,7 +144,9 @@ public class CopyingApp { String tabKey = config.getString("jobs.job.key"); String tabSelect = config.getString("jobs.job.select"); String tabInsert = config.getString("jobs.job.insert"); - Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,nRowsToLog,blockSize,pollTimeout); + String preTarget = config.getString("jobs.job.preTarget"); + String postTarget = config.getString("jobs.job.postTarget"); + Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,preTarget,postTarget,nRowsToLog,blockSize,pollTimeout); j.start(); j.join(); } else { diff --git a/src/main/java/com/rasilon/ujetl/Job.java b/src/main/java/com/rasilon/ujetl/Job.java index 0f226a7..f5badf2 100644 --- a/src/main/java/com/rasilon/ujetl/Job.java +++ b/src/main/java/com/rasilon/ujetl/Job.java @@ -29,6 +29,8 @@ public class Job extends Thread { String key; String select; String insert; + String preTarget; + String postTarget; Integer nRowsToLog; Integer blockSize; Integer pollTimeout; @@ -38,7 +40,7 @@ public class Job extends Thread { AtomicBoolean threadsExit = new AtomicBoolean(false);; - public Job(Connection sConn,Connection dConn,String name,String jobName,String key,String select,String insert,Integer nRowsToLog,Integer blockSize,Integer pollTimeout) { + public Job(Connection sConn,Connection dConn,String name,String jobName,String key,String select,String insert,String preTarget,String postTarget,Integer nRowsToLog,Integer blockSize,Integer pollTimeout) { this.sConn = sConn; this.dConn = dConn; this.name = name; @@ -46,6 +48,8 @@ public class Job extends Thread { this.key = key; this.select = select; this.insert = insert; + this.preTarget = preTarget; + this.postTarget = postTarget; this.nRowsToLog = nRowsToLog; this.blockSize = blockSize; this.pollTimeout = pollTimeout; @@ -169,11 +173,20 @@ public class Job extends Thread { } } + // Outer run public void run() { try { ResultSet rs; log.info(String.format("%s - Processing table: %s",jobName,name)); + if(preTarget != null){ + log.debug("Trying to execute preTarget SQL"); + PreparedStatement s = dConn.prepareStatement(preTarget); + s.executeUpdate(); + s.close(); + }else{ + log.debug("No preTarget; skipping."); + } log.debug("Trying to execute: "+key); PreparedStatement keyStatement = dConn.prepareStatement(key); @@ -211,6 +224,16 @@ public class Job extends Thread { p.join(); c.join(); + if(postTarget != null){ + log.debug("Trying to execute postTarget SQL"); + PreparedStatement s = dConn.prepareStatement(postTarget); + s.executeUpdate(); + s.close(); + }else{ + log.debug("No postTarget; skipping."); + } + + } catch(InterruptedException e) { throw new RuntimeException(e); } catch(SQLException e) { diff --git a/src/test/java/com/rasilon/ujetl/TestJob.java b/src/test/java/com/rasilon/ujetl/TestJob.java index 7b9141e..98bf01c 100644 --- a/src/test/java/com/rasilon/ujetl/TestJob.java +++ b/src/test/java/com/rasilon/ujetl/TestJob.java @@ -45,6 +45,8 @@ public class TestJob { "SELECT -1 AS key", "SELECT id,dat FROM src WHERE id > ?", "INSERT INTO dest VALUES(?,?)", + null, + null, 100, 100, 100 diff --git a/src/test/java/com/rasilon/ujetl/TestPrePost.java b/src/test/java/com/rasilon/ujetl/TestPrePost.java new file mode 100644 index 0000000..7392a28 --- /dev/null +++ b/src/test/java/com/rasilon/ujetl/TestPrePost.java @@ -0,0 +1,62 @@ +package com.rasilon.ujetl; + +import java.sql.*; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.MethodOrderer.Alphanumeric; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + + +public class TestPrePost { + + private static String jdbcURL = "jdbc:h2:mem:dbtest"; + @Test + public void test002verifyH2Works() { + try { + Connection conn = DriverManager.getConnection(jdbcURL, "sa", ""); + conn.close(); + } catch(Exception e) { + fail(e.toString()); + } + } + + @Test + public void testPrePost() { + try ( + Connection src = DriverManager.getConnection(jdbcURL, "sa", ""); + Connection dest = DriverManager.getConnection(jdbcURL, "sa", ""); + + ) { + src.createStatement().executeUpdate("CREATE TABLE src(id bigint not null primary key, dat varchar);"); + dest.createStatement().executeUpdate("CREATE TABLE dest(id bigint not null primary key, dat varchar);"); + PreparedStatement inserter = src.prepareStatement("INSERT INTO src(id,dat) VALUES(?,'banana')"); + for(int i=0; i<10000; i++) { + inserter.setInt(1,i); + inserter.executeUpdate(); + } + + Job j = new Job( + src, + dest, + "jUnit Test Config", + "jUnit Test Job", + "SELECT -1 AS key", + "SELECT id,dat FROM src WHERE id > ?", + "INSERT INTO tmp_dest VALUES(?,?)", + "CREATE TEMP TABLE tmp_dest(id bigint not null primary key, dat varchar);", + "INSERT INTO dest SELECT * from tmp_dest;", + 100, + 100, + 100 + ); + j.start(); + j.join(); + // do stuff + } catch(Exception e) { + e.printStackTrace(); + fail(e.toString()); + } + } +} From 4f905dd47adc6514471e59da40412fc0bd3b0c34 Mon Sep 17 00:00:00 2001 From: Derry Hamilton Date: Mon, 5 Aug 2019 11:32:24 +0100 Subject: [PATCH 03/59] Add test config --- docker/multistage/TEST_config_live.xml | 54 ++++++++++++++++++++++++ src/main/java/com/rasilon/ujetl/Job.java | 8 ++-- 2 files changed, 58 insertions(+), 4 deletions(-) diff --git a/docker/multistage/TEST_config_live.xml b/docker/multistage/TEST_config_live.xml index 0d6fbc5..290d35d 100644 --- a/docker/multistage/TEST_config_live.xml +++ b/docker/multistage/TEST_config_live.xml @@ -98,5 +98,59 @@ OR denormalised_personalia.lname is distinct from EXCLUDED.lname + + test pre post + select -1 as key + + + drop table if exists tmp_dest; + create temp table tmp_dest( + id bigint, + test_int integer, + test_text text, + test_ts timestamp with time zone + ); + + + insert into tmp_dest( + id, + test_int, + test_text, + test_ts + )values( + ?::bigint, + ?::integer, + ?::text, + ?::timestamp with time zone + ) + + + insert into public.dest( + id, + test_int, + test_text, + test_ts + ) + select id,test_int,test_text,test_ts + from tmp_dest + ON CONFLICT(id) DO UPDATE + set + test_int = EXCLUDED.test_int, + test_text = EXCLUDED.test_text, + test_ts = EXCLUDED.test_ts + WHERE + dest.test_int IS DISTINCT FROM EXCLUDED.test_int + OR dest.test_text IS DISTINCT FROM EXCLUDED.test_text + OR dest.test_ts IS DISTINCT FROM EXCLUDED.test_ts + + diff --git a/src/main/java/com/rasilon/ujetl/Job.java b/src/main/java/com/rasilon/ujetl/Job.java index f5badf2..cc66650 100644 --- a/src/main/java/com/rasilon/ujetl/Job.java +++ b/src/main/java/com/rasilon/ujetl/Job.java @@ -180,12 +180,12 @@ public class Job extends Thread { log.info(String.format("%s - Processing table: %s",jobName,name)); if(preTarget != null){ - log.debug("Trying to execute preTarget SQL"); + log.info("Trying to execute preTarget SQL"); PreparedStatement s = dConn.prepareStatement(preTarget); s.executeUpdate(); s.close(); }else{ - log.debug("No preTarget; skipping."); + log.info("No preTarget; skipping."); } log.debug("Trying to execute: "+key); @@ -225,12 +225,12 @@ public class Job extends Thread { c.join(); if(postTarget != null){ - log.debug("Trying to execute postTarget SQL"); + log.info("Trying to execute postTarget SQL"); PreparedStatement s = dConn.prepareStatement(postTarget); s.executeUpdate(); s.close(); }else{ - log.debug("No postTarget; skipping."); + log.info("No postTarget; skipping."); } From 4f0db0a2df46254af5316eedd4fee0ecfb884117 Mon Sep 17 00:00:00 2001 From: Derry Hamilton Date: Mon, 5 Aug 2019 11:35:22 +0100 Subject: [PATCH 04/59] Version bump --- pom.xml | 2 +- uJETL.spec | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 996b9da..3b5bafd 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma com.rasilon.ujetl CopyingApp jar - 2.0.5 + 2.1.5 uJETL https://github.com/rasilon/ujetl diff --git a/uJETL.spec b/uJETL.spec index 0d57460..7f88252 100644 --- a/uJETL.spec +++ b/uJETL.spec @@ -1,6 +1,6 @@ Summary: Java app to facilitate moving data between databases. Name: uJETL -Version: 2.0.5 +Version: 2.1.5 Release: 1 Group: Applications/Database License: All rights reserved. From 06c64d499fd833e79cf8406c14df49a2c326d88f Mon Sep 17 00:00:00 2001 From: Derry Hamilton Date: Mon, 5 Aug 2019 16:00:43 +0100 Subject: [PATCH 05/59] Explicitly commit pre and post SQL so that a post command in the last job isn't lost. --- pom.xml | 2 +- src/main/java/com/rasilon/ujetl/Job.java | 2 ++ uJETL.spec | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 3b5bafd..f080f72 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma com.rasilon.ujetl CopyingApp jar - 2.1.5 + 2.1.6 uJETL https://github.com/rasilon/ujetl diff --git a/src/main/java/com/rasilon/ujetl/Job.java b/src/main/java/com/rasilon/ujetl/Job.java index cc66650..ca855af 100644 --- a/src/main/java/com/rasilon/ujetl/Job.java +++ b/src/main/java/com/rasilon/ujetl/Job.java @@ -184,6 +184,7 @@ public class Job extends Thread { PreparedStatement s = dConn.prepareStatement(preTarget); s.executeUpdate(); s.close(); + dConn.commit(); }else{ log.info("No preTarget; skipping."); } @@ -229,6 +230,7 @@ public class Job extends Thread { PreparedStatement s = dConn.prepareStatement(postTarget); s.executeUpdate(); s.close(); + dConn.commit(); }else{ log.info("No postTarget; skipping."); } diff --git a/uJETL.spec b/uJETL.spec index 7f88252..d1f3baf 100644 --- a/uJETL.spec +++ b/uJETL.spec @@ -1,6 +1,6 @@ Summary: Java app to facilitate moving data between databases. Name: uJETL -Version: 2.1.5 +Version: 2.1.6 Release: 1 Group: Applications/Database License: All rights reserved. From 01310bfca472c297e04f6a16471d0c70da5d50c0 Mon Sep 17 00:00:00 2001 From: Derry Hamilton Date: Mon, 5 Aug 2019 16:38:24 +0100 Subject: [PATCH 06/59] Fix producer thread name, for logging and monitoring. --- pom.xml | 2 +- src/main/java/com/rasilon/ujetl/Job.java | 2 +- uJETL.spec | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index f080f72..838e43b 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma com.rasilon.ujetl CopyingApp jar - 2.1.6 + 2.1.7 uJETL https://github.com/rasilon/ujetl diff --git a/src/main/java/com/rasilon/ujetl/Job.java b/src/main/java/com/rasilon/ujetl/Job.java index ca855af..c0eb8e5 100644 --- a/src/main/java/com/rasilon/ujetl/Job.java +++ b/src/main/java/com/rasilon/ujetl/Job.java @@ -76,7 +76,7 @@ public class Job extends Thread { public Producer(ResultSet src,BlockingQueue q) { this.src = src; this.q = q; - this.setName(String.format("%s-%s-Consumer",jobName,name)); + this.setName(String.format("%s-%s-Producer",jobName,name)); } public void run() { try { diff --git a/uJETL.spec b/uJETL.spec index d1f3baf..4a5b7bf 100644 --- a/uJETL.spec +++ b/uJETL.spec @@ -1,6 +1,6 @@ Summary: Java app to facilitate moving data between databases. Name: uJETL -Version: 2.1.6 +Version: 2.1.7 Release: 1 Group: Applications/Database License: All rights reserved. From 734dc8608fa1c52ea6691195c5f1e0c8b60b889f Mon Sep 17 00:00:00 2001 From: Derry Hamilton Date: Tue, 27 Aug 2019 12:53:37 +0100 Subject: [PATCH 07/59] Add session identification --- docker/multistage/TEST_config_live.xml | 6 ++ .../java/com/rasilon/ujetl/CopyingApp.java | 40 +++++++++++- src/main/java/com/rasilon/ujetl/Job.java | 65 ++++++++++++++----- src/test/java/com/rasilon/ujetl/TestJob.java | 4 +- .../java/com/rasilon/ujetl/TestPrePost.java | 4 +- src/test/resources/TEST_config_live.xml | 2 + 6 files changed, 100 insertions(+), 21 deletions(-) diff --git a/docker/multistage/TEST_config_live.xml b/docker/multistage/TEST_config_live.xml index 290d35d..fa38eec 100644 --- a/docker/multistage/TEST_config_live.xml +++ b/docker/multistage/TEST_config_live.xml @@ -18,6 +18,8 @@ test + select 'PID:'||pg_backend_pid() + select 'PID:'||pg_backend_pid() select coalesce(-1,max(id),-1) as key from dest select @@ -84,6 +88,8 @@ denormalise + select 'PID:'||pg_backend_pid() + select 'PID:'||pg_backend_pid() select -1 as key diff --git a/src/main/java/com/rasilon/ujetl/CopyingApp.java b/src/main/java/com/rasilon/ujetl/CopyingApp.java index d2465fb..e1e47a5 100644 --- a/src/main/java/com/rasilon/ujetl/CopyingApp.java +++ b/src/main/java/com/rasilon/ujetl/CopyingApp.java @@ -133,8 +133,25 @@ public class CopyingApp { String tabInsert = config.getString("jobs.job("+i+").insert"); String preTarget = config.getString("jobs.job("+i+").preTarget"); String postTarget = config.getString("jobs.job("+i+").postTarget"); + String identifySourceSQL = config.getString("jobs.job.identifySourceSQL"); + String identifyDestinationSQL = config.getString("jobs.job.identifyDestinationSQL"); - Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,preTarget,postTarget,nRowsToLog,blockSize,pollTimeout); + Job j = new Job( + sConn, + dConn, + tabName, + jobName, + tabKey, + tabSelect, + tabInsert, + preTarget, + postTarget, + nRowsToLog, + blockSize, + pollTimeout, + identifySourceSQL, + identifyDestinationSQL + ); j.start(); j.join(); @@ -146,7 +163,26 @@ public class CopyingApp { String tabInsert = config.getString("jobs.job.insert"); String preTarget = config.getString("jobs.job.preTarget"); String postTarget = config.getString("jobs.job.postTarget"); - Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,preTarget,postTarget,nRowsToLog,blockSize,pollTimeout); + String identifySourceSQL = config.getString("jobs.job.identifySourceSQL"); + String identifyDestinationSQL = config.getString("jobs.job.identifyDestinationSQL"); + + + Job j = new Job( + sConn, + dConn, + tabName, + jobName, + tabKey, + tabSelect, + tabInsert, + preTarget, + postTarget, + nRowsToLog, + blockSize, + pollTimeout, + identifySourceSQL, + identifyDestinationSQL + ); j.start(); j.join(); } else { diff --git a/src/main/java/com/rasilon/ujetl/Job.java b/src/main/java/com/rasilon/ujetl/Job.java index c0eb8e5..67f82a3 100644 --- a/src/main/java/com/rasilon/ujetl/Job.java +++ b/src/main/java/com/rasilon/ujetl/Job.java @@ -22,25 +22,29 @@ import org.apache.logging.log4j.Logger; public class Job extends Thread { static Logger log = org.apache.logging.log4j.LogManager.getLogger(Job.class); - Connection sConn; - Connection dConn; - String name; - String jobName; - String key; - String select; - String insert; - String preTarget; - String postTarget; - Integer nRowsToLog; - Integer blockSize; - Integer pollTimeout; + private Connection sConn; + private Connection dConn; + private String name; + private String jobName; + private String key; + private String select; + private String insert; + private String preTarget; + private String postTarget; + private Integer nRowsToLog; + private Integer blockSize; + private Integer pollTimeout; + private String identifySourceSQL; + private String identifyDestinationSQL; - BlockingQueue> resultBuffer; - AtomicBoolean producerLive; - AtomicBoolean threadsExit = new AtomicBoolean(false);; + private BlockingQueue> resultBuffer; + private AtomicBoolean producerLive; + private AtomicBoolean threadsExit = new AtomicBoolean(false);; + private String sourceID; + private String destID; - public Job(Connection sConn,Connection dConn,String name,String jobName,String key,String select,String insert,String preTarget,String postTarget,Integer nRowsToLog,Integer blockSize,Integer pollTimeout) { + public Job(Connection sConn,Connection dConn,String name,String jobName,String key,String select,String insert,String preTarget,String postTarget,Integer nRowsToLog,Integer blockSize,Integer pollTimeout,String identifySourceSQL, String identifyDestinationSQL) { this.sConn = sConn; this.dConn = dConn; this.name = name; @@ -53,6 +57,8 @@ public class Job extends Thread { this.nRowsToLog = nRowsToLog; this.blockSize = blockSize; this.pollTimeout = pollTimeout; + this.identifySourceSQL = identifySourceSQL; + this.identifyDestinationSQL = identifyDestinationSQL; resultBuffer = new ArrayBlockingQueue>( 3 * blockSize); producerLive = new AtomicBoolean(true); @@ -178,7 +184,20 @@ public class Job extends Thread { try { ResultSet rs; - log.info(String.format("%s - Processing table: %s",jobName,name)); + if(identifySourceSQL != null) sourceID = getSingleString(identifySourceSQL,sConn); + if(identifyDestinationSQL != null) destID = getSingleString(identifyDestinationSQL,dConn); + + if(sourceID != null || destID != null){ + log.info(String.format( + "%s - Processing table: %s with source: %s, dest: %s", + jobName, + name, + sourceID==null?"":sourceID, + destID==null?"":destID + )); + }else{ + log.info(String.format("%s - Processing table: %s",jobName,name)); + } if(preTarget != null){ log.info("Trying to execute preTarget SQL"); PreparedStatement s = dConn.prepareStatement(preTarget); @@ -242,4 +261,16 @@ public class Job extends Thread { throw new RuntimeException(e); } } + + private String getSingleString(String sql, Connection conn){ + try{ + PreparedStatement s = conn.prepareStatement(sql); + ResultSet r = s.executeQuery(); + r.next(); + return r.getString(1); + } catch(SQLException e) { + throw new RuntimeException(e); + } + } + } diff --git a/src/test/java/com/rasilon/ujetl/TestJob.java b/src/test/java/com/rasilon/ujetl/TestJob.java index 98bf01c..853aeb4 100644 --- a/src/test/java/com/rasilon/ujetl/TestJob.java +++ b/src/test/java/com/rasilon/ujetl/TestJob.java @@ -49,7 +49,9 @@ public class TestJob { null, 100, 100, - 100 + 100, + "select 'PID:'||session_id()", + "select 'PID:'||session_id()" ); j.start(); j.join(); diff --git a/src/test/java/com/rasilon/ujetl/TestPrePost.java b/src/test/java/com/rasilon/ujetl/TestPrePost.java index 7392a28..77ff2a6 100644 --- a/src/test/java/com/rasilon/ujetl/TestPrePost.java +++ b/src/test/java/com/rasilon/ujetl/TestPrePost.java @@ -49,7 +49,9 @@ public class TestPrePost { "INSERT INTO dest SELECT * from tmp_dest;", 100, 100, - 100 + 100, + "select 'PID:'||session_id()", + "select 'PID:'||session_id()" ); j.start(); j.join(); diff --git a/src/test/resources/TEST_config_live.xml b/src/test/resources/TEST_config_live.xml index b4630a3..338faff 100644 --- a/src/test/resources/TEST_config_live.xml +++ b/src/test/resources/TEST_config_live.xml @@ -18,6 +18,8 @@ test + select 'PID:'||pg_backend_pid() + select 'PID:'||pg_backend_pid() select coalesce(-1,max(id),-1) as key from dest