mirror of
https://github.com/rasilon/ujetl.git
synced 2026-04-11 18:39:30 +00:00
Compare commits
89 commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 4f16842623 | |||
|
|
d2257f1b81 | ||
| 8ab5b1b79e | |||
|
|
26d47f16c9 | ||
| 9f0fe6bb3b | |||
| bf4ed02af5 | |||
| 5f275b93d8 | |||
|
|
d509513084 | ||
| 96e28a5abb | |||
| 1da5fabb05 | |||
| 8a4d892dfa | |||
|
|
07f68922b3 | ||
| 22af36b555 | |||
|
|
d0a5075191 | ||
| 1ce7e09751 | |||
| f651fd720e | |||
| 58b78b2021 | |||
| a88ac56848 | |||
| 9a8716f33e | |||
| 1b1ba551c8 | |||
| 866d02fb52 | |||
| 441b2f4191 | |||
| b81aedefb1 | |||
| 584f83de0d | |||
| 4d38679155 | |||
| b378189512 | |||
| 49487a83af | |||
| 9f4729bf1d | |||
|
|
2126553e5c | ||
| 64743f430b | |||
| 28c918dd6a | |||
|
|
17d33ec18b | ||
| 5a61bb357f | |||
| ffe33276ba | |||
|
|
67f6ec2dab | ||
|
|
9554d3df63 | ||
| f6268344d0 | |||
| 130b120515 | |||
|
|
54a8cd6312 | ||
|
|
d7bde365ed | ||
| 1c5e54acc9 | |||
| b17ca2479b | |||
| 5de1e80b8c | |||
|
|
36acc0ed23 | ||
|
|
3235db7f6a | ||
|
|
79a3dbf499 | ||
| 7a22b6ddae | |||
| f42fa6550e | |||
| 3c525bf006 | |||
| e405e372cd | |||
|
|
bc9849e3ee | ||
| 0730d1bf9d | |||
|
|
45c5900481 | ||
|
|
f4ded0857c | ||
| c34f8b54c4 | |||
| ce5a48c083 | |||
| e6ccc6f2af | |||
|
|
20c1c7695f | ||
| 5df6384abf | |||
| be6fbc73a1 | |||
| b0cc8f8ce7 | |||
| cc03ef6c1d | |||
| f2ef10be64 | |||
| 6a1ab875c7 | |||
| b3b02669f7 | |||
| f963d8bdcf | |||
| f3538a09f9 | |||
| af62d6cf96 | |||
| ead5a9e173 | |||
|
|
a77be8b696 | ||
| 3493e18630 | |||
|
|
cb3b3e3b63 | ||
| cbb44ce1d9 | |||
| 8dbe3e9505 | |||
|
|
be9e468354 | ||
| 66dd3bb37d | |||
| a7d1a6077b | |||
| 734dc8608f | |||
| 01310bfca4 | |||
| 06c64d499f | |||
| 892f7b4fa8 | |||
| 4f0db0a2df | |||
| 4f905dd47a | |||
| 8fdbc6a78e | |||
| ddc67f3a41 | |||
| 633fbe7391 | |||
| f669b1af9d | |||
| e71832f57a | |||
| d34a0902d1 |
23 changed files with 532 additions and 146 deletions
|
|
@ -1,12 +0,0 @@
|
||||||
#!/bin/bash
|
|
||||||
set -e
|
|
||||||
|
|
||||||
cd /root
|
|
||||||
cp -Rv build build2
|
|
||||||
cd build2
|
|
||||||
|
|
||||||
SPEC=$(ls *.spec)
|
|
||||||
VER=$(grep Version $SPEC | awk '{print $2}')
|
|
||||||
tar cvf $HOME/rpmbuild/SOURCES/uJETL-${VER}.tar.gz --show-transformed --transform="s/^\./uJETL-${VER}/" .
|
|
||||||
rpmbuild -ba $SPEC
|
|
||||||
cp /root/rpmbuild/RPMS/x86_64/* /root/build/
|
|
||||||
|
|
@ -1,6 +0,0 @@
|
||||||
#!/bin/bash
|
|
||||||
set -e
|
|
||||||
|
|
||||||
docker build --rm -t local/c7-buildhost docker/build
|
|
||||||
|
|
||||||
docker run -it --rm -v `pwd`:/root/build local/c7-buildhost /root/build/build_util/build_rpm
|
|
||||||
|
|
@ -1,2 +1,3 @@
|
||||||
#!/bin/bash
|
#!/bin/bash
|
||||||
docker build --target deploy -t rasilon/ujetl docker/multistage
|
docker build --target deploy -t rasilon/ujetl docker/multistage
|
||||||
|
docker tag rasilon/ujetl:latest rasilon/ujetl:$(xpath -q -e '/project/version/text()' pom.xml)
|
||||||
|
|
|
||||||
4
build_util/push_docker_images
Executable file
4
build_util/push_docker_images
Executable file
|
|
@ -0,0 +1,4 @@
|
||||||
|
#!/bin/bash
|
||||||
|
docker push rasilon/ujetl:latest
|
||||||
|
docker push rasilon/ujetl:$(xpath -q -e '/project/version/text()' pom.xml)
|
||||||
|
|
||||||
|
|
@ -21,15 +21,15 @@ begin
|
||||||
pg_attribute,
|
pg_attribute,
|
||||||
pg_namespace
|
pg_namespace
|
||||||
WHERE
|
WHERE
|
||||||
pg_class.relname = tabname AND
|
pg_class.relname = tabname
|
||||||
indrelid = pg_class.oid AND
|
AND indrelid = pg_class.oid
|
||||||
nspname = sch AND
|
AND nspname = sch
|
||||||
pg_class.relnamespace = pg_namespace.oid AND
|
AND pg_class.relnamespace = pg_namespace.oid
|
||||||
pg_attribute.attrelid = pg_class.oid AND
|
AND pg_attribute.attrelid = pg_class.oid
|
||||||
pg_attribute.attnum = any(pg_index.indkey)
|
AND pg_attribute.attnum = any(pg_index.indkey)
|
||||||
AND indisprimary ;
|
AND indisprimary ;
|
||||||
|
|
||||||
header := 'INSERT INTO '||quote_ident(sch)||'.'||quote_ident(tabname)||E' as t (\n ';
|
header := E'INSERT INTO '||quote_ident(sch)||'.'||quote_ident(tabname)||E' as t (\n ';
|
||||||
for colinfo in
|
for colinfo in
|
||||||
select
|
select
|
||||||
*
|
*
|
||||||
|
|
@ -40,7 +40,6 @@ begin
|
||||||
and table_name = tabname
|
and table_name = tabname
|
||||||
order by ordinal_position
|
order by ordinal_position
|
||||||
loop
|
loop
|
||||||
raise info 'Working on %.% (%)',sch,tabname,colinfo::text;
|
|
||||||
if not is_first then
|
if not is_first then
|
||||||
col_list := col_list || E',\n ';
|
col_list := col_list || E',\n ';
|
||||||
vals := vals || E',\n ';
|
vals := vals || E',\n ';
|
||||||
|
|
@ -48,7 +47,7 @@ begin
|
||||||
changes := changes || E'\n OR ';
|
changes := changes || E'\n OR ';
|
||||||
end if;
|
end if;
|
||||||
col_list := col_list || quote_ident(colinfo.column_name);
|
col_list := col_list || quote_ident(colinfo.column_name);
|
||||||
vals := vals || '?::' || quote_ident(colinfo.data_type);
|
vals := vals || '?::' || colinfo.data_type;
|
||||||
sets := sets || quote_ident(colinfo.column_name) ||
|
sets := sets || quote_ident(colinfo.column_name) ||
|
||||||
E' = EXCLUDED.' || quote_ident(colinfo.column_name);
|
E' = EXCLUDED.' || quote_ident(colinfo.column_name);
|
||||||
changes := changes || E't.' || quote_ident(colinfo.column_name) ||
|
changes := changes || E't.' || quote_ident(colinfo.column_name) ||
|
||||||
|
|
|
||||||
65
config_util/ujetl_select_generator.sql
Normal file
65
config_util/ujetl_select_generator.sql
Normal file
|
|
@ -0,0 +1,65 @@
|
||||||
|
CREATE OR REPLACE FUNCTION pg_temp.ujetl_select(sch text, tabname text)
|
||||||
|
RETURNS text
|
||||||
|
LANGUAGE plpgsql
|
||||||
|
AS $function$
|
||||||
|
declare
|
||||||
|
s text := '';
|
||||||
|
header text := '';
|
||||||
|
col_list text := '';
|
||||||
|
vals text := '';
|
||||||
|
sets text := '';
|
||||||
|
changes text := '';
|
||||||
|
is_first boolean := true;
|
||||||
|
colinfo record;
|
||||||
|
pks text;
|
||||||
|
begin
|
||||||
|
SELECT
|
||||||
|
array_to_string(array_agg(quote_ident(pg_attribute.attname::text) ),', ') into pks
|
||||||
|
FROM
|
||||||
|
pg_index,
|
||||||
|
pg_class,
|
||||||
|
pg_attribute,
|
||||||
|
pg_namespace
|
||||||
|
WHERE
|
||||||
|
pg_class.relname = tabname
|
||||||
|
AND indrelid = pg_class.oid
|
||||||
|
AND nspname = sch
|
||||||
|
AND pg_class.relnamespace = pg_namespace.oid
|
||||||
|
AND pg_attribute.attrelid = pg_class.oid
|
||||||
|
AND pg_attribute.attnum = any(pg_index.indkey)
|
||||||
|
AND indisprimary ;
|
||||||
|
|
||||||
|
header := E'SELECT\n ';
|
||||||
|
for colinfo in
|
||||||
|
select
|
||||||
|
*
|
||||||
|
from
|
||||||
|
information_schema.columns
|
||||||
|
where
|
||||||
|
table_schema = sch
|
||||||
|
and table_name = tabname
|
||||||
|
order by ordinal_position
|
||||||
|
loop
|
||||||
|
if not is_first then
|
||||||
|
col_list := col_list || E',\n ';
|
||||||
|
end if;
|
||||||
|
col_list := col_list || quote_ident(colinfo.column_name);
|
||||||
|
|
||||||
|
is_first = false;
|
||||||
|
end loop;
|
||||||
|
|
||||||
|
s := header ||
|
||||||
|
coalesce(col_list,'col_list failed') ||
|
||||||
|
E'\nFROM\n ' ||
|
||||||
|
quote_ident(sch)||'.'||quote_ident(tabname)||E' as t \n '||
|
||||||
|
E'WHERE\n insert criteria here >= ?::datatype';
|
||||||
|
return s;
|
||||||
|
end;
|
||||||
|
$function$
|
||||||
|
;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -1,17 +1,13 @@
|
||||||
FROM centos:centos7 as builder
|
FROM ubuntu:22.04 as builder
|
||||||
RUN yum -y install epel-release java-1.8.0-openjdk-devel
|
RUN apt-get update && apt-get -y upgrade
|
||||||
RUN yum -y groupinstall 'Development Tools'
|
RUN apt-get -y install openjdk-19-jdk-headless maven git
|
||||||
RUN yum -y install git maven
|
RUN git clone --single-branch --branch main https://github.com/rasilon/ujetl.git
|
||||||
RUN git clone https://github.com/rasilon/ujetl.git
|
RUN cd ujetl && mvn -e package
|
||||||
RUN cd ujetl && mvn package
|
|
||||||
|
|
||||||
|
FROM openjdk:11 as runner
|
||||||
|
|
||||||
|
|
||||||
FROM openjdk:8-alpine as runner
|
|
||||||
LABEL maintainer="Derry Hamilton <derryh@rasilon.net>"
|
LABEL maintainer="Derry Hamilton <derryh@rasilon.net>"
|
||||||
|
|
||||||
RUN apk update && apk upgrade && apk add bash
|
RUN apt update && apt upgrade -y && apt install -y bash
|
||||||
|
|
||||||
RUN mkdir -p /usr/share/ujetl/lib/ /var/ujetl /etc/ujetl
|
RUN mkdir -p /usr/share/ujetl/lib/ /var/ujetl /etc/ujetl
|
||||||
|
|
||||||
|
|
@ -24,7 +20,7 @@ CMD ["/ujetl_entrypoint"]
|
||||||
FROM runner as tester
|
FROM runner as tester
|
||||||
COPY TEST_config_live.xml /var/ujetl/
|
COPY TEST_config_live.xml /var/ujetl/
|
||||||
COPY wait_for_postgres /
|
COPY wait_for_postgres /
|
||||||
RUN apk add postgresql-client
|
RUN apt-get install -y postgresql-client
|
||||||
|
|
||||||
|
|
||||||
FROM runner as deploy
|
FROM runner as deploy
|
||||||
|
|
|
||||||
|
|
@ -4,20 +4,27 @@
|
||||||
<nRowsToLog>10000</nRowsToLog>
|
<nRowsToLog>10000</nRowsToLog>
|
||||||
<blockSize>1000</blockSize>
|
<blockSize>1000</blockSize>
|
||||||
<pollTimeout>500</pollTimeout>
|
<pollTimeout>500</pollTimeout>
|
||||||
|
<drivers>
|
||||||
|
<driver>org.postgresql.Driver</driver>
|
||||||
|
<driver>org.relique.jdbc.csv.CsvDriver</driver>
|
||||||
|
</drivers>
|
||||||
|
|
||||||
<source>
|
<source>
|
||||||
<dsn>jdbc:postgresql://localhost:5432/test</dsn>
|
<dsn>jdbc:postgresql://testdb:5432/test</dsn>
|
||||||
<username>test</username>
|
<username>test</username>
|
||||||
<password>test</password>
|
<password>test</password>
|
||||||
<networkTimeout>600000</networkTimeout>
|
<networkTimeout>600000</networkTimeout>
|
||||||
</source>
|
</source>
|
||||||
<dest>
|
<dest>
|
||||||
<dsn>jdbc:postgresql://localhost:5432/test</dsn>
|
<dsn>jdbc:postgresql://testdb:5432/test</dsn>
|
||||||
<username>test</username>
|
<username>test</username>
|
||||||
<password>test</password>
|
<password>test</password>
|
||||||
</dest>
|
</dest>
|
||||||
<jobs>
|
<jobs>
|
||||||
<job>
|
<job>
|
||||||
<name>test</name>
|
<name>test</name>
|
||||||
|
<identifySourceSQL>select 'PID:'||pg_backend_pid()</identifySourceSQL>
|
||||||
|
<identifyDestinationSQL>select 'PID:'||pg_backend_pid()</identifyDestinationSQL>
|
||||||
<key>select coalesce(-1,max(id),-1) as key from dest</key>
|
<key>select coalesce(-1,max(id),-1) as key from dest</key>
|
||||||
<select>
|
<select>
|
||||||
select
|
select
|
||||||
|
|
@ -50,9 +57,46 @@
|
||||||
</insert>
|
</insert>
|
||||||
</job>
|
</job>
|
||||||
<job>
|
<job>
|
||||||
<name>denormalise</name>
|
<name>test upsert</name>
|
||||||
|
<identifySourceSQL>select 'PID:'||pg_backend_pid()</identifySourceSQL>
|
||||||
|
<identifyDestinationSQL>select 'PID:'||pg_backend_pid()</identifyDestinationSQL>
|
||||||
<key>select -1 as key</key>
|
<key>select -1 as key</key>
|
||||||
<select>select person_id,fname,lname from normalised_personalia p join normalised_first_names f using(fid) join normalised_last_names l using(lid);</select>
|
<select>
|
||||||
|
select
|
||||||
|
id,
|
||||||
|
test_int,
|
||||||
|
test_text,
|
||||||
|
test_ts
|
||||||
|
from
|
||||||
|
public.source where id > ?::bigint</select>
|
||||||
|
<insert>
|
||||||
|
insert into public.dest(
|
||||||
|
id,
|
||||||
|
test_int,
|
||||||
|
test_text,
|
||||||
|
test_ts
|
||||||
|
)values(
|
||||||
|
?::bigint,
|
||||||
|
?::integer,
|
||||||
|
?::text,
|
||||||
|
?::timestamp with time zone
|
||||||
|
)ON CONFLICT(id) DO UPDATE
|
||||||
|
set
|
||||||
|
test_int = EXCLUDED.test_int,
|
||||||
|
test_text = EXCLUDED.test_text,
|
||||||
|
test_ts = EXCLUDED.test_ts
|
||||||
|
WHERE
|
||||||
|
dest.test_int IS DISTINCT FROM EXCLUDED.test_int
|
||||||
|
OR dest.test_text IS DISTINCT FROM EXCLUDED.test_text
|
||||||
|
OR dest.test_ts IS DISTINCT FROM EXCLUDED.test_ts
|
||||||
|
</insert>
|
||||||
|
</job>
|
||||||
|
<job>
|
||||||
|
<name>denormalise</name>
|
||||||
|
<identifySourceSQL>select 'PID:'||pg_backend_pid()</identifySourceSQL>
|
||||||
|
<identifyDestinationSQL>select 'PID:'||pg_backend_pid()</identifyDestinationSQL>
|
||||||
|
<key>select -1 as key</key>
|
||||||
|
<select>select person_id,fname,lname from normalised_personalia p join normalised_first_names f using(fid) join normalised_last_names l using(lid) where ?::integer is not null;</select>
|
||||||
<insert>
|
<insert>
|
||||||
INSERT INTO denormalised_personalia(person_id,fname,lname)
|
INSERT INTO denormalised_personalia(person_id,fname,lname)
|
||||||
values(?::integer,?::text,?::text)
|
values(?::integer,?::text,?::text)
|
||||||
|
|
@ -65,5 +109,59 @@
|
||||||
OR denormalised_personalia.lname is distinct from EXCLUDED.lname
|
OR denormalised_personalia.lname is distinct from EXCLUDED.lname
|
||||||
</insert>
|
</insert>
|
||||||
</job>
|
</job>
|
||||||
|
<job>
|
||||||
|
<name>test pre post</name>
|
||||||
|
<key>select -1 as key</key>
|
||||||
|
<select>
|
||||||
|
select
|
||||||
|
id,
|
||||||
|
test_int,
|
||||||
|
test_text,
|
||||||
|
test_ts
|
||||||
|
from
|
||||||
|
public.source where id > ?::bigint
|
||||||
|
</select>
|
||||||
|
<preTarget>
|
||||||
|
drop table if exists tmp_dest;
|
||||||
|
create temp table tmp_dest(
|
||||||
|
id bigint,
|
||||||
|
test_int integer,
|
||||||
|
test_text text,
|
||||||
|
test_ts timestamp with time zone
|
||||||
|
);
|
||||||
|
</preTarget>
|
||||||
|
<insert>
|
||||||
|
insert into tmp_dest(
|
||||||
|
id,
|
||||||
|
test_int,
|
||||||
|
test_text,
|
||||||
|
test_ts
|
||||||
|
)values(
|
||||||
|
?::bigint,
|
||||||
|
?::integer,
|
||||||
|
?::text,
|
||||||
|
?::timestamp with time zone
|
||||||
|
)
|
||||||
|
</insert>
|
||||||
|
<insert>
|
||||||
|
insert into public.dest(
|
||||||
|
id,
|
||||||
|
test_int,
|
||||||
|
test_text,
|
||||||
|
test_ts
|
||||||
|
)
|
||||||
|
select id,test_int,test_text,test_ts
|
||||||
|
from tmp_dest
|
||||||
|
ON CONFLICT(id) DO UPDATE
|
||||||
|
set
|
||||||
|
test_int = EXCLUDED.test_int,
|
||||||
|
test_text = EXCLUDED.test_text,
|
||||||
|
test_ts = EXCLUDED.test_ts
|
||||||
|
WHERE
|
||||||
|
dest.test_int IS DISTINCT FROM EXCLUDED.test_int
|
||||||
|
OR dest.test_text IS DISTINCT FROM EXCLUDED.test_text
|
||||||
|
OR dest.test_ts IS DISTINCT FROM EXCLUDED.test_ts
|
||||||
|
</insert>
|
||||||
|
</job>
|
||||||
</jobs>
|
</jobs>
|
||||||
</CopyingApp>
|
</CopyingApp>
|
||||||
|
|
|
||||||
4
docker/multistage/small.csv
Normal file
4
docker/multistage/small.csv
Normal file
|
|
@ -0,0 +1,4 @@
|
||||||
|
id,dat
|
||||||
|
1,banana
|
||||||
|
2,potato
|
||||||
|
3,nugget
|
||||||
|
|
|
@ -10,10 +10,11 @@ ls
|
||||||
echo Starting run loop
|
echo Starting run loop
|
||||||
for file in *.xml
|
for file in *.xml
|
||||||
do
|
do
|
||||||
/usr/bin/java \
|
/usr/local/openjdk-11/bin/java \
|
||||||
-Xms1g \
|
-Xms1g \
|
||||||
-Xmx2g \
|
-Xmx2g \
|
||||||
-cp /usr/share/ujetl/lib/CopyingApp.jar \
|
-cp /usr/share/ujetl/lib/CopyingApp.jar \
|
||||||
|
-Dlog4j.configurationFile="$LOG_PROPS" \
|
||||||
com.rasilon.ujetl.CopyingApp \
|
com.rasilon.ujetl.CopyingApp \
|
||||||
--log4j "$LOG_PROPS" \
|
--log4j "$LOG_PROPS" \
|
||||||
--config "$file"
|
--config "$file"
|
||||||
|
|
|
||||||
|
|
@ -8,5 +8,14 @@ until PGPASSWORD=test psql -h "testdb" -U "test" -c 'SELECT 1 FROM public.contai
|
||||||
sleep 1
|
sleep 1
|
||||||
done
|
done
|
||||||
|
|
||||||
|
>&2 echo "Postgres is up - Waiting for the reboot"
|
||||||
|
sleep 3 # Wait for the Postgres reboot at the end of setup
|
||||||
|
|
||||||
|
until PGPASSWORD=test psql -h "testdb" -U "test" -c 'SELECT 1 FROM public.container_ready' postgres; do
|
||||||
|
>&2 echo "Postgres is unavailable - sleeping"
|
||||||
|
sleep 1
|
||||||
|
done
|
||||||
|
|
||||||
|
|
||||||
>&2 echo "Postgres is up - executing command"
|
>&2 echo "Postgres is up - executing command"
|
||||||
exec $cmd
|
exec $cmd
|
||||||
|
|
|
||||||
|
|
@ -44,6 +44,13 @@ CREATE TABLE denormalised_personalia(
|
||||||
lname text
|
lname text
|
||||||
);
|
);
|
||||||
|
|
||||||
|
CREATE TABLE test_csvjdbc(
|
||||||
|
id integer not null primary key,
|
||||||
|
dat text
|
||||||
|
);
|
||||||
|
|
||||||
|
GRANT SELECT ON ALL TABLES IN SCHEMA public TO test;
|
||||||
|
GRANT SELECT,INSERT,UPDATE ON denormalised_personalia TO test;
|
||||||
|
|
||||||
\c postgres
|
\c postgres
|
||||||
CREATE TABLE public.container_ready AS SELECT 1 FROM(VALUES(1)) AS a(a);
|
CREATE TABLE public.container_ready AS SELECT 1 FROM(VALUES(1)) AS a(a);
|
||||||
|
|
|
||||||
|
|
@ -30,9 +30,9 @@ fi
|
||||||
/usr/bin/java \
|
/usr/bin/java \
|
||||||
-Xms1g \
|
-Xms1g \
|
||||||
-Xmx2g \
|
-Xmx2g \
|
||||||
|
-Dlog4j.configurationFile="$LOG_PROPS" \
|
||||||
-cp /usr/share/ujetl/lib/CopyingApp.jar \
|
-cp /usr/share/ujetl/lib/CopyingApp.jar \
|
||||||
com.rasilon.ujetl.CopyingApp \
|
com.rasilon.ujetl.CopyingApp \
|
||||||
--log4j "$LOG_PROPS" \
|
|
||||||
--config "/etc/ujetl/${JOBNAME}_config_live.xml"
|
--config "/etc/ujetl/${JOBNAME}_config_live.xml"
|
||||||
|
|
||||||
#rm -f $LOCKFILE
|
#rm -f $LOCKFILE
|
||||||
|
|
|
||||||
26
pom.xml
26
pom.xml
|
|
@ -6,7 +6,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma
|
||||||
<groupId>com.rasilon.ujetl</groupId>
|
<groupId>com.rasilon.ujetl</groupId>
|
||||||
<artifactId>CopyingApp</artifactId>
|
<artifactId>CopyingApp</artifactId>
|
||||||
<packaging>jar</packaging>
|
<packaging>jar</packaging>
|
||||||
<version>2.0.3</version>
|
<version>2.5.2</version>
|
||||||
<name>uJETL</name>
|
<name>uJETL</name>
|
||||||
<url>https://github.com/rasilon/ujetl</url>
|
<url>https://github.com/rasilon/ujetl</url>
|
||||||
<properties>
|
<properties>
|
||||||
|
|
@ -35,13 +35,13 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.h2database</groupId>
|
<groupId>com.h2database</groupId>
|
||||||
<artifactId>h2</artifactId>
|
<artifactId>h2</artifactId>
|
||||||
<version>1.4.199</version>
|
<version>2.2.220</version>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.commons</groupId>
|
<groupId>org.apache.commons</groupId>
|
||||||
<artifactId>commons-lang3</artifactId>
|
<artifactId>commons-lang3</artifactId>
|
||||||
<version>3.9</version>
|
<version>3.18.0</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>commons-logging</groupId>
|
<groupId>commons-logging</groupId>
|
||||||
|
|
@ -51,12 +51,12 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.commons</groupId>
|
<groupId>org.apache.commons</groupId>
|
||||||
<artifactId>commons-configuration2</artifactId>
|
<artifactId>commons-configuration2</artifactId>
|
||||||
<version>2.4</version>
|
<version>2.10.1</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>commons-beanutils</groupId>
|
<groupId>commons-beanutils</groupId>
|
||||||
<artifactId>commons-beanutils</artifactId>
|
<artifactId>commons-beanutils</artifactId>
|
||||||
<version>1.9.3</version>
|
<version>1.11.0</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.beust</groupId>
|
<groupId>com.beust</groupId>
|
||||||
|
|
@ -66,27 +66,31 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.logging.log4j</groupId>
|
<groupId>org.apache.logging.log4j</groupId>
|
||||||
<artifactId>log4j-api</artifactId>
|
<artifactId>log4j-api</artifactId>
|
||||||
<version>2.11.2</version>
|
<version>2.17.1</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.logging.log4j</groupId>
|
<groupId>org.apache.logging.log4j</groupId>
|
||||||
<artifactId>log4j-core</artifactId>
|
<artifactId>log4j-core</artifactId>
|
||||||
<version>2.11.2</version>
|
<version>2.25.3</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.postgresql</groupId>
|
<groupId>org.postgresql</groupId>
|
||||||
<artifactId>postgresql</artifactId>
|
<artifactId>postgresql</artifactId>
|
||||||
<version>42.2.5</version>
|
<version>42.7.2</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>net.sourceforge.csvjdbc</groupId>
|
||||||
|
<artifactId>csvjdbc</artifactId>
|
||||||
|
<version>1.0.40</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
<build>
|
<build>
|
||||||
<plugins>
|
<plugins>
|
||||||
<plugin>
|
<plugin>
|
||||||
<artifactId>maven-compiler-plugin</artifactId>
|
<artifactId>maven-compiler-plugin</artifactId>
|
||||||
<version>2.3.2</version>
|
<version>3.8.0</version>
|
||||||
<configuration>
|
<configuration>
|
||||||
<source>1.8</source>
|
<release>11</release>
|
||||||
<target>1.8</target>
|
|
||||||
</configuration>
|
</configuration>
|
||||||
</plugin>
|
</plugin>
|
||||||
<plugin>
|
<plugin>
|
||||||
|
|
|
||||||
|
|
@ -34,10 +34,6 @@ public class CopyingApp {
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
CopyingAppCommandParser cli = new CopyingAppCommandParser(args);
|
CopyingAppCommandParser cli = new CopyingAppCommandParser(args);
|
||||||
LoggerContext context = (org.apache.logging.log4j.core.LoggerContext) LogManager.getContext(false);
|
LoggerContext context = (org.apache.logging.log4j.core.LoggerContext) LogManager.getContext(false);
|
||||||
String log4jConfigLocation = cli.getLog4jConfigFile();
|
|
||||||
File file = new File(log4jConfigLocation);
|
|
||||||
context.setConfigLocation(file.toURI());
|
|
||||||
System.out.println("Config set from "+file.toURI());
|
|
||||||
|
|
||||||
CopyingApp app = new CopyingApp(cli);
|
CopyingApp app = new CopyingApp(cli);
|
||||||
try {
|
try {
|
||||||
|
|
@ -79,6 +75,7 @@ public class CopyingApp {
|
||||||
|
|
||||||
Configuration config = configs.xml(cli.getConfigFile());
|
Configuration config = configs.xml(cli.getConfigFile());
|
||||||
|
|
||||||
|
loadDrivers(config);
|
||||||
String hardLimitSeconds = config.getString("hardLimitSeconds");
|
String hardLimitSeconds = config.getString("hardLimitSeconds");
|
||||||
if(hardLimitSeconds != null) {
|
if(hardLimitSeconds != null) {
|
||||||
TimeLimiter hardLimit = new TimeLimiter(Integer.decode(hardLimitSeconds).intValue(),true);
|
TimeLimiter hardLimit = new TimeLimiter(Integer.decode(hardLimitSeconds).intValue(),true);
|
||||||
|
|
@ -131,7 +128,27 @@ public class CopyingApp {
|
||||||
String tabKey = config.getString("jobs.job("+i+").key");
|
String tabKey = config.getString("jobs.job("+i+").key");
|
||||||
String tabSelect = config.getString("jobs.job("+i+").select");
|
String tabSelect = config.getString("jobs.job("+i+").select");
|
||||||
String tabInsert = config.getString("jobs.job("+i+").insert");
|
String tabInsert = config.getString("jobs.job("+i+").insert");
|
||||||
Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,nRowsToLog,blockSize,pollTimeout);
|
String preTarget = config.getString("jobs.job("+i+").preTarget");
|
||||||
|
String postTarget = config.getString("jobs.job("+i+").postTarget");
|
||||||
|
String identifySourceSQL = config.getString("jobs.job.identifySourceSQL");
|
||||||
|
String identifyDestinationSQL = config.getString("jobs.job.identifyDestinationSQL");
|
||||||
|
|
||||||
|
Job j = new Job(
|
||||||
|
sConn,
|
||||||
|
dConn,
|
||||||
|
tabName,
|
||||||
|
jobName,
|
||||||
|
tabKey,
|
||||||
|
tabSelect,
|
||||||
|
tabInsert,
|
||||||
|
preTarget,
|
||||||
|
postTarget,
|
||||||
|
nRowsToLog,
|
||||||
|
blockSize,
|
||||||
|
pollTimeout,
|
||||||
|
identifySourceSQL,
|
||||||
|
identifyDestinationSQL
|
||||||
|
);
|
||||||
j.start();
|
j.start();
|
||||||
j.join();
|
j.join();
|
||||||
|
|
||||||
|
|
@ -141,7 +158,28 @@ public class CopyingApp {
|
||||||
String tabKey = config.getString("jobs.job.key");
|
String tabKey = config.getString("jobs.job.key");
|
||||||
String tabSelect = config.getString("jobs.job.select");
|
String tabSelect = config.getString("jobs.job.select");
|
||||||
String tabInsert = config.getString("jobs.job.insert");
|
String tabInsert = config.getString("jobs.job.insert");
|
||||||
Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,nRowsToLog,blockSize,pollTimeout);
|
String preTarget = config.getString("jobs.job.preTarget");
|
||||||
|
String postTarget = config.getString("jobs.job.postTarget");
|
||||||
|
String identifySourceSQL = config.getString("jobs.job.identifySourceSQL");
|
||||||
|
String identifyDestinationSQL = config.getString("jobs.job.identifyDestinationSQL");
|
||||||
|
|
||||||
|
|
||||||
|
Job j = new Job(
|
||||||
|
sConn,
|
||||||
|
dConn,
|
||||||
|
tabName,
|
||||||
|
jobName,
|
||||||
|
tabKey,
|
||||||
|
tabSelect,
|
||||||
|
tabInsert,
|
||||||
|
preTarget,
|
||||||
|
postTarget,
|
||||||
|
nRowsToLog,
|
||||||
|
blockSize,
|
||||||
|
pollTimeout,
|
||||||
|
identifySourceSQL,
|
||||||
|
identifyDestinationSQL
|
||||||
|
);
|
||||||
j.start();
|
j.start();
|
||||||
j.join();
|
j.join();
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -203,4 +241,21 @@ public class CopyingApp {
|
||||||
|
|
||||||
return c;
|
return c;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Even with JDBC 4, some drivers don't play nicely with whatever
|
||||||
|
// the classloaders are up to. So this allows us to force it the
|
||||||
|
// old fashioned way, and works around the
|
||||||
|
// "But it works fine when it's the /only/ driver!"
|
||||||
|
// cross-database problem
|
||||||
|
private void loadDrivers(Configuration config) {
|
||||||
|
String[] drivers = config.get(String[].class, "drivers.driver");
|
||||||
|
for(String d:drivers) {
|
||||||
|
try {
|
||||||
|
Class.forName(d);
|
||||||
|
log.info("Preloaded driver "+d);
|
||||||
|
} catch(ClassNotFoundException e) {
|
||||||
|
log.error("Could not preload driver "+d,e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -12,7 +12,7 @@ public class CopyingAppCommandParser {
|
||||||
private String configFile;
|
private String configFile;
|
||||||
|
|
||||||
@Parameter(names = {"-log4j","--log4j"}, description = "Log4J config file for this run")
|
@Parameter(names = {"-log4j","--log4j"}, description = "Log4J config file for this run")
|
||||||
private String log4jConfigFile = "/etc/ppl/default_log4j_config.properties";
|
private String log4jConfigFile = "/etc/ujetl/default_log4j_config.properties";
|
||||||
|
|
||||||
public CopyingAppCommandParser(String[] args) {
|
public CopyingAppCommandParser(String[] args) {
|
||||||
super();
|
super();
|
||||||
|
|
@ -23,8 +23,4 @@ public class CopyingAppCommandParser {
|
||||||
return configFile;
|
return configFile;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getLog4jConfigFile() {
|
|
||||||
return log4jConfigFile;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -22,23 +22,29 @@ import org.apache.logging.log4j.Logger;
|
||||||
public class Job extends Thread {
|
public class Job extends Thread {
|
||||||
static Logger log = org.apache.logging.log4j.LogManager.getLogger(Job.class);
|
static Logger log = org.apache.logging.log4j.LogManager.getLogger(Job.class);
|
||||||
|
|
||||||
Connection sConn;
|
private Connection sConn;
|
||||||
Connection dConn;
|
private Connection dConn;
|
||||||
String name;
|
private String name;
|
||||||
String jobName;
|
private String jobName;
|
||||||
String key;
|
private String key;
|
||||||
String select;
|
private String select;
|
||||||
String insert;
|
private String insert;
|
||||||
Integer nRowsToLog;
|
private String preTarget;
|
||||||
Integer blockSize;
|
private String postTarget;
|
||||||
Integer pollTimeout;
|
private Integer nRowsToLog;
|
||||||
|
private Integer blockSize;
|
||||||
|
private Integer pollTimeout;
|
||||||
|
private String identifySourceSQL;
|
||||||
|
private String identifyDestinationSQL;
|
||||||
|
|
||||||
BlockingQueue<List<String>> resultBuffer;
|
private BlockingQueue<List<String>> resultBuffer;
|
||||||
AtomicBoolean producerLive;
|
private AtomicBoolean producerLive;
|
||||||
AtomicBoolean threadsExit = new AtomicBoolean(false);;
|
private AtomicBoolean threadsExit = new AtomicBoolean(false);;
|
||||||
|
private String sourceID;
|
||||||
|
private String destID;
|
||||||
|
|
||||||
|
|
||||||
public Job(Connection sConn,Connection dConn,String name,String jobName,String key,String select,String insert,Integer nRowsToLog,Integer blockSize,Integer pollTimeout) {
|
public Job(Connection sConn,Connection dConn,String name,String jobName,String key,String select,String insert,String preTarget,String postTarget,Integer nRowsToLog,Integer blockSize,Integer pollTimeout,String identifySourceSQL, String identifyDestinationSQL) {
|
||||||
this.sConn = sConn;
|
this.sConn = sConn;
|
||||||
this.dConn = dConn;
|
this.dConn = dConn;
|
||||||
this.name = name;
|
this.name = name;
|
||||||
|
|
@ -46,9 +52,13 @@ public class Job extends Thread {
|
||||||
this.key = key;
|
this.key = key;
|
||||||
this.select = select;
|
this.select = select;
|
||||||
this.insert = insert;
|
this.insert = insert;
|
||||||
|
this.preTarget = preTarget;
|
||||||
|
this.postTarget = postTarget;
|
||||||
this.nRowsToLog = nRowsToLog;
|
this.nRowsToLog = nRowsToLog;
|
||||||
this.blockSize = blockSize;
|
this.blockSize = blockSize;
|
||||||
this.pollTimeout = pollTimeout;
|
this.pollTimeout = pollTimeout;
|
||||||
|
this.identifySourceSQL = identifySourceSQL;
|
||||||
|
this.identifyDestinationSQL = identifyDestinationSQL;
|
||||||
|
|
||||||
resultBuffer = new ArrayBlockingQueue<List<String>>( 3 * blockSize);
|
resultBuffer = new ArrayBlockingQueue<List<String>>( 3 * blockSize);
|
||||||
producerLive = new AtomicBoolean(true);
|
producerLive = new AtomicBoolean(true);
|
||||||
|
|
@ -72,12 +82,12 @@ public class Job extends Thread {
|
||||||
public Producer(ResultSet src,BlockingQueue q) {
|
public Producer(ResultSet src,BlockingQueue q) {
|
||||||
this.src = src;
|
this.src = src;
|
||||||
this.q = q;
|
this.q = q;
|
||||||
this.setName(String.format("%s-%s-Consumer",jobName,name));
|
this.setName(String.format("%s-%s-Producer",jobName,name));
|
||||||
}
|
}
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
long rowsInserted = 0;
|
long rowsInserted = 0;
|
||||||
long rowNum = 0;
|
long rowsAttempted = 0;
|
||||||
long stamp = System.nanoTime();
|
long stamp = System.nanoTime();
|
||||||
long nstamp;
|
long nstamp;
|
||||||
int columnCount = src.getMetaData().getColumnCount();
|
int columnCount = src.getMetaData().getColumnCount();
|
||||||
|
|
@ -95,13 +105,13 @@ public class Job extends Thread {
|
||||||
}
|
}
|
||||||
log.trace("Producer queue full.");
|
log.trace("Producer queue full.");
|
||||||
}
|
}
|
||||||
rowNum++;
|
rowsAttempted++;
|
||||||
if(rowNum % nRowsToLog == 0) {
|
if(rowsAttempted % nRowsToLog == 0) {
|
||||||
log.info(String.format("%s - Queued %s rows for %s so far",jobName,rowNum,name));
|
log.info(String.format("%s - Queued %s rows for %s so far",jobName,rowsAttempted,name));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
producerLive.set(false);
|
producerLive.set(false);
|
||||||
log.info(String.format("%s - Queued a total of %s rows for %s",jobName,rowNum,name));
|
log.info(String.format("%s - Queued a total of %s rows for %s",jobName,rowsAttempted,name));
|
||||||
} catch(Exception e) {
|
} catch(Exception e) {
|
||||||
producerLive.set(false); // Signal we've exited.
|
producerLive.set(false); // Signal we've exited.
|
||||||
threadsExit.set(true); // Signal we've exited.
|
threadsExit.set(true); // Signal we've exited.
|
||||||
|
|
@ -122,7 +132,7 @@ public class Job extends Thread {
|
||||||
}
|
}
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
long rowNum = 0;
|
long rowsAttempted = 0;
|
||||||
long rowsInserted = 0;
|
long rowsInserted = 0;
|
||||||
|
|
||||||
while(true) {
|
while(true) {
|
||||||
|
|
@ -133,7 +143,7 @@ public class Job extends Thread {
|
||||||
if(row == null && producerLive.get() == false) {
|
if(row == null && producerLive.get() == false) {
|
||||||
rowsInserted += arraySum(insertStatement.executeBatch());
|
rowsInserted += arraySum(insertStatement.executeBatch());
|
||||||
dConn.commit();
|
dConn.commit();
|
||||||
log.info(String.format("%s - Inserted a total of %s of %s notified rows into %s",jobName,rowNum,rowsInserted,name));
|
log.info(String.format("%s - Inserted a total of %s of %s notified rows into %s",jobName,rowsInserted,rowsAttempted,name));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if(threadsExit.get()) {
|
if(threadsExit.get()) {
|
||||||
|
|
@ -150,14 +160,14 @@ public class Job extends Thread {
|
||||||
}
|
}
|
||||||
insertStatement.addBatch();
|
insertStatement.addBatch();
|
||||||
|
|
||||||
rowNum++;
|
rowsAttempted++;
|
||||||
if(rowNum % nRowsToLog == 0) {
|
if(rowsAttempted % nRowsToLog == 0) {
|
||||||
rowsInserted += arraySum(insertStatement.executeBatch());
|
rowsInserted += arraySum(insertStatement.executeBatch());
|
||||||
dConn.commit();
|
dConn.commit();
|
||||||
log.info(String.format("%s - Inserted %s of %s notified rows into %s",
|
log.info(String.format("%s - Inserted %s of %s notified rows into %s",
|
||||||
jobName,
|
jobName,
|
||||||
rowsInserted,
|
rowsInserted,
|
||||||
rowNum,
|
rowsAttempted,
|
||||||
name));
|
name));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -169,11 +179,34 @@ public class Job extends Thread {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Outer run
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
ResultSet rs;
|
ResultSet rs;
|
||||||
|
|
||||||
|
if(identifySourceSQL != null) sourceID = getSingleString(identifySourceSQL,sConn);
|
||||||
|
if(identifyDestinationSQL != null) destID = getSingleString(identifyDestinationSQL,dConn);
|
||||||
|
|
||||||
|
if(sourceID != null || destID != null){
|
||||||
|
log.info(String.format(
|
||||||
|
"%s - Processing table: %s with source: %s, dest: %s",
|
||||||
|
jobName,
|
||||||
|
name,
|
||||||
|
sourceID==null?"":sourceID,
|
||||||
|
destID==null?"":destID
|
||||||
|
));
|
||||||
|
}else{
|
||||||
log.info(String.format("%s - Processing table: %s",jobName,name));
|
log.info(String.format("%s - Processing table: %s",jobName,name));
|
||||||
|
}
|
||||||
|
if(preTarget != null){
|
||||||
|
log.info(String.format("%s - Trying to execute preTarget SQL",jobName));
|
||||||
|
PreparedStatement s = dConn.prepareStatement(preTarget);
|
||||||
|
s.executeUpdate();
|
||||||
|
s.close();
|
||||||
|
dConn.commit();
|
||||||
|
}else{
|
||||||
|
log.info(String.format("%s - No preTarget; skipping.",jobName));
|
||||||
|
}
|
||||||
|
|
||||||
log.debug("Trying to execute: "+key);
|
log.debug("Trying to execute: "+key);
|
||||||
PreparedStatement keyStatement = dConn.prepareStatement(key);
|
PreparedStatement keyStatement = dConn.prepareStatement(key);
|
||||||
|
|
@ -211,10 +244,33 @@ public class Job extends Thread {
|
||||||
p.join();
|
p.join();
|
||||||
c.join();
|
c.join();
|
||||||
|
|
||||||
|
if(postTarget != null){
|
||||||
|
log.info(String.format("%s - Trying to execute postTarget SQL",jobName));
|
||||||
|
PreparedStatement s = dConn.prepareStatement(postTarget);
|
||||||
|
s.executeUpdate();
|
||||||
|
s.close();
|
||||||
|
dConn.commit();
|
||||||
|
}else{
|
||||||
|
log.info(String.format("%s - No postTarget; skipping.",jobName));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
} catch(InterruptedException e) {
|
} catch(InterruptedException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
} catch(SQLException e) {
|
} catch(SQLException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String getSingleString(String sql, Connection conn){
|
||||||
|
try{
|
||||||
|
PreparedStatement s = conn.prepareStatement(sql);
|
||||||
|
ResultSet r = s.executeQuery();
|
||||||
|
r.next();
|
||||||
|
return r.getString(1);
|
||||||
|
} catch(SQLException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
37
src/test/java/com/rasilon/ujetl/TestConfig.java
Normal file
37
src/test/java/com/rasilon/ujetl/TestConfig.java
Normal file
|
|
@ -0,0 +1,37 @@
|
||||||
|
package com.rasilon.ujetl;
|
||||||
|
|
||||||
|
import org.apache.commons.configuration2.Configuration;
|
||||||
|
import org.apache.commons.configuration2.builder.fluent.Configurations;
|
||||||
|
import org.apache.commons.configuration2.ex.ConfigurationException;
|
||||||
|
|
||||||
|
import org.apache.commons.beanutils.PropertyUtils; // Why does config need this?
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.AfterAll;
|
||||||
|
import org.junit.jupiter.api.MethodOrderer.Alphanumeric;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.fail;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author derryh
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class TestConfig {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test001VerifyArrayOfDrivers() {
|
||||||
|
try {
|
||||||
|
Configurations configs = new Configurations();
|
||||||
|
Configuration config = configs.xml("TEST_config_live.xml");
|
||||||
|
String[] drivers = config.get(String[].class, "drivers.driver");
|
||||||
|
int ndrivers =drivers.length;
|
||||||
|
if(ndrivers != 3){
|
||||||
|
fail("Expected 3 drivers, but found "+ndrivers);
|
||||||
|
}
|
||||||
|
} catch(Exception e) {
|
||||||
|
fail(e.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -42,12 +42,16 @@ public class TestJob {
|
||||||
dest,
|
dest,
|
||||||
"jUnit Test Config",
|
"jUnit Test Config",
|
||||||
"jUnit Test Job",
|
"jUnit Test Job",
|
||||||
"SELECT -1 AS key",
|
"SELECT -1 AS \"key\"",
|
||||||
"SELECT id,dat FROM src WHERE id > ?",
|
"SELECT id,dat FROM src WHERE id > ?",
|
||||||
"INSERT INTO dest VALUES(?,?)",
|
"INSERT INTO dest VALUES(?,?)",
|
||||||
|
null,
|
||||||
|
null,
|
||||||
100,
|
100,
|
||||||
100,
|
100,
|
||||||
100
|
100,
|
||||||
|
"select 'PID:'||session_id()",
|
||||||
|
"select 'PID:'||session_id()"
|
||||||
);
|
);
|
||||||
j.start();
|
j.start();
|
||||||
j.join();
|
j.join();
|
||||||
|
|
|
||||||
|
|
@ -13,15 +13,12 @@ public class TestParser {
|
||||||
public void test001Parset() {
|
public void test001Parset() {
|
||||||
try {
|
try {
|
||||||
String[] args = {
|
String[] args = {
|
||||||
"--log4j",
|
|
||||||
"log4j_test_banana.xml",
|
|
||||||
"--config",
|
"--config",
|
||||||
"config_test_banana.xml"
|
"config_test_banana.xml"
|
||||||
};
|
};
|
||||||
CopyingAppCommandParser p = new CopyingAppCommandParser(args);
|
CopyingAppCommandParser p = new CopyingAppCommandParser(args);
|
||||||
|
|
||||||
assertEquals(p.getConfigFile(),"config_test_banana.xml");
|
assertEquals(p.getConfigFile(),"config_test_banana.xml");
|
||||||
assertEquals(p.getLog4jConfigFile(),"log4j_test_banana.xml");
|
|
||||||
|
|
||||||
} catch(Exception e) {
|
} catch(Exception e) {
|
||||||
fail(e.toString());
|
fail(e.toString());
|
||||||
|
|
|
||||||
64
src/test/java/com/rasilon/ujetl/TestPrePost.java
Normal file
64
src/test/java/com/rasilon/ujetl/TestPrePost.java
Normal file
|
|
@ -0,0 +1,64 @@
|
||||||
|
package com.rasilon.ujetl;
|
||||||
|
|
||||||
|
import java.sql.*;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.AfterAll;
|
||||||
|
import org.junit.jupiter.api.MethodOrderer.Alphanumeric;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.fail;
|
||||||
|
|
||||||
|
|
||||||
|
public class TestPrePost {
|
||||||
|
|
||||||
|
private static String jdbcURL = "jdbc:h2:mem:dbtest";
|
||||||
|
@Test
|
||||||
|
public void test002verifyH2Works() {
|
||||||
|
try {
|
||||||
|
Connection conn = DriverManager.getConnection(jdbcURL, "sa", "");
|
||||||
|
conn.close();
|
||||||
|
} catch(Exception e) {
|
||||||
|
fail(e.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPrePost() {
|
||||||
|
try (
|
||||||
|
Connection src = DriverManager.getConnection(jdbcURL, "sa", "");
|
||||||
|
Connection dest = DriverManager.getConnection(jdbcURL, "sa", "");
|
||||||
|
|
||||||
|
) {
|
||||||
|
src.createStatement().executeUpdate("CREATE TABLE src(id bigint not null primary key, dat varchar);");
|
||||||
|
dest.createStatement().executeUpdate("CREATE TABLE dest(id bigint not null primary key, dat varchar);");
|
||||||
|
PreparedStatement inserter = src.prepareStatement("INSERT INTO src(id,dat) VALUES(?,'banana')");
|
||||||
|
for(int i=0; i<10000; i++) {
|
||||||
|
inserter.setInt(1,i);
|
||||||
|
inserter.executeUpdate();
|
||||||
|
}
|
||||||
|
|
||||||
|
Job j = new Job(
|
||||||
|
src,
|
||||||
|
dest,
|
||||||
|
"jUnit Test Config",
|
||||||
|
"jUnit Test Job",
|
||||||
|
"SELECT -1 AS \"key\"",
|
||||||
|
"SELECT id,dat FROM src WHERE id > ?",
|
||||||
|
"INSERT INTO tmp_dest VALUES(?,?)",
|
||||||
|
"CREATE TEMP TABLE tmp_dest(id bigint not null primary key, dat varchar);",
|
||||||
|
"INSERT INTO dest SELECT * from tmp_dest;",
|
||||||
|
100,
|
||||||
|
100,
|
||||||
|
100,
|
||||||
|
"select 'PID:'||session_id()",
|
||||||
|
"select 'PID:'||session_id()"
|
||||||
|
);
|
||||||
|
j.start();
|
||||||
|
j.join();
|
||||||
|
// do stuff
|
||||||
|
} catch(Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
fail(e.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -4,6 +4,11 @@
|
||||||
<nRowsToLog>10000</nRowsToLog>
|
<nRowsToLog>10000</nRowsToLog>
|
||||||
<blockSize>1000</blockSize>
|
<blockSize>1000</blockSize>
|
||||||
<pollTimeout>500</pollTimeout>
|
<pollTimeout>500</pollTimeout>
|
||||||
|
<drivers>
|
||||||
|
<driver>org.postgresql.Driver</driver>
|
||||||
|
<driver>org.h2.Driver</driver>
|
||||||
|
<driver>org.relique.jdbc.csv.CsvDriver</driver>
|
||||||
|
</drivers>
|
||||||
<source>
|
<source>
|
||||||
<dsn>jdbc:postgresql://localhost:5432/test</dsn>
|
<dsn>jdbc:postgresql://localhost:5432/test</dsn>
|
||||||
<username>test</username>
|
<username>test</username>
|
||||||
|
|
@ -18,6 +23,8 @@
|
||||||
<jobs>
|
<jobs>
|
||||||
<job>
|
<job>
|
||||||
<name>test</name>
|
<name>test</name>
|
||||||
|
<identifySourceSQL>select 'PID:'||pg_backend_pid()</identifySourceSQL>
|
||||||
|
<identifyDestinationSQL>select 'PID:'||pg_backend_pid()</identifyDestinationSQL>
|
||||||
<key>select coalesce(-1,max(id),-1) as key from dest</key>
|
<key>select coalesce(-1,max(id),-1) as key from dest</key>
|
||||||
<select>
|
<select>
|
||||||
select
|
select
|
||||||
|
|
@ -49,6 +56,39 @@
|
||||||
OR dest.test_ts = EXCLUDED.test_ts
|
OR dest.test_ts = EXCLUDED.test_ts
|
||||||
</insert>
|
</insert>
|
||||||
</job>
|
</job>
|
||||||
|
<job>
|
||||||
|
<name>test upsert</name>
|
||||||
|
<key>select -1 as key</key>
|
||||||
|
<select>
|
||||||
|
select
|
||||||
|
id,
|
||||||
|
test_int,
|
||||||
|
test_text,
|
||||||
|
test_ts
|
||||||
|
from
|
||||||
|
public.source where id > ?::bigint</select>
|
||||||
|
<insert>
|
||||||
|
insert into public.dest(
|
||||||
|
id,
|
||||||
|
test_int,
|
||||||
|
test_text,
|
||||||
|
test_ts
|
||||||
|
)values(
|
||||||
|
?::bigint,
|
||||||
|
?::integer,
|
||||||
|
?::text,
|
||||||
|
?::timestamp with time zone
|
||||||
|
)ON CONFLICT(id) DO UPDATE
|
||||||
|
set
|
||||||
|
test_int = EXCLUDED.test_int,
|
||||||
|
test_text = EXCLUDED.test_text,
|
||||||
|
test_ts = EXCLUDED.test_ts
|
||||||
|
WHERE
|
||||||
|
dest.test_int IS DISTINCT FROM EXCLUDED.test_int
|
||||||
|
OR dest.test_text IS DISTINCT FROM EXCLUDED.test_text
|
||||||
|
OR dest.test_ts IS DISTINCT FROM EXCLUDED.test_ts
|
||||||
|
</insert>
|
||||||
|
</job>
|
||||||
<job>
|
<job>
|
||||||
<name>denormalise</name>
|
<name>denormalise</name>
|
||||||
<key>select -1 as key</key>
|
<key>select -1 as key</key>
|
||||||
|
|
|
||||||
33
uJETL.spec
33
uJETL.spec
|
|
@ -1,33 +0,0 @@
|
||||||
Summary: Java app to facilitate moving data between databases.
|
|
||||||
Name: uJETL
|
|
||||||
Version: 2.0.3
|
|
||||||
Release: 1
|
|
||||||
Group: Applications/Database
|
|
||||||
License: All rights reserved.
|
|
||||||
Source: uJETL-%{version}.tar.gz
|
|
||||||
URL: https://github.com/rasilon/ujetl.git
|
|
||||||
Distribution: derryh
|
|
||||||
Vendor: derryh
|
|
||||||
Packager: Derry Hamilton <derryh@rasilon.net>
|
|
||||||
#BuildRoot: .
|
|
||||||
|
|
||||||
%description
|
|
||||||
A very small ETL app
|
|
||||||
|
|
||||||
%prep
|
|
||||||
%setup
|
|
||||||
|
|
||||||
%build
|
|
||||||
#mvn -Dmaven.test.skip=true clean package
|
|
||||||
true
|
|
||||||
|
|
||||||
%install
|
|
||||||
mkdir -p $RPM_BUILD_ROOT/usr/share/ujetl/lib $RPM_BUILD_ROOT/etc/ujetl $RPM_BUILD_ROOT/usr/bin
|
|
||||||
cp target/CopyingApp-*-jar-with-dependencies.jar $RPM_BUILD_ROOT/usr/share/ujetl/lib/CopyingApp.jar
|
|
||||||
cp install_extra/run_copying_job $RPM_BUILD_ROOT/usr/bin
|
|
||||||
cp install_extra/copying_defaults_log4j.xml $RPM_BUILD_ROOT/etc/ujetl
|
|
||||||
|
|
||||||
%files
|
|
||||||
/usr/share/ujetl/lib/CopyingApp.jar
|
|
||||||
/usr/bin/run_copying_job
|
|
||||||
/etc/ujetl/copying_defaults_log4j.xml
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue