Compare commits

..

No commits in common. "main" and "v2.0.2" have entirely different histories.
main ... v2.0.2

29 changed files with 127 additions and 844 deletions

View file

@ -2,9 +2,3 @@
Originally written in the days of trying to do something, with no budget, I wrote this out of necessity. I subsequently got permission to open-source it so long as there were no references to the company in it. So this is the cleaned up version, with a few additional features added and the things that turned out to be pointless removed. Originally written in the days of trying to do something, with no budget, I wrote this out of necessity. I subsequently got permission to open-source it so long as there were no references to the company in it. So this is the cleaned up version, with a few additional features added and the things that turned out to be pointless removed.
It's probably the smallest functional ETL application with decent performance. Since I only use it on Postgres nowadays, it only officially supports Postgres at the moment. But in the near past it's worked pulling data from "several commercial databases" that don't like being named in benchmarks etc. and if you have the JDBC jars in your classpath then it should just work. It's probably the smallest functional ETL application with decent performance. Since I only use it on Postgres nowadays, it only officially supports Postgres at the moment. But in the near past it's worked pulling data from "several commercial databases" that don't like being named in benchmarks etc. and if you have the JDBC jars in your classpath then it should just work.
For an example config file, please see [TEST_config_live.xml](https://github.com/rasilon/ujetl/blob/master/src/test/resources/TEST_config_live.xml)
To run the dockerised integration tests, use `build_util/run_docker_tests` in this repo.
A runnable docker image is available at [rasilon/ujetl](https://cloud.docker.com/repository/docker/rasilon/ujetl). This expects config files copied into, or mounted into `/var/ujetl/`. RPMs can be built using `build_util/build_rpms_in_docker`. As the name suggests, you need docker for that.

12
build_util/build_rpm Executable file
View file

@ -0,0 +1,12 @@
#!/bin/bash
set -e
cd /root
cp -Rv build build2
cd build2
SPEC=$(ls *.spec)
VER=$(grep Version $SPEC | awk '{print $2}')
tar cvf $HOME/rpmbuild/SOURCES/uJETL-${VER}.tar.gz --show-transformed --transform="s/^\./uJETL-${VER}/" .
rpmbuild -ba $SPEC
cp /root/rpmbuild/RPMS/x86_64/* /root/build/

View file

@ -0,0 +1,6 @@
#!/bin/bash
set -e
docker build --rm -t local/c7-buildhost docker/build
docker run -it --rm -v `pwd`:/root/build local/c7-buildhost /root/build/build_util/build_rpm

View file

@ -1,3 +1,2 @@
#!/bin/bash #!/bin/bash
docker build --target deploy -t rasilon/ujetl docker/multistage docker build --target deploy -t rasilon/ujetl docker/multistage
docker tag rasilon/ujetl:latest rasilon/ujetl:$(xpath -q -e '/project/version/text()' pom.xml)

View file

@ -1,4 +0,0 @@
#!/bin/bash
docker push rasilon/ujetl:latest
docker push rasilon/ujetl:$(xpath -q -e '/project/version/text()' pom.xml)

View file

@ -1,6 +1,5 @@
#!/bin/bash #!/bin/bash
docker build --target tester -t rasilon/ujetl_tester docker/multistage
docker-compose -f docker/test_compose/docker-compose.yml run --rm tests docker-compose -f docker/test_compose/docker-compose.yml run --rm tests
docker-compose -f docker/test_compose/docker-compose.yml down docker-compose -f docker/test_compose/docker-compose.yml down

View file

@ -1,76 +0,0 @@
CREATE OR REPLACE FUNCTION pg_temp.ujetl_insert(sch text, tabname text)
RETURNS text
LANGUAGE plpgsql
AS $function$
declare
s text := '';
header text := '';
col_list text := '';
vals text := '';
sets text := '';
changes text := '';
is_first boolean := true;
colinfo record;
pks text;
begin
SELECT
array_to_string(array_agg(quote_ident(pg_attribute.attname::text) ),', ') into pks
FROM
pg_index,
pg_class,
pg_attribute,
pg_namespace
WHERE
pg_class.relname = tabname
AND indrelid = pg_class.oid
AND nspname = sch
AND pg_class.relnamespace = pg_namespace.oid
AND pg_attribute.attrelid = pg_class.oid
AND pg_attribute.attnum = any(pg_index.indkey)
AND indisprimary ;
header := E'INSERT INTO '||quote_ident(sch)||'.'||quote_ident(tabname)||E' as t (\n ';
for colinfo in
select
*
from
information_schema.columns
where
table_schema = sch
and table_name = tabname
order by ordinal_position
loop
if not is_first then
col_list := col_list || E',\n ';
vals := vals || E',\n ';
sets := sets || E',\n ';
changes := changes || E'\n OR ';
end if;
col_list := col_list || quote_ident(colinfo.column_name);
vals := vals || '?::' || colinfo.data_type;
sets := sets || quote_ident(colinfo.column_name) ||
E' = EXCLUDED.' || quote_ident(colinfo.column_name);
changes := changes || E't.' || quote_ident(colinfo.column_name) ||
E' IS DISTINCT FROM EXCLUDED.' || quote_ident(colinfo.column_name);
is_first = false;
end loop;
s := coalesce(header,'header failed') ||
coalesce(col_list,'col_list failed') ||
E'\n)VALUES(\n ' ||
coalesce(vals,'vals failed') ||
E')\nON CONFLICT(' || coalesce(pks,'No primary keys found') || E') DO UPDATE\nSET\n ' ||
coalesce(sets,'sets failed') ||
E'\nWHERE\n '||
coalesce(changes,'changes failed');
return s;
end;
$function$
;

View file

@ -1,65 +0,0 @@
CREATE OR REPLACE FUNCTION pg_temp.ujetl_select(sch text, tabname text)
RETURNS text
LANGUAGE plpgsql
AS $function$
declare
s text := '';
header text := '';
col_list text := '';
vals text := '';
sets text := '';
changes text := '';
is_first boolean := true;
colinfo record;
pks text;
begin
SELECT
array_to_string(array_agg(quote_ident(pg_attribute.attname::text) ),', ') into pks
FROM
pg_index,
pg_class,
pg_attribute,
pg_namespace
WHERE
pg_class.relname = tabname
AND indrelid = pg_class.oid
AND nspname = sch
AND pg_class.relnamespace = pg_namespace.oid
AND pg_attribute.attrelid = pg_class.oid
AND pg_attribute.attnum = any(pg_index.indkey)
AND indisprimary ;
header := E'SELECT\n ';
for colinfo in
select
*
from
information_schema.columns
where
table_schema = sch
and table_name = tabname
order by ordinal_position
loop
if not is_first then
col_list := col_list || E',\n ';
end if;
col_list := col_list || quote_ident(colinfo.column_name);
is_first = false;
end loop;
s := header ||
coalesce(col_list,'col_list failed') ||
E'\nFROM\n ' ||
quote_ident(sch)||'.'||quote_ident(tabname)||E' as t \n '||
E'WHERE\n insert criteria here >= ?::datatype';
return s;
end;
$function$
;

View file

@ -1,31 +0,0 @@
# General build RPM environment for CentOS 6.x
#
# VERSION 0.0.1
FROM centos:centos7
MAINTAINER Derry Hamilton <derryh@rasilon.net>
# Install up-to-date epel rpm repository
RUN yum -y install epel-release
# Install java first, to get a sensible one.
RUN yum -y install java-1.8.0-openjdk-devel
# Install various packages to get compile environment
RUN yum -y groupinstall 'Development Tools'
# Install git command to access GitHub repository
RUN yum -y install git
# Install rpm-build to use rpmrebuild command
RUN yum -y install rpm-build
# Install yum-utils to use yumdownloader command
RUN yum -y install yum-utils
# Install rpmdevtools to use rpmdev-setuptree command
RUN yum -y install rpmdevtools
# Install rpmdevtools to use rpmdev-setuptree command
RUN yum -y install maven
RUN mkdir -p /root/rpmbuild/SOURCES

View file

@ -1,13 +1,17 @@
FROM ubuntu:22.04 as builder FROM centos:centos7 as builder
RUN apt-get update && apt-get -y upgrade RUN yum -y install epel-release java-1.8.0-openjdk-devel
RUN apt-get -y install openjdk-19-jdk-headless maven git RUN yum -y groupinstall 'Development Tools'
RUN git clone --single-branch --branch main https://github.com/rasilon/ujetl.git RUN yum -y install git maven
RUN cd ujetl && mvn -e package RUN git clone https://github.com/rasilon/ujetl.git
RUN cd ujetl && mvn package
FROM openjdk:11 as runner
FROM openjdk:8-alpine as runner
LABEL maintainer="Derry Hamilton <derryh@rasilon.net>" LABEL maintainer="Derry Hamilton <derryh@rasilon.net>"
RUN apt update && apt upgrade -y && apt install -y bash RUN apk update && apk upgrade && apk add bash
RUN mkdir -p /usr/share/ujetl/lib/ /var/ujetl /etc/ujetl RUN mkdir -p /usr/share/ujetl/lib/ /var/ujetl /etc/ujetl
@ -20,7 +24,7 @@ CMD ["/ujetl_entrypoint"]
FROM runner as tester FROM runner as tester
COPY TEST_config_live.xml /var/ujetl/ COPY TEST_config_live.xml /var/ujetl/
COPY wait_for_postgres / COPY wait_for_postgres /
RUN apt-get install -y postgresql-client RUN apk add postgresql-client
FROM runner as deploy FROM runner as deploy

View file

@ -3,12 +3,6 @@
<hardLimitSeconds>360000</hardLimitSeconds> <hardLimitSeconds>360000</hardLimitSeconds>
<nRowsToLog>10000</nRowsToLog> <nRowsToLog>10000</nRowsToLog>
<blockSize>1000</blockSize> <blockSize>1000</blockSize>
<pollTimeout>500</pollTimeout>
<drivers>
<driver>org.postgresql.Driver</driver>
<driver>org.relique.jdbc.csv.CsvDriver</driver>
</drivers>
<source> <source>
<dsn>jdbc:postgresql://testdb:5432/test</dsn> <dsn>jdbc:postgresql://testdb:5432/test</dsn>
<username>test</username> <username>test</username>
@ -23,8 +17,6 @@
<jobs> <jobs>
<job> <job>
<name>test</name> <name>test</name>
<identifySourceSQL>select 'PID:'||pg_backend_pid()</identifySourceSQL>
<identifyDestinationSQL>select 'PID:'||pg_backend_pid()</identifyDestinationSQL>
<key>select coalesce(-1,max(id),-1) as key from dest</key> <key>select coalesce(-1,max(id),-1) as key from dest</key>
<select> <select>
select select
@ -56,112 +48,5 @@
OR dest.test_ts = EXCLUDED.test_ts OR dest.test_ts = EXCLUDED.test_ts
</insert> </insert>
</job> </job>
<job>
<name>test upsert</name>
<identifySourceSQL>select 'PID:'||pg_backend_pid()</identifySourceSQL>
<identifyDestinationSQL>select 'PID:'||pg_backend_pid()</identifyDestinationSQL>
<key>select -1 as key</key>
<select>
select
id,
test_int,
test_text,
test_ts
from
public.source where id > ?::bigint</select>
<insert>
insert into public.dest(
id,
test_int,
test_text,
test_ts
)values(
?::bigint,
?::integer,
?::text,
?::timestamp with time zone
)ON CONFLICT(id) DO UPDATE
set
test_int = EXCLUDED.test_int,
test_text = EXCLUDED.test_text,
test_ts = EXCLUDED.test_ts
WHERE
dest.test_int IS DISTINCT FROM EXCLUDED.test_int
OR dest.test_text IS DISTINCT FROM EXCLUDED.test_text
OR dest.test_ts IS DISTINCT FROM EXCLUDED.test_ts
</insert>
</job>
<job>
<name>denormalise</name>
<identifySourceSQL>select 'PID:'||pg_backend_pid()</identifySourceSQL>
<identifyDestinationSQL>select 'PID:'||pg_backend_pid()</identifyDestinationSQL>
<key>select -1 as key</key>
<select>select person_id,fname,lname from normalised_personalia p join normalised_first_names f using(fid) join normalised_last_names l using(lid) where ?::integer is not null;</select>
<insert>
INSERT INTO denormalised_personalia(person_id,fname,lname)
values(?::integer,?::text,?::text)
ON CONFLICT (person_id) DO UPDATE
SET
fname = EXCLUDED.fname,
lname = EXCLUDED.lname
WHERE
denormalised_personalia.fname is distinct from EXCLUDED.fname
OR denormalised_personalia.lname is distinct from EXCLUDED.lname
</insert>
</job>
<job>
<name>test pre post</name>
<key>select -1 as key</key>
<select>
select
id,
test_int,
test_text,
test_ts
from
public.source where id > ?::bigint
</select>
<preTarget>
drop table if exists tmp_dest;
create temp table tmp_dest(
id bigint,
test_int integer,
test_text text,
test_ts timestamp with time zone
);
</preTarget>
<insert>
insert into tmp_dest(
id,
test_int,
test_text,
test_ts
)values(
?::bigint,
?::integer,
?::text,
?::timestamp with time zone
)
</insert>
<insert>
insert into public.dest(
id,
test_int,
test_text,
test_ts
)
select id,test_int,test_text,test_ts
from tmp_dest
ON CONFLICT(id) DO UPDATE
set
test_int = EXCLUDED.test_int,
test_text = EXCLUDED.test_text,
test_ts = EXCLUDED.test_ts
WHERE
dest.test_int IS DISTINCT FROM EXCLUDED.test_int
OR dest.test_text IS DISTINCT FROM EXCLUDED.test_text
OR dest.test_ts IS DISTINCT FROM EXCLUDED.test_ts
</insert>
</job>
</jobs> </jobs>
</CopyingApp> </CopyingApp>

View file

@ -1,4 +0,0 @@
id,dat
1,banana
2,potato
3,nugget
1 id dat
2 1 banana
3 2 potato
4 3 nugget

View file

@ -10,11 +10,10 @@ ls
echo Starting run loop echo Starting run loop
for file in *.xml for file in *.xml
do do
/usr/local/openjdk-11/bin/java \ /usr/bin/java \
-Xms1g \ -Xms1g \
-Xmx2g \ -Xmx2g \
-cp /usr/share/ujetl/lib/CopyingApp.jar \ -cp /usr/share/ujetl/lib/CopyingApp.jar \
-Dlog4j.configurationFile="$LOG_PROPS" \
com.rasilon.ujetl.CopyingApp \ com.rasilon.ujetl.CopyingApp \
--log4j "$LOG_PROPS" \ --log4j "$LOG_PROPS" \
--config "$file" --config "$file"

View file

@ -8,14 +8,5 @@ until PGPASSWORD=test psql -h "testdb" -U "test" -c 'SELECT 1 FROM public.contai
sleep 1 sleep 1
done done
>&2 echo "Postgres is up - Waiting for the reboot"
sleep 3 # Wait for the Postgres reboot at the end of setup
until PGPASSWORD=test psql -h "testdb" -U "test" -c 'SELECT 1 FROM public.container_ready' postgres; do
>&2 echo "Postgres is unavailable - sleeping"
sleep 1
done
>&2 echo "Postgres is up - executing command" >&2 echo "Postgres is up - executing command"
exec $cmd exec $cmd

View file

@ -12,7 +12,7 @@ services:
POSTGRES_PASSWORD: password POSTGRES_PASSWORD: password
POSTGRES_DB: postgres POSTGRES_DB: postgres
tests: tests:
image: rasilon/ujetl_tester:latest image: rasilon/ujetl_test:latest
build: build:
context: ../multistage context: ../multistage
links: links:

View file

@ -20,38 +20,6 @@ GRANT SELECT,INSERT,UPDATE,DELETE ON dest TO test;
INSERT INTO source(test_int,test_text,test_ts) SELECT 1,'banana',now() FROM generate_series(1,100000); INSERT INTO source(test_int,test_text,test_ts) SELECT 1,'banana',now() FROM generate_series(1,100000);
CREATE TABLE normalised_first_names(
fid smallserial not null primary key,
fname text not null unique
);
CREATE TABLE normalised_last_names(
lid smallserial not null primary key,
lname text not null unique
);
INSERT INTO normalised_first_names (fname) values ('Abigail'), ('Adam'), ('Beatrice'), ('Bruce'), ('Claire'), ('Clive'), ('Deborah'), ('Dave');
INSERT INTO normalised_last_names (lname) values ('Adams'), ('Bellamy'), ('Clark'), ('Dabrowski');
CREATE TABLE normalised_personalia (
person_id serial not null primary key,
fid smallint not null references normalised_first_names(fid),
lid smallint not null references normalised_last_names(lid)
);
insert into normalised_personalia(fid,lid) values (1,1), (1,2), (1,3), (1,4), (2,1), (2,2), (2,3), (2,4), (3,1), (3,2), (3,3), (3,4), (4,1), (4,2), (4,3), (4,4);
CREATE TABLE denormalised_personalia(
person_id integer not null primary key,
fname text,
lname text
);
CREATE TABLE test_csvjdbc(
id integer not null primary key,
dat text
);
GRANT SELECT ON ALL TABLES IN SCHEMA public TO test;
GRANT SELECT,INSERT,UPDATE ON denormalised_personalia TO test;
\c postgres \c postgres
CREATE TABLE public.container_ready AS SELECT 1 FROM(VALUES(1)) AS a(a); CREATE TABLE public.container_ready AS SELECT 1 FROM(VALUES(1)) AS a(a);
GRANT SELECT ON public.container_ready TO TEST; GRANT SELECT ON public.container_ready TO TEST;

View file

@ -30,9 +30,9 @@ fi
/usr/bin/java \ /usr/bin/java \
-Xms1g \ -Xms1g \
-Xmx2g \ -Xmx2g \
-Dlog4j.configurationFile="$LOG_PROPS" \
-cp /usr/share/ujetl/lib/CopyingApp.jar \ -cp /usr/share/ujetl/lib/CopyingApp.jar \
com.rasilon.ujetl.CopyingApp \ com.rasilon.ujetl.CopyingApp \
--log4j "$LOG_PROPS" \
--config "/etc/ujetl/${JOBNAME}_config_live.xml" --config "/etc/ujetl/${JOBNAME}_config_live.xml"
#rm -f $LOCKFILE #rm -f $LOCKFILE

60
pom.xml
View file

@ -6,42 +6,19 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma
<groupId>com.rasilon.ujetl</groupId> <groupId>com.rasilon.ujetl</groupId>
<artifactId>CopyingApp</artifactId> <artifactId>CopyingApp</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
<version>2.5.2</version> <version>2.0.2</version>
<name>uJETL</name> <name>uJETL</name>
<url>https://github.com/rasilon/ujetl</url> <url></url>
<properties>
<project.build.sourceEncoding>
UTF-8</project.build.sourceEncoding>
</properties>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.junit.jupiter</groupId> <groupId>junit</groupId>
<artifactId>junit-jupiter-api</artifactId> <artifactId>junit</artifactId>
<version>5.4.2</version> <version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.4.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<version>5.4.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>2.2.220</version>
<scope>test</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId> <artifactId>commons-lang3</artifactId>
<version>3.18.0</version> <version>3.9</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>commons-logging</groupId> <groupId>commons-logging</groupId>
@ -51,12 +28,13 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma
<dependency> <dependency>
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-configuration2</artifactId> <artifactId>commons-configuration2</artifactId>
<version>2.10.1</version> <version>2.4</version>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/commons-beanutils/commons-beanutils -->
<dependency> <dependency>
<groupId>commons-beanutils</groupId> <groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId> <artifactId>commons-beanutils</artifactId>
<version>1.11.0</version> <version>1.9.3</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.beust</groupId> <groupId>com.beust</groupId>
@ -66,31 +44,27 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma
<dependency> <dependency>
<groupId>org.apache.logging.log4j</groupId> <groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId> <artifactId>log4j-api</artifactId>
<version>2.17.1</version> <version>2.11.2</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.logging.log4j</groupId> <groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId> <artifactId>log4j-core</artifactId>
<version>2.25.3</version> <version>2.11.2</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.postgresql</groupId> <groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId> <artifactId>postgresql</artifactId>
<version>42.7.2</version> <version>42.2.5</version>
</dependency>
<dependency>
<groupId>net.sourceforge.csvjdbc</groupId>
<artifactId>csvjdbc</artifactId>
<version>1.0.40</version>
</dependency> </dependency>
</dependencies> </dependencies>
<build> <build>
<plugins> <plugins>
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version> <version>2.3.2</version>
<configuration> <configuration>
<release>11</release> <source>1.8</source>
<target>1.8</target>
</configuration> </configuration>
</plugin> </plugin>
<plugin> <plugin>
@ -114,10 +88,6 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma
</execution> </execution>
</executions> </executions>
</plugin> </plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.0</version>
</plugin>
</plugins> </plugins>
</build> </build>
</project> </project>

View file

@ -34,6 +34,10 @@ public class CopyingApp {
public static void main(String[] args) { public static void main(String[] args) {
CopyingAppCommandParser cli = new CopyingAppCommandParser(args); CopyingAppCommandParser cli = new CopyingAppCommandParser(args);
LoggerContext context = (org.apache.logging.log4j.core.LoggerContext) LogManager.getContext(false); LoggerContext context = (org.apache.logging.log4j.core.LoggerContext) LogManager.getContext(false);
String log4jConfigLocation = cli.getLog4jConfigFile();
File file = new File(log4jConfigLocation);
context.setConfigLocation(file.toURI());
System.out.println("Config set from "+file.toURI());
CopyingApp app = new CopyingApp(cli); CopyingApp app = new CopyingApp(cli);
try { try {
@ -75,7 +79,6 @@ public class CopyingApp {
Configuration config = configs.xml(cli.getConfigFile()); Configuration config = configs.xml(cli.getConfigFile());
loadDrivers(config);
String hardLimitSeconds = config.getString("hardLimitSeconds"); String hardLimitSeconds = config.getString("hardLimitSeconds");
if(hardLimitSeconds != null) { if(hardLimitSeconds != null) {
TimeLimiter hardLimit = new TimeLimiter(Integer.decode(hardLimitSeconds).intValue(),true); TimeLimiter hardLimit = new TimeLimiter(Integer.decode(hardLimitSeconds).intValue(),true);
@ -128,27 +131,7 @@ public class CopyingApp {
String tabKey = config.getString("jobs.job("+i+").key"); String tabKey = config.getString("jobs.job("+i+").key");
String tabSelect = config.getString("jobs.job("+i+").select"); String tabSelect = config.getString("jobs.job("+i+").select");
String tabInsert = config.getString("jobs.job("+i+").insert"); String tabInsert = config.getString("jobs.job("+i+").insert");
String preTarget = config.getString("jobs.job("+i+").preTarget"); Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,nRowsToLog,blockSize,pollTimeout);
String postTarget = config.getString("jobs.job("+i+").postTarget");
String identifySourceSQL = config.getString("jobs.job.identifySourceSQL");
String identifyDestinationSQL = config.getString("jobs.job.identifyDestinationSQL");
Job j = new Job(
sConn,
dConn,
tabName,
jobName,
tabKey,
tabSelect,
tabInsert,
preTarget,
postTarget,
nRowsToLog,
blockSize,
pollTimeout,
identifySourceSQL,
identifyDestinationSQL
);
j.start(); j.start();
j.join(); j.join();
@ -158,28 +141,7 @@ public class CopyingApp {
String tabKey = config.getString("jobs.job.key"); String tabKey = config.getString("jobs.job.key");
String tabSelect = config.getString("jobs.job.select"); String tabSelect = config.getString("jobs.job.select");
String tabInsert = config.getString("jobs.job.insert"); String tabInsert = config.getString("jobs.job.insert");
String preTarget = config.getString("jobs.job.preTarget"); Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,nRowsToLog,blockSize,pollTimeout);
String postTarget = config.getString("jobs.job.postTarget");
String identifySourceSQL = config.getString("jobs.job.identifySourceSQL");
String identifyDestinationSQL = config.getString("jobs.job.identifyDestinationSQL");
Job j = new Job(
sConn,
dConn,
tabName,
jobName,
tabKey,
tabSelect,
tabInsert,
preTarget,
postTarget,
nRowsToLog,
blockSize,
pollTimeout,
identifySourceSQL,
identifyDestinationSQL
);
j.start(); j.start();
j.join(); j.join();
} else { } else {
@ -241,21 +203,4 @@ public class CopyingApp {
return c; return c;
} }
// Even with JDBC 4, some drivers don't play nicely with whatever
// the classloaders are up to. So this allows us to force it the
// old fashioned way, and works around the
// "But it works fine when it's the /only/ driver!"
// cross-database problem
private void loadDrivers(Configuration config) {
String[] drivers = config.get(String[].class, "drivers.driver");
for(String d:drivers) {
try {
Class.forName(d);
log.info("Preloaded driver "+d);
} catch(ClassNotFoundException e) {
log.error("Could not preload driver "+d,e);
}
}
}
} }

View file

@ -12,7 +12,7 @@ public class CopyingAppCommandParser {
private String configFile; private String configFile;
@Parameter(names = {"-log4j","--log4j"}, description = "Log4J config file for this run") @Parameter(names = {"-log4j","--log4j"}, description = "Log4J config file for this run")
private String log4jConfigFile = "/etc/ujetl/default_log4j_config.properties"; private String log4jConfigFile = "/etc/ppl/default_log4j_config.properties";
public CopyingAppCommandParser(String[] args) { public CopyingAppCommandParser(String[] args) {
super(); super();
@ -23,4 +23,8 @@ public class CopyingAppCommandParser {
return configFile; return configFile;
} }
public String getLog4jConfigFile() {
return log4jConfigFile;
}
} }

View file

@ -22,29 +22,23 @@ import org.apache.logging.log4j.Logger;
public class Job extends Thread { public class Job extends Thread {
static Logger log = org.apache.logging.log4j.LogManager.getLogger(Job.class); static Logger log = org.apache.logging.log4j.LogManager.getLogger(Job.class);
private Connection sConn; Connection sConn;
private Connection dConn; Connection dConn;
private String name; String name;
private String jobName; String jobName;
private String key; String key;
private String select; String select;
private String insert; String insert;
private String preTarget; Integer nRowsToLog;
private String postTarget; Integer blockSize;
private Integer nRowsToLog; Integer pollTimeout;
private Integer blockSize;
private Integer pollTimeout;
private String identifySourceSQL;
private String identifyDestinationSQL;
private BlockingQueue<List<String>> resultBuffer; BlockingQueue<List<String>> resultBuffer;
private AtomicBoolean producerLive; AtomicBoolean producerLive;
private AtomicBoolean threadsExit = new AtomicBoolean(false);; AtomicBoolean threadsExit = new AtomicBoolean(false);;
private String sourceID;
private String destID;
public Job(Connection sConn,Connection dConn,String name,String jobName,String key,String select,String insert,String preTarget,String postTarget,Integer nRowsToLog,Integer blockSize,Integer pollTimeout,String identifySourceSQL, String identifyDestinationSQL) { public Job(Connection sConn,Connection dConn,String name,String jobName,String key,String select,String insert,Integer nRowsToLog,Integer blockSize,Integer pollTimeout) {
this.sConn = sConn; this.sConn = sConn;
this.dConn = dConn; this.dConn = dConn;
this.name = name; this.name = name;
@ -52,13 +46,9 @@ public class Job extends Thread {
this.key = key; this.key = key;
this.select = select; this.select = select;
this.insert = insert; this.insert = insert;
this.preTarget = preTarget;
this.postTarget = postTarget;
this.nRowsToLog = nRowsToLog; this.nRowsToLog = nRowsToLog;
this.blockSize = blockSize; this.blockSize = blockSize;
this.pollTimeout = pollTimeout; this.pollTimeout = pollTimeout;
this.identifySourceSQL = identifySourceSQL;
this.identifyDestinationSQL = identifyDestinationSQL;
resultBuffer = new ArrayBlockingQueue<List<String>>( 3 * blockSize); resultBuffer = new ArrayBlockingQueue<List<String>>( 3 * blockSize);
producerLive = new AtomicBoolean(true); producerLive = new AtomicBoolean(true);
@ -82,12 +72,12 @@ public class Job extends Thread {
public Producer(ResultSet src,BlockingQueue q) { public Producer(ResultSet src,BlockingQueue q) {
this.src = src; this.src = src;
this.q = q; this.q = q;
this.setName(String.format("%s-%s-Producer",jobName,name)); this.setName(String.format("%s-%s-Consumer",jobName,name));
} }
public void run() { public void run() {
try { try {
long rowsInserted = 0; long rowsInserted = 0;
long rowsAttempted = 0; long rowNum = 0;
long stamp = System.nanoTime(); long stamp = System.nanoTime();
long nstamp; long nstamp;
int columnCount = src.getMetaData().getColumnCount(); int columnCount = src.getMetaData().getColumnCount();
@ -105,13 +95,13 @@ public class Job extends Thread {
} }
log.trace("Producer queue full."); log.trace("Producer queue full.");
} }
rowsAttempted++; rowNum++;
if(rowsAttempted % nRowsToLog == 0) { if(rowNum % nRowsToLog == 0) {
log.info(String.format("%s - Queued %s rows for %s so far",jobName,rowsAttempted,name)); log.info(String.format("%s - Queued %s rows for %s so far",jobName,rowNum,name));
} }
} }
producerLive.set(false); producerLive.set(false);
log.info(String.format("%s - Queued a total of %s rows for %s",jobName,rowsAttempted,name)); log.info(String.format("%s - Queued a total of %s rows for %s",jobName,rowNum,name));
} catch(Exception e) { } catch(Exception e) {
producerLive.set(false); // Signal we've exited. producerLive.set(false); // Signal we've exited.
threadsExit.set(true); // Signal we've exited. threadsExit.set(true); // Signal we've exited.
@ -132,7 +122,7 @@ public class Job extends Thread {
} }
public void run() { public void run() {
try { try {
long rowsAttempted = 0; long rowNum = 0;
long rowsInserted = 0; long rowsInserted = 0;
while(true) { while(true) {
@ -143,7 +133,7 @@ public class Job extends Thread {
if(row == null && producerLive.get() == false) { if(row == null && producerLive.get() == false) {
rowsInserted += arraySum(insertStatement.executeBatch()); rowsInserted += arraySum(insertStatement.executeBatch());
dConn.commit(); dConn.commit();
log.info(String.format("%s - Inserted a total of %s of %s notified rows into %s",jobName,rowsInserted,rowsAttempted,name)); log.info(String.format("%s - Inserted a total of %s of %s notified rows into %s",jobName,rowNum,rowsInserted,name));
return; return;
} }
if(threadsExit.get()) { if(threadsExit.get()) {
@ -160,14 +150,14 @@ public class Job extends Thread {
} }
insertStatement.addBatch(); insertStatement.addBatch();
rowsAttempted++; rowNum++;
if(rowsAttempted % nRowsToLog == 0) { if(rowNum % nRowsToLog == 0) {
rowsInserted += arraySum(insertStatement.executeBatch()); rowsInserted += arraySum(insertStatement.executeBatch());
dConn.commit(); dConn.commit();
log.info(String.format("%s - Inserted %s of %s notified rows into %s", log.info(String.format("%s - Inserted %s of %s notified rows into %s so far",
jobName, jobName,
rowNum,
rowsInserted, rowsInserted,
rowsAttempted,
name)); name));
} }
} }
@ -179,34 +169,11 @@ public class Job extends Thread {
} }
} }
// Outer run
public void run() { public void run() {
try { try {
ResultSet rs; ResultSet rs;
if(identifySourceSQL != null) sourceID = getSingleString(identifySourceSQL,sConn);
if(identifyDestinationSQL != null) destID = getSingleString(identifyDestinationSQL,dConn);
if(sourceID != null || destID != null){
log.info(String.format(
"%s - Processing table: %s with source: %s, dest: %s",
jobName,
name,
sourceID==null?"":sourceID,
destID==null?"":destID
));
}else{
log.info(String.format("%s - Processing table: %s",jobName,name)); log.info(String.format("%s - Processing table: %s",jobName,name));
}
if(preTarget != null){
log.info(String.format("%s - Trying to execute preTarget SQL",jobName));
PreparedStatement s = dConn.prepareStatement(preTarget);
s.executeUpdate();
s.close();
dConn.commit();
}else{
log.info(String.format("%s - No preTarget; skipping.",jobName));
}
log.debug("Trying to execute: "+key); log.debug("Trying to execute: "+key);
PreparedStatement keyStatement = dConn.prepareStatement(key); PreparedStatement keyStatement = dConn.prepareStatement(key);
@ -244,33 +211,10 @@ public class Job extends Thread {
p.join(); p.join();
c.join(); c.join();
if(postTarget != null){
log.info(String.format("%s - Trying to execute postTarget SQL",jobName));
PreparedStatement s = dConn.prepareStatement(postTarget);
s.executeUpdate();
s.close();
dConn.commit();
}else{
log.info(String.format("%s - No postTarget; skipping.",jobName));
}
} catch(InterruptedException e) { } catch(InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} catch(SQLException e) { } catch(SQLException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
private String getSingleString(String sql, Connection conn){
try{
PreparedStatement s = conn.prepareStatement(sql);
ResultSet r = s.executeQuery();
r.next();
return r.getString(1);
} catch(SQLException e) {
throw new RuntimeException(e);
}
}
} }

View file

@ -1,37 +0,0 @@
package com.rasilon.ujetl;
import org.apache.commons.configuration2.Configuration;
import org.apache.commons.configuration2.builder.fluent.Configurations;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.commons.beanutils.PropertyUtils; // Why does config need this?
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.MethodOrderer.Alphanumeric;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
/**
* @author derryh
*
*/
public class TestConfig {
@Test
public void test001VerifyArrayOfDrivers() {
try {
Configurations configs = new Configurations();
Configuration config = configs.xml("TEST_config_live.xml");
String[] drivers = config.get(String[].class, "drivers.driver");
int ndrivers =drivers.length;
if(ndrivers != 3){
fail("Expected 3 drivers, but found "+ndrivers);
}
} catch(Exception e) {
fail(e.toString());
}
}
}

View file

@ -1,64 +0,0 @@
package com.rasilon.ujetl;
import java.sql.*;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.MethodOrderer.Alphanumeric;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
public class TestJob {
private static String jdbcURL = "jdbc:h2:mem:dbtest";
@Test
public void test002verifyH2Works() {
try {
Connection conn = DriverManager.getConnection(jdbcURL, "sa", "");
conn.close();
} catch(Exception e) {
fail(e.toString());
}
}
@Test
public void testJob() {
try (
Connection src = DriverManager.getConnection(jdbcURL, "sa", "");
Connection dest = DriverManager.getConnection(jdbcURL, "sa", "");
) {
src.createStatement().executeUpdate("CREATE TABLE src(id bigint not null primary key, dat varchar);");
dest.createStatement().executeUpdate("CREATE TABLE dest(id bigint not null primary key, dat varchar);");
PreparedStatement inserter = src.prepareStatement("INSERT INTO src(id,dat) VALUES(?,'banana')");
for(int i=0; i<10000; i++) {
inserter.setInt(1,i);
inserter.executeUpdate();
}
Job j = new Job(
src,
dest,
"jUnit Test Config",
"jUnit Test Job",
"SELECT -1 AS \"key\"",
"SELECT id,dat FROM src WHERE id > ?",
"INSERT INTO dest VALUES(?,?)",
null,
null,
100,
100,
100,
"select 'PID:'||session_id()",
"select 'PID:'||session_id()"
);
j.start();
j.join();
// do stuff
} catch(Exception e) {
e.printStackTrace();
fail(e.toString());
}
}
}

View file

@ -1,27 +0,0 @@
package com.rasilon.ujetl;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.MethodOrderer.Alphanumeric;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
public class TestParser {
@Test
public void test001Parset() {
try {
String[] args = {
"--config",
"config_test_banana.xml"
};
CopyingAppCommandParser p = new CopyingAppCommandParser(args);
assertEquals(p.getConfigFile(),"config_test_banana.xml");
} catch(Exception e) {
fail(e.toString());
}
}
}

View file

@ -1,64 +0,0 @@
package com.rasilon.ujetl;
import java.sql.*;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.MethodOrderer.Alphanumeric;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
public class TestPrePost {
private static String jdbcURL = "jdbc:h2:mem:dbtest";
@Test
public void test002verifyH2Works() {
try {
Connection conn = DriverManager.getConnection(jdbcURL, "sa", "");
conn.close();
} catch(Exception e) {
fail(e.toString());
}
}
@Test
public void testPrePost() {
try (
Connection src = DriverManager.getConnection(jdbcURL, "sa", "");
Connection dest = DriverManager.getConnection(jdbcURL, "sa", "");
) {
src.createStatement().executeUpdate("CREATE TABLE src(id bigint not null primary key, dat varchar);");
dest.createStatement().executeUpdate("CREATE TABLE dest(id bigint not null primary key, dat varchar);");
PreparedStatement inserter = src.prepareStatement("INSERT INTO src(id,dat) VALUES(?,'banana')");
for(int i=0; i<10000; i++) {
inserter.setInt(1,i);
inserter.executeUpdate();
}
Job j = new Job(
src,
dest,
"jUnit Test Config",
"jUnit Test Job",
"SELECT -1 AS \"key\"",
"SELECT id,dat FROM src WHERE id > ?",
"INSERT INTO tmp_dest VALUES(?,?)",
"CREATE TEMP TABLE tmp_dest(id bigint not null primary key, dat varchar);",
"INSERT INTO dest SELECT * from tmp_dest;",
100,
100,
100,
"select 'PID:'||session_id()",
"select 'PID:'||session_id()"
);
j.start();
j.join();
// do stuff
} catch(Exception e) {
e.printStackTrace();
fail(e.toString());
}
}
}

View file

@ -1,28 +0,0 @@
package com.rasilon.ujetl;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.MethodOrderer.Alphanumeric;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
public class TestTimeLimiter {
@Test
public void test001Limiter() {
try {
TimeLimiter hardLimit = new TimeLimiter(1,false);
hardLimit.start();
Thread.sleep(10000);
fail("Sleep wasn't interrupted by the limiter!");
} catch(java.lang.InterruptedException e) {
// Pass
} catch(Exception e) {
e.printStackTrace();
fail("Unexpected exception.");
}
}
}

View file

@ -3,12 +3,6 @@
<hardLimitSeconds>360000</hardLimitSeconds> <hardLimitSeconds>360000</hardLimitSeconds>
<nRowsToLog>10000</nRowsToLog> <nRowsToLog>10000</nRowsToLog>
<blockSize>1000</blockSize> <blockSize>1000</blockSize>
<pollTimeout>500</pollTimeout>
<drivers>
<driver>org.postgresql.Driver</driver>
<driver>org.h2.Driver</driver>
<driver>org.relique.jdbc.csv.CsvDriver</driver>
</drivers>
<source> <source>
<dsn>jdbc:postgresql://localhost:5432/test</dsn> <dsn>jdbc:postgresql://localhost:5432/test</dsn>
<username>test</username> <username>test</username>
@ -23,8 +17,6 @@
<jobs> <jobs>
<job> <job>
<name>test</name> <name>test</name>
<identifySourceSQL>select 'PID:'||pg_backend_pid()</identifySourceSQL>
<identifyDestinationSQL>select 'PID:'||pg_backend_pid()</identifyDestinationSQL>
<key>select coalesce(-1,max(id),-1) as key from dest</key> <key>select coalesce(-1,max(id),-1) as key from dest</key>
<select> <select>
select select
@ -56,54 +48,5 @@
OR dest.test_ts = EXCLUDED.test_ts OR dest.test_ts = EXCLUDED.test_ts
</insert> </insert>
</job> </job>
<job>
<name>test upsert</name>
<key>select -1 as key</key>
<select>
select
id,
test_int,
test_text,
test_ts
from
public.source where id > ?::bigint</select>
<insert>
insert into public.dest(
id,
test_int,
test_text,
test_ts
)values(
?::bigint,
?::integer,
?::text,
?::timestamp with time zone
)ON CONFLICT(id) DO UPDATE
set
test_int = EXCLUDED.test_int,
test_text = EXCLUDED.test_text,
test_ts = EXCLUDED.test_ts
WHERE
dest.test_int IS DISTINCT FROM EXCLUDED.test_int
OR dest.test_text IS DISTINCT FROM EXCLUDED.test_text
OR dest.test_ts IS DISTINCT FROM EXCLUDED.test_ts
</insert>
</job>
<job>
<name>denormalise</name>
<key>select -1 as key</key>
<select>select person_id,fname,lname from normalised_personalia p join normalised_first_names f using(fid) join normalised_last_names l using(lid);</select>
<insert>
INSERT INTO denormalised_personalia(person_id,fname,lname)
values(?::integer,?::text,?::text)
ON CONFLICT (person_id) DO UPDATE
SET
fname = EXCLUDED.fname,
lname = EXCLUDED.lname
WHERE
denormalised_personalia.fname is distinct from EXCLUDED.fname
OR denormalised_personalia.lname is distinct from EXCLUDED.lname
</insert>
</job>
</jobs> </jobs>
</CopyingApp> </CopyingApp>

View file

@ -1,13 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="INFO">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>

33
uJETL.spec Normal file
View file

@ -0,0 +1,33 @@
Summary: Java app to facilitate moving data between databases.
Name: uJETL
Version: 2.0.2
Release: 1
Group: Applications/Database
License: All rights reserved.
Source: uJETL-%{version}.tar.gz
URL: https://github.com/rasilon/ujetl.git
Distribution: derryh
Vendor: derryh
Packager: Derry Hamilton <derryh@rasilon.net>
#BuildRoot: .
%description
A very small ETL app
%prep
%setup
%build
#mvn -Dmaven.test.skip=true clean package
true
%install
mkdir -p $RPM_BUILD_ROOT/usr/share/ujetl/lib $RPM_BUILD_ROOT/etc/ujetl $RPM_BUILD_ROOT/usr/bin
cp target/CopyingApp-*-jar-with-dependencies.jar $RPM_BUILD_ROOT/usr/share/ujetl/lib/CopyingApp.jar
cp install_extra/run_copying_job $RPM_BUILD_ROOT/usr/bin
cp install_extra/copying_defaults_log4j.xml $RPM_BUILD_ROOT/etc/ujetl
%files
/usr/share/ujetl/lib/CopyingApp.jar
/usr/bin/run_copying_job
/etc/ujetl/copying_defaults_log4j.xml