mirror of
https://github.com/rasilon/ujetl.git
synced 2026-04-11 18:39:30 +00:00
Compare commits
106 commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 4f16842623 | |||
|
|
d2257f1b81 | ||
| 8ab5b1b79e | |||
|
|
26d47f16c9 | ||
| 9f0fe6bb3b | |||
| bf4ed02af5 | |||
| 5f275b93d8 | |||
|
|
d509513084 | ||
| 96e28a5abb | |||
| 1da5fabb05 | |||
| 8a4d892dfa | |||
|
|
07f68922b3 | ||
| 22af36b555 | |||
|
|
d0a5075191 | ||
| 1ce7e09751 | |||
| f651fd720e | |||
| 58b78b2021 | |||
| a88ac56848 | |||
| 9a8716f33e | |||
| 1b1ba551c8 | |||
| 866d02fb52 | |||
| 441b2f4191 | |||
| b81aedefb1 | |||
| 584f83de0d | |||
| 4d38679155 | |||
| b378189512 | |||
| 49487a83af | |||
| 9f4729bf1d | |||
|
|
2126553e5c | ||
| 64743f430b | |||
| 28c918dd6a | |||
|
|
17d33ec18b | ||
| 5a61bb357f | |||
| ffe33276ba | |||
|
|
67f6ec2dab | ||
|
|
9554d3df63 | ||
| f6268344d0 | |||
| 130b120515 | |||
|
|
54a8cd6312 | ||
|
|
d7bde365ed | ||
| 1c5e54acc9 | |||
| b17ca2479b | |||
| 5de1e80b8c | |||
|
|
36acc0ed23 | ||
|
|
3235db7f6a | ||
|
|
79a3dbf499 | ||
| 7a22b6ddae | |||
| f42fa6550e | |||
| 3c525bf006 | |||
| e405e372cd | |||
|
|
bc9849e3ee | ||
| 0730d1bf9d | |||
|
|
45c5900481 | ||
|
|
f4ded0857c | ||
| c34f8b54c4 | |||
| ce5a48c083 | |||
| e6ccc6f2af | |||
|
|
20c1c7695f | ||
| 5df6384abf | |||
| be6fbc73a1 | |||
| b0cc8f8ce7 | |||
| cc03ef6c1d | |||
| f2ef10be64 | |||
| 6a1ab875c7 | |||
| b3b02669f7 | |||
| f963d8bdcf | |||
| f3538a09f9 | |||
| af62d6cf96 | |||
| ead5a9e173 | |||
|
|
a77be8b696 | ||
| 3493e18630 | |||
|
|
cb3b3e3b63 | ||
| cbb44ce1d9 | |||
| 8dbe3e9505 | |||
|
|
be9e468354 | ||
| 66dd3bb37d | |||
| a7d1a6077b | |||
| 734dc8608f | |||
| 01310bfca4 | |||
| 06c64d499f | |||
| 892f7b4fa8 | |||
| 4f0db0a2df | |||
| 4f905dd47a | |||
| 8fdbc6a78e | |||
| ddc67f3a41 | |||
| 633fbe7391 | |||
| f669b1af9d | |||
| e71832f57a | |||
| d34a0902d1 | |||
| cde891ee16 | |||
| 2b5f4f1dbe | |||
| 97baf0d11a | |||
| 9d82c9f279 | |||
| 31a2968f6a | |||
| 981b59225b | |||
| ff157e6a59 | |||
| 4fe30a196c | |||
| 7a858ec64a | |||
| 6737ae6492 | |||
| b8883d6f23 | |||
| 61ddbfd817 | |||
| af46f83799 | |||
| 5571bd1b4d | |||
| 3badb97d49 | |||
| ea629b3098 | |||
| 3bfc9d6a1b |
31 changed files with 960 additions and 135 deletions
|
|
@ -2,3 +2,9 @@
|
|||
Originally written in the days of trying to do something, with no budget, I wrote this out of necessity. I subsequently got permission to open-source it so long as there were no references to the company in it. So this is the cleaned up version, with a few additional features added and the things that turned out to be pointless removed.
|
||||
|
||||
It's probably the smallest functional ETL application with decent performance. Since I only use it on Postgres nowadays, it only officially supports Postgres at the moment. But in the near past it's worked pulling data from "several commercial databases" that don't like being named in benchmarks etc. and if you have the JDBC jars in your classpath then it should just work.
|
||||
|
||||
For an example config file, please see [TEST_config_live.xml](https://github.com/rasilon/ujetl/blob/master/src/test/resources/TEST_config_live.xml)
|
||||
|
||||
To run the dockerised integration tests, use `build_util/run_docker_tests` in this repo.
|
||||
|
||||
A runnable docker image is available at [rasilon/ujetl](https://cloud.docker.com/repository/docker/rasilon/ujetl). This expects config files copied into, or mounted into `/var/ujetl/`. RPMs can be built using `build_util/build_rpms_in_docker`. As the name suggests, you need docker for that.
|
||||
|
|
|
|||
|
|
@ -1,6 +0,0 @@
|
|||
#!/bin/bash
|
||||
set -e
|
||||
|
||||
docker build --rm -t local/c7-buildhost docker/build
|
||||
|
||||
docker run -it -v `pwd`:/root/build local/c7-buildhost /root/build/build_util/build_rpm
|
||||
|
|
@ -1,12 +0,0 @@
|
|||
#!/bin/bash
|
||||
set -e
|
||||
|
||||
cd /root
|
||||
cp -Rv build build2
|
||||
cd build2
|
||||
|
||||
SPEC=$(ls *.spec)
|
||||
VER=$(grep Version $SPEC | awk '{print $2}')
|
||||
tar cvf $HOME/rpmbuild/SOURCES/uJETL-${VER}.tar.gz --show-transformed --transform="s/^\./uJETL-${VER}/" .
|
||||
rpmbuild -ba $SPEC
|
||||
cp /root/rpmbuild/RPMS/x86_64/* /root/build/
|
||||
|
|
@ -1,4 +1,3 @@
|
|||
#!/bin/bash
|
||||
set -e
|
||||
cp *.rpm docker/run/
|
||||
docker build --rm -t local/c7-runhost docker/run
|
||||
docker build --target deploy -t rasilon/ujetl docker/multistage
|
||||
docker tag rasilon/ujetl:latest rasilon/ujetl:$(xpath -q -e '/project/version/text()' pom.xml)
|
||||
|
|
|
|||
4
build_util/push_docker_images
Executable file
4
build_util/push_docker_images
Executable file
|
|
@ -0,0 +1,4 @@
|
|||
#!/bin/bash
|
||||
docker push rasilon/ujetl:latest
|
||||
docker push rasilon/ujetl:$(xpath -q -e '/project/version/text()' pom.xml)
|
||||
|
||||
6
build_util/run_docker_tests
Executable file
6
build_util/run_docker_tests
Executable file
|
|
@ -0,0 +1,6 @@
|
|||
#!/bin/bash
|
||||
|
||||
docker build --target tester -t rasilon/ujetl_tester docker/multistage
|
||||
docker-compose -f docker/test_compose/docker-compose.yml run --rm tests
|
||||
docker-compose -f docker/test_compose/docker-compose.yml down
|
||||
|
||||
76
config_util/ujetl_insert_generator.sql
Normal file
76
config_util/ujetl_insert_generator.sql
Normal file
|
|
@ -0,0 +1,76 @@
|
|||
CREATE OR REPLACE FUNCTION pg_temp.ujetl_insert(sch text, tabname text)
|
||||
RETURNS text
|
||||
LANGUAGE plpgsql
|
||||
AS $function$
|
||||
declare
|
||||
s text := '';
|
||||
header text := '';
|
||||
col_list text := '';
|
||||
vals text := '';
|
||||
sets text := '';
|
||||
changes text := '';
|
||||
is_first boolean := true;
|
||||
colinfo record;
|
||||
pks text;
|
||||
begin
|
||||
SELECT
|
||||
array_to_string(array_agg(quote_ident(pg_attribute.attname::text) ),', ') into pks
|
||||
FROM
|
||||
pg_index,
|
||||
pg_class,
|
||||
pg_attribute,
|
||||
pg_namespace
|
||||
WHERE
|
||||
pg_class.relname = tabname
|
||||
AND indrelid = pg_class.oid
|
||||
AND nspname = sch
|
||||
AND pg_class.relnamespace = pg_namespace.oid
|
||||
AND pg_attribute.attrelid = pg_class.oid
|
||||
AND pg_attribute.attnum = any(pg_index.indkey)
|
||||
AND indisprimary ;
|
||||
|
||||
header := E'INSERT INTO '||quote_ident(sch)||'.'||quote_ident(tabname)||E' as t (\n ';
|
||||
for colinfo in
|
||||
select
|
||||
*
|
||||
from
|
||||
information_schema.columns
|
||||
where
|
||||
table_schema = sch
|
||||
and table_name = tabname
|
||||
order by ordinal_position
|
||||
loop
|
||||
if not is_first then
|
||||
col_list := col_list || E',\n ';
|
||||
vals := vals || E',\n ';
|
||||
sets := sets || E',\n ';
|
||||
changes := changes || E'\n OR ';
|
||||
end if;
|
||||
col_list := col_list || quote_ident(colinfo.column_name);
|
||||
vals := vals || '?::' || colinfo.data_type;
|
||||
sets := sets || quote_ident(colinfo.column_name) ||
|
||||
E' = EXCLUDED.' || quote_ident(colinfo.column_name);
|
||||
changes := changes || E't.' || quote_ident(colinfo.column_name) ||
|
||||
E' IS DISTINCT FROM EXCLUDED.' || quote_ident(colinfo.column_name);
|
||||
|
||||
is_first = false;
|
||||
end loop;
|
||||
|
||||
s := coalesce(header,'header failed') ||
|
||||
coalesce(col_list,'col_list failed') ||
|
||||
E'\n)VALUES(\n ' ||
|
||||
coalesce(vals,'vals failed') ||
|
||||
E')\nON CONFLICT(' || coalesce(pks,'No primary keys found') || E') DO UPDATE\nSET\n ' ||
|
||||
coalesce(sets,'sets failed') ||
|
||||
E'\nWHERE\n '||
|
||||
coalesce(changes,'changes failed');
|
||||
return s;
|
||||
end;
|
||||
$function$
|
||||
;
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
65
config_util/ujetl_select_generator.sql
Normal file
65
config_util/ujetl_select_generator.sql
Normal file
|
|
@ -0,0 +1,65 @@
|
|||
CREATE OR REPLACE FUNCTION pg_temp.ujetl_select(sch text, tabname text)
|
||||
RETURNS text
|
||||
LANGUAGE plpgsql
|
||||
AS $function$
|
||||
declare
|
||||
s text := '';
|
||||
header text := '';
|
||||
col_list text := '';
|
||||
vals text := '';
|
||||
sets text := '';
|
||||
changes text := '';
|
||||
is_first boolean := true;
|
||||
colinfo record;
|
||||
pks text;
|
||||
begin
|
||||
SELECT
|
||||
array_to_string(array_agg(quote_ident(pg_attribute.attname::text) ),', ') into pks
|
||||
FROM
|
||||
pg_index,
|
||||
pg_class,
|
||||
pg_attribute,
|
||||
pg_namespace
|
||||
WHERE
|
||||
pg_class.relname = tabname
|
||||
AND indrelid = pg_class.oid
|
||||
AND nspname = sch
|
||||
AND pg_class.relnamespace = pg_namespace.oid
|
||||
AND pg_attribute.attrelid = pg_class.oid
|
||||
AND pg_attribute.attnum = any(pg_index.indkey)
|
||||
AND indisprimary ;
|
||||
|
||||
header := E'SELECT\n ';
|
||||
for colinfo in
|
||||
select
|
||||
*
|
||||
from
|
||||
information_schema.columns
|
||||
where
|
||||
table_schema = sch
|
||||
and table_name = tabname
|
||||
order by ordinal_position
|
||||
loop
|
||||
if not is_first then
|
||||
col_list := col_list || E',\n ';
|
||||
end if;
|
||||
col_list := col_list || quote_ident(colinfo.column_name);
|
||||
|
||||
is_first = false;
|
||||
end loop;
|
||||
|
||||
s := header ||
|
||||
coalesce(col_list,'col_list failed') ||
|
||||
E'\nFROM\n ' ||
|
||||
quote_ident(sch)||'.'||quote_ident(tabname)||E' as t \n '||
|
||||
E'WHERE\n insert criteria here >= ?::datatype';
|
||||
return s;
|
||||
end;
|
||||
$function$
|
||||
;
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
27
docker/multistage/Dockerfile
Normal file
27
docker/multistage/Dockerfile
Normal file
|
|
@ -0,0 +1,27 @@
|
|||
FROM ubuntu:22.04 as builder
|
||||
RUN apt-get update && apt-get -y upgrade
|
||||
RUN apt-get -y install openjdk-19-jdk-headless maven git
|
||||
RUN git clone --single-branch --branch main https://github.com/rasilon/ujetl.git
|
||||
RUN cd ujetl && mvn -e package
|
||||
|
||||
FROM openjdk:11 as runner
|
||||
LABEL maintainer="Derry Hamilton <derryh@rasilon.net>"
|
||||
|
||||
RUN apt update && apt upgrade -y && apt install -y bash
|
||||
|
||||
RUN mkdir -p /usr/share/ujetl/lib/ /var/ujetl /etc/ujetl
|
||||
|
||||
COPY --from=builder /ujetl/target/CopyingApp-2.*-jar-with-dependencies.jar /usr/share/ujetl/lib/CopyingApp.jar
|
||||
COPY --from=builder /ujetl/install_extra/copying_defaults_log4j.xml /etc/ujetl/
|
||||
COPY ujetl_entrypoint /
|
||||
CMD ["/ujetl_entrypoint"]
|
||||
|
||||
|
||||
FROM runner as tester
|
||||
COPY TEST_config_live.xml /var/ujetl/
|
||||
COPY wait_for_postgres /
|
||||
RUN apt-get install -y postgresql-client
|
||||
|
||||
|
||||
FROM runner as deploy
|
||||
# Convice docker cloud to build the deploy image
|
||||
167
docker/multistage/TEST_config_live.xml
Normal file
167
docker/multistage/TEST_config_live.xml
Normal file
|
|
@ -0,0 +1,167 @@
|
|||
<?xml version="1.0" encoding="iso-8859-1"?>
|
||||
<CopyingApp>
|
||||
<hardLimitSeconds>360000</hardLimitSeconds>
|
||||
<nRowsToLog>10000</nRowsToLog>
|
||||
<blockSize>1000</blockSize>
|
||||
<pollTimeout>500</pollTimeout>
|
||||
<drivers>
|
||||
<driver>org.postgresql.Driver</driver>
|
||||
<driver>org.relique.jdbc.csv.CsvDriver</driver>
|
||||
</drivers>
|
||||
|
||||
<source>
|
||||
<dsn>jdbc:postgresql://testdb:5432/test</dsn>
|
||||
<username>test</username>
|
||||
<password>test</password>
|
||||
<networkTimeout>600000</networkTimeout>
|
||||
</source>
|
||||
<dest>
|
||||
<dsn>jdbc:postgresql://testdb:5432/test</dsn>
|
||||
<username>test</username>
|
||||
<password>test</password>
|
||||
</dest>
|
||||
<jobs>
|
||||
<job>
|
||||
<name>test</name>
|
||||
<identifySourceSQL>select 'PID:'||pg_backend_pid()</identifySourceSQL>
|
||||
<identifyDestinationSQL>select 'PID:'||pg_backend_pid()</identifyDestinationSQL>
|
||||
<key>select coalesce(-1,max(id),-1) as key from dest</key>
|
||||
<select>
|
||||
select
|
||||
id,
|
||||
test_int,
|
||||
test_text,
|
||||
test_ts
|
||||
from
|
||||
public.source where id > ?::bigint</select>
|
||||
<insert>
|
||||
insert into public.dest(
|
||||
id,
|
||||
test_int,
|
||||
test_text,
|
||||
test_ts
|
||||
)values(
|
||||
?::bigint,
|
||||
?::integer,
|
||||
?::text,
|
||||
?::timestamp with time zone
|
||||
)ON CONFLICT(id) DO UPDATE
|
||||
set
|
||||
test_int = EXCLUDED.test_int,
|
||||
test_text = EXCLUDED.test_text,
|
||||
test_ts = EXCLUDED.test_ts
|
||||
WHERE
|
||||
dest.test_int = EXCLUDED.test_int
|
||||
OR dest.test_text = EXCLUDED.test_text
|
||||
OR dest.test_ts = EXCLUDED.test_ts
|
||||
</insert>
|
||||
</job>
|
||||
<job>
|
||||
<name>test upsert</name>
|
||||
<identifySourceSQL>select 'PID:'||pg_backend_pid()</identifySourceSQL>
|
||||
<identifyDestinationSQL>select 'PID:'||pg_backend_pid()</identifyDestinationSQL>
|
||||
<key>select -1 as key</key>
|
||||
<select>
|
||||
select
|
||||
id,
|
||||
test_int,
|
||||
test_text,
|
||||
test_ts
|
||||
from
|
||||
public.source where id > ?::bigint</select>
|
||||
<insert>
|
||||
insert into public.dest(
|
||||
id,
|
||||
test_int,
|
||||
test_text,
|
||||
test_ts
|
||||
)values(
|
||||
?::bigint,
|
||||
?::integer,
|
||||
?::text,
|
||||
?::timestamp with time zone
|
||||
)ON CONFLICT(id) DO UPDATE
|
||||
set
|
||||
test_int = EXCLUDED.test_int,
|
||||
test_text = EXCLUDED.test_text,
|
||||
test_ts = EXCLUDED.test_ts
|
||||
WHERE
|
||||
dest.test_int IS DISTINCT FROM EXCLUDED.test_int
|
||||
OR dest.test_text IS DISTINCT FROM EXCLUDED.test_text
|
||||
OR dest.test_ts IS DISTINCT FROM EXCLUDED.test_ts
|
||||
</insert>
|
||||
</job>
|
||||
<job>
|
||||
<name>denormalise</name>
|
||||
<identifySourceSQL>select 'PID:'||pg_backend_pid()</identifySourceSQL>
|
||||
<identifyDestinationSQL>select 'PID:'||pg_backend_pid()</identifyDestinationSQL>
|
||||
<key>select -1 as key</key>
|
||||
<select>select person_id,fname,lname from normalised_personalia p join normalised_first_names f using(fid) join normalised_last_names l using(lid) where ?::integer is not null;</select>
|
||||
<insert>
|
||||
INSERT INTO denormalised_personalia(person_id,fname,lname)
|
||||
values(?::integer,?::text,?::text)
|
||||
ON CONFLICT (person_id) DO UPDATE
|
||||
SET
|
||||
fname = EXCLUDED.fname,
|
||||
lname = EXCLUDED.lname
|
||||
WHERE
|
||||
denormalised_personalia.fname is distinct from EXCLUDED.fname
|
||||
OR denormalised_personalia.lname is distinct from EXCLUDED.lname
|
||||
</insert>
|
||||
</job>
|
||||
<job>
|
||||
<name>test pre post</name>
|
||||
<key>select -1 as key</key>
|
||||
<select>
|
||||
select
|
||||
id,
|
||||
test_int,
|
||||
test_text,
|
||||
test_ts
|
||||
from
|
||||
public.source where id > ?::bigint
|
||||
</select>
|
||||
<preTarget>
|
||||
drop table if exists tmp_dest;
|
||||
create temp table tmp_dest(
|
||||
id bigint,
|
||||
test_int integer,
|
||||
test_text text,
|
||||
test_ts timestamp with time zone
|
||||
);
|
||||
</preTarget>
|
||||
<insert>
|
||||
insert into tmp_dest(
|
||||
id,
|
||||
test_int,
|
||||
test_text,
|
||||
test_ts
|
||||
)values(
|
||||
?::bigint,
|
||||
?::integer,
|
||||
?::text,
|
||||
?::timestamp with time zone
|
||||
)
|
||||
</insert>
|
||||
<insert>
|
||||
insert into public.dest(
|
||||
id,
|
||||
test_int,
|
||||
test_text,
|
||||
test_ts
|
||||
)
|
||||
select id,test_int,test_text,test_ts
|
||||
from tmp_dest
|
||||
ON CONFLICT(id) DO UPDATE
|
||||
set
|
||||
test_int = EXCLUDED.test_int,
|
||||
test_text = EXCLUDED.test_text,
|
||||
test_ts = EXCLUDED.test_ts
|
||||
WHERE
|
||||
dest.test_int IS DISTINCT FROM EXCLUDED.test_int
|
||||
OR dest.test_text IS DISTINCT FROM EXCLUDED.test_text
|
||||
OR dest.test_ts IS DISTINCT FROM EXCLUDED.test_ts
|
||||
</insert>
|
||||
</job>
|
||||
</jobs>
|
||||
</CopyingApp>
|
||||
4
docker/multistage/small.csv
Normal file
4
docker/multistage/small.csv
Normal file
|
|
@ -0,0 +1,4 @@
|
|||
id,dat
|
||||
1,banana
|
||||
2,potato
|
||||
3,nugget
|
||||
|
7
docker/run/ujetl_entrypoint → docker/multistage/ujetl_entrypoint
Normal file → Executable file
7
docker/run/ujetl_entrypoint → docker/multistage/ujetl_entrypoint
Normal file → Executable file
|
|
@ -4,12 +4,17 @@ set -e
|
|||
LOG_PROPS=/etc/ujetl/copying_defaults_log4j.xml
|
||||
|
||||
cd /var/ujetl
|
||||
echo Currently in `pwd`
|
||||
echo processing files:
|
||||
ls
|
||||
echo Starting run loop
|
||||
for file in *.xml
|
||||
do
|
||||
/usr/bin/java \
|
||||
/usr/local/openjdk-11/bin/java \
|
||||
-Xms1g \
|
||||
-Xmx2g \
|
||||
-cp /usr/share/ujetl/lib/CopyingApp.jar \
|
||||
-Dlog4j.configurationFile="$LOG_PROPS" \
|
||||
com.rasilon.ujetl.CopyingApp \
|
||||
--log4j "$LOG_PROPS" \
|
||||
--config "$file"
|
||||
21
docker/multistage/wait_for_postgres
Executable file
21
docker/multistage/wait_for_postgres
Executable file
|
|
@ -0,0 +1,21 @@
|
|||
#!/bin/bash
|
||||
set -e
|
||||
|
||||
cmd="$@"
|
||||
|
||||
until PGPASSWORD=test psql -h "testdb" -U "test" -c 'SELECT 1 FROM public.container_ready' postgres; do
|
||||
>&2 echo "Postgres is unavailable - sleeping"
|
||||
sleep 1
|
||||
done
|
||||
|
||||
>&2 echo "Postgres is up - Waiting for the reboot"
|
||||
sleep 3 # Wait for the Postgres reboot at the end of setup
|
||||
|
||||
until PGPASSWORD=test psql -h "testdb" -U "test" -c 'SELECT 1 FROM public.container_ready' postgres; do
|
||||
>&2 echo "Postgres is unavailable - sleeping"
|
||||
sleep 1
|
||||
done
|
||||
|
||||
|
||||
>&2 echo "Postgres is up - executing command"
|
||||
exec $cmd
|
||||
|
|
@ -1,18 +0,0 @@
|
|||
# General build RPM environment for CentOS 6.x
|
||||
#
|
||||
# VERSION 0.0.1
|
||||
|
||||
FROM centos:centos7
|
||||
MAINTAINER Derry Hamilton <derryh@rasilon.net>
|
||||
|
||||
# Install up-to-date epel rpm repository
|
||||
RUN yum -y install epel-release
|
||||
|
||||
# Install java first, to get a sensible one.
|
||||
RUN yum -y install java-1.8.0-openjdk-devel
|
||||
|
||||
COPY uJETL-2.*.x86_64.rpm /tmp/
|
||||
COPY ujetl_entrypoint /
|
||||
RUN rpm -i /tmp/uJETL-2.*.x86_64.rpm
|
||||
ENTRYPOINT ["/ujetl_entrypoint"]
|
||||
|
||||
23
docker/test_compose/docker-compose.yml
Normal file
23
docker/test_compose/docker-compose.yml
Normal file
|
|
@ -0,0 +1,23 @@
|
|||
# This is a sample to help put the full application together
|
||||
|
||||
version: '3.3'
|
||||
|
||||
services:
|
||||
testdb:
|
||||
image: rasilon/ujetl_testdb:latest
|
||||
build:
|
||||
context: ../test_db
|
||||
environment:
|
||||
POSTGRES_USER: postgres
|
||||
POSTGRES_PASSWORD: password
|
||||
POSTGRES_DB: postgres
|
||||
tests:
|
||||
image: rasilon/ujetl_tester:latest
|
||||
build:
|
||||
context: ../multistage
|
||||
links:
|
||||
- "testdb"
|
||||
command: ["/wait_for_postgres", "/ujetl_entrypoint"]
|
||||
|
||||
|
||||
|
||||
3
docker/test_db/Dockerfile
Normal file
3
docker/test_db/Dockerfile
Normal file
|
|
@ -0,0 +1,3 @@
|
|||
FROM postgres:11
|
||||
COPY setup.sql /docker-entrypoint-initdb.d/
|
||||
COPY is_ready /
|
||||
3
docker/test_db/is_ready
Normal file
3
docker/test_db/is_ready
Normal file
|
|
@ -0,0 +1,3 @@
|
|||
#!/bin/bash
|
||||
/usr/lib/postgresql/9.6/bin/psql -U postgres -c "SELECT 1 FROM public.container_ready" postgres
|
||||
|
||||
58
docker/test_db/setup.sql
Normal file
58
docker/test_db/setup.sql
Normal file
|
|
@ -0,0 +1,58 @@
|
|||
CREATE DATABASE test;
|
||||
\c test
|
||||
CREATE ROLE test login password 'test';
|
||||
CREATE UNLOGGED TABLE source (
|
||||
id bigserial primary key,
|
||||
test_int integer,
|
||||
test_text text,
|
||||
test_ts timestamp with time zone
|
||||
);
|
||||
CREATE UNLOGGED TABLE dest (
|
||||
id bigint primary key,
|
||||
test_int integer,
|
||||
test_text text,
|
||||
test_ts timestamp with time zone
|
||||
);
|
||||
|
||||
|
||||
GRANT SELECT ON source to test;
|
||||
GRANT SELECT,INSERT,UPDATE,DELETE ON dest TO test;
|
||||
|
||||
INSERT INTO source(test_int,test_text,test_ts) SELECT 1,'banana',now() FROM generate_series(1,100000);
|
||||
|
||||
CREATE TABLE normalised_first_names(
|
||||
fid smallserial not null primary key,
|
||||
fname text not null unique
|
||||
);
|
||||
CREATE TABLE normalised_last_names(
|
||||
lid smallserial not null primary key,
|
||||
lname text not null unique
|
||||
);
|
||||
INSERT INTO normalised_first_names (fname) values ('Abigail'), ('Adam'), ('Beatrice'), ('Bruce'), ('Claire'), ('Clive'), ('Deborah'), ('Dave');
|
||||
INSERT INTO normalised_last_names (lname) values ('Adams'), ('Bellamy'), ('Clark'), ('Dabrowski');
|
||||
|
||||
CREATE TABLE normalised_personalia (
|
||||
person_id serial not null primary key,
|
||||
fid smallint not null references normalised_first_names(fid),
|
||||
lid smallint not null references normalised_last_names(lid)
|
||||
);
|
||||
insert into normalised_personalia(fid,lid) values (1,1), (1,2), (1,3), (1,4), (2,1), (2,2), (2,3), (2,4), (3,1), (3,2), (3,3), (3,4), (4,1), (4,2), (4,3), (4,4);
|
||||
|
||||
CREATE TABLE denormalised_personalia(
|
||||
person_id integer not null primary key,
|
||||
fname text,
|
||||
lname text
|
||||
);
|
||||
|
||||
CREATE TABLE test_csvjdbc(
|
||||
id integer not null primary key,
|
||||
dat text
|
||||
);
|
||||
|
||||
GRANT SELECT ON ALL TABLES IN SCHEMA public TO test;
|
||||
GRANT SELECT,INSERT,UPDATE ON denormalised_personalia TO test;
|
||||
|
||||
\c postgres
|
||||
CREATE TABLE public.container_ready AS SELECT 1 FROM(VALUES(1)) AS a(a);
|
||||
GRANT SELECT ON public.container_ready TO TEST;
|
||||
|
||||
|
|
@ -30,9 +30,9 @@ fi
|
|||
/usr/bin/java \
|
||||
-Xms1g \
|
||||
-Xmx2g \
|
||||
-Dlog4j.configurationFile="$LOG_PROPS" \
|
||||
-cp /usr/share/ujetl/lib/CopyingApp.jar \
|
||||
com.rasilon.ujetl.CopyingApp \
|
||||
--log4j "$LOG_PROPS" \
|
||||
--config "/etc/ujetl/${JOBNAME}_config_live.xml"
|
||||
|
||||
#rm -f $LOCKFILE
|
||||
|
|
|
|||
60
pom.xml
60
pom.xml
|
|
@ -6,19 +6,42 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma
|
|||
<groupId>com.rasilon.ujetl</groupId>
|
||||
<artifactId>CopyingApp</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<version>2.0.1</version>
|
||||
<version>2.5.2</version>
|
||||
<name>uJETL</name>
|
||||
<url></url>
|
||||
<url>https://github.com/rasilon/ujetl</url>
|
||||
<properties>
|
||||
<project.build.sourceEncoding>
|
||||
UTF-8</project.build.sourceEncoding>
|
||||
</properties>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>4.12</version>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter-api</artifactId>
|
||||
<version>5.4.2</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter-engine</artifactId>
|
||||
<version>5.4.2</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.junit.vintage</groupId>
|
||||
<artifactId>junit-vintage-engine</artifactId>
|
||||
<version>5.4.2</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.h2database</groupId>
|
||||
<artifactId>h2</artifactId>
|
||||
<version>2.2.220</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
<version>3.9</version>
|
||||
<version>3.18.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-logging</groupId>
|
||||
|
|
@ -28,13 +51,12 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma
|
|||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-configuration2</artifactId>
|
||||
<version>2.4</version>
|
||||
<version>2.10.1</version>
|
||||
</dependency>
|
||||
<!-- https://mvnrepository.com/artifact/commons-beanutils/commons-beanutils -->
|
||||
<dependency>
|
||||
<groupId>commons-beanutils</groupId>
|
||||
<artifactId>commons-beanutils</artifactId>
|
||||
<version>1.9.3</version>
|
||||
<version>1.11.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.beust</groupId>
|
||||
|
|
@ -44,27 +66,31 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma
|
|||
<dependency>
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-api</artifactId>
|
||||
<version>2.11.2</version>
|
||||
<version>2.17.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-core</artifactId>
|
||||
<version>2.11.2</version>
|
||||
<version>2.25.3</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.postgresql</groupId>
|
||||
<artifactId>postgresql</artifactId>
|
||||
<version>42.2.5</version>
|
||||
<version>42.7.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>net.sourceforge.csvjdbc</groupId>
|
||||
<artifactId>csvjdbc</artifactId>
|
||||
<version>1.0.40</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<version>2.3.2</version>
|
||||
<version>3.8.0</version>
|
||||
<configuration>
|
||||
<source>1.8</source>
|
||||
<target>1.8</target>
|
||||
<release>11</release>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
|
|
@ -88,6 +114,10 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma
|
|||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<version>2.22.0</version>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
||||
|
|
|
|||
|
|
@ -34,10 +34,6 @@ public class CopyingApp {
|
|||
public static void main(String[] args) {
|
||||
CopyingAppCommandParser cli = new CopyingAppCommandParser(args);
|
||||
LoggerContext context = (org.apache.logging.log4j.core.LoggerContext) LogManager.getContext(false);
|
||||
String log4jConfigLocation = cli.getLog4jConfigFile();
|
||||
File file = new File(log4jConfigLocation);
|
||||
context.setConfigLocation(file.toURI());
|
||||
System.out.println("Config set from "+file.toURI());
|
||||
|
||||
CopyingApp app = new CopyingApp(cli);
|
||||
try {
|
||||
|
|
@ -79,6 +75,7 @@ public class CopyingApp {
|
|||
|
||||
Configuration config = configs.xml(cli.getConfigFile());
|
||||
|
||||
loadDrivers(config);
|
||||
String hardLimitSeconds = config.getString("hardLimitSeconds");
|
||||
if(hardLimitSeconds != null) {
|
||||
TimeLimiter hardLimit = new TimeLimiter(Integer.decode(hardLimitSeconds).intValue(),true);
|
||||
|
|
@ -110,7 +107,7 @@ public class CopyingApp {
|
|||
|
||||
Integer pollTimeout = null;
|
||||
try {
|
||||
pollTimeout = new Integer(config.getString("nRowsToLog"));
|
||||
pollTimeout = new Integer(config.getString("pollTimeout"));
|
||||
log.info(String.format("%s - Setting Poll timeout to %s milliseconds", jobName, pollTimeout));
|
||||
} catch(Exception e) {
|
||||
pollTimeout = new Integer(1000); // If we don't have a new setting, use the old default
|
||||
|
|
@ -131,7 +128,27 @@ public class CopyingApp {
|
|||
String tabKey = config.getString("jobs.job("+i+").key");
|
||||
String tabSelect = config.getString("jobs.job("+i+").select");
|
||||
String tabInsert = config.getString("jobs.job("+i+").insert");
|
||||
Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,nRowsToLog,blockSize,pollTimeout);
|
||||
String preTarget = config.getString("jobs.job("+i+").preTarget");
|
||||
String postTarget = config.getString("jobs.job("+i+").postTarget");
|
||||
String identifySourceSQL = config.getString("jobs.job.identifySourceSQL");
|
||||
String identifyDestinationSQL = config.getString("jobs.job.identifyDestinationSQL");
|
||||
|
||||
Job j = new Job(
|
||||
sConn,
|
||||
dConn,
|
||||
tabName,
|
||||
jobName,
|
||||
tabKey,
|
||||
tabSelect,
|
||||
tabInsert,
|
||||
preTarget,
|
||||
postTarget,
|
||||
nRowsToLog,
|
||||
blockSize,
|
||||
pollTimeout,
|
||||
identifySourceSQL,
|
||||
identifyDestinationSQL
|
||||
);
|
||||
j.start();
|
||||
j.join();
|
||||
|
||||
|
|
@ -141,7 +158,28 @@ public class CopyingApp {
|
|||
String tabKey = config.getString("jobs.job.key");
|
||||
String tabSelect = config.getString("jobs.job.select");
|
||||
String tabInsert = config.getString("jobs.job.insert");
|
||||
Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,nRowsToLog,blockSize,pollTimeout);
|
||||
String preTarget = config.getString("jobs.job.preTarget");
|
||||
String postTarget = config.getString("jobs.job.postTarget");
|
||||
String identifySourceSQL = config.getString("jobs.job.identifySourceSQL");
|
||||
String identifyDestinationSQL = config.getString("jobs.job.identifyDestinationSQL");
|
||||
|
||||
|
||||
Job j = new Job(
|
||||
sConn,
|
||||
dConn,
|
||||
tabName,
|
||||
jobName,
|
||||
tabKey,
|
||||
tabSelect,
|
||||
tabInsert,
|
||||
preTarget,
|
||||
postTarget,
|
||||
nRowsToLog,
|
||||
blockSize,
|
||||
pollTimeout,
|
||||
identifySourceSQL,
|
||||
identifyDestinationSQL
|
||||
);
|
||||
j.start();
|
||||
j.join();
|
||||
} else {
|
||||
|
|
@ -203,4 +241,21 @@ public class CopyingApp {
|
|||
|
||||
return c;
|
||||
}
|
||||
|
||||
// Even with JDBC 4, some drivers don't play nicely with whatever
|
||||
// the classloaders are up to. So this allows us to force it the
|
||||
// old fashioned way, and works around the
|
||||
// "But it works fine when it's the /only/ driver!"
|
||||
// cross-database problem
|
||||
private void loadDrivers(Configuration config) {
|
||||
String[] drivers = config.get(String[].class, "drivers.driver");
|
||||
for(String d:drivers) {
|
||||
try {
|
||||
Class.forName(d);
|
||||
log.info("Preloaded driver "+d);
|
||||
} catch(ClassNotFoundException e) {
|
||||
log.error("Could not preload driver "+d,e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ public class CopyingAppCommandParser {
|
|||
private String configFile;
|
||||
|
||||
@Parameter(names = {"-log4j","--log4j"}, description = "Log4J config file for this run")
|
||||
private String log4jConfigFile = "/etc/ppl/default_log4j_config.properties";
|
||||
private String log4jConfigFile = "/etc/ujetl/default_log4j_config.properties";
|
||||
|
||||
public CopyingAppCommandParser(String[] args) {
|
||||
super();
|
||||
|
|
@ -23,8 +23,4 @@ public class CopyingAppCommandParser {
|
|||
return configFile;
|
||||
}
|
||||
|
||||
public String getLog4jConfigFile() {
|
||||
return log4jConfigFile;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,23 +22,29 @@ import org.apache.logging.log4j.Logger;
|
|||
public class Job extends Thread {
|
||||
static Logger log = org.apache.logging.log4j.LogManager.getLogger(Job.class);
|
||||
|
||||
Connection sConn;
|
||||
Connection dConn;
|
||||
String name;
|
||||
String jobName;
|
||||
String key;
|
||||
String select;
|
||||
String insert;
|
||||
Integer nRowsToLog;
|
||||
Integer blockSize;
|
||||
Integer pollTimeout;
|
||||
private Connection sConn;
|
||||
private Connection dConn;
|
||||
private String name;
|
||||
private String jobName;
|
||||
private String key;
|
||||
private String select;
|
||||
private String insert;
|
||||
private String preTarget;
|
||||
private String postTarget;
|
||||
private Integer nRowsToLog;
|
||||
private Integer blockSize;
|
||||
private Integer pollTimeout;
|
||||
private String identifySourceSQL;
|
||||
private String identifyDestinationSQL;
|
||||
|
||||
BlockingQueue<List<String>> resultBuffer;
|
||||
AtomicBoolean producerLive;
|
||||
AtomicBoolean threadsExit = new AtomicBoolean(false);;
|
||||
private BlockingQueue<List<String>> resultBuffer;
|
||||
private AtomicBoolean producerLive;
|
||||
private AtomicBoolean threadsExit = new AtomicBoolean(false);;
|
||||
private String sourceID;
|
||||
private String destID;
|
||||
|
||||
|
||||
public Job(Connection sConn,Connection dConn,String name,String jobName,String key,String select,String insert,Integer nRowsToLog,Integer blockSize,Integer pollTimeout) {
|
||||
public Job(Connection sConn,Connection dConn,String name,String jobName,String key,String select,String insert,String preTarget,String postTarget,Integer nRowsToLog,Integer blockSize,Integer pollTimeout,String identifySourceSQL, String identifyDestinationSQL) {
|
||||
this.sConn = sConn;
|
||||
this.dConn = dConn;
|
||||
this.name = name;
|
||||
|
|
@ -46,9 +52,13 @@ public class Job extends Thread {
|
|||
this.key = key;
|
||||
this.select = select;
|
||||
this.insert = insert;
|
||||
this.preTarget = preTarget;
|
||||
this.postTarget = postTarget;
|
||||
this.nRowsToLog = nRowsToLog;
|
||||
this.blockSize = blockSize;
|
||||
this.pollTimeout = pollTimeout;
|
||||
this.identifySourceSQL = identifySourceSQL;
|
||||
this.identifyDestinationSQL = identifyDestinationSQL;
|
||||
|
||||
resultBuffer = new ArrayBlockingQueue<List<String>>( 3 * blockSize);
|
||||
producerLive = new AtomicBoolean(true);
|
||||
|
|
@ -72,12 +82,12 @@ public class Job extends Thread {
|
|||
public Producer(ResultSet src,BlockingQueue q) {
|
||||
this.src = src;
|
||||
this.q = q;
|
||||
this.setName(String.format("%s-%s-Consumer",jobName,name));
|
||||
this.setName(String.format("%s-%s-Producer",jobName,name));
|
||||
}
|
||||
public void run() {
|
||||
try {
|
||||
long rowsInserted = 0;
|
||||
long rowNum = 0;
|
||||
long rowsAttempted = 0;
|
||||
long stamp = System.nanoTime();
|
||||
long nstamp;
|
||||
int columnCount = src.getMetaData().getColumnCount();
|
||||
|
|
@ -95,13 +105,13 @@ public class Job extends Thread {
|
|||
}
|
||||
log.trace("Producer queue full.");
|
||||
}
|
||||
rowNum++;
|
||||
if(rowNum % nRowsToLog == 0) {
|
||||
log.info(String.format("%s - Queued %s rows for %s so far",jobName,rowNum,name));
|
||||
rowsAttempted++;
|
||||
if(rowsAttempted % nRowsToLog == 0) {
|
||||
log.info(String.format("%s - Queued %s rows for %s so far",jobName,rowsAttempted,name));
|
||||
}
|
||||
}
|
||||
producerLive.set(false);
|
||||
log.info(String.format("%s - Queued a total of %s rows for %s",jobName,rowNum,name));
|
||||
log.info(String.format("%s - Queued a total of %s rows for %s",jobName,rowsAttempted,name));
|
||||
} catch(Exception e) {
|
||||
producerLive.set(false); // Signal we've exited.
|
||||
threadsExit.set(true); // Signal we've exited.
|
||||
|
|
@ -122,7 +132,7 @@ public class Job extends Thread {
|
|||
}
|
||||
public void run() {
|
||||
try {
|
||||
long rowNum = 0;
|
||||
long rowsAttempted = 0;
|
||||
long rowsInserted = 0;
|
||||
|
||||
while(true) {
|
||||
|
|
@ -133,7 +143,7 @@ public class Job extends Thread {
|
|||
if(row == null && producerLive.get() == false) {
|
||||
rowsInserted += arraySum(insertStatement.executeBatch());
|
||||
dConn.commit();
|
||||
log.info(String.format("%s - Inserted a total of %s of %s notified rows into %s",jobName,rowNum,rowsInserted,name));
|
||||
log.info(String.format("%s - Inserted a total of %s of %s notified rows into %s",jobName,rowsInserted,rowsAttempted,name));
|
||||
return;
|
||||
}
|
||||
if(threadsExit.get()) {
|
||||
|
|
@ -150,14 +160,14 @@ public class Job extends Thread {
|
|||
}
|
||||
insertStatement.addBatch();
|
||||
|
||||
rowNum++;
|
||||
if(rowNum % nRowsToLog == 0) {
|
||||
rowsAttempted++;
|
||||
if(rowsAttempted % nRowsToLog == 0) {
|
||||
rowsInserted += arraySum(insertStatement.executeBatch());
|
||||
dConn.commit();
|
||||
log.info(String.format("%s - Inserted %s of %s notified rows into %s so far",
|
||||
log.info(String.format("%s - Inserted %s of %s notified rows into %s",
|
||||
jobName,
|
||||
rowNum,
|
||||
rowsInserted,
|
||||
rowsAttempted,
|
||||
name));
|
||||
}
|
||||
}
|
||||
|
|
@ -169,11 +179,34 @@ public class Job extends Thread {
|
|||
}
|
||||
}
|
||||
|
||||
// Outer run
|
||||
public void run() {
|
||||
try {
|
||||
ResultSet rs;
|
||||
|
||||
if(identifySourceSQL != null) sourceID = getSingleString(identifySourceSQL,sConn);
|
||||
if(identifyDestinationSQL != null) destID = getSingleString(identifyDestinationSQL,dConn);
|
||||
|
||||
if(sourceID != null || destID != null){
|
||||
log.info(String.format(
|
||||
"%s - Processing table: %s with source: %s, dest: %s",
|
||||
jobName,
|
||||
name,
|
||||
sourceID==null?"":sourceID,
|
||||
destID==null?"":destID
|
||||
));
|
||||
}else{
|
||||
log.info(String.format("%s - Processing table: %s",jobName,name));
|
||||
}
|
||||
if(preTarget != null){
|
||||
log.info(String.format("%s - Trying to execute preTarget SQL",jobName));
|
||||
PreparedStatement s = dConn.prepareStatement(preTarget);
|
||||
s.executeUpdate();
|
||||
s.close();
|
||||
dConn.commit();
|
||||
}else{
|
||||
log.info(String.format("%s - No preTarget; skipping.",jobName));
|
||||
}
|
||||
|
||||
log.debug("Trying to execute: "+key);
|
||||
PreparedStatement keyStatement = dConn.prepareStatement(key);
|
||||
|
|
@ -211,10 +244,33 @@ public class Job extends Thread {
|
|||
p.join();
|
||||
c.join();
|
||||
|
||||
if(postTarget != null){
|
||||
log.info(String.format("%s - Trying to execute postTarget SQL",jobName));
|
||||
PreparedStatement s = dConn.prepareStatement(postTarget);
|
||||
s.executeUpdate();
|
||||
s.close();
|
||||
dConn.commit();
|
||||
}else{
|
||||
log.info(String.format("%s - No postTarget; skipping.",jobName));
|
||||
}
|
||||
|
||||
|
||||
} catch(InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
} catch(SQLException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private String getSingleString(String sql, Connection conn){
|
||||
try{
|
||||
PreparedStatement s = conn.prepareStatement(sql);
|
||||
ResultSet r = s.executeQuery();
|
||||
r.next();
|
||||
return r.getString(1);
|
||||
} catch(SQLException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
37
src/test/java/com/rasilon/ujetl/TestConfig.java
Normal file
37
src/test/java/com/rasilon/ujetl/TestConfig.java
Normal file
|
|
@ -0,0 +1,37 @@
|
|||
package com.rasilon.ujetl;
|
||||
|
||||
import org.apache.commons.configuration2.Configuration;
|
||||
import org.apache.commons.configuration2.builder.fluent.Configurations;
|
||||
import org.apache.commons.configuration2.ex.ConfigurationException;
|
||||
|
||||
import org.apache.commons.beanutils.PropertyUtils; // Why does config need this?
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.MethodOrderer.Alphanumeric;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
|
||||
|
||||
/**
|
||||
* @author derryh
|
||||
*
|
||||
*/
|
||||
public class TestConfig {
|
||||
|
||||
@Test
|
||||
public void test001VerifyArrayOfDrivers() {
|
||||
try {
|
||||
Configurations configs = new Configurations();
|
||||
Configuration config = configs.xml("TEST_config_live.xml");
|
||||
String[] drivers = config.get(String[].class, "drivers.driver");
|
||||
int ndrivers =drivers.length;
|
||||
if(ndrivers != 3){
|
||||
fail("Expected 3 drivers, but found "+ndrivers);
|
||||
}
|
||||
} catch(Exception e) {
|
||||
fail(e.toString());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
64
src/test/java/com/rasilon/ujetl/TestJob.java
Normal file
64
src/test/java/com/rasilon/ujetl/TestJob.java
Normal file
|
|
@ -0,0 +1,64 @@
|
|||
package com.rasilon.ujetl;
|
||||
|
||||
import java.sql.*;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.MethodOrderer.Alphanumeric;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
|
||||
|
||||
public class TestJob {
|
||||
|
||||
private static String jdbcURL = "jdbc:h2:mem:dbtest";
|
||||
@Test
|
||||
public void test002verifyH2Works() {
|
||||
try {
|
||||
Connection conn = DriverManager.getConnection(jdbcURL, "sa", "");
|
||||
conn.close();
|
||||
} catch(Exception e) {
|
||||
fail(e.toString());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJob() {
|
||||
try (
|
||||
Connection src = DriverManager.getConnection(jdbcURL, "sa", "");
|
||||
Connection dest = DriverManager.getConnection(jdbcURL, "sa", "");
|
||||
|
||||
) {
|
||||
src.createStatement().executeUpdate("CREATE TABLE src(id bigint not null primary key, dat varchar);");
|
||||
dest.createStatement().executeUpdate("CREATE TABLE dest(id bigint not null primary key, dat varchar);");
|
||||
PreparedStatement inserter = src.prepareStatement("INSERT INTO src(id,dat) VALUES(?,'banana')");
|
||||
for(int i=0; i<10000; i++) {
|
||||
inserter.setInt(1,i);
|
||||
inserter.executeUpdate();
|
||||
}
|
||||
|
||||
Job j = new Job(
|
||||
src,
|
||||
dest,
|
||||
"jUnit Test Config",
|
||||
"jUnit Test Job",
|
||||
"SELECT -1 AS \"key\"",
|
||||
"SELECT id,dat FROM src WHERE id > ?",
|
||||
"INSERT INTO dest VALUES(?,?)",
|
||||
null,
|
||||
null,
|
||||
100,
|
||||
100,
|
||||
100,
|
||||
"select 'PID:'||session_id()",
|
||||
"select 'PID:'||session_id()"
|
||||
);
|
||||
j.start();
|
||||
j.join();
|
||||
// do stuff
|
||||
} catch(Exception e) {
|
||||
e.printStackTrace();
|
||||
fail(e.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
27
src/test/java/com/rasilon/ujetl/TestParser.java
Normal file
27
src/test/java/com/rasilon/ujetl/TestParser.java
Normal file
|
|
@ -0,0 +1,27 @@
|
|||
package com.rasilon.ujetl;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.MethodOrderer.Alphanumeric;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
|
||||
|
||||
public class TestParser {
|
||||
|
||||
@Test
|
||||
public void test001Parset() {
|
||||
try {
|
||||
String[] args = {
|
||||
"--config",
|
||||
"config_test_banana.xml"
|
||||
};
|
||||
CopyingAppCommandParser p = new CopyingAppCommandParser(args);
|
||||
|
||||
assertEquals(p.getConfigFile(),"config_test_banana.xml");
|
||||
|
||||
} catch(Exception e) {
|
||||
fail(e.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
64
src/test/java/com/rasilon/ujetl/TestPrePost.java
Normal file
64
src/test/java/com/rasilon/ujetl/TestPrePost.java
Normal file
|
|
@ -0,0 +1,64 @@
|
|||
package com.rasilon.ujetl;
|
||||
|
||||
import java.sql.*;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.MethodOrderer.Alphanumeric;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
|
||||
|
||||
public class TestPrePost {
|
||||
|
||||
private static String jdbcURL = "jdbc:h2:mem:dbtest";
|
||||
@Test
|
||||
public void test002verifyH2Works() {
|
||||
try {
|
||||
Connection conn = DriverManager.getConnection(jdbcURL, "sa", "");
|
||||
conn.close();
|
||||
} catch(Exception e) {
|
||||
fail(e.toString());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPrePost() {
|
||||
try (
|
||||
Connection src = DriverManager.getConnection(jdbcURL, "sa", "");
|
||||
Connection dest = DriverManager.getConnection(jdbcURL, "sa", "");
|
||||
|
||||
) {
|
||||
src.createStatement().executeUpdate("CREATE TABLE src(id bigint not null primary key, dat varchar);");
|
||||
dest.createStatement().executeUpdate("CREATE TABLE dest(id bigint not null primary key, dat varchar);");
|
||||
PreparedStatement inserter = src.prepareStatement("INSERT INTO src(id,dat) VALUES(?,'banana')");
|
||||
for(int i=0; i<10000; i++) {
|
||||
inserter.setInt(1,i);
|
||||
inserter.executeUpdate();
|
||||
}
|
||||
|
||||
Job j = new Job(
|
||||
src,
|
||||
dest,
|
||||
"jUnit Test Config",
|
||||
"jUnit Test Job",
|
||||
"SELECT -1 AS \"key\"",
|
||||
"SELECT id,dat FROM src WHERE id > ?",
|
||||
"INSERT INTO tmp_dest VALUES(?,?)",
|
||||
"CREATE TEMP TABLE tmp_dest(id bigint not null primary key, dat varchar);",
|
||||
"INSERT INTO dest SELECT * from tmp_dest;",
|
||||
100,
|
||||
100,
|
||||
100,
|
||||
"select 'PID:'||session_id()",
|
||||
"select 'PID:'||session_id()"
|
||||
);
|
||||
j.start();
|
||||
j.join();
|
||||
// do stuff
|
||||
} catch(Exception e) {
|
||||
e.printStackTrace();
|
||||
fail(e.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
28
src/test/java/com/rasilon/ujetl/TestTimeLimiter.java
Normal file
28
src/test/java/com/rasilon/ujetl/TestTimeLimiter.java
Normal file
|
|
@ -0,0 +1,28 @@
|
|||
package com.rasilon.ujetl;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.MethodOrderer.Alphanumeric;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
|
||||
|
||||
public class TestTimeLimiter {
|
||||
|
||||
@Test
|
||||
public void test001Limiter() {
|
||||
try {
|
||||
TimeLimiter hardLimit = new TimeLimiter(1,false);
|
||||
hardLimit.start();
|
||||
|
||||
Thread.sleep(10000);
|
||||
|
||||
fail("Sleep wasn't interrupted by the limiter!");
|
||||
} catch(java.lang.InterruptedException e) {
|
||||
// Pass
|
||||
} catch(Exception e) {
|
||||
e.printStackTrace();
|
||||
fail("Unexpected exception.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -3,6 +3,12 @@
|
|||
<hardLimitSeconds>360000</hardLimitSeconds>
|
||||
<nRowsToLog>10000</nRowsToLog>
|
||||
<blockSize>1000</blockSize>
|
||||
<pollTimeout>500</pollTimeout>
|
||||
<drivers>
|
||||
<driver>org.postgresql.Driver</driver>
|
||||
<driver>org.h2.Driver</driver>
|
||||
<driver>org.relique.jdbc.csv.CsvDriver</driver>
|
||||
</drivers>
|
||||
<source>
|
||||
<dsn>jdbc:postgresql://localhost:5432/test</dsn>
|
||||
<username>test</username>
|
||||
|
|
@ -17,6 +23,8 @@
|
|||
<jobs>
|
||||
<job>
|
||||
<name>test</name>
|
||||
<identifySourceSQL>select 'PID:'||pg_backend_pid()</identifySourceSQL>
|
||||
<identifyDestinationSQL>select 'PID:'||pg_backend_pid()</identifyDestinationSQL>
|
||||
<key>select coalesce(-1,max(id),-1) as key from dest</key>
|
||||
<select>
|
||||
select
|
||||
|
|
@ -48,5 +56,54 @@
|
|||
OR dest.test_ts = EXCLUDED.test_ts
|
||||
</insert>
|
||||
</job>
|
||||
<job>
|
||||
<name>test upsert</name>
|
||||
<key>select -1 as key</key>
|
||||
<select>
|
||||
select
|
||||
id,
|
||||
test_int,
|
||||
test_text,
|
||||
test_ts
|
||||
from
|
||||
public.source where id > ?::bigint</select>
|
||||
<insert>
|
||||
insert into public.dest(
|
||||
id,
|
||||
test_int,
|
||||
test_text,
|
||||
test_ts
|
||||
)values(
|
||||
?::bigint,
|
||||
?::integer,
|
||||
?::text,
|
||||
?::timestamp with time zone
|
||||
)ON CONFLICT(id) DO UPDATE
|
||||
set
|
||||
test_int = EXCLUDED.test_int,
|
||||
test_text = EXCLUDED.test_text,
|
||||
test_ts = EXCLUDED.test_ts
|
||||
WHERE
|
||||
dest.test_int IS DISTINCT FROM EXCLUDED.test_int
|
||||
OR dest.test_text IS DISTINCT FROM EXCLUDED.test_text
|
||||
OR dest.test_ts IS DISTINCT FROM EXCLUDED.test_ts
|
||||
</insert>
|
||||
</job>
|
||||
<job>
|
||||
<name>denormalise</name>
|
||||
<key>select -1 as key</key>
|
||||
<select>select person_id,fname,lname from normalised_personalia p join normalised_first_names f using(fid) join normalised_last_names l using(lid);</select>
|
||||
<insert>
|
||||
INSERT INTO denormalised_personalia(person_id,fname,lname)
|
||||
values(?::integer,?::text,?::text)
|
||||
ON CONFLICT (person_id) DO UPDATE
|
||||
SET
|
||||
fname = EXCLUDED.fname,
|
||||
lname = EXCLUDED.lname
|
||||
WHERE
|
||||
denormalised_personalia.fname is distinct from EXCLUDED.fname
|
||||
OR denormalised_personalia.lname is distinct from EXCLUDED.lname
|
||||
</insert>
|
||||
</job>
|
||||
</jobs>
|
||||
</CopyingApp>
|
||||
|
|
|
|||
13
src/test/resources/log4j2.xml
Normal file
13
src/test/resources/log4j2.xml
Normal file
|
|
@ -0,0 +1,13 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<Configuration status="INFO">
|
||||
<Appenders>
|
||||
<Console name="Console" target="SYSTEM_OUT">
|
||||
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
|
||||
</Console>
|
||||
</Appenders>
|
||||
<Loggers>
|
||||
<Root level="INFO">
|
||||
<AppenderRef ref="Console"/>
|
||||
</Root>
|
||||
</Loggers>
|
||||
</Configuration>
|
||||
33
uJETL.spec
33
uJETL.spec
|
|
@ -1,33 +0,0 @@
|
|||
Summary: Java app to facilitate moving data between databases.
|
||||
Name: uJETL
|
||||
Version: 2.0.1
|
||||
Release: 1
|
||||
Group: Applications/Database
|
||||
License: All rights reserved.
|
||||
Source: uJETL-%{version}.tar.gz
|
||||
URL: https://github.com/rasilon/ujetl.git
|
||||
Distribution: derryh
|
||||
Vendor: derryh
|
||||
Packager: Derry Hamilton <derryh@rasilon.net>
|
||||
#BuildRoot: .
|
||||
|
||||
%description
|
||||
A very small ETL app
|
||||
|
||||
%prep
|
||||
%setup
|
||||
|
||||
%build
|
||||
#mvn -Dmaven.test.skip=true clean package
|
||||
true
|
||||
|
||||
%install
|
||||
mkdir -p $RPM_BUILD_ROOT/usr/share/ujetl/lib $RPM_BUILD_ROOT/etc/ujetl $RPM_BUILD_ROOT/usr/bin
|
||||
cp target/CopyingApp-*-jar-with-dependencies.jar $RPM_BUILD_ROOT/usr/share/ujetl/lib/CopyingApp.jar
|
||||
cp install_extra/run_copying_job $RPM_BUILD_ROOT/usr/bin
|
||||
cp install_extra/copying_defaults_log4j.xml $RPM_BUILD_ROOT/etc/ujetl
|
||||
|
||||
%files
|
||||
/usr/share/ujetl/lib/CopyingApp.jar
|
||||
/usr/bin/run_copying_job
|
||||
/etc/ujetl/copying_defaults_log4j.xml
|
||||
Loading…
Add table
Add a link
Reference in a new issue