From 3bfc9d6a1b8ea14d26ed48fd74d37d1d2c91a64f Mon Sep 17 00:00:00 2001 From: Derry Hamilton Date: Wed, 12 Jun 2019 15:39:27 +0100 Subject: [PATCH 01/80] Remove docker instance after use --- build_util/build_in_docker | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build_util/build_in_docker b/build_util/build_in_docker index caa6996..acc6fee 100755 --- a/build_util/build_in_docker +++ b/build_util/build_in_docker @@ -3,4 +3,4 @@ set -e docker build --rm -t local/c7-buildhost docker/build -docker run -it -v `pwd`:/root/build local/c7-buildhost /root/build/build_util/build_rpm +docker run -it --rm -v `pwd`:/root/build local/c7-buildhost /root/build/build_util/build_rpm From ea629b3098385a18db4f5e3d1ba3557a816c5092 Mon Sep 17 00:00:00 2001 From: Derry Hamilton Date: Wed, 12 Jun 2019 16:21:28 +0100 Subject: [PATCH 02/80] Add multistage docker build --- docker/multistage/Dockerfile | 23 +++++++++++++++++++++++ docker/multistage/ujetl_entrypoint | 17 +++++++++++++++++ 2 files changed, 40 insertions(+) create mode 100644 docker/multistage/Dockerfile create mode 100755 docker/multistage/ujetl_entrypoint diff --git a/docker/multistage/Dockerfile b/docker/multistage/Dockerfile new file mode 100644 index 0000000..fe0653a --- /dev/null +++ b/docker/multistage/Dockerfile @@ -0,0 +1,23 @@ +FROM centos:centos7 as builder +RUN yum -y install epel-release java-1.8.0-openjdk-devel +RUN yum -y groupinstall 'Development Tools' +RUN yum -y install git maven +RUN git clone https://github.com/rasilon/ujetl.git +RUN cd ujetl && mvn package + + + + +FROM centos:centos7 +MAINTAINER Derry Hamilton + +RUN yum -y install epel-release +RUN yum -y install java-1.8.0-openjdk-devel + +RUN mkdir -p /usr/share/ujetl/lib/ /var/ujetl /etc/ujetl + +COPY --from=builder /ujetl/target/CopyingApp-2.*-jar-with-dependencies.jar /usr/share/ujetl/lib/CopyingApp.jar +COPY --from=builder /ujetl/install_extra/copying_defaults_log4j.xml /etc/ujetl/ +COPY ujetl_entrypoint / +ENTRYPOINT ["/ujetl_entrypoint"] + diff --git a/docker/multistage/ujetl_entrypoint b/docker/multistage/ujetl_entrypoint new file mode 100755 index 0000000..b5c73fe --- /dev/null +++ b/docker/multistage/ujetl_entrypoint @@ -0,0 +1,17 @@ +#!/bin/bash +set -e + +LOG_PROPS=/etc/ujetl/copying_defaults_log4j.xml + +cd /var/ujetl +for file in *.xml +do + /usr/bin/java \ + -Xms1g \ + -Xmx2g \ + -cp /usr/share/ujetl/lib/CopyingApp.jar \ + com.rasilon.ujetl.CopyingApp \ + --log4j "$LOG_PROPS" \ + --config "$file" +done + From 3badb97d49d190c4a0ee6d4e634dc81ab9951306 Mon Sep 17 00:00:00 2001 From: Derry Hamilton Date: Wed, 12 Jun 2019 16:47:28 +0100 Subject: [PATCH 03/80] Move to alpine --- docker/multistage/Dockerfile | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/docker/multistage/Dockerfile b/docker/multistage/Dockerfile index fe0653a..4ff472a 100644 --- a/docker/multistage/Dockerfile +++ b/docker/multistage/Dockerfile @@ -8,16 +8,17 @@ RUN cd ujetl && mvn package -FROM centos:centos7 +#FROM centos:centos7 +FROM openjdk:8-alpine MAINTAINER Derry Hamilton -RUN yum -y install epel-release -RUN yum -y install java-1.8.0-openjdk-devel +#RUN yum -y install epel-release +#RUN yum -y install java-1.8.0-openjdk-devel RUN mkdir -p /usr/share/ujetl/lib/ /var/ujetl /etc/ujetl COPY --from=builder /ujetl/target/CopyingApp-2.*-jar-with-dependencies.jar /usr/share/ujetl/lib/CopyingApp.jar COPY --from=builder /ujetl/install_extra/copying_defaults_log4j.xml /etc/ujetl/ -COPY ujetl_entrypoint / +COPY docker/multistage/ujetl_entrypoint / ENTRYPOINT ["/ujetl_entrypoint"] From 5571bd1b4d41fdc885bf1fa0a2695bf44ea3a876 Mon Sep 17 00:00:00 2001 From: Derry Hamilton Date: Mon, 17 Jun 2019 12:53:31 +0100 Subject: [PATCH 04/80] Add dockerised tests --- .../{build_in_docker => build_rpms_in_docker} | 0 build_util/create_run_docker | 4 +- build_util/run_docker_tests | 5 ++ docker/build/Dockerfile | 31 ----------- docker/multistage/Dockerfile | 21 +++++--- docker/multistage/TEST_config_live.xml | 52 +++++++++++++++++++ docker/multistage/ujetl_entrypoint | 4 ++ docker/multistage/wait_for_postgres | 12 +++++ docker/run/Dockerfile | 18 ------- docker/run/ujetl_entrypoint | 17 ------ docker/test_compose/docker-compose.yml | 23 ++++++++ docker/test_db/Dockerfile | 3 ++ docker/test_db/is_ready | 3 ++ docker/test_db/setup.sql | 26 ++++++++++ 14 files changed, 143 insertions(+), 76 deletions(-) rename build_util/{build_in_docker => build_rpms_in_docker} (100%) create mode 100755 build_util/run_docker_tests delete mode 100644 docker/build/Dockerfile create mode 100644 docker/multistage/TEST_config_live.xml create mode 100755 docker/multistage/wait_for_postgres delete mode 100644 docker/run/Dockerfile delete mode 100644 docker/run/ujetl_entrypoint create mode 100644 docker/test_compose/docker-compose.yml create mode 100644 docker/test_db/Dockerfile create mode 100644 docker/test_db/is_ready create mode 100644 docker/test_db/setup.sql diff --git a/build_util/build_in_docker b/build_util/build_rpms_in_docker similarity index 100% rename from build_util/build_in_docker rename to build_util/build_rpms_in_docker diff --git a/build_util/create_run_docker b/build_util/create_run_docker index 0ccc0f6..1acbb42 100755 --- a/build_util/create_run_docker +++ b/build_util/create_run_docker @@ -1,4 +1,2 @@ #!/bin/bash -set -e -cp *.rpm docker/run/ -docker build --rm -t local/c7-runhost docker/run +docker build --target deploy -t rasilon/ujetl docker/multistage diff --git a/build_util/run_docker_tests b/build_util/run_docker_tests new file mode 100755 index 0000000..e8bd891 --- /dev/null +++ b/build_util/run_docker_tests @@ -0,0 +1,5 @@ +#!/bin/bash + +docker-compose -f docker/test_compose/docker-compose.yml run --rm tests +docker-compose -f docker/test_compose/docker-compose.yml down + diff --git a/docker/build/Dockerfile b/docker/build/Dockerfile deleted file mode 100644 index c780b8f..0000000 --- a/docker/build/Dockerfile +++ /dev/null @@ -1,31 +0,0 @@ -# General build RPM environment for CentOS 6.x -# -# VERSION 0.0.1 - -FROM centos:centos7 -MAINTAINER Derry Hamilton - -# Install up-to-date epel rpm repository -RUN yum -y install epel-release - -# Install java first, to get a sensible one. -RUN yum -y install java-1.8.0-openjdk-devel - -# Install various packages to get compile environment -RUN yum -y groupinstall 'Development Tools' - -# Install git command to access GitHub repository -RUN yum -y install git - -# Install rpm-build to use rpmrebuild command -RUN yum -y install rpm-build - -# Install yum-utils to use yumdownloader command -RUN yum -y install yum-utils - -# Install rpmdevtools to use rpmdev-setuptree command -RUN yum -y install rpmdevtools - -# Install rpmdevtools to use rpmdev-setuptree command -RUN yum -y install maven -RUN mkdir -p /root/rpmbuild/SOURCES diff --git a/docker/multistage/Dockerfile b/docker/multistage/Dockerfile index 4ff472a..f529941 100644 --- a/docker/multistage/Dockerfile +++ b/docker/multistage/Dockerfile @@ -8,17 +8,24 @@ RUN cd ujetl && mvn package -#FROM centos:centos7 -FROM openjdk:8-alpine -MAINTAINER Derry Hamilton +FROM openjdk:8-alpine as runner +LABEL maintainer="Derry Hamilton " -#RUN yum -y install epel-release -#RUN yum -y install java-1.8.0-openjdk-devel +RUN apk update && apk upgrade && apk add bash RUN mkdir -p /usr/share/ujetl/lib/ /var/ujetl /etc/ujetl COPY --from=builder /ujetl/target/CopyingApp-2.*-jar-with-dependencies.jar /usr/share/ujetl/lib/CopyingApp.jar COPY --from=builder /ujetl/install_extra/copying_defaults_log4j.xml /etc/ujetl/ -COPY docker/multistage/ujetl_entrypoint / -ENTRYPOINT ["/ujetl_entrypoint"] +COPY ujetl_entrypoint / +CMD ["/ujetl_entrypoint"] + +FROM runner as tester +COPY TEST_config_live.xml /var/ujetl/ +COPY wait_for_postgres / +RUN apk add postgresql-client + + +FROM runner as deploy +# Convice docker cloud to build the deploy image diff --git a/docker/multistage/TEST_config_live.xml b/docker/multistage/TEST_config_live.xml new file mode 100644 index 0000000..97f0500 --- /dev/null +++ b/docker/multistage/TEST_config_live.xml @@ -0,0 +1,52 @@ + + + 360000 + 10000 + 1000 + + jdbc:postgresql://testdb:5432/test + test + test + 600000 + + + jdbc:postgresql://testdb:5432/test + test + test + + + + test + select coalesce(-1,max(id),-1) as key from dest + + + insert into public.dest( + id, + test_int, + test_text, + test_ts + )values( + ?::bigint, + ?::integer, + ?::text, + ?::timestamp with time zone + )ON CONFLICT(id) DO UPDATE + set + test_int = EXCLUDED.test_int, + test_text = EXCLUDED.test_text, + test_ts = EXCLUDED.test_ts + WHERE + dest.test_int = EXCLUDED.test_int + OR dest.test_text = EXCLUDED.test_text + OR dest.test_ts = EXCLUDED.test_ts + + + + diff --git a/docker/multistage/ujetl_entrypoint b/docker/multistage/ujetl_entrypoint index b5c73fe..9979de6 100755 --- a/docker/multistage/ujetl_entrypoint +++ b/docker/multistage/ujetl_entrypoint @@ -4,6 +4,10 @@ set -e LOG_PROPS=/etc/ujetl/copying_defaults_log4j.xml cd /var/ujetl +echo Currently in `pwd` +echo processing files: +ls +echo Starting run loop for file in *.xml do /usr/bin/java \ diff --git a/docker/multistage/wait_for_postgres b/docker/multistage/wait_for_postgres new file mode 100755 index 0000000..d3dee95 --- /dev/null +++ b/docker/multistage/wait_for_postgres @@ -0,0 +1,12 @@ +#!/bin/bash +set -e + +cmd="$@" + +until PGPASSWORD=test psql -h "testdb" -U "test" -c 'SELECT 1 FROM public.container_ready' postgres; do + >&2 echo "Postgres is unavailable - sleeping" + sleep 1 +done + +>&2 echo "Postgres is up - executing command" +exec $cmd diff --git a/docker/run/Dockerfile b/docker/run/Dockerfile deleted file mode 100644 index bf6cc6d..0000000 --- a/docker/run/Dockerfile +++ /dev/null @@ -1,18 +0,0 @@ -# General build RPM environment for CentOS 6.x -# -# VERSION 0.0.1 - -FROM centos:centos7 -MAINTAINER Derry Hamilton - -# Install up-to-date epel rpm repository -RUN yum -y install epel-release - -# Install java first, to get a sensible one. -RUN yum -y install java-1.8.0-openjdk-devel - -COPY uJETL-2.*.x86_64.rpm /tmp/ -COPY ujetl_entrypoint / -RUN rpm -i /tmp/uJETL-2.*.x86_64.rpm -ENTRYPOINT ["/ujetl_entrypoint"] - diff --git a/docker/run/ujetl_entrypoint b/docker/run/ujetl_entrypoint deleted file mode 100644 index b5c73fe..0000000 --- a/docker/run/ujetl_entrypoint +++ /dev/null @@ -1,17 +0,0 @@ -#!/bin/bash -set -e - -LOG_PROPS=/etc/ujetl/copying_defaults_log4j.xml - -cd /var/ujetl -for file in *.xml -do - /usr/bin/java \ - -Xms1g \ - -Xmx2g \ - -cp /usr/share/ujetl/lib/CopyingApp.jar \ - com.rasilon.ujetl.CopyingApp \ - --log4j "$LOG_PROPS" \ - --config "$file" -done - diff --git a/docker/test_compose/docker-compose.yml b/docker/test_compose/docker-compose.yml new file mode 100644 index 0000000..5048838 --- /dev/null +++ b/docker/test_compose/docker-compose.yml @@ -0,0 +1,23 @@ +# This is a sample to help put the full application together + +version: '3.3' + +services: + testdb: + image: rasilon/ujetl_testdb:latest + build: + context: ../test_db + environment: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: password + POSTGRES_DB: postgres + tests: + image: rasilon/ujetl_test:latest + build: + context: ../multistage + links: + - "testdb" + command: ["/wait_for_postgres", "/ujetl_entrypoint"] + + + diff --git a/docker/test_db/Dockerfile b/docker/test_db/Dockerfile new file mode 100644 index 0000000..868ee9f --- /dev/null +++ b/docker/test_db/Dockerfile @@ -0,0 +1,3 @@ +FROM postgres:11 +COPY setup.sql /docker-entrypoint-initdb.d/ +COPY is_ready / diff --git a/docker/test_db/is_ready b/docker/test_db/is_ready new file mode 100644 index 0000000..a92c259 --- /dev/null +++ b/docker/test_db/is_ready @@ -0,0 +1,3 @@ +#!/bin/bash +/usr/lib/postgresql/9.6/bin/psql -U postgres -c "SELECT 1 FROM public.container_ready" postgres + diff --git a/docker/test_db/setup.sql b/docker/test_db/setup.sql new file mode 100644 index 0000000..d476bde --- /dev/null +++ b/docker/test_db/setup.sql @@ -0,0 +1,26 @@ +CREATE DATABASE test; +\c test +CREATE ROLE test login password 'test'; +CREATE UNLOGGED TABLE source ( + id bigserial primary key, + test_int integer, + test_text text, + test_ts timestamp with time zone +); +CREATE UNLOGGED TABLE dest ( + id bigint primary key, + test_int integer, + test_text text, + test_ts timestamp with time zone +); + + +GRANT SELECT ON source to test; +GRANT SELECT,INSERT,UPDATE,DELETE ON dest TO test; + +INSERT INTO source(test_int,test_text,test_ts) SELECT 1,'banana',now() FROM generate_series(1,100000); + +\c postgres +CREATE TABLE public.container_ready AS SELECT 1 FROM(VALUES(1)) AS a(a); +GRANT SELECT ON public.container_ready TO TEST; + From af46f837992ace5e716aa7a4871413289e7454dc Mon Sep 17 00:00:00 2001 From: Derry Hamilton Date: Tue, 18 Jun 2019 08:31:46 +0100 Subject: [PATCH 05/80] Fix copy/paste error in poll timeout config --- src/main/java/com/rasilon/ujetl/CopyingApp.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/rasilon/ujetl/CopyingApp.java b/src/main/java/com/rasilon/ujetl/CopyingApp.java index 6938888..803e741 100644 --- a/src/main/java/com/rasilon/ujetl/CopyingApp.java +++ b/src/main/java/com/rasilon/ujetl/CopyingApp.java @@ -110,7 +110,7 @@ public class CopyingApp { Integer pollTimeout = null; try { - pollTimeout = new Integer(config.getString("nRowsToLog")); + pollTimeout = new Integer(config.getString("pollTimeout")); log.info(String.format("%s - Setting Poll timeout to %s milliseconds", jobName, pollTimeout)); } catch(Exception e) { pollTimeout = new Integer(1000); // If we don't have a new setting, use the old default From 61ddbfd817dca632afadc7ccfc6cd0f675a9e6c6 Mon Sep 17 00:00:00 2001 From: Derry Hamilton Date: Tue, 18 Jun 2019 09:37:53 +0100 Subject: [PATCH 06/80] Bump version after config fix --- pom.xml | 2 +- uJETL.spec | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 945e0cd..c72c336 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma com.rasilon.ujetl CopyingApp jar - 2.0.1 + 2.0.2 uJETL diff --git a/uJETL.spec b/uJETL.spec index 45a6bc0..162aa8e 100644 --- a/uJETL.spec +++ b/uJETL.spec @@ -1,6 +1,6 @@ Summary: Java app to facilitate moving data between databases. Name: uJETL -Version: 2.0.1 +Version: 2.0.2 Release: 1 Group: Applications/Database License: All rights reserved. From b8883d6f233b85a4f473329380072c012a6ae448 Mon Sep 17 00:00:00 2001 From: Derry Hamilton Date: Tue, 18 Jun 2019 13:17:22 +0100 Subject: [PATCH 07/80] Improve the docs a little --- README.md | 6 ++++++ src/test/resources/TEST_config_live.xml | 1 + 2 files changed, 7 insertions(+) diff --git a/README.md b/README.md index 3245b14..dab2586 100644 --- a/README.md +++ b/README.md @@ -2,3 +2,9 @@ Originally written in the days of trying to do something, with no budget, I wrote this out of necessity. I subsequently got permission to open-source it so long as there were no references to the company in it. So this is the cleaned up version, with a few additional features added and the things that turned out to be pointless removed. It's probably the smallest functional ETL application with decent performance. Since I only use it on Postgres nowadays, it only officially supports Postgres at the moment. But in the near past it's worked pulling data from "several commercial databases" that don't like being named in benchmarks etc. and if you have the JDBC jars in your classpath then it should just work. + +For an example config file, please see [TEST_config_live.xml](https://github.com/rasilon/ujetl/blob/master/src/test/resources/TEST_config_live.xml) + +To run the dockerised integration tests, use `build_util/run_docker_tests` in this repo. + +A runnable docker image is available at [rasilon/ujetl](https://cloud.docker.com/repository/docker/rasilon/ujetl). This expects config files copied into, or mounted into `/var/ujetl/`. RPMs can be built using `build_util/build_rpms_in_docker`. As the name suggests, you need docker for that. diff --git a/src/test/resources/TEST_config_live.xml b/src/test/resources/TEST_config_live.xml index 90a65f5..09795f2 100644 --- a/src/test/resources/TEST_config_live.xml +++ b/src/test/resources/TEST_config_live.xml @@ -3,6 +3,7 @@ 360000 10000 1000 + 500 jdbc:postgresql://localhost:5432/test test From 6737ae6492dd87e6cf50b94c27eeb73721bfbce1 Mon Sep 17 00:00:00 2001 From: Derry Hamilton Date: Tue, 18 Jun 2019 13:21:51 +0100 Subject: [PATCH 08/80] Add testing deps --- pom.xml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pom.xml b/pom.xml index c72c336..7887265 100644 --- a/pom.xml +++ b/pom.xml @@ -14,6 +14,13 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma junit junit 4.12 + test + + + com.h2database + h2 + 1.4.199 + test org.apache.commons From 7a858ec64a48188fdc3aff1f022c0c01a3b11edc Mon Sep 17 00:00:00 2001 From: Derry Hamilton Date: Tue, 18 Jun 2019 14:33:10 +0100 Subject: [PATCH 09/80] Add redone unit tests for v2 --- pom.xml | 29 +++++++-- src/test/java/com/rasilon/ujetl/TestJob.java | 60 +++++++++++++++++++ .../java/com/rasilon/ujetl/TestParser.java | 30 ++++++++++ 3 files changed, 114 insertions(+), 5 deletions(-) create mode 100644 src/test/java/com/rasilon/ujetl/TestJob.java create mode 100644 src/test/java/com/rasilon/ujetl/TestParser.java diff --git a/pom.xml b/pom.xml index 7887265..6083eaf 100644 --- a/pom.xml +++ b/pom.xml @@ -8,12 +8,28 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma jar 2.0.2 uJETL - + https://github.com/rasilon/ujetl + + + UTF-8 + - junit - junit - 4.12 + org.junit.jupiter + junit-jupiter-api + 5.4.2 + test + + + org.junit.jupiter + junit-jupiter-engine + 5.4.2 + test + + + org.junit.vintage + junit-vintage-engine + 5.4.2 test @@ -37,7 +53,6 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma commons-configuration2 2.4 - commons-beanutils commons-beanutils @@ -95,6 +110,10 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma + + maven-surefire-plugin + 2.22.0 + diff --git a/src/test/java/com/rasilon/ujetl/TestJob.java b/src/test/java/com/rasilon/ujetl/TestJob.java new file mode 100644 index 0000000..7b9141e --- /dev/null +++ b/src/test/java/com/rasilon/ujetl/TestJob.java @@ -0,0 +1,60 @@ +package com.rasilon.ujetl; + +import java.sql.*; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.MethodOrderer.Alphanumeric; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + + +public class TestJob { + + private static String jdbcURL = "jdbc:h2:mem:dbtest"; + @Test + public void test002verifyH2Works() { + try { + Connection conn = DriverManager.getConnection(jdbcURL, "sa", ""); + conn.close(); + } catch(Exception e) { + fail(e.toString()); + } + } + + @Test + public void testJob() { + try ( + Connection src = DriverManager.getConnection(jdbcURL, "sa", ""); + Connection dest = DriverManager.getConnection(jdbcURL, "sa", ""); + + ) { + src.createStatement().executeUpdate("CREATE TABLE src(id bigint not null primary key, dat varchar);"); + dest.createStatement().executeUpdate("CREATE TABLE dest(id bigint not null primary key, dat varchar);"); + PreparedStatement inserter = src.prepareStatement("INSERT INTO src(id,dat) VALUES(?,'banana')"); + for(int i=0; i<10000; i++) { + inserter.setInt(1,i); + inserter.executeUpdate(); + } + + Job j = new Job( + src, + dest, + "jUnit Test Config", + "jUnit Test Job", + "SELECT -1 AS key", + "SELECT id,dat FROM src WHERE id > ?", + "INSERT INTO dest VALUES(?,?)", + 100, + 100, + 100 + ); + j.start(); + j.join(); + // do stuff + } catch(Exception e) { + e.printStackTrace(); + fail(e.toString()); + } + } +} diff --git a/src/test/java/com/rasilon/ujetl/TestParser.java b/src/test/java/com/rasilon/ujetl/TestParser.java new file mode 100644 index 0000000..13366fd --- /dev/null +++ b/src/test/java/com/rasilon/ujetl/TestParser.java @@ -0,0 +1,30 @@ +package com.rasilon.ujetl; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.MethodOrderer.Alphanumeric; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + + +public class TestParser { + + @Test + public void test001Parset() { + try { + String[] args = { + "--log4j", + "log4j_test_banana.xml", + "--config", + "config_test_banana.xml" + }; + CopyingAppCommandParser p = new CopyingAppCommandParser(args); + + assertEquals(p.getConfigFile(),"config_test_banana.xml"); + assertEquals(p.getLog4jConfigFile(),"log4j_test_banana.xml"); + + } catch(Exception e) { + fail(e.toString()); + } + } +} From 4fe30a196c0f9bf483871c29119da722288f1665 Mon Sep 17 00:00:00 2001 From: Derry Hamilton Date: Tue, 18 Jun 2019 14:36:54 +0100 Subject: [PATCH 10/80] Add RPM build back --- docker/build/Dockerfile | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 docker/build/Dockerfile diff --git a/docker/build/Dockerfile b/docker/build/Dockerfile new file mode 100644 index 0000000..c780b8f --- /dev/null +++ b/docker/build/Dockerfile @@ -0,0 +1,31 @@ +# General build RPM environment for CentOS 6.x +# +# VERSION 0.0.1 + +FROM centos:centos7 +MAINTAINER Derry Hamilton + +# Install up-to-date epel rpm repository +RUN yum -y install epel-release + +# Install java first, to get a sensible one. +RUN yum -y install java-1.8.0-openjdk-devel + +# Install various packages to get compile environment +RUN yum -y groupinstall 'Development Tools' + +# Install git command to access GitHub repository +RUN yum -y install git + +# Install rpm-build to use rpmrebuild command +RUN yum -y install rpm-build + +# Install yum-utils to use yumdownloader command +RUN yum -y install yum-utils + +# Install rpmdevtools to use rpmdev-setuptree command +RUN yum -y install rpmdevtools + +# Install rpmdevtools to use rpmdev-setuptree command +RUN yum -y install maven +RUN mkdir -p /root/rpmbuild/SOURCES From ff157e6a59279d0af3ff0a6041849a19a1fc725e Mon Sep 17 00:00:00 2001 From: Derry Hamilton Date: Tue, 18 Jun 2019 14:46:21 +0100 Subject: [PATCH 11/80] Add TimeLimiter test. --- .../com/rasilon/ujetl/TestTimeLimiter.java | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 src/test/java/com/rasilon/ujetl/TestTimeLimiter.java diff --git a/src/test/java/com/rasilon/ujetl/TestTimeLimiter.java b/src/test/java/com/rasilon/ujetl/TestTimeLimiter.java new file mode 100644 index 0000000..73258eb --- /dev/null +++ b/src/test/java/com/rasilon/ujetl/TestTimeLimiter.java @@ -0,0 +1,25 @@ +package com.rasilon.ujetl; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.MethodOrderer.Alphanumeric; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + + +public class TestTimeLimiter { + + @Test + public void test001Limiter() { + try { + TimeLimiter hardLimit = new TimeLimiter(1,false); + hardLimit.start(); + + Thread.sleep(10000); + + fail("Sleep wasn't interrupted by the limiter!"); + } catch(Exception e) { + // Pass + } + } +} From 981b59225b60a9b77c32fe1170d95bdb624fc9de Mon Sep 17 00:00:00 2001 From: Derry Hamilton Date: Tue, 18 Jun 2019 15:01:04 +0100 Subject: [PATCH 12/80] Fix integration test assumptions --- build_util/run_docker_tests | 1 + docker/test_compose/docker-compose.yml | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/build_util/run_docker_tests b/build_util/run_docker_tests index e8bd891..b8317c8 100755 --- a/build_util/run_docker_tests +++ b/build_util/run_docker_tests @@ -1,5 +1,6 @@ #!/bin/bash +docker build --target tester -t rasilon/ujetl_tester docker/multistage docker-compose -f docker/test_compose/docker-compose.yml run --rm tests docker-compose -f docker/test_compose/docker-compose.yml down diff --git a/docker/test_compose/docker-compose.yml b/docker/test_compose/docker-compose.yml index 5048838..91bd3d9 100644 --- a/docker/test_compose/docker-compose.yml +++ b/docker/test_compose/docker-compose.yml @@ -12,7 +12,7 @@ services: POSTGRES_PASSWORD: password POSTGRES_DB: postgres tests: - image: rasilon/ujetl_test:latest + image: rasilon/ujetl_tester:latest build: context: ../multistage links: From 31a2968f6adcec27e0e19804e46396095e5c818d Mon Sep 17 00:00:00 2001 From: Derry Hamilton Date: Tue, 18 Jun 2019 15:07:29 +0100 Subject: [PATCH 13/80] Tigten up TimeLimiter test, and add default test logging config --- .../java/com/rasilon/ujetl/TestTimeLimiter.java | 5 ++++- src/test/resources/log4j2.xml | 13 +++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) create mode 100644 src/test/resources/log4j2.xml diff --git a/src/test/java/com/rasilon/ujetl/TestTimeLimiter.java b/src/test/java/com/rasilon/ujetl/TestTimeLimiter.java index 73258eb..db12d51 100644 --- a/src/test/java/com/rasilon/ujetl/TestTimeLimiter.java +++ b/src/test/java/com/rasilon/ujetl/TestTimeLimiter.java @@ -18,8 +18,11 @@ public class TestTimeLimiter { Thread.sleep(10000); fail("Sleep wasn't interrupted by the limiter!"); - } catch(Exception e) { + } catch(java.lang.InterruptedException e) { // Pass + } catch(Exception e) { + e.printStackTrace(); + fail("Unexpected exception."); } } } diff --git a/src/test/resources/log4j2.xml b/src/test/resources/log4j2.xml new file mode 100644 index 0000000..adeb7a4 --- /dev/null +++ b/src/test/resources/log4j2.xml @@ -0,0 +1,13 @@ + + + + + + + + + + + + + From 9d82c9f2792c0fc506b5763336e359954ceff0d9 Mon Sep 17 00:00:00 2001 From: Derry Hamilton Date: Wed, 26 Jun 2019 12:58:23 +0100 Subject: [PATCH 14/80] Add insert generator --- config_util/ujetl_insert_generator.sql | 74 ++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100644 config_util/ujetl_insert_generator.sql diff --git a/config_util/ujetl_insert_generator.sql b/config_util/ujetl_insert_generator.sql new file mode 100644 index 0000000..e7b74b4 --- /dev/null +++ b/config_util/ujetl_insert_generator.sql @@ -0,0 +1,74 @@ +CREATE OR REPLACE FUNCTION pg_temp.ujetl_insert(sch text, tabname text) + RETURNS text + LANGUAGE plpgsql +AS $function$ +declare + s text := ''; + header text := ''; + col_list text := ''; + vals text := ''; + sets text := ''; + changes text := ''; + is_first boolean := true; + colinfo record; + pks text; +begin + SELECT + array_to_string(array_agg(pg_attribute.attname::text ),', ') into pks + FROM + pg_index, + pg_class, + pg_attribute, + pg_namespace + WHERE + pg_class.relname = tabname AND + indrelid = pg_class.oid AND + nspname = sch AND + pg_class.relnamespace = pg_namespace.oid AND + pg_attribute.attrelid = pg_class.oid AND + pg_attribute.attnum = any(pg_index.indkey) + AND indisprimary ; + + header := 'INSERT INTO '||sch||'.'||tabname||E' as t (\n '; + for colinfo in + select + * + from + information_schema.columns + where + table_schema = 'bi_processing' + and table_name = 'player' + order by ordinal_position + loop + if not is_first then + col_list := col_list || E',\n '; + vals := vals || E',\n '; + sets := sets || E',\n '; + changes := changes || E'\n OR '; + end if; + col_list := col_list || colinfo.column_name; + vals := vals || '?::' || colinfo.data_type; + sets := sets || colinfo.column_name || E' = EXCLUDED.' || colinfo.column_name; + changes := changes || E't.' || colinfo.column_name || E' IS DISTINCT FROM EXCLUDED.' || colinfo.column_name; + + is_first = false; + end loop; + + s := header || + col_list || + E'\n)VALUES(\n ' || + vals || + E')\nON CONFLICT(' || pks || E') DO UPDATE\nSET\n ' || + sets || + E'\nWHERE\n '|| + changes; + return s; +end; +$function$ +; + + + + + + From 97baf0d11aa6816ed74c0c937806d0a47a82eec9 Mon Sep 17 00:00:00 2001 From: Derry Hamilton Date: Wed, 26 Jun 2019 16:31:24 +0100 Subject: [PATCH 15/80] Improve robustness of insert generator --- config_util/ujetl_insert_generator.sql | 31 ++++++++++++++------------ 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/config_util/ujetl_insert_generator.sql b/config_util/ujetl_insert_generator.sql index e7b74b4..20cb2eb 100644 --- a/config_util/ujetl_insert_generator.sql +++ b/config_util/ujetl_insert_generator.sql @@ -14,7 +14,7 @@ declare pks text; begin SELECT - array_to_string(array_agg(pg_attribute.attname::text ),', ') into pks + array_to_string(array_agg(quote_ident(pg_attribute.attname::text) ),', ') into pks FROM pg_index, pg_class, @@ -29,39 +29,42 @@ begin pg_attribute.attnum = any(pg_index.indkey) AND indisprimary ; - header := 'INSERT INTO '||sch||'.'||tabname||E' as t (\n '; + header := 'INSERT INTO '||quote_ident(sch)||'.'||quote_ident(tabname)||E' as t (\n '; for colinfo in select * from information_schema.columns where - table_schema = 'bi_processing' - and table_name = 'player' + table_schema = sch + and table_name = tabname order by ordinal_position loop + raise info 'Working on %.% (%)',sch,tabname,colinfo::text; if not is_first then col_list := col_list || E',\n '; vals := vals || E',\n '; sets := sets || E',\n '; changes := changes || E'\n OR '; end if; - col_list := col_list || colinfo.column_name; - vals := vals || '?::' || colinfo.data_type; - sets := sets || colinfo.column_name || E' = EXCLUDED.' || colinfo.column_name; - changes := changes || E't.' || colinfo.column_name || E' IS DISTINCT FROM EXCLUDED.' || colinfo.column_name; + col_list := col_list || quote_ident(colinfo.column_name); + vals := vals || '?::' || quote_ident(colinfo.data_type); + sets := sets || quote_ident(colinfo.column_name) || + E' = EXCLUDED.' || quote_ident(colinfo.column_name); + changes := changes || E't.' || quote_ident(colinfo.column_name) || + E' IS DISTINCT FROM EXCLUDED.' || quote_ident(colinfo.column_name); is_first = false; end loop; - s := header || - col_list || + s := coalesce(header,'header failed') || + coalesce(col_list,'col_list failed') || E'\n)VALUES(\n ' || - vals || - E')\nON CONFLICT(' || pks || E') DO UPDATE\nSET\n ' || - sets || + coalesce(vals,'vals failed') || + E')\nON CONFLICT(' || coalesce(pks,'No primary keys found') || E') DO UPDATE\nSET\n ' || + coalesce(sets,'sets failed') || E'\nWHERE\n '|| - changes; + coalesce(changes,'changes failed'); return s; end; $function$ From 2b5f4f1dbeb3d9d48ce1c59d0702e72bc69f178f Mon Sep 17 00:00:00 2001 From: Derry Hamilton Date: Thu, 11 Jul 2019 09:13:15 +0100 Subject: [PATCH 16/80] Add more test data, and fix log ordering --- docker/multistage/TEST_config_live.xml | 21 ++++++++++++++++++-- docker/test_db/setup.sql | 25 ++++++++++++++++++++++++ src/main/java/com/rasilon/ujetl/Job.java | 4 ++-- src/test/resources/TEST_config_live.xml | 16 +++++++++++++++ 4 files changed, 62 insertions(+), 4 deletions(-) diff --git a/docker/multistage/TEST_config_live.xml b/docker/multistage/TEST_config_live.xml index 97f0500..2c70ff7 100644 --- a/docker/multistage/TEST_config_live.xml +++ b/docker/multistage/TEST_config_live.xml @@ -3,14 +3,15 @@ 360000 10000 1000 + 500 - jdbc:postgresql://testdb:5432/test + jdbc:postgresql://localhost:5432/test test test 600000 - jdbc:postgresql://testdb:5432/test + jdbc:postgresql://localhost:5432/test test test @@ -48,5 +49,21 @@ OR dest.test_ts = EXCLUDED.test_ts + + denormalise + select -1 as key + + + INSERT INTO denormalised_personalia(person_id,fname,lname) + values(?::integer,?::text,?::text) + ON CONFLICT (person_id) DO UPDATE + SET + fname = EXCLUDED.fname, + lname = EXCLUDED.lname + WHERE + denormalised_personalia.fname is distinct from EXCLUDED.fname + OR denormalised_personalia.lname is distinct from EXCLUDED.lname + + diff --git a/docker/test_db/setup.sql b/docker/test_db/setup.sql index d476bde..782f213 100644 --- a/docker/test_db/setup.sql +++ b/docker/test_db/setup.sql @@ -20,6 +20,31 @@ GRANT SELECT,INSERT,UPDATE,DELETE ON dest TO test; INSERT INTO source(test_int,test_text,test_ts) SELECT 1,'banana',now() FROM generate_series(1,100000); +CREATE TABLE normalised_first_names( + fid smallserial not null primary key, + fname text not null unique +); +CREATE TABLE normalised_last_names( + lid smallserial not null primary key, + lname text not null unique +); +INSERT INTO normalised_first_names (fname) values ('Abigail'), ('Adam'), ('Beatrice'), ('Bruce'), ('Claire'), ('Clive'), ('Deborah'), ('Dave'); +INSERT INTO normalised_last_names (lname) values ('Adams'), ('Bellamy'), ('Clark'), ('Dabrowski'); + +CREATE TABLE normalised_personalia ( + person_id serial not null primary key, + fid smallint not null references normalised_first_names(fid), + lid smallint not null references normalised_last_names(lid) +); +insert into normalised_personalia(fid,lid) values (1,1), (1,2), (1,3), (1,4), (2,1), (2,2), (2,3), (2,4), (3,1), (3,2), (3,3), (3,4), (4,1), (4,2), (4,3), (4,4); + +CREATE TABLE denormalised_personalia( + person_id integer not null primary key, + fname text, + lname text +); + + \c postgres CREATE TABLE public.container_ready AS SELECT 1 FROM(VALUES(1)) AS a(a); GRANT SELECT ON public.container_ready TO TEST; diff --git a/src/main/java/com/rasilon/ujetl/Job.java b/src/main/java/com/rasilon/ujetl/Job.java index a083f56..5afb96f 100644 --- a/src/main/java/com/rasilon/ujetl/Job.java +++ b/src/main/java/com/rasilon/ujetl/Job.java @@ -154,10 +154,10 @@ public class Job extends Thread { if(rowNum % nRowsToLog == 0) { rowsInserted += arraySum(insertStatement.executeBatch()); dConn.commit(); - log.info(String.format("%s - Inserted %s of %s notified rows into %s so far", + log.info(String.format("%s - Inserted %s of %s notified rows into %s", jobName, - rowNum, rowsInserted, + rowNum, name)); } } diff --git a/src/test/resources/TEST_config_live.xml b/src/test/resources/TEST_config_live.xml index 09795f2..2c70ff7 100644 --- a/src/test/resources/TEST_config_live.xml +++ b/src/test/resources/TEST_config_live.xml @@ -49,5 +49,21 @@ OR dest.test_ts = EXCLUDED.test_ts + + denormalise + select -1 as key + + + INSERT INTO denormalised_personalia(person_id,fname,lname) + values(?::integer,?::text,?::text) + ON CONFLICT (person_id) DO UPDATE + SET + fname = EXCLUDED.fname, + lname = EXCLUDED.lname + WHERE + denormalised_personalia.fname is distinct from EXCLUDED.fname + OR denormalised_personalia.lname is distinct from EXCLUDED.lname + + From cde891ee167e916317dd8a1c57c0d063a81cd174 Mon Sep 17 00:00:00 2001 From: Derry Hamilton Date: Thu, 11 Jul 2019 13:22:06 +0100 Subject: [PATCH 17/80] Version bump with logging fix --- pom.xml | 2 +- uJETL.spec | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 6083eaf..ffc64ce 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma com.rasilon.ujetl CopyingApp jar - 2.0.2 + 2.0.3 uJETL https://github.com/rasilon/ujetl diff --git a/uJETL.spec b/uJETL.spec index 162aa8e..4eb4838 100644 --- a/uJETL.spec +++ b/uJETL.spec @@ -1,6 +1,6 @@ Summary: Java app to facilitate moving data between databases. Name: uJETL -Version: 2.0.2 +Version: 2.0.3 Release: 1 Group: Applications/Database License: All rights reserved. From d34a0902d14031ca6cbac3b3185988f394c7ea68 Mon Sep 17 00:00:00 2001 From: Derry Hamilton Date: Fri, 12 Jul 2019 10:33:18 +0100 Subject: [PATCH 18/80] Fix logging message order --- pom.xml | 2 +- src/main/java/com/rasilon/ujetl/Job.java | 2 +- uJETL.spec | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index ffc64ce..5c2df24 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma com.rasilon.ujetl CopyingApp jar - 2.0.3 + 2.0.4 uJETL https://github.com/rasilon/ujetl diff --git a/src/main/java/com/rasilon/ujetl/Job.java b/src/main/java/com/rasilon/ujetl/Job.java index 5afb96f..7a0b273 100644 --- a/src/main/java/com/rasilon/ujetl/Job.java +++ b/src/main/java/com/rasilon/ujetl/Job.java @@ -133,7 +133,7 @@ public class Job extends Thread { if(row == null && producerLive.get() == false) { rowsInserted += arraySum(insertStatement.executeBatch()); dConn.commit(); - log.info(String.format("%s - Inserted a total of %s of %s notified rows into %s",jobName,rowNum,rowsInserted,name)); + log.info(String.format("%s - Inserted a total of %s of %s notified rows into %s",jobName,rowsInserted,rowNum,name)); return; } if(threadsExit.get()) { diff --git a/uJETL.spec b/uJETL.spec index 4eb4838..280a886 100644 --- a/uJETL.spec +++ b/uJETL.spec @@ -1,6 +1,6 @@ Summary: Java app to facilitate moving data between databases. Name: uJETL -Version: 2.0.3 +Version: 2.0.4 Release: 1 Group: Applications/Database License: All rights reserved. From e71832f57ae862ee757ddc587855946ad642a1c7 Mon Sep 17 00:00:00 2001 From: Derry Hamilton Date: Fri, 12 Jul 2019 13:03:08 +0100 Subject: [PATCH 19/80] Refactor variable name --- src/main/java/com/rasilon/ujetl/Job.java | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/rasilon/ujetl/Job.java b/src/main/java/com/rasilon/ujetl/Job.java index 7a0b273..0f226a7 100644 --- a/src/main/java/com/rasilon/ujetl/Job.java +++ b/src/main/java/com/rasilon/ujetl/Job.java @@ -77,7 +77,7 @@ public class Job extends Thread { public void run() { try { long rowsInserted = 0; - long rowNum = 0; + long rowsAttempted = 0; long stamp = System.nanoTime(); long nstamp; int columnCount = src.getMetaData().getColumnCount(); @@ -95,13 +95,13 @@ public class Job extends Thread { } log.trace("Producer queue full."); } - rowNum++; - if(rowNum % nRowsToLog == 0) { - log.info(String.format("%s - Queued %s rows for %s so far",jobName,rowNum,name)); + rowsAttempted++; + if(rowsAttempted % nRowsToLog == 0) { + log.info(String.format("%s - Queued %s rows for %s so far",jobName,rowsAttempted,name)); } } producerLive.set(false); - log.info(String.format("%s - Queued a total of %s rows for %s",jobName,rowNum,name)); + log.info(String.format("%s - Queued a total of %s rows for %s",jobName,rowsAttempted,name)); } catch(Exception e) { producerLive.set(false); // Signal we've exited. threadsExit.set(true); // Signal we've exited. @@ -122,7 +122,7 @@ public class Job extends Thread { } public void run() { try { - long rowNum = 0; + long rowsAttempted = 0; long rowsInserted = 0; while(true) { @@ -133,7 +133,7 @@ public class Job extends Thread { if(row == null && producerLive.get() == false) { rowsInserted += arraySum(insertStatement.executeBatch()); dConn.commit(); - log.info(String.format("%s - Inserted a total of %s of %s notified rows into %s",jobName,rowsInserted,rowNum,name)); + log.info(String.format("%s - Inserted a total of %s of %s notified rows into %s",jobName,rowsInserted,rowsAttempted,name)); return; } if(threadsExit.get()) { @@ -150,14 +150,14 @@ public class Job extends Thread { } insertStatement.addBatch(); - rowNum++; - if(rowNum % nRowsToLog == 0) { + rowsAttempted++; + if(rowsAttempted % nRowsToLog == 0) { rowsInserted += arraySum(insertStatement.executeBatch()); dConn.commit(); log.info(String.format("%s - Inserted %s of %s notified rows into %s", jobName, rowsInserted, - rowNum, + rowsAttempted, name)); } } From f669b1af9dc6fdc5475e214107a6ab612f0e5665 Mon Sep 17 00:00:00 2001 From: Derry Hamilton Date: Fri, 12 Jul 2019 13:03:23 +0100 Subject: [PATCH 20/80] Fix up the docker tests --- docker/multistage/TEST_config_live.xml | 39 +++++++++++++++++++++++-- docker/multistage/wait_for_postgres | 9 ++++++ docker/test_db/setup.sql | 3 ++ src/test/resources/TEST_config_live.xml | 33 +++++++++++++++++++++ 4 files changed, 81 insertions(+), 3 deletions(-) diff --git a/docker/multistage/TEST_config_live.xml b/docker/multistage/TEST_config_live.xml index 2c70ff7..0d6fbc5 100644 --- a/docker/multistage/TEST_config_live.xml +++ b/docker/multistage/TEST_config_live.xml @@ -5,13 +5,13 @@ 1000 500 - jdbc:postgresql://localhost:5432/test + jdbc:postgresql://testdb:5432/test test test 600000 - jdbc:postgresql://localhost:5432/test + jdbc:postgresql://testdb:5432/test test test @@ -49,10 +49,43 @@ OR dest.test_ts = EXCLUDED.test_ts + + test upsert + select -1 as key + + + insert into public.dest( + id, + test_int, + test_text, + test_ts + )values( + ?::bigint, + ?::integer, + ?::text, + ?::timestamp with time zone + )ON CONFLICT(id) DO UPDATE + set + test_int = EXCLUDED.test_int, + test_text = EXCLUDED.test_text, + test_ts = EXCLUDED.test_ts + WHERE + dest.test_int IS DISTINCT FROM EXCLUDED.test_int + OR dest.test_text IS DISTINCT FROM EXCLUDED.test_text + OR dest.test_ts IS DISTINCT FROM EXCLUDED.test_ts + + denormalise select -1 as key - + INSERT INTO denormalised_personalia(person_id,fname,lname) values(?::integer,?::text,?::text) diff --git a/docker/multistage/wait_for_postgres b/docker/multistage/wait_for_postgres index d3dee95..9676149 100755 --- a/docker/multistage/wait_for_postgres +++ b/docker/multistage/wait_for_postgres @@ -8,5 +8,14 @@ until PGPASSWORD=test psql -h "testdb" -U "test" -c 'SELECT 1 FROM public.contai sleep 1 done +>&2 echo "Postgres is up - Waiting for the reboot" +sleep 3 # Wait for the Postgres reboot at the end of setup + +until PGPASSWORD=test psql -h "testdb" -U "test" -c 'SELECT 1 FROM public.container_ready' postgres; do + >&2 echo "Postgres is unavailable - sleeping" + sleep 1 +done + + >&2 echo "Postgres is up - executing command" exec $cmd diff --git a/docker/test_db/setup.sql b/docker/test_db/setup.sql index 782f213..ff4a1b7 100644 --- a/docker/test_db/setup.sql +++ b/docker/test_db/setup.sql @@ -44,6 +44,9 @@ CREATE TABLE denormalised_personalia( lname text ); +GRANT SELECT ON ALL TABLES IN SCHEMA public TO test; +GRANT SELECT,INSERT,UPDATE ON denormalised_personalia TO test; + \c postgres CREATE TABLE public.container_ready AS SELECT 1 FROM(VALUES(1)) AS a(a); diff --git a/src/test/resources/TEST_config_live.xml b/src/test/resources/TEST_config_live.xml index 2c70ff7..b4630a3 100644 --- a/src/test/resources/TEST_config_live.xml +++ b/src/test/resources/TEST_config_live.xml @@ -49,6 +49,39 @@ OR dest.test_ts = EXCLUDED.test_ts + + test upsert + select -1 as key + + + insert into public.dest( + id, + test_int, + test_text, + test_ts + )values( + ?::bigint, + ?::integer, + ?::text, + ?::timestamp with time zone + )ON CONFLICT(id) DO UPDATE + set + test_int = EXCLUDED.test_int, + test_text = EXCLUDED.test_text, + test_ts = EXCLUDED.test_ts + WHERE + dest.test_int IS DISTINCT FROM EXCLUDED.test_int + OR dest.test_text IS DISTINCT FROM EXCLUDED.test_text + OR dest.test_ts IS DISTINCT FROM EXCLUDED.test_ts + + denormalise select -1 as key From 633fbe7391502ef44487f110c8163deb662193ff Mon Sep 17 00:00:00 2001 From: Derry Hamilton Date: Fri, 12 Jul 2019 13:04:55 +0100 Subject: [PATCH 21/80] Version bump --- pom.xml | 2 +- uJETL.spec | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 5c2df24..996b9da 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma com.rasilon.ujetl CopyingApp jar - 2.0.4 + 2.0.5 uJETL https://github.com/rasilon/ujetl diff --git a/uJETL.spec b/uJETL.spec index 280a886..0d57460 100644 --- a/uJETL.spec +++ b/uJETL.spec @@ -1,6 +1,6 @@ Summary: Java app to facilitate moving data between databases. Name: uJETL -Version: 2.0.4 +Version: 2.0.5 Release: 1 Group: Applications/Database License: All rights reserved. From ddc67f3a4148b2fb2f0c64df3277e24c7c0cf04b Mon Sep 17 00:00:00 2001 From: Derry Hamilton Date: Fri, 12 Jul 2019 13:51:30 +0100 Subject: [PATCH 22/80] Bugfix type quoting and add select generator --- config_util/ujetl_insert_generator.sql | 33 +++++++------ config_util/ujetl_select_generator.sql | 65 ++++++++++++++++++++++++++ 2 files changed, 81 insertions(+), 17 deletions(-) create mode 100644 config_util/ujetl_select_generator.sql diff --git a/config_util/ujetl_insert_generator.sql b/config_util/ujetl_insert_generator.sql index 20cb2eb..a7375f6 100644 --- a/config_util/ujetl_insert_generator.sql +++ b/config_util/ujetl_insert_generator.sql @@ -14,22 +14,22 @@ declare pks text; begin SELECT - array_to_string(array_agg(quote_ident(pg_attribute.attname::text) ),', ') into pks - FROM - pg_index, - pg_class, - pg_attribute, - pg_namespace - WHERE - pg_class.relname = tabname AND - indrelid = pg_class.oid AND - nspname = sch AND - pg_class.relnamespace = pg_namespace.oid AND - pg_attribute.attrelid = pg_class.oid AND - pg_attribute.attnum = any(pg_index.indkey) - AND indisprimary ; + array_to_string(array_agg(quote_ident(pg_attribute.attname::text) ),', ') into pks + FROM + pg_index, + pg_class, + pg_attribute, + pg_namespace + WHERE + pg_class.relname = tabname + AND indrelid = pg_class.oid + AND nspname = sch + AND pg_class.relnamespace = pg_namespace.oid + AND pg_attribute.attrelid = pg_class.oid + AND pg_attribute.attnum = any(pg_index.indkey) + AND indisprimary ; - header := 'INSERT INTO '||quote_ident(sch)||'.'||quote_ident(tabname)||E' as t (\n '; + header := E'INSERT INTO '||quote_ident(sch)||'.'||quote_ident(tabname)||E' as t (\n '; for colinfo in select * @@ -40,7 +40,6 @@ begin and table_name = tabname order by ordinal_position loop - raise info 'Working on %.% (%)',sch,tabname,colinfo::text; if not is_first then col_list := col_list || E',\n '; vals := vals || E',\n '; @@ -48,7 +47,7 @@ begin changes := changes || E'\n OR '; end if; col_list := col_list || quote_ident(colinfo.column_name); - vals := vals || '?::' || quote_ident(colinfo.data_type); + vals := vals || '?::' || colinfo.data_type; sets := sets || quote_ident(colinfo.column_name) || E' = EXCLUDED.' || quote_ident(colinfo.column_name); changes := changes || E't.' || quote_ident(colinfo.column_name) || diff --git a/config_util/ujetl_select_generator.sql b/config_util/ujetl_select_generator.sql new file mode 100644 index 0000000..5c475af --- /dev/null +++ b/config_util/ujetl_select_generator.sql @@ -0,0 +1,65 @@ +CREATE OR REPLACE FUNCTION pg_temp.ujetl_select(sch text, tabname text) + RETURNS text + LANGUAGE plpgsql +AS $function$ +declare + s text := ''; + header text := ''; + col_list text := ''; + vals text := ''; + sets text := ''; + changes text := ''; + is_first boolean := true; + colinfo record; + pks text; +begin + SELECT + array_to_string(array_agg(quote_ident(pg_attribute.attname::text) ),', ') into pks + FROM + pg_index, + pg_class, + pg_attribute, + pg_namespace + WHERE + pg_class.relname = tabname + AND indrelid = pg_class.oid + AND nspname = sch + AND pg_class.relnamespace = pg_namespace.oid + AND pg_attribute.attrelid = pg_class.oid + AND pg_attribute.attnum = any(pg_index.indkey) + AND indisprimary ; + + header := E'SELECT\n '; + for colinfo in + select + * + from + information_schema.columns + where + table_schema = sch + and table_name = tabname + order by ordinal_position + loop + if not is_first then + col_list := col_list || E',\n '; + end if; + col_list := col_list || quote_ident(colinfo.column_name); + + is_first = false; + end loop; + + s := header || + coalesce(col_list,'col_list failed') || + E'\nFROM\n ' || + quote_ident(sch)||'.'||quote_ident(tabname)||E' as t \n '|| + E'WHERE\n insert criteria here >= ?::datatype'; + return s; +end; +$function$ +; + + + + + + From 8fdbc6a78ec304865234d0762c1ca6326e84e482 Mon Sep 17 00:00:00 2001 From: Derry Hamilton Date: Mon, 5 Aug 2019 11:10:12 +0100 Subject: [PATCH 23/80] Add pre and post SQL --- .../java/com/rasilon/ujetl/CopyingApp.java | 9 ++- src/main/java/com/rasilon/ujetl/Job.java | 25 +++++++- src/test/java/com/rasilon/ujetl/TestJob.java | 2 + .../java/com/rasilon/ujetl/TestPrePost.java | 62 +++++++++++++++++++ 4 files changed, 95 insertions(+), 3 deletions(-) create mode 100644 src/test/java/com/rasilon/ujetl/TestPrePost.java diff --git a/src/main/java/com/rasilon/ujetl/CopyingApp.java b/src/main/java/com/rasilon/ujetl/CopyingApp.java index 803e741..d2465fb 100644 --- a/src/main/java/com/rasilon/ujetl/CopyingApp.java +++ b/src/main/java/com/rasilon/ujetl/CopyingApp.java @@ -131,7 +131,10 @@ public class CopyingApp { String tabKey = config.getString("jobs.job("+i+").key"); String tabSelect = config.getString("jobs.job("+i+").select"); String tabInsert = config.getString("jobs.job("+i+").insert"); - Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,nRowsToLog,blockSize,pollTimeout); + String preTarget = config.getString("jobs.job("+i+").preTarget"); + String postTarget = config.getString("jobs.job("+i+").postTarget"); + + Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,preTarget,postTarget,nRowsToLog,blockSize,pollTimeout); j.start(); j.join(); @@ -141,7 +144,9 @@ public class CopyingApp { String tabKey = config.getString("jobs.job.key"); String tabSelect = config.getString("jobs.job.select"); String tabInsert = config.getString("jobs.job.insert"); - Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,nRowsToLog,blockSize,pollTimeout); + String preTarget = config.getString("jobs.job.preTarget"); + String postTarget = config.getString("jobs.job.postTarget"); + Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,preTarget,postTarget,nRowsToLog,blockSize,pollTimeout); j.start(); j.join(); } else { diff --git a/src/main/java/com/rasilon/ujetl/Job.java b/src/main/java/com/rasilon/ujetl/Job.java index 0f226a7..f5badf2 100644 --- a/src/main/java/com/rasilon/ujetl/Job.java +++ b/src/main/java/com/rasilon/ujetl/Job.java @@ -29,6 +29,8 @@ public class Job extends Thread { String key; String select; String insert; + String preTarget; + String postTarget; Integer nRowsToLog; Integer blockSize; Integer pollTimeout; @@ -38,7 +40,7 @@ public class Job extends Thread { AtomicBoolean threadsExit = new AtomicBoolean(false);; - public Job(Connection sConn,Connection dConn,String name,String jobName,String key,String select,String insert,Integer nRowsToLog,Integer blockSize,Integer pollTimeout) { + public Job(Connection sConn,Connection dConn,String name,String jobName,String key,String select,String insert,String preTarget,String postTarget,Integer nRowsToLog,Integer blockSize,Integer pollTimeout) { this.sConn = sConn; this.dConn = dConn; this.name = name; @@ -46,6 +48,8 @@ public class Job extends Thread { this.key = key; this.select = select; this.insert = insert; + this.preTarget = preTarget; + this.postTarget = postTarget; this.nRowsToLog = nRowsToLog; this.blockSize = blockSize; this.pollTimeout = pollTimeout; @@ -169,11 +173,20 @@ public class Job extends Thread { } } + // Outer run public void run() { try { ResultSet rs; log.info(String.format("%s - Processing table: %s",jobName,name)); + if(preTarget != null){ + log.debug("Trying to execute preTarget SQL"); + PreparedStatement s = dConn.prepareStatement(preTarget); + s.executeUpdate(); + s.close(); + }else{ + log.debug("No preTarget; skipping."); + } log.debug("Trying to execute: "+key); PreparedStatement keyStatement = dConn.prepareStatement(key); @@ -211,6 +224,16 @@ public class Job extends Thread { p.join(); c.join(); + if(postTarget != null){ + log.debug("Trying to execute postTarget SQL"); + PreparedStatement s = dConn.prepareStatement(postTarget); + s.executeUpdate(); + s.close(); + }else{ + log.debug("No postTarget; skipping."); + } + + } catch(InterruptedException e) { throw new RuntimeException(e); } catch(SQLException e) { diff --git a/src/test/java/com/rasilon/ujetl/TestJob.java b/src/test/java/com/rasilon/ujetl/TestJob.java index 7b9141e..98bf01c 100644 --- a/src/test/java/com/rasilon/ujetl/TestJob.java +++ b/src/test/java/com/rasilon/ujetl/TestJob.java @@ -45,6 +45,8 @@ public class TestJob { "SELECT -1 AS key", "SELECT id,dat FROM src WHERE id > ?", "INSERT INTO dest VALUES(?,?)", + null, + null, 100, 100, 100 diff --git a/src/test/java/com/rasilon/ujetl/TestPrePost.java b/src/test/java/com/rasilon/ujetl/TestPrePost.java new file mode 100644 index 0000000..7392a28 --- /dev/null +++ b/src/test/java/com/rasilon/ujetl/TestPrePost.java @@ -0,0 +1,62 @@ +package com.rasilon.ujetl; + +import java.sql.*; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.MethodOrderer.Alphanumeric; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + + +public class TestPrePost { + + private static String jdbcURL = "jdbc:h2:mem:dbtest"; + @Test + public void test002verifyH2Works() { + try { + Connection conn = DriverManager.getConnection(jdbcURL, "sa", ""); + conn.close(); + } catch(Exception e) { + fail(e.toString()); + } + } + + @Test + public void testPrePost() { + try ( + Connection src = DriverManager.getConnection(jdbcURL, "sa", ""); + Connection dest = DriverManager.getConnection(jdbcURL, "sa", ""); + + ) { + src.createStatement().executeUpdate("CREATE TABLE src(id bigint not null primary key, dat varchar);"); + dest.createStatement().executeUpdate("CREATE TABLE dest(id bigint not null primary key, dat varchar);"); + PreparedStatement inserter = src.prepareStatement("INSERT INTO src(id,dat) VALUES(?,'banana')"); + for(int i=0; i<10000; i++) { + inserter.setInt(1,i); + inserter.executeUpdate(); + } + + Job j = new Job( + src, + dest, + "jUnit Test Config", + "jUnit Test Job", + "SELECT -1 AS key", + "SELECT id,dat FROM src WHERE id > ?", + "INSERT INTO tmp_dest VALUES(?,?)", + "CREATE TEMP TABLE tmp_dest(id bigint not null primary key, dat varchar);", + "INSERT INTO dest SELECT * from tmp_dest;", + 100, + 100, + 100 + ); + j.start(); + j.join(); + // do stuff + } catch(Exception e) { + e.printStackTrace(); + fail(e.toString()); + } + } +} From 4f905dd47adc6514471e59da40412fc0bd3b0c34 Mon Sep 17 00:00:00 2001 From: Derry Hamilton Date: Mon, 5 Aug 2019 11:32:24 +0100 Subject: [PATCH 24/80] Add test config --- docker/multistage/TEST_config_live.xml | 54 ++++++++++++++++++++++++ src/main/java/com/rasilon/ujetl/Job.java | 8 ++-- 2 files changed, 58 insertions(+), 4 deletions(-) diff --git a/docker/multistage/TEST_config_live.xml b/docker/multistage/TEST_config_live.xml index 0d6fbc5..290d35d 100644 --- a/docker/multistage/TEST_config_live.xml +++ b/docker/multistage/TEST_config_live.xml @@ -98,5 +98,59 @@ OR denormalised_personalia.lname is distinct from EXCLUDED.lname + + test pre post + select -1 as key + + + drop table if exists tmp_dest; + create temp table tmp_dest( + id bigint, + test_int integer, + test_text text, + test_ts timestamp with time zone + ); + + + insert into tmp_dest( + id, + test_int, + test_text, + test_ts + )values( + ?::bigint, + ?::integer, + ?::text, + ?::timestamp with time zone + ) + + + insert into public.dest( + id, + test_int, + test_text, + test_ts + ) + select id,test_int,test_text,test_ts + from tmp_dest + ON CONFLICT(id) DO UPDATE + set + test_int = EXCLUDED.test_int, + test_text = EXCLUDED.test_text, + test_ts = EXCLUDED.test_ts + WHERE + dest.test_int IS DISTINCT FROM EXCLUDED.test_int + OR dest.test_text IS DISTINCT FROM EXCLUDED.test_text + OR dest.test_ts IS DISTINCT FROM EXCLUDED.test_ts + + diff --git a/src/main/java/com/rasilon/ujetl/Job.java b/src/main/java/com/rasilon/ujetl/Job.java index f5badf2..cc66650 100644 --- a/src/main/java/com/rasilon/ujetl/Job.java +++ b/src/main/java/com/rasilon/ujetl/Job.java @@ -180,12 +180,12 @@ public class Job extends Thread { log.info(String.format("%s - Processing table: %s",jobName,name)); if(preTarget != null){ - log.debug("Trying to execute preTarget SQL"); + log.info("Trying to execute preTarget SQL"); PreparedStatement s = dConn.prepareStatement(preTarget); s.executeUpdate(); s.close(); }else{ - log.debug("No preTarget; skipping."); + log.info("No preTarget; skipping."); } log.debug("Trying to execute: "+key); @@ -225,12 +225,12 @@ public class Job extends Thread { c.join(); if(postTarget != null){ - log.debug("Trying to execute postTarget SQL"); + log.info("Trying to execute postTarget SQL"); PreparedStatement s = dConn.prepareStatement(postTarget); s.executeUpdate(); s.close(); }else{ - log.debug("No postTarget; skipping."); + log.info("No postTarget; skipping."); } From 4f0db0a2df46254af5316eedd4fee0ecfb884117 Mon Sep 17 00:00:00 2001 From: Derry Hamilton Date: Mon, 5 Aug 2019 11:35:22 +0100 Subject: [PATCH 25/80] Version bump --- pom.xml | 2 +- uJETL.spec | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 996b9da..3b5bafd 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma com.rasilon.ujetl CopyingApp jar - 2.0.5 + 2.1.5 uJETL https://github.com/rasilon/ujetl diff --git a/uJETL.spec b/uJETL.spec index 0d57460..7f88252 100644 --- a/uJETL.spec +++ b/uJETL.spec @@ -1,6 +1,6 @@ Summary: Java app to facilitate moving data between databases. Name: uJETL -Version: 2.0.5 +Version: 2.1.5 Release: 1 Group: Applications/Database License: All rights reserved. From 06c64d499fd833e79cf8406c14df49a2c326d88f Mon Sep 17 00:00:00 2001 From: Derry Hamilton Date: Mon, 5 Aug 2019 16:00:43 +0100 Subject: [PATCH 26/80] Explicitly commit pre and post SQL so that a post command in the last job isn't lost. --- pom.xml | 2 +- src/main/java/com/rasilon/ujetl/Job.java | 2 ++ uJETL.spec | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 3b5bafd..f080f72 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma com.rasilon.ujetl CopyingApp jar - 2.1.5 + 2.1.6 uJETL https://github.com/rasilon/ujetl diff --git a/src/main/java/com/rasilon/ujetl/Job.java b/src/main/java/com/rasilon/ujetl/Job.java index cc66650..ca855af 100644 --- a/src/main/java/com/rasilon/ujetl/Job.java +++ b/src/main/java/com/rasilon/ujetl/Job.java @@ -184,6 +184,7 @@ public class Job extends Thread { PreparedStatement s = dConn.prepareStatement(preTarget); s.executeUpdate(); s.close(); + dConn.commit(); }else{ log.info("No preTarget; skipping."); } @@ -229,6 +230,7 @@ public class Job extends Thread { PreparedStatement s = dConn.prepareStatement(postTarget); s.executeUpdate(); s.close(); + dConn.commit(); }else{ log.info("No postTarget; skipping."); } diff --git a/uJETL.spec b/uJETL.spec index 7f88252..d1f3baf 100644 --- a/uJETL.spec +++ b/uJETL.spec @@ -1,6 +1,6 @@ Summary: Java app to facilitate moving data between databases. Name: uJETL -Version: 2.1.5 +Version: 2.1.6 Release: 1 Group: Applications/Database License: All rights reserved. From 01310bfca472c297e04f6a16471d0c70da5d50c0 Mon Sep 17 00:00:00 2001 From: Derry Hamilton Date: Mon, 5 Aug 2019 16:38:24 +0100 Subject: [PATCH 27/80] Fix producer thread name, for logging and monitoring. --- pom.xml | 2 +- src/main/java/com/rasilon/ujetl/Job.java | 2 +- uJETL.spec | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index f080f72..838e43b 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma com.rasilon.ujetl CopyingApp jar - 2.1.6 + 2.1.7 uJETL https://github.com/rasilon/ujetl diff --git a/src/main/java/com/rasilon/ujetl/Job.java b/src/main/java/com/rasilon/ujetl/Job.java index ca855af..c0eb8e5 100644 --- a/src/main/java/com/rasilon/ujetl/Job.java +++ b/src/main/java/com/rasilon/ujetl/Job.java @@ -76,7 +76,7 @@ public class Job extends Thread { public Producer(ResultSet src,BlockingQueue q) { this.src = src; this.q = q; - this.setName(String.format("%s-%s-Consumer",jobName,name)); + this.setName(String.format("%s-%s-Producer",jobName,name)); } public void run() { try { diff --git a/uJETL.spec b/uJETL.spec index d1f3baf..4a5b7bf 100644 --- a/uJETL.spec +++ b/uJETL.spec @@ -1,6 +1,6 @@ Summary: Java app to facilitate moving data between databases. Name: uJETL -Version: 2.1.6 +Version: 2.1.7 Release: 1 Group: Applications/Database License: All rights reserved. From 734dc8608fa1c52ea6691195c5f1e0c8b60b889f Mon Sep 17 00:00:00 2001 From: Derry Hamilton Date: Tue, 27 Aug 2019 12:53:37 +0100 Subject: [PATCH 28/80] Add session identification --- docker/multistage/TEST_config_live.xml | 6 ++ .../java/com/rasilon/ujetl/CopyingApp.java | 40 +++++++++++- src/main/java/com/rasilon/ujetl/Job.java | 65 ++++++++++++++----- src/test/java/com/rasilon/ujetl/TestJob.java | 4 +- .../java/com/rasilon/ujetl/TestPrePost.java | 4 +- src/test/resources/TEST_config_live.xml | 2 + 6 files changed, 100 insertions(+), 21 deletions(-) diff --git a/docker/multistage/TEST_config_live.xml b/docker/multistage/TEST_config_live.xml index 290d35d..fa38eec 100644 --- a/docker/multistage/TEST_config_live.xml +++ b/docker/multistage/TEST_config_live.xml @@ -18,6 +18,8 @@ test + select 'PID:'||pg_backend_pid() + select 'PID:'||pg_backend_pid() select coalesce(-1,max(id),-1) as key from dest select @@ -84,6 +88,8 @@ denormalise + select 'PID:'||pg_backend_pid() + select 'PID:'||pg_backend_pid() select -1 as key diff --git a/src/main/java/com/rasilon/ujetl/CopyingApp.java b/src/main/java/com/rasilon/ujetl/CopyingApp.java index d2465fb..e1e47a5 100644 --- a/src/main/java/com/rasilon/ujetl/CopyingApp.java +++ b/src/main/java/com/rasilon/ujetl/CopyingApp.java @@ -133,8 +133,25 @@ public class CopyingApp { String tabInsert = config.getString("jobs.job("+i+").insert"); String preTarget = config.getString("jobs.job("+i+").preTarget"); String postTarget = config.getString("jobs.job("+i+").postTarget"); + String identifySourceSQL = config.getString("jobs.job.identifySourceSQL"); + String identifyDestinationSQL = config.getString("jobs.job.identifyDestinationSQL"); - Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,preTarget,postTarget,nRowsToLog,blockSize,pollTimeout); + Job j = new Job( + sConn, + dConn, + tabName, + jobName, + tabKey, + tabSelect, + tabInsert, + preTarget, + postTarget, + nRowsToLog, + blockSize, + pollTimeout, + identifySourceSQL, + identifyDestinationSQL + ); j.start(); j.join(); @@ -146,7 +163,26 @@ public class CopyingApp { String tabInsert = config.getString("jobs.job.insert"); String preTarget = config.getString("jobs.job.preTarget"); String postTarget = config.getString("jobs.job.postTarget"); - Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,preTarget,postTarget,nRowsToLog,blockSize,pollTimeout); + String identifySourceSQL = config.getString("jobs.job.identifySourceSQL"); + String identifyDestinationSQL = config.getString("jobs.job.identifyDestinationSQL"); + + + Job j = new Job( + sConn, + dConn, + tabName, + jobName, + tabKey, + tabSelect, + tabInsert, + preTarget, + postTarget, + nRowsToLog, + blockSize, + pollTimeout, + identifySourceSQL, + identifyDestinationSQL + ); j.start(); j.join(); } else { diff --git a/src/main/java/com/rasilon/ujetl/Job.java b/src/main/java/com/rasilon/ujetl/Job.java index c0eb8e5..67f82a3 100644 --- a/src/main/java/com/rasilon/ujetl/Job.java +++ b/src/main/java/com/rasilon/ujetl/Job.java @@ -22,25 +22,29 @@ import org.apache.logging.log4j.Logger; public class Job extends Thread { static Logger log = org.apache.logging.log4j.LogManager.getLogger(Job.class); - Connection sConn; - Connection dConn; - String name; - String jobName; - String key; - String select; - String insert; - String preTarget; - String postTarget; - Integer nRowsToLog; - Integer blockSize; - Integer pollTimeout; + private Connection sConn; + private Connection dConn; + private String name; + private String jobName; + private String key; + private String select; + private String insert; + private String preTarget; + private String postTarget; + private Integer nRowsToLog; + private Integer blockSize; + private Integer pollTimeout; + private String identifySourceSQL; + private String identifyDestinationSQL; - BlockingQueue> resultBuffer; - AtomicBoolean producerLive; - AtomicBoolean threadsExit = new AtomicBoolean(false);; + private BlockingQueue> resultBuffer; + private AtomicBoolean producerLive; + private AtomicBoolean threadsExit = new AtomicBoolean(false);; + private String sourceID; + private String destID; - public Job(Connection sConn,Connection dConn,String name,String jobName,String key,String select,String insert,String preTarget,String postTarget,Integer nRowsToLog,Integer blockSize,Integer pollTimeout) { + public Job(Connection sConn,Connection dConn,String name,String jobName,String key,String select,String insert,String preTarget,String postTarget,Integer nRowsToLog,Integer blockSize,Integer pollTimeout,String identifySourceSQL, String identifyDestinationSQL) { this.sConn = sConn; this.dConn = dConn; this.name = name; @@ -53,6 +57,8 @@ public class Job extends Thread { this.nRowsToLog = nRowsToLog; this.blockSize = blockSize; this.pollTimeout = pollTimeout; + this.identifySourceSQL = identifySourceSQL; + this.identifyDestinationSQL = identifyDestinationSQL; resultBuffer = new ArrayBlockingQueue>( 3 * blockSize); producerLive = new AtomicBoolean(true); @@ -178,7 +184,20 @@ public class Job extends Thread { try { ResultSet rs; - log.info(String.format("%s - Processing table: %s",jobName,name)); + if(identifySourceSQL != null) sourceID = getSingleString(identifySourceSQL,sConn); + if(identifyDestinationSQL != null) destID = getSingleString(identifyDestinationSQL,dConn); + + if(sourceID != null || destID != null){ + log.info(String.format( + "%s - Processing table: %s with source: %s, dest: %s", + jobName, + name, + sourceID==null?"":sourceID, + destID==null?"":destID + )); + }else{ + log.info(String.format("%s - Processing table: %s",jobName,name)); + } if(preTarget != null){ log.info("Trying to execute preTarget SQL"); PreparedStatement s = dConn.prepareStatement(preTarget); @@ -242,4 +261,16 @@ public class Job extends Thread { throw new RuntimeException(e); } } + + private String getSingleString(String sql, Connection conn){ + try{ + PreparedStatement s = conn.prepareStatement(sql); + ResultSet r = s.executeQuery(); + r.next(); + return r.getString(1); + } catch(SQLException e) { + throw new RuntimeException(e); + } + } + } diff --git a/src/test/java/com/rasilon/ujetl/TestJob.java b/src/test/java/com/rasilon/ujetl/TestJob.java index 98bf01c..853aeb4 100644 --- a/src/test/java/com/rasilon/ujetl/TestJob.java +++ b/src/test/java/com/rasilon/ujetl/TestJob.java @@ -49,7 +49,9 @@ public class TestJob { null, 100, 100, - 100 + 100, + "select 'PID:'||session_id()", + "select 'PID:'||session_id()" ); j.start(); j.join(); diff --git a/src/test/java/com/rasilon/ujetl/TestPrePost.java b/src/test/java/com/rasilon/ujetl/TestPrePost.java index 7392a28..77ff2a6 100644 --- a/src/test/java/com/rasilon/ujetl/TestPrePost.java +++ b/src/test/java/com/rasilon/ujetl/TestPrePost.java @@ -49,7 +49,9 @@ public class TestPrePost { "INSERT INTO dest SELECT * from tmp_dest;", 100, 100, - 100 + 100, + "select 'PID:'||session_id()", + "select 'PID:'||session_id()" ); j.start(); j.join(); diff --git a/src/test/resources/TEST_config_live.xml b/src/test/resources/TEST_config_live.xml index b4630a3..338faff 100644 --- a/src/test/resources/TEST_config_live.xml +++ b/src/test/resources/TEST_config_live.xml @@ -18,6 +18,8 @@ test + select 'PID:'||pg_backend_pid() + select 'PID:'||pg_backend_pid() select coalesce(-1,max(id),-1) as key from dest