mirror of
https://github.com/rasilon/ujetl.git
synced 2026-04-11 10:29:29 +00:00
Compare commits
88 commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 4f16842623 | |||
|
|
d2257f1b81 | ||
| 8ab5b1b79e | |||
|
|
26d47f16c9 | ||
| 9f0fe6bb3b | |||
| bf4ed02af5 | |||
| 5f275b93d8 | |||
|
|
d509513084 | ||
| 96e28a5abb | |||
| 1da5fabb05 | |||
| 8a4d892dfa | |||
|
|
07f68922b3 | ||
| 22af36b555 | |||
|
|
d0a5075191 | ||
| 1ce7e09751 | |||
| f651fd720e | |||
| 58b78b2021 | |||
| a88ac56848 | |||
| 9a8716f33e | |||
| 1b1ba551c8 | |||
| 866d02fb52 | |||
| 441b2f4191 | |||
| b81aedefb1 | |||
| 584f83de0d | |||
| 4d38679155 | |||
| b378189512 | |||
| 49487a83af | |||
| 9f4729bf1d | |||
|
|
2126553e5c | ||
| 64743f430b | |||
| 28c918dd6a | |||
|
|
17d33ec18b | ||
| 5a61bb357f | |||
| ffe33276ba | |||
|
|
67f6ec2dab | ||
|
|
9554d3df63 | ||
| f6268344d0 | |||
| 130b120515 | |||
|
|
54a8cd6312 | ||
|
|
d7bde365ed | ||
| 1c5e54acc9 | |||
| b17ca2479b | |||
| 5de1e80b8c | |||
|
|
36acc0ed23 | ||
|
|
3235db7f6a | ||
|
|
79a3dbf499 | ||
| 7a22b6ddae | |||
| f42fa6550e | |||
| 3c525bf006 | |||
| e405e372cd | |||
|
|
bc9849e3ee | ||
| 0730d1bf9d | |||
|
|
45c5900481 | ||
|
|
f4ded0857c | ||
| c34f8b54c4 | |||
| ce5a48c083 | |||
| e6ccc6f2af | |||
|
|
20c1c7695f | ||
| 5df6384abf | |||
| be6fbc73a1 | |||
| b0cc8f8ce7 | |||
| cc03ef6c1d | |||
| f2ef10be64 | |||
| 6a1ab875c7 | |||
| b3b02669f7 | |||
| f963d8bdcf | |||
| f3538a09f9 | |||
| af62d6cf96 | |||
| ead5a9e173 | |||
|
|
a77be8b696 | ||
| 3493e18630 | |||
|
|
cb3b3e3b63 | ||
| cbb44ce1d9 | |||
| 8dbe3e9505 | |||
|
|
be9e468354 | ||
| 66dd3bb37d | |||
| a7d1a6077b | |||
| 734dc8608f | |||
| 01310bfca4 | |||
| 06c64d499f | |||
| 892f7b4fa8 | |||
| 4f0db0a2df | |||
| 4f905dd47a | |||
| 8fdbc6a78e | |||
| ddc67f3a41 | |||
| 633fbe7391 | |||
| f669b1af9d | |||
| e71832f57a |
23 changed files with 532 additions and 146 deletions
|
|
@ -1,12 +0,0 @@
|
|||
#!/bin/bash
|
||||
set -e
|
||||
|
||||
cd /root
|
||||
cp -Rv build build2
|
||||
cd build2
|
||||
|
||||
SPEC=$(ls *.spec)
|
||||
VER=$(grep Version $SPEC | awk '{print $2}')
|
||||
tar cvf $HOME/rpmbuild/SOURCES/uJETL-${VER}.tar.gz --show-transformed --transform="s/^\./uJETL-${VER}/" .
|
||||
rpmbuild -ba $SPEC
|
||||
cp /root/rpmbuild/RPMS/x86_64/* /root/build/
|
||||
|
|
@ -1,6 +0,0 @@
|
|||
#!/bin/bash
|
||||
set -e
|
||||
|
||||
docker build --rm -t local/c7-buildhost docker/build
|
||||
|
||||
docker run -it --rm -v `pwd`:/root/build local/c7-buildhost /root/build/build_util/build_rpm
|
||||
|
|
@ -1,2 +1,3 @@
|
|||
#!/bin/bash
|
||||
docker build --target deploy -t rasilon/ujetl docker/multistage
|
||||
docker tag rasilon/ujetl:latest rasilon/ujetl:$(xpath -q -e '/project/version/text()' pom.xml)
|
||||
|
|
|
|||
4
build_util/push_docker_images
Executable file
4
build_util/push_docker_images
Executable file
|
|
@ -0,0 +1,4 @@
|
|||
#!/bin/bash
|
||||
docker push rasilon/ujetl:latest
|
||||
docker push rasilon/ujetl:$(xpath -q -e '/project/version/text()' pom.xml)
|
||||
|
||||
|
|
@ -21,15 +21,15 @@ begin
|
|||
pg_attribute,
|
||||
pg_namespace
|
||||
WHERE
|
||||
pg_class.relname = tabname AND
|
||||
indrelid = pg_class.oid AND
|
||||
nspname = sch AND
|
||||
pg_class.relnamespace = pg_namespace.oid AND
|
||||
pg_attribute.attrelid = pg_class.oid AND
|
||||
pg_attribute.attnum = any(pg_index.indkey)
|
||||
pg_class.relname = tabname
|
||||
AND indrelid = pg_class.oid
|
||||
AND nspname = sch
|
||||
AND pg_class.relnamespace = pg_namespace.oid
|
||||
AND pg_attribute.attrelid = pg_class.oid
|
||||
AND pg_attribute.attnum = any(pg_index.indkey)
|
||||
AND indisprimary ;
|
||||
|
||||
header := 'INSERT INTO '||quote_ident(sch)||'.'||quote_ident(tabname)||E' as t (\n ';
|
||||
header := E'INSERT INTO '||quote_ident(sch)||'.'||quote_ident(tabname)||E' as t (\n ';
|
||||
for colinfo in
|
||||
select
|
||||
*
|
||||
|
|
@ -40,7 +40,6 @@ begin
|
|||
and table_name = tabname
|
||||
order by ordinal_position
|
||||
loop
|
||||
raise info 'Working on %.% (%)',sch,tabname,colinfo::text;
|
||||
if not is_first then
|
||||
col_list := col_list || E',\n ';
|
||||
vals := vals || E',\n ';
|
||||
|
|
@ -48,7 +47,7 @@ begin
|
|||
changes := changes || E'\n OR ';
|
||||
end if;
|
||||
col_list := col_list || quote_ident(colinfo.column_name);
|
||||
vals := vals || '?::' || quote_ident(colinfo.data_type);
|
||||
vals := vals || '?::' || colinfo.data_type;
|
||||
sets := sets || quote_ident(colinfo.column_name) ||
|
||||
E' = EXCLUDED.' || quote_ident(colinfo.column_name);
|
||||
changes := changes || E't.' || quote_ident(colinfo.column_name) ||
|
||||
|
|
|
|||
65
config_util/ujetl_select_generator.sql
Normal file
65
config_util/ujetl_select_generator.sql
Normal file
|
|
@ -0,0 +1,65 @@
|
|||
CREATE OR REPLACE FUNCTION pg_temp.ujetl_select(sch text, tabname text)
|
||||
RETURNS text
|
||||
LANGUAGE plpgsql
|
||||
AS $function$
|
||||
declare
|
||||
s text := '';
|
||||
header text := '';
|
||||
col_list text := '';
|
||||
vals text := '';
|
||||
sets text := '';
|
||||
changes text := '';
|
||||
is_first boolean := true;
|
||||
colinfo record;
|
||||
pks text;
|
||||
begin
|
||||
SELECT
|
||||
array_to_string(array_agg(quote_ident(pg_attribute.attname::text) ),', ') into pks
|
||||
FROM
|
||||
pg_index,
|
||||
pg_class,
|
||||
pg_attribute,
|
||||
pg_namespace
|
||||
WHERE
|
||||
pg_class.relname = tabname
|
||||
AND indrelid = pg_class.oid
|
||||
AND nspname = sch
|
||||
AND pg_class.relnamespace = pg_namespace.oid
|
||||
AND pg_attribute.attrelid = pg_class.oid
|
||||
AND pg_attribute.attnum = any(pg_index.indkey)
|
||||
AND indisprimary ;
|
||||
|
||||
header := E'SELECT\n ';
|
||||
for colinfo in
|
||||
select
|
||||
*
|
||||
from
|
||||
information_schema.columns
|
||||
where
|
||||
table_schema = sch
|
||||
and table_name = tabname
|
||||
order by ordinal_position
|
||||
loop
|
||||
if not is_first then
|
||||
col_list := col_list || E',\n ';
|
||||
end if;
|
||||
col_list := col_list || quote_ident(colinfo.column_name);
|
||||
|
||||
is_first = false;
|
||||
end loop;
|
||||
|
||||
s := header ||
|
||||
coalesce(col_list,'col_list failed') ||
|
||||
E'\nFROM\n ' ||
|
||||
quote_ident(sch)||'.'||quote_ident(tabname)||E' as t \n '||
|
||||
E'WHERE\n insert criteria here >= ?::datatype';
|
||||
return s;
|
||||
end;
|
||||
$function$
|
||||
;
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
@ -1,17 +1,13 @@
|
|||
FROM centos:centos7 as builder
|
||||
RUN yum -y install epel-release java-1.8.0-openjdk-devel
|
||||
RUN yum -y groupinstall 'Development Tools'
|
||||
RUN yum -y install git maven
|
||||
RUN git clone https://github.com/rasilon/ujetl.git
|
||||
RUN cd ujetl && mvn package
|
||||
FROM ubuntu:22.04 as builder
|
||||
RUN apt-get update && apt-get -y upgrade
|
||||
RUN apt-get -y install openjdk-19-jdk-headless maven git
|
||||
RUN git clone --single-branch --branch main https://github.com/rasilon/ujetl.git
|
||||
RUN cd ujetl && mvn -e package
|
||||
|
||||
|
||||
|
||||
|
||||
FROM openjdk:8-alpine as runner
|
||||
FROM openjdk:11 as runner
|
||||
LABEL maintainer="Derry Hamilton <derryh@rasilon.net>"
|
||||
|
||||
RUN apk update && apk upgrade && apk add bash
|
||||
RUN apt update && apt upgrade -y && apt install -y bash
|
||||
|
||||
RUN mkdir -p /usr/share/ujetl/lib/ /var/ujetl /etc/ujetl
|
||||
|
||||
|
|
@ -24,7 +20,7 @@ CMD ["/ujetl_entrypoint"]
|
|||
FROM runner as tester
|
||||
COPY TEST_config_live.xml /var/ujetl/
|
||||
COPY wait_for_postgres /
|
||||
RUN apk add postgresql-client
|
||||
RUN apt-get install -y postgresql-client
|
||||
|
||||
|
||||
FROM runner as deploy
|
||||
|
|
|
|||
|
|
@ -4,20 +4,27 @@
|
|||
<nRowsToLog>10000</nRowsToLog>
|
||||
<blockSize>1000</blockSize>
|
||||
<pollTimeout>500</pollTimeout>
|
||||
<drivers>
|
||||
<driver>org.postgresql.Driver</driver>
|
||||
<driver>org.relique.jdbc.csv.CsvDriver</driver>
|
||||
</drivers>
|
||||
|
||||
<source>
|
||||
<dsn>jdbc:postgresql://localhost:5432/test</dsn>
|
||||
<dsn>jdbc:postgresql://testdb:5432/test</dsn>
|
||||
<username>test</username>
|
||||
<password>test</password>
|
||||
<networkTimeout>600000</networkTimeout>
|
||||
</source>
|
||||
<dest>
|
||||
<dsn>jdbc:postgresql://localhost:5432/test</dsn>
|
||||
<dsn>jdbc:postgresql://testdb:5432/test</dsn>
|
||||
<username>test</username>
|
||||
<password>test</password>
|
||||
</dest>
|
||||
<jobs>
|
||||
<job>
|
||||
<name>test</name>
|
||||
<identifySourceSQL>select 'PID:'||pg_backend_pid()</identifySourceSQL>
|
||||
<identifyDestinationSQL>select 'PID:'||pg_backend_pid()</identifyDestinationSQL>
|
||||
<key>select coalesce(-1,max(id),-1) as key from dest</key>
|
||||
<select>
|
||||
select
|
||||
|
|
@ -50,9 +57,46 @@
|
|||
</insert>
|
||||
</job>
|
||||
<job>
|
||||
<name>denormalise</name>
|
||||
<name>test upsert</name>
|
||||
<identifySourceSQL>select 'PID:'||pg_backend_pid()</identifySourceSQL>
|
||||
<identifyDestinationSQL>select 'PID:'||pg_backend_pid()</identifyDestinationSQL>
|
||||
<key>select -1 as key</key>
|
||||
<select>select person_id,fname,lname from normalised_personalia p join normalised_first_names f using(fid) join normalised_last_names l using(lid);</select>
|
||||
<select>
|
||||
select
|
||||
id,
|
||||
test_int,
|
||||
test_text,
|
||||
test_ts
|
||||
from
|
||||
public.source where id > ?::bigint</select>
|
||||
<insert>
|
||||
insert into public.dest(
|
||||
id,
|
||||
test_int,
|
||||
test_text,
|
||||
test_ts
|
||||
)values(
|
||||
?::bigint,
|
||||
?::integer,
|
||||
?::text,
|
||||
?::timestamp with time zone
|
||||
)ON CONFLICT(id) DO UPDATE
|
||||
set
|
||||
test_int = EXCLUDED.test_int,
|
||||
test_text = EXCLUDED.test_text,
|
||||
test_ts = EXCLUDED.test_ts
|
||||
WHERE
|
||||
dest.test_int IS DISTINCT FROM EXCLUDED.test_int
|
||||
OR dest.test_text IS DISTINCT FROM EXCLUDED.test_text
|
||||
OR dest.test_ts IS DISTINCT FROM EXCLUDED.test_ts
|
||||
</insert>
|
||||
</job>
|
||||
<job>
|
||||
<name>denormalise</name>
|
||||
<identifySourceSQL>select 'PID:'||pg_backend_pid()</identifySourceSQL>
|
||||
<identifyDestinationSQL>select 'PID:'||pg_backend_pid()</identifyDestinationSQL>
|
||||
<key>select -1 as key</key>
|
||||
<select>select person_id,fname,lname from normalised_personalia p join normalised_first_names f using(fid) join normalised_last_names l using(lid) where ?::integer is not null;</select>
|
||||
<insert>
|
||||
INSERT INTO denormalised_personalia(person_id,fname,lname)
|
||||
values(?::integer,?::text,?::text)
|
||||
|
|
@ -65,5 +109,59 @@
|
|||
OR denormalised_personalia.lname is distinct from EXCLUDED.lname
|
||||
</insert>
|
||||
</job>
|
||||
<job>
|
||||
<name>test pre post</name>
|
||||
<key>select -1 as key</key>
|
||||
<select>
|
||||
select
|
||||
id,
|
||||
test_int,
|
||||
test_text,
|
||||
test_ts
|
||||
from
|
||||
public.source where id > ?::bigint
|
||||
</select>
|
||||
<preTarget>
|
||||
drop table if exists tmp_dest;
|
||||
create temp table tmp_dest(
|
||||
id bigint,
|
||||
test_int integer,
|
||||
test_text text,
|
||||
test_ts timestamp with time zone
|
||||
);
|
||||
</preTarget>
|
||||
<insert>
|
||||
insert into tmp_dest(
|
||||
id,
|
||||
test_int,
|
||||
test_text,
|
||||
test_ts
|
||||
)values(
|
||||
?::bigint,
|
||||
?::integer,
|
||||
?::text,
|
||||
?::timestamp with time zone
|
||||
)
|
||||
</insert>
|
||||
<insert>
|
||||
insert into public.dest(
|
||||
id,
|
||||
test_int,
|
||||
test_text,
|
||||
test_ts
|
||||
)
|
||||
select id,test_int,test_text,test_ts
|
||||
from tmp_dest
|
||||
ON CONFLICT(id) DO UPDATE
|
||||
set
|
||||
test_int = EXCLUDED.test_int,
|
||||
test_text = EXCLUDED.test_text,
|
||||
test_ts = EXCLUDED.test_ts
|
||||
WHERE
|
||||
dest.test_int IS DISTINCT FROM EXCLUDED.test_int
|
||||
OR dest.test_text IS DISTINCT FROM EXCLUDED.test_text
|
||||
OR dest.test_ts IS DISTINCT FROM EXCLUDED.test_ts
|
||||
</insert>
|
||||
</job>
|
||||
</jobs>
|
||||
</CopyingApp>
|
||||
|
|
|
|||
4
docker/multistage/small.csv
Normal file
4
docker/multistage/small.csv
Normal file
|
|
@ -0,0 +1,4 @@
|
|||
id,dat
|
||||
1,banana
|
||||
2,potato
|
||||
3,nugget
|
||||
|
|
|
@ -10,10 +10,11 @@ ls
|
|||
echo Starting run loop
|
||||
for file in *.xml
|
||||
do
|
||||
/usr/bin/java \
|
||||
/usr/local/openjdk-11/bin/java \
|
||||
-Xms1g \
|
||||
-Xmx2g \
|
||||
-cp /usr/share/ujetl/lib/CopyingApp.jar \
|
||||
-Dlog4j.configurationFile="$LOG_PROPS" \
|
||||
com.rasilon.ujetl.CopyingApp \
|
||||
--log4j "$LOG_PROPS" \
|
||||
--config "$file"
|
||||
|
|
|
|||
|
|
@ -8,5 +8,14 @@ until PGPASSWORD=test psql -h "testdb" -U "test" -c 'SELECT 1 FROM public.contai
|
|||
sleep 1
|
||||
done
|
||||
|
||||
>&2 echo "Postgres is up - Waiting for the reboot"
|
||||
sleep 3 # Wait for the Postgres reboot at the end of setup
|
||||
|
||||
until PGPASSWORD=test psql -h "testdb" -U "test" -c 'SELECT 1 FROM public.container_ready' postgres; do
|
||||
>&2 echo "Postgres is unavailable - sleeping"
|
||||
sleep 1
|
||||
done
|
||||
|
||||
|
||||
>&2 echo "Postgres is up - executing command"
|
||||
exec $cmd
|
||||
|
|
|
|||
|
|
@ -44,6 +44,13 @@ CREATE TABLE denormalised_personalia(
|
|||
lname text
|
||||
);
|
||||
|
||||
CREATE TABLE test_csvjdbc(
|
||||
id integer not null primary key,
|
||||
dat text
|
||||
);
|
||||
|
||||
GRANT SELECT ON ALL TABLES IN SCHEMA public TO test;
|
||||
GRANT SELECT,INSERT,UPDATE ON denormalised_personalia TO test;
|
||||
|
||||
\c postgres
|
||||
CREATE TABLE public.container_ready AS SELECT 1 FROM(VALUES(1)) AS a(a);
|
||||
|
|
|
|||
|
|
@ -30,9 +30,9 @@ fi
|
|||
/usr/bin/java \
|
||||
-Xms1g \
|
||||
-Xmx2g \
|
||||
-Dlog4j.configurationFile="$LOG_PROPS" \
|
||||
-cp /usr/share/ujetl/lib/CopyingApp.jar \
|
||||
com.rasilon.ujetl.CopyingApp \
|
||||
--log4j "$LOG_PROPS" \
|
||||
--config "/etc/ujetl/${JOBNAME}_config_live.xml"
|
||||
|
||||
#rm -f $LOCKFILE
|
||||
|
|
|
|||
26
pom.xml
26
pom.xml
|
|
@ -6,7 +6,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma
|
|||
<groupId>com.rasilon.ujetl</groupId>
|
||||
<artifactId>CopyingApp</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<version>2.0.4</version>
|
||||
<version>2.5.2</version>
|
||||
<name>uJETL</name>
|
||||
<url>https://github.com/rasilon/ujetl</url>
|
||||
<properties>
|
||||
|
|
@ -35,13 +35,13 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma
|
|||
<dependency>
|
||||
<groupId>com.h2database</groupId>
|
||||
<artifactId>h2</artifactId>
|
||||
<version>1.4.199</version>
|
||||
<version>2.2.220</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
<version>3.9</version>
|
||||
<version>3.18.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-logging</groupId>
|
||||
|
|
@ -51,12 +51,12 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma
|
|||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-configuration2</artifactId>
|
||||
<version>2.4</version>
|
||||
<version>2.10.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-beanutils</groupId>
|
||||
<artifactId>commons-beanutils</artifactId>
|
||||
<version>1.9.3</version>
|
||||
<version>1.11.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.beust</groupId>
|
||||
|
|
@ -66,27 +66,31 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma
|
|||
<dependency>
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-api</artifactId>
|
||||
<version>2.11.2</version>
|
||||
<version>2.17.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-core</artifactId>
|
||||
<version>2.11.2</version>
|
||||
<version>2.25.3</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.postgresql</groupId>
|
||||
<artifactId>postgresql</artifactId>
|
||||
<version>42.2.5</version>
|
||||
<version>42.7.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>net.sourceforge.csvjdbc</groupId>
|
||||
<artifactId>csvjdbc</artifactId>
|
||||
<version>1.0.40</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<version>2.3.2</version>
|
||||
<version>3.8.0</version>
|
||||
<configuration>
|
||||
<source>1.8</source>
|
||||
<target>1.8</target>
|
||||
<release>11</release>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
|
|
|
|||
|
|
@ -34,10 +34,6 @@ public class CopyingApp {
|
|||
public static void main(String[] args) {
|
||||
CopyingAppCommandParser cli = new CopyingAppCommandParser(args);
|
||||
LoggerContext context = (org.apache.logging.log4j.core.LoggerContext) LogManager.getContext(false);
|
||||
String log4jConfigLocation = cli.getLog4jConfigFile();
|
||||
File file = new File(log4jConfigLocation);
|
||||
context.setConfigLocation(file.toURI());
|
||||
System.out.println("Config set from "+file.toURI());
|
||||
|
||||
CopyingApp app = new CopyingApp(cli);
|
||||
try {
|
||||
|
|
@ -79,6 +75,7 @@ public class CopyingApp {
|
|||
|
||||
Configuration config = configs.xml(cli.getConfigFile());
|
||||
|
||||
loadDrivers(config);
|
||||
String hardLimitSeconds = config.getString("hardLimitSeconds");
|
||||
if(hardLimitSeconds != null) {
|
||||
TimeLimiter hardLimit = new TimeLimiter(Integer.decode(hardLimitSeconds).intValue(),true);
|
||||
|
|
@ -131,7 +128,27 @@ public class CopyingApp {
|
|||
String tabKey = config.getString("jobs.job("+i+").key");
|
||||
String tabSelect = config.getString("jobs.job("+i+").select");
|
||||
String tabInsert = config.getString("jobs.job("+i+").insert");
|
||||
Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,nRowsToLog,blockSize,pollTimeout);
|
||||
String preTarget = config.getString("jobs.job("+i+").preTarget");
|
||||
String postTarget = config.getString("jobs.job("+i+").postTarget");
|
||||
String identifySourceSQL = config.getString("jobs.job.identifySourceSQL");
|
||||
String identifyDestinationSQL = config.getString("jobs.job.identifyDestinationSQL");
|
||||
|
||||
Job j = new Job(
|
||||
sConn,
|
||||
dConn,
|
||||
tabName,
|
||||
jobName,
|
||||
tabKey,
|
||||
tabSelect,
|
||||
tabInsert,
|
||||
preTarget,
|
||||
postTarget,
|
||||
nRowsToLog,
|
||||
blockSize,
|
||||
pollTimeout,
|
||||
identifySourceSQL,
|
||||
identifyDestinationSQL
|
||||
);
|
||||
j.start();
|
||||
j.join();
|
||||
|
||||
|
|
@ -141,7 +158,28 @@ public class CopyingApp {
|
|||
String tabKey = config.getString("jobs.job.key");
|
||||
String tabSelect = config.getString("jobs.job.select");
|
||||
String tabInsert = config.getString("jobs.job.insert");
|
||||
Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,nRowsToLog,blockSize,pollTimeout);
|
||||
String preTarget = config.getString("jobs.job.preTarget");
|
||||
String postTarget = config.getString("jobs.job.postTarget");
|
||||
String identifySourceSQL = config.getString("jobs.job.identifySourceSQL");
|
||||
String identifyDestinationSQL = config.getString("jobs.job.identifyDestinationSQL");
|
||||
|
||||
|
||||
Job j = new Job(
|
||||
sConn,
|
||||
dConn,
|
||||
tabName,
|
||||
jobName,
|
||||
tabKey,
|
||||
tabSelect,
|
||||
tabInsert,
|
||||
preTarget,
|
||||
postTarget,
|
||||
nRowsToLog,
|
||||
blockSize,
|
||||
pollTimeout,
|
||||
identifySourceSQL,
|
||||
identifyDestinationSQL
|
||||
);
|
||||
j.start();
|
||||
j.join();
|
||||
} else {
|
||||
|
|
@ -203,4 +241,21 @@ public class CopyingApp {
|
|||
|
||||
return c;
|
||||
}
|
||||
|
||||
// Even with JDBC 4, some drivers don't play nicely with whatever
|
||||
// the classloaders are up to. So this allows us to force it the
|
||||
// old fashioned way, and works around the
|
||||
// "But it works fine when it's the /only/ driver!"
|
||||
// cross-database problem
|
||||
private void loadDrivers(Configuration config) {
|
||||
String[] drivers = config.get(String[].class, "drivers.driver");
|
||||
for(String d:drivers) {
|
||||
try {
|
||||
Class.forName(d);
|
||||
log.info("Preloaded driver "+d);
|
||||
} catch(ClassNotFoundException e) {
|
||||
log.error("Could not preload driver "+d,e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ public class CopyingAppCommandParser {
|
|||
private String configFile;
|
||||
|
||||
@Parameter(names = {"-log4j","--log4j"}, description = "Log4J config file for this run")
|
||||
private String log4jConfigFile = "/etc/ppl/default_log4j_config.properties";
|
||||
private String log4jConfigFile = "/etc/ujetl/default_log4j_config.properties";
|
||||
|
||||
public CopyingAppCommandParser(String[] args) {
|
||||
super();
|
||||
|
|
@ -23,8 +23,4 @@ public class CopyingAppCommandParser {
|
|||
return configFile;
|
||||
}
|
||||
|
||||
public String getLog4jConfigFile() {
|
||||
return log4jConfigFile;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,23 +22,29 @@ import org.apache.logging.log4j.Logger;
|
|||
public class Job extends Thread {
|
||||
static Logger log = org.apache.logging.log4j.LogManager.getLogger(Job.class);
|
||||
|
||||
Connection sConn;
|
||||
Connection dConn;
|
||||
String name;
|
||||
String jobName;
|
||||
String key;
|
||||
String select;
|
||||
String insert;
|
||||
Integer nRowsToLog;
|
||||
Integer blockSize;
|
||||
Integer pollTimeout;
|
||||
private Connection sConn;
|
||||
private Connection dConn;
|
||||
private String name;
|
||||
private String jobName;
|
||||
private String key;
|
||||
private String select;
|
||||
private String insert;
|
||||
private String preTarget;
|
||||
private String postTarget;
|
||||
private Integer nRowsToLog;
|
||||
private Integer blockSize;
|
||||
private Integer pollTimeout;
|
||||
private String identifySourceSQL;
|
||||
private String identifyDestinationSQL;
|
||||
|
||||
BlockingQueue<List<String>> resultBuffer;
|
||||
AtomicBoolean producerLive;
|
||||
AtomicBoolean threadsExit = new AtomicBoolean(false);;
|
||||
private BlockingQueue<List<String>> resultBuffer;
|
||||
private AtomicBoolean producerLive;
|
||||
private AtomicBoolean threadsExit = new AtomicBoolean(false);;
|
||||
private String sourceID;
|
||||
private String destID;
|
||||
|
||||
|
||||
public Job(Connection sConn,Connection dConn,String name,String jobName,String key,String select,String insert,Integer nRowsToLog,Integer blockSize,Integer pollTimeout) {
|
||||
public Job(Connection sConn,Connection dConn,String name,String jobName,String key,String select,String insert,String preTarget,String postTarget,Integer nRowsToLog,Integer blockSize,Integer pollTimeout,String identifySourceSQL, String identifyDestinationSQL) {
|
||||
this.sConn = sConn;
|
||||
this.dConn = dConn;
|
||||
this.name = name;
|
||||
|
|
@ -46,9 +52,13 @@ public class Job extends Thread {
|
|||
this.key = key;
|
||||
this.select = select;
|
||||
this.insert = insert;
|
||||
this.preTarget = preTarget;
|
||||
this.postTarget = postTarget;
|
||||
this.nRowsToLog = nRowsToLog;
|
||||
this.blockSize = blockSize;
|
||||
this.pollTimeout = pollTimeout;
|
||||
this.identifySourceSQL = identifySourceSQL;
|
||||
this.identifyDestinationSQL = identifyDestinationSQL;
|
||||
|
||||
resultBuffer = new ArrayBlockingQueue<List<String>>( 3 * blockSize);
|
||||
producerLive = new AtomicBoolean(true);
|
||||
|
|
@ -72,12 +82,12 @@ public class Job extends Thread {
|
|||
public Producer(ResultSet src,BlockingQueue q) {
|
||||
this.src = src;
|
||||
this.q = q;
|
||||
this.setName(String.format("%s-%s-Consumer",jobName,name));
|
||||
this.setName(String.format("%s-%s-Producer",jobName,name));
|
||||
}
|
||||
public void run() {
|
||||
try {
|
||||
long rowsInserted = 0;
|
||||
long rowNum = 0;
|
||||
long rowsAttempted = 0;
|
||||
long stamp = System.nanoTime();
|
||||
long nstamp;
|
||||
int columnCount = src.getMetaData().getColumnCount();
|
||||
|
|
@ -95,13 +105,13 @@ public class Job extends Thread {
|
|||
}
|
||||
log.trace("Producer queue full.");
|
||||
}
|
||||
rowNum++;
|
||||
if(rowNum % nRowsToLog == 0) {
|
||||
log.info(String.format("%s - Queued %s rows for %s so far",jobName,rowNum,name));
|
||||
rowsAttempted++;
|
||||
if(rowsAttempted % nRowsToLog == 0) {
|
||||
log.info(String.format("%s - Queued %s rows for %s so far",jobName,rowsAttempted,name));
|
||||
}
|
||||
}
|
||||
producerLive.set(false);
|
||||
log.info(String.format("%s - Queued a total of %s rows for %s",jobName,rowNum,name));
|
||||
log.info(String.format("%s - Queued a total of %s rows for %s",jobName,rowsAttempted,name));
|
||||
} catch(Exception e) {
|
||||
producerLive.set(false); // Signal we've exited.
|
||||
threadsExit.set(true); // Signal we've exited.
|
||||
|
|
@ -122,7 +132,7 @@ public class Job extends Thread {
|
|||
}
|
||||
public void run() {
|
||||
try {
|
||||
long rowNum = 0;
|
||||
long rowsAttempted = 0;
|
||||
long rowsInserted = 0;
|
||||
|
||||
while(true) {
|
||||
|
|
@ -133,7 +143,7 @@ public class Job extends Thread {
|
|||
if(row == null && producerLive.get() == false) {
|
||||
rowsInserted += arraySum(insertStatement.executeBatch());
|
||||
dConn.commit();
|
||||
log.info(String.format("%s - Inserted a total of %s of %s notified rows into %s",jobName,rowsInserted,rowNum,name));
|
||||
log.info(String.format("%s - Inserted a total of %s of %s notified rows into %s",jobName,rowsInserted,rowsAttempted,name));
|
||||
return;
|
||||
}
|
||||
if(threadsExit.get()) {
|
||||
|
|
@ -150,14 +160,14 @@ public class Job extends Thread {
|
|||
}
|
||||
insertStatement.addBatch();
|
||||
|
||||
rowNum++;
|
||||
if(rowNum % nRowsToLog == 0) {
|
||||
rowsAttempted++;
|
||||
if(rowsAttempted % nRowsToLog == 0) {
|
||||
rowsInserted += arraySum(insertStatement.executeBatch());
|
||||
dConn.commit();
|
||||
log.info(String.format("%s - Inserted %s of %s notified rows into %s",
|
||||
jobName,
|
||||
rowsInserted,
|
||||
rowNum,
|
||||
rowsAttempted,
|
||||
name));
|
||||
}
|
||||
}
|
||||
|
|
@ -169,11 +179,34 @@ public class Job extends Thread {
|
|||
}
|
||||
}
|
||||
|
||||
// Outer run
|
||||
public void run() {
|
||||
try {
|
||||
ResultSet rs;
|
||||
|
||||
if(identifySourceSQL != null) sourceID = getSingleString(identifySourceSQL,sConn);
|
||||
if(identifyDestinationSQL != null) destID = getSingleString(identifyDestinationSQL,dConn);
|
||||
|
||||
if(sourceID != null || destID != null){
|
||||
log.info(String.format(
|
||||
"%s - Processing table: %s with source: %s, dest: %s",
|
||||
jobName,
|
||||
name,
|
||||
sourceID==null?"":sourceID,
|
||||
destID==null?"":destID
|
||||
));
|
||||
}else{
|
||||
log.info(String.format("%s - Processing table: %s",jobName,name));
|
||||
}
|
||||
if(preTarget != null){
|
||||
log.info(String.format("%s - Trying to execute preTarget SQL",jobName));
|
||||
PreparedStatement s = dConn.prepareStatement(preTarget);
|
||||
s.executeUpdate();
|
||||
s.close();
|
||||
dConn.commit();
|
||||
}else{
|
||||
log.info(String.format("%s - No preTarget; skipping.",jobName));
|
||||
}
|
||||
|
||||
log.debug("Trying to execute: "+key);
|
||||
PreparedStatement keyStatement = dConn.prepareStatement(key);
|
||||
|
|
@ -211,10 +244,33 @@ public class Job extends Thread {
|
|||
p.join();
|
||||
c.join();
|
||||
|
||||
if(postTarget != null){
|
||||
log.info(String.format("%s - Trying to execute postTarget SQL",jobName));
|
||||
PreparedStatement s = dConn.prepareStatement(postTarget);
|
||||
s.executeUpdate();
|
||||
s.close();
|
||||
dConn.commit();
|
||||
}else{
|
||||
log.info(String.format("%s - No postTarget; skipping.",jobName));
|
||||
}
|
||||
|
||||
|
||||
} catch(InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
} catch(SQLException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private String getSingleString(String sql, Connection conn){
|
||||
try{
|
||||
PreparedStatement s = conn.prepareStatement(sql);
|
||||
ResultSet r = s.executeQuery();
|
||||
r.next();
|
||||
return r.getString(1);
|
||||
} catch(SQLException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
37
src/test/java/com/rasilon/ujetl/TestConfig.java
Normal file
37
src/test/java/com/rasilon/ujetl/TestConfig.java
Normal file
|
|
@ -0,0 +1,37 @@
|
|||
package com.rasilon.ujetl;
|
||||
|
||||
import org.apache.commons.configuration2.Configuration;
|
||||
import org.apache.commons.configuration2.builder.fluent.Configurations;
|
||||
import org.apache.commons.configuration2.ex.ConfigurationException;
|
||||
|
||||
import org.apache.commons.beanutils.PropertyUtils; // Why does config need this?
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.MethodOrderer.Alphanumeric;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
|
||||
|
||||
/**
|
||||
* @author derryh
|
||||
*
|
||||
*/
|
||||
public class TestConfig {
|
||||
|
||||
@Test
|
||||
public void test001VerifyArrayOfDrivers() {
|
||||
try {
|
||||
Configurations configs = new Configurations();
|
||||
Configuration config = configs.xml("TEST_config_live.xml");
|
||||
String[] drivers = config.get(String[].class, "drivers.driver");
|
||||
int ndrivers =drivers.length;
|
||||
if(ndrivers != 3){
|
||||
fail("Expected 3 drivers, but found "+ndrivers);
|
||||
}
|
||||
} catch(Exception e) {
|
||||
fail(e.toString());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -42,12 +42,16 @@ public class TestJob {
|
|||
dest,
|
||||
"jUnit Test Config",
|
||||
"jUnit Test Job",
|
||||
"SELECT -1 AS key",
|
||||
"SELECT -1 AS \"key\"",
|
||||
"SELECT id,dat FROM src WHERE id > ?",
|
||||
"INSERT INTO dest VALUES(?,?)",
|
||||
null,
|
||||
null,
|
||||
100,
|
||||
100,
|
||||
100
|
||||
100,
|
||||
"select 'PID:'||session_id()",
|
||||
"select 'PID:'||session_id()"
|
||||
);
|
||||
j.start();
|
||||
j.join();
|
||||
|
|
|
|||
|
|
@ -13,15 +13,12 @@ public class TestParser {
|
|||
public void test001Parset() {
|
||||
try {
|
||||
String[] args = {
|
||||
"--log4j",
|
||||
"log4j_test_banana.xml",
|
||||
"--config",
|
||||
"config_test_banana.xml"
|
||||
};
|
||||
CopyingAppCommandParser p = new CopyingAppCommandParser(args);
|
||||
|
||||
assertEquals(p.getConfigFile(),"config_test_banana.xml");
|
||||
assertEquals(p.getLog4jConfigFile(),"log4j_test_banana.xml");
|
||||
|
||||
} catch(Exception e) {
|
||||
fail(e.toString());
|
||||
|
|
|
|||
64
src/test/java/com/rasilon/ujetl/TestPrePost.java
Normal file
64
src/test/java/com/rasilon/ujetl/TestPrePost.java
Normal file
|
|
@ -0,0 +1,64 @@
|
|||
package com.rasilon.ujetl;
|
||||
|
||||
import java.sql.*;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.MethodOrderer.Alphanumeric;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
|
||||
|
||||
public class TestPrePost {
|
||||
|
||||
private static String jdbcURL = "jdbc:h2:mem:dbtest";
|
||||
@Test
|
||||
public void test002verifyH2Works() {
|
||||
try {
|
||||
Connection conn = DriverManager.getConnection(jdbcURL, "sa", "");
|
||||
conn.close();
|
||||
} catch(Exception e) {
|
||||
fail(e.toString());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPrePost() {
|
||||
try (
|
||||
Connection src = DriverManager.getConnection(jdbcURL, "sa", "");
|
||||
Connection dest = DriverManager.getConnection(jdbcURL, "sa", "");
|
||||
|
||||
) {
|
||||
src.createStatement().executeUpdate("CREATE TABLE src(id bigint not null primary key, dat varchar);");
|
||||
dest.createStatement().executeUpdate("CREATE TABLE dest(id bigint not null primary key, dat varchar);");
|
||||
PreparedStatement inserter = src.prepareStatement("INSERT INTO src(id,dat) VALUES(?,'banana')");
|
||||
for(int i=0; i<10000; i++) {
|
||||
inserter.setInt(1,i);
|
||||
inserter.executeUpdate();
|
||||
}
|
||||
|
||||
Job j = new Job(
|
||||
src,
|
||||
dest,
|
||||
"jUnit Test Config",
|
||||
"jUnit Test Job",
|
||||
"SELECT -1 AS \"key\"",
|
||||
"SELECT id,dat FROM src WHERE id > ?",
|
||||
"INSERT INTO tmp_dest VALUES(?,?)",
|
||||
"CREATE TEMP TABLE tmp_dest(id bigint not null primary key, dat varchar);",
|
||||
"INSERT INTO dest SELECT * from tmp_dest;",
|
||||
100,
|
||||
100,
|
||||
100,
|
||||
"select 'PID:'||session_id()",
|
||||
"select 'PID:'||session_id()"
|
||||
);
|
||||
j.start();
|
||||
j.join();
|
||||
// do stuff
|
||||
} catch(Exception e) {
|
||||
e.printStackTrace();
|
||||
fail(e.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -4,6 +4,11 @@
|
|||
<nRowsToLog>10000</nRowsToLog>
|
||||
<blockSize>1000</blockSize>
|
||||
<pollTimeout>500</pollTimeout>
|
||||
<drivers>
|
||||
<driver>org.postgresql.Driver</driver>
|
||||
<driver>org.h2.Driver</driver>
|
||||
<driver>org.relique.jdbc.csv.CsvDriver</driver>
|
||||
</drivers>
|
||||
<source>
|
||||
<dsn>jdbc:postgresql://localhost:5432/test</dsn>
|
||||
<username>test</username>
|
||||
|
|
@ -18,6 +23,8 @@
|
|||
<jobs>
|
||||
<job>
|
||||
<name>test</name>
|
||||
<identifySourceSQL>select 'PID:'||pg_backend_pid()</identifySourceSQL>
|
||||
<identifyDestinationSQL>select 'PID:'||pg_backend_pid()</identifyDestinationSQL>
|
||||
<key>select coalesce(-1,max(id),-1) as key from dest</key>
|
||||
<select>
|
||||
select
|
||||
|
|
@ -49,6 +56,39 @@
|
|||
OR dest.test_ts = EXCLUDED.test_ts
|
||||
</insert>
|
||||
</job>
|
||||
<job>
|
||||
<name>test upsert</name>
|
||||
<key>select -1 as key</key>
|
||||
<select>
|
||||
select
|
||||
id,
|
||||
test_int,
|
||||
test_text,
|
||||
test_ts
|
||||
from
|
||||
public.source where id > ?::bigint</select>
|
||||
<insert>
|
||||
insert into public.dest(
|
||||
id,
|
||||
test_int,
|
||||
test_text,
|
||||
test_ts
|
||||
)values(
|
||||
?::bigint,
|
||||
?::integer,
|
||||
?::text,
|
||||
?::timestamp with time zone
|
||||
)ON CONFLICT(id) DO UPDATE
|
||||
set
|
||||
test_int = EXCLUDED.test_int,
|
||||
test_text = EXCLUDED.test_text,
|
||||
test_ts = EXCLUDED.test_ts
|
||||
WHERE
|
||||
dest.test_int IS DISTINCT FROM EXCLUDED.test_int
|
||||
OR dest.test_text IS DISTINCT FROM EXCLUDED.test_text
|
||||
OR dest.test_ts IS DISTINCT FROM EXCLUDED.test_ts
|
||||
</insert>
|
||||
</job>
|
||||
<job>
|
||||
<name>denormalise</name>
|
||||
<key>select -1 as key</key>
|
||||
|
|
|
|||
33
uJETL.spec
33
uJETL.spec
|
|
@ -1,33 +0,0 @@
|
|||
Summary: Java app to facilitate moving data between databases.
|
||||
Name: uJETL
|
||||
Version: 2.0.4
|
||||
Release: 1
|
||||
Group: Applications/Database
|
||||
License: All rights reserved.
|
||||
Source: uJETL-%{version}.tar.gz
|
||||
URL: https://github.com/rasilon/ujetl.git
|
||||
Distribution: derryh
|
||||
Vendor: derryh
|
||||
Packager: Derry Hamilton <derryh@rasilon.net>
|
||||
#BuildRoot: .
|
||||
|
||||
%description
|
||||
A very small ETL app
|
||||
|
||||
%prep
|
||||
%setup
|
||||
|
||||
%build
|
||||
#mvn -Dmaven.test.skip=true clean package
|
||||
true
|
||||
|
||||
%install
|
||||
mkdir -p $RPM_BUILD_ROOT/usr/share/ujetl/lib $RPM_BUILD_ROOT/etc/ujetl $RPM_BUILD_ROOT/usr/bin
|
||||
cp target/CopyingApp-*-jar-with-dependencies.jar $RPM_BUILD_ROOT/usr/share/ujetl/lib/CopyingApp.jar
|
||||
cp install_extra/run_copying_job $RPM_BUILD_ROOT/usr/bin
|
||||
cp install_extra/copying_defaults_log4j.xml $RPM_BUILD_ROOT/etc/ujetl
|
||||
|
||||
%files
|
||||
/usr/share/ujetl/lib/CopyingApp.jar
|
||||
/usr/bin/run_copying_job
|
||||
/etc/ujetl/copying_defaults_log4j.xml
|
||||
Loading…
Add table
Add a link
Reference in a new issue