Compare commits

..

No commits in common. "main" and "v2.0.4" have entirely different histories.
main ... v2.0.4

23 changed files with 146 additions and 532 deletions

12
build_util/build_rpm Executable file
View file

@ -0,0 +1,12 @@
#!/bin/bash
set -e
cd /root
cp -Rv build build2
cd build2
SPEC=$(ls *.spec)
VER=$(grep Version $SPEC | awk '{print $2}')
tar cvf $HOME/rpmbuild/SOURCES/uJETL-${VER}.tar.gz --show-transformed --transform="s/^\./uJETL-${VER}/" .
rpmbuild -ba $SPEC
cp /root/rpmbuild/RPMS/x86_64/* /root/build/

View file

@ -0,0 +1,6 @@
#!/bin/bash
set -e
docker build --rm -t local/c7-buildhost docker/build
docker run -it --rm -v `pwd`:/root/build local/c7-buildhost /root/build/build_util/build_rpm

View file

@ -1,3 +1,2 @@
#!/bin/bash #!/bin/bash
docker build --target deploy -t rasilon/ujetl docker/multistage docker build --target deploy -t rasilon/ujetl docker/multistage
docker tag rasilon/ujetl:latest rasilon/ujetl:$(xpath -q -e '/project/version/text()' pom.xml)

View file

@ -1,4 +0,0 @@
#!/bin/bash
docker push rasilon/ujetl:latest
docker push rasilon/ujetl:$(xpath -q -e '/project/version/text()' pom.xml)

View file

@ -14,22 +14,22 @@ declare
pks text; pks text;
begin begin
SELECT SELECT
array_to_string(array_agg(quote_ident(pg_attribute.attname::text) ),', ') into pks array_to_string(array_agg(quote_ident(pg_attribute.attname::text) ),', ') into pks
FROM FROM
pg_index, pg_index,
pg_class, pg_class,
pg_attribute, pg_attribute,
pg_namespace pg_namespace
WHERE WHERE
pg_class.relname = tabname pg_class.relname = tabname AND
AND indrelid = pg_class.oid indrelid = pg_class.oid AND
AND nspname = sch nspname = sch AND
AND pg_class.relnamespace = pg_namespace.oid pg_class.relnamespace = pg_namespace.oid AND
AND pg_attribute.attrelid = pg_class.oid pg_attribute.attrelid = pg_class.oid AND
AND pg_attribute.attnum = any(pg_index.indkey) pg_attribute.attnum = any(pg_index.indkey)
AND indisprimary ; AND indisprimary ;
header := E'INSERT INTO '||quote_ident(sch)||'.'||quote_ident(tabname)||E' as t (\n '; header := 'INSERT INTO '||quote_ident(sch)||'.'||quote_ident(tabname)||E' as t (\n ';
for colinfo in for colinfo in
select select
* *
@ -40,6 +40,7 @@ begin
and table_name = tabname and table_name = tabname
order by ordinal_position order by ordinal_position
loop loop
raise info 'Working on %.% (%)',sch,tabname,colinfo::text;
if not is_first then if not is_first then
col_list := col_list || E',\n '; col_list := col_list || E',\n ';
vals := vals || E',\n '; vals := vals || E',\n ';
@ -47,7 +48,7 @@ begin
changes := changes || E'\n OR '; changes := changes || E'\n OR ';
end if; end if;
col_list := col_list || quote_ident(colinfo.column_name); col_list := col_list || quote_ident(colinfo.column_name);
vals := vals || '?::' || colinfo.data_type; vals := vals || '?::' || quote_ident(colinfo.data_type);
sets := sets || quote_ident(colinfo.column_name) || sets := sets || quote_ident(colinfo.column_name) ||
E' = EXCLUDED.' || quote_ident(colinfo.column_name); E' = EXCLUDED.' || quote_ident(colinfo.column_name);
changes := changes || E't.' || quote_ident(colinfo.column_name) || changes := changes || E't.' || quote_ident(colinfo.column_name) ||

View file

@ -1,65 +0,0 @@
CREATE OR REPLACE FUNCTION pg_temp.ujetl_select(sch text, tabname text)
RETURNS text
LANGUAGE plpgsql
AS $function$
declare
s text := '';
header text := '';
col_list text := '';
vals text := '';
sets text := '';
changes text := '';
is_first boolean := true;
colinfo record;
pks text;
begin
SELECT
array_to_string(array_agg(quote_ident(pg_attribute.attname::text) ),', ') into pks
FROM
pg_index,
pg_class,
pg_attribute,
pg_namespace
WHERE
pg_class.relname = tabname
AND indrelid = pg_class.oid
AND nspname = sch
AND pg_class.relnamespace = pg_namespace.oid
AND pg_attribute.attrelid = pg_class.oid
AND pg_attribute.attnum = any(pg_index.indkey)
AND indisprimary ;
header := E'SELECT\n ';
for colinfo in
select
*
from
information_schema.columns
where
table_schema = sch
and table_name = tabname
order by ordinal_position
loop
if not is_first then
col_list := col_list || E',\n ';
end if;
col_list := col_list || quote_ident(colinfo.column_name);
is_first = false;
end loop;
s := header ||
coalesce(col_list,'col_list failed') ||
E'\nFROM\n ' ||
quote_ident(sch)||'.'||quote_ident(tabname)||E' as t \n '||
E'WHERE\n insert criteria here >= ?::datatype';
return s;
end;
$function$
;

View file

@ -1,13 +1,17 @@
FROM ubuntu:22.04 as builder FROM centos:centos7 as builder
RUN apt-get update && apt-get -y upgrade RUN yum -y install epel-release java-1.8.0-openjdk-devel
RUN apt-get -y install openjdk-19-jdk-headless maven git RUN yum -y groupinstall 'Development Tools'
RUN git clone --single-branch --branch main https://github.com/rasilon/ujetl.git RUN yum -y install git maven
RUN cd ujetl && mvn -e package RUN git clone https://github.com/rasilon/ujetl.git
RUN cd ujetl && mvn package
FROM openjdk:11 as runner
FROM openjdk:8-alpine as runner
LABEL maintainer="Derry Hamilton <derryh@rasilon.net>" LABEL maintainer="Derry Hamilton <derryh@rasilon.net>"
RUN apt update && apt upgrade -y && apt install -y bash RUN apk update && apk upgrade && apk add bash
RUN mkdir -p /usr/share/ujetl/lib/ /var/ujetl /etc/ujetl RUN mkdir -p /usr/share/ujetl/lib/ /var/ujetl /etc/ujetl
@ -20,7 +24,7 @@ CMD ["/ujetl_entrypoint"]
FROM runner as tester FROM runner as tester
COPY TEST_config_live.xml /var/ujetl/ COPY TEST_config_live.xml /var/ujetl/
COPY wait_for_postgres / COPY wait_for_postgres /
RUN apt-get install -y postgresql-client RUN apk add postgresql-client
FROM runner as deploy FROM runner as deploy

View file

@ -4,27 +4,20 @@
<nRowsToLog>10000</nRowsToLog> <nRowsToLog>10000</nRowsToLog>
<blockSize>1000</blockSize> <blockSize>1000</blockSize>
<pollTimeout>500</pollTimeout> <pollTimeout>500</pollTimeout>
<drivers>
<driver>org.postgresql.Driver</driver>
<driver>org.relique.jdbc.csv.CsvDriver</driver>
</drivers>
<source> <source>
<dsn>jdbc:postgresql://testdb:5432/test</dsn> <dsn>jdbc:postgresql://localhost:5432/test</dsn>
<username>test</username> <username>test</username>
<password>test</password> <password>test</password>
<networkTimeout>600000</networkTimeout> <networkTimeout>600000</networkTimeout>
</source> </source>
<dest> <dest>
<dsn>jdbc:postgresql://testdb:5432/test</dsn> <dsn>jdbc:postgresql://localhost:5432/test</dsn>
<username>test</username> <username>test</username>
<password>test</password> <password>test</password>
</dest> </dest>
<jobs> <jobs>
<job> <job>
<name>test</name> <name>test</name>
<identifySourceSQL>select 'PID:'||pg_backend_pid()</identifySourceSQL>
<identifyDestinationSQL>select 'PID:'||pg_backend_pid()</identifyDestinationSQL>
<key>select coalesce(-1,max(id),-1) as key from dest</key> <key>select coalesce(-1,max(id),-1) as key from dest</key>
<select> <select>
select select
@ -56,47 +49,10 @@
OR dest.test_ts = EXCLUDED.test_ts OR dest.test_ts = EXCLUDED.test_ts
</insert> </insert>
</job> </job>
<job>
<name>test upsert</name>
<identifySourceSQL>select 'PID:'||pg_backend_pid()</identifySourceSQL>
<identifyDestinationSQL>select 'PID:'||pg_backend_pid()</identifyDestinationSQL>
<key>select -1 as key</key>
<select>
select
id,
test_int,
test_text,
test_ts
from
public.source where id > ?::bigint</select>
<insert>
insert into public.dest(
id,
test_int,
test_text,
test_ts
)values(
?::bigint,
?::integer,
?::text,
?::timestamp with time zone
)ON CONFLICT(id) DO UPDATE
set
test_int = EXCLUDED.test_int,
test_text = EXCLUDED.test_text,
test_ts = EXCLUDED.test_ts
WHERE
dest.test_int IS DISTINCT FROM EXCLUDED.test_int
OR dest.test_text IS DISTINCT FROM EXCLUDED.test_text
OR dest.test_ts IS DISTINCT FROM EXCLUDED.test_ts
</insert>
</job>
<job> <job>
<name>denormalise</name> <name>denormalise</name>
<identifySourceSQL>select 'PID:'||pg_backend_pid()</identifySourceSQL>
<identifyDestinationSQL>select 'PID:'||pg_backend_pid()</identifyDestinationSQL>
<key>select -1 as key</key> <key>select -1 as key</key>
<select>select person_id,fname,lname from normalised_personalia p join normalised_first_names f using(fid) join normalised_last_names l using(lid) where ?::integer is not null;</select> <select>select person_id,fname,lname from normalised_personalia p join normalised_first_names f using(fid) join normalised_last_names l using(lid);</select>
<insert> <insert>
INSERT INTO denormalised_personalia(person_id,fname,lname) INSERT INTO denormalised_personalia(person_id,fname,lname)
values(?::integer,?::text,?::text) values(?::integer,?::text,?::text)
@ -109,59 +65,5 @@
OR denormalised_personalia.lname is distinct from EXCLUDED.lname OR denormalised_personalia.lname is distinct from EXCLUDED.lname
</insert> </insert>
</job> </job>
<job>
<name>test pre post</name>
<key>select -1 as key</key>
<select>
select
id,
test_int,
test_text,
test_ts
from
public.source where id > ?::bigint
</select>
<preTarget>
drop table if exists tmp_dest;
create temp table tmp_dest(
id bigint,
test_int integer,
test_text text,
test_ts timestamp with time zone
);
</preTarget>
<insert>
insert into tmp_dest(
id,
test_int,
test_text,
test_ts
)values(
?::bigint,
?::integer,
?::text,
?::timestamp with time zone
)
</insert>
<insert>
insert into public.dest(
id,
test_int,
test_text,
test_ts
)
select id,test_int,test_text,test_ts
from tmp_dest
ON CONFLICT(id) DO UPDATE
set
test_int = EXCLUDED.test_int,
test_text = EXCLUDED.test_text,
test_ts = EXCLUDED.test_ts
WHERE
dest.test_int IS DISTINCT FROM EXCLUDED.test_int
OR dest.test_text IS DISTINCT FROM EXCLUDED.test_text
OR dest.test_ts IS DISTINCT FROM EXCLUDED.test_ts
</insert>
</job>
</jobs> </jobs>
</CopyingApp> </CopyingApp>

View file

@ -1,4 +0,0 @@
id,dat
1,banana
2,potato
3,nugget
1 id dat
2 1 banana
3 2 potato
4 3 nugget

View file

@ -10,11 +10,10 @@ ls
echo Starting run loop echo Starting run loop
for file in *.xml for file in *.xml
do do
/usr/local/openjdk-11/bin/java \ /usr/bin/java \
-Xms1g \ -Xms1g \
-Xmx2g \ -Xmx2g \
-cp /usr/share/ujetl/lib/CopyingApp.jar \ -cp /usr/share/ujetl/lib/CopyingApp.jar \
-Dlog4j.configurationFile="$LOG_PROPS" \
com.rasilon.ujetl.CopyingApp \ com.rasilon.ujetl.CopyingApp \
--log4j "$LOG_PROPS" \ --log4j "$LOG_PROPS" \
--config "$file" --config "$file"

View file

@ -8,14 +8,5 @@ until PGPASSWORD=test psql -h "testdb" -U "test" -c 'SELECT 1 FROM public.contai
sleep 1 sleep 1
done done
>&2 echo "Postgres is up - Waiting for the reboot"
sleep 3 # Wait for the Postgres reboot at the end of setup
until PGPASSWORD=test psql -h "testdb" -U "test" -c 'SELECT 1 FROM public.container_ready' postgres; do
>&2 echo "Postgres is unavailable - sleeping"
sleep 1
done
>&2 echo "Postgres is up - executing command" >&2 echo "Postgres is up - executing command"
exec $cmd exec $cmd

View file

@ -44,13 +44,6 @@ CREATE TABLE denormalised_personalia(
lname text lname text
); );
CREATE TABLE test_csvjdbc(
id integer not null primary key,
dat text
);
GRANT SELECT ON ALL TABLES IN SCHEMA public TO test;
GRANT SELECT,INSERT,UPDATE ON denormalised_personalia TO test;
\c postgres \c postgres
CREATE TABLE public.container_ready AS SELECT 1 FROM(VALUES(1)) AS a(a); CREATE TABLE public.container_ready AS SELECT 1 FROM(VALUES(1)) AS a(a);

View file

@ -30,9 +30,9 @@ fi
/usr/bin/java \ /usr/bin/java \
-Xms1g \ -Xms1g \
-Xmx2g \ -Xmx2g \
-Dlog4j.configurationFile="$LOG_PROPS" \
-cp /usr/share/ujetl/lib/CopyingApp.jar \ -cp /usr/share/ujetl/lib/CopyingApp.jar \
com.rasilon.ujetl.CopyingApp \ com.rasilon.ujetl.CopyingApp \
--log4j "$LOG_PROPS" \
--config "/etc/ujetl/${JOBNAME}_config_live.xml" --config "/etc/ujetl/${JOBNAME}_config_live.xml"
#rm -f $LOCKFILE #rm -f $LOCKFILE

26
pom.xml
View file

@ -6,7 +6,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma
<groupId>com.rasilon.ujetl</groupId> <groupId>com.rasilon.ujetl</groupId>
<artifactId>CopyingApp</artifactId> <artifactId>CopyingApp</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
<version>2.5.2</version> <version>2.0.4</version>
<name>uJETL</name> <name>uJETL</name>
<url>https://github.com/rasilon/ujetl</url> <url>https://github.com/rasilon/ujetl</url>
<properties> <properties>
@ -35,13 +35,13 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma
<dependency> <dependency>
<groupId>com.h2database</groupId> <groupId>com.h2database</groupId>
<artifactId>h2</artifactId> <artifactId>h2</artifactId>
<version>2.2.220</version> <version>1.4.199</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId> <artifactId>commons-lang3</artifactId>
<version>3.18.0</version> <version>3.9</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>commons-logging</groupId> <groupId>commons-logging</groupId>
@ -51,12 +51,12 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma
<dependency> <dependency>
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-configuration2</artifactId> <artifactId>commons-configuration2</artifactId>
<version>2.10.1</version> <version>2.4</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>commons-beanutils</groupId> <groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId> <artifactId>commons-beanutils</artifactId>
<version>1.11.0</version> <version>1.9.3</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.beust</groupId> <groupId>com.beust</groupId>
@ -66,31 +66,27 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma
<dependency> <dependency>
<groupId>org.apache.logging.log4j</groupId> <groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId> <artifactId>log4j-api</artifactId>
<version>2.17.1</version> <version>2.11.2</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.logging.log4j</groupId> <groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId> <artifactId>log4j-core</artifactId>
<version>2.25.3</version> <version>2.11.2</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.postgresql</groupId> <groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId> <artifactId>postgresql</artifactId>
<version>42.7.2</version> <version>42.2.5</version>
</dependency>
<dependency>
<groupId>net.sourceforge.csvjdbc</groupId>
<artifactId>csvjdbc</artifactId>
<version>1.0.40</version>
</dependency> </dependency>
</dependencies> </dependencies>
<build> <build>
<plugins> <plugins>
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version> <version>2.3.2</version>
<configuration> <configuration>
<release>11</release> <source>1.8</source>
<target>1.8</target>
</configuration> </configuration>
</plugin> </plugin>
<plugin> <plugin>

View file

@ -34,6 +34,10 @@ public class CopyingApp {
public static void main(String[] args) { public static void main(String[] args) {
CopyingAppCommandParser cli = new CopyingAppCommandParser(args); CopyingAppCommandParser cli = new CopyingAppCommandParser(args);
LoggerContext context = (org.apache.logging.log4j.core.LoggerContext) LogManager.getContext(false); LoggerContext context = (org.apache.logging.log4j.core.LoggerContext) LogManager.getContext(false);
String log4jConfigLocation = cli.getLog4jConfigFile();
File file = new File(log4jConfigLocation);
context.setConfigLocation(file.toURI());
System.out.println("Config set from "+file.toURI());
CopyingApp app = new CopyingApp(cli); CopyingApp app = new CopyingApp(cli);
try { try {
@ -75,7 +79,6 @@ public class CopyingApp {
Configuration config = configs.xml(cli.getConfigFile()); Configuration config = configs.xml(cli.getConfigFile());
loadDrivers(config);
String hardLimitSeconds = config.getString("hardLimitSeconds"); String hardLimitSeconds = config.getString("hardLimitSeconds");
if(hardLimitSeconds != null) { if(hardLimitSeconds != null) {
TimeLimiter hardLimit = new TimeLimiter(Integer.decode(hardLimitSeconds).intValue(),true); TimeLimiter hardLimit = new TimeLimiter(Integer.decode(hardLimitSeconds).intValue(),true);
@ -105,14 +108,14 @@ public class CopyingApp {
log.info(String.format("%s - Setting Row count interval to default of 100 rows.",jobName)); log.info(String.format("%s - Setting Row count interval to default of 100 rows.",jobName));
} }
Integer pollTimeout = null; Integer pollTimeout = null;
try { try {
pollTimeout = new Integer(config.getString("pollTimeout")); pollTimeout = new Integer(config.getString("pollTimeout"));
log.info(String.format("%s - Setting Poll timeout to %s milliseconds", jobName, pollTimeout)); log.info(String.format("%s - Setting Poll timeout to %s milliseconds", jobName, pollTimeout));
} catch(Exception e) { } catch(Exception e) {
pollTimeout = new Integer(1000); // If we don't have a new setting, use the old default pollTimeout = new Integer(1000); // If we don't have a new setting, use the old default
log.info(String.format("%s - Setting poll timeout to default of 1 second.",jobName)); log.info(String.format("%s - Setting poll timeout to default of 1 second.",jobName));
} }
@ -128,27 +131,7 @@ public class CopyingApp {
String tabKey = config.getString("jobs.job("+i+").key"); String tabKey = config.getString("jobs.job("+i+").key");
String tabSelect = config.getString("jobs.job("+i+").select"); String tabSelect = config.getString("jobs.job("+i+").select");
String tabInsert = config.getString("jobs.job("+i+").insert"); String tabInsert = config.getString("jobs.job("+i+").insert");
String preTarget = config.getString("jobs.job("+i+").preTarget"); Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,nRowsToLog,blockSize,pollTimeout);
String postTarget = config.getString("jobs.job("+i+").postTarget");
String identifySourceSQL = config.getString("jobs.job.identifySourceSQL");
String identifyDestinationSQL = config.getString("jobs.job.identifyDestinationSQL");
Job j = new Job(
sConn,
dConn,
tabName,
jobName,
tabKey,
tabSelect,
tabInsert,
preTarget,
postTarget,
nRowsToLog,
blockSize,
pollTimeout,
identifySourceSQL,
identifyDestinationSQL
);
j.start(); j.start();
j.join(); j.join();
@ -158,28 +141,7 @@ public class CopyingApp {
String tabKey = config.getString("jobs.job.key"); String tabKey = config.getString("jobs.job.key");
String tabSelect = config.getString("jobs.job.select"); String tabSelect = config.getString("jobs.job.select");
String tabInsert = config.getString("jobs.job.insert"); String tabInsert = config.getString("jobs.job.insert");
String preTarget = config.getString("jobs.job.preTarget"); Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,nRowsToLog,blockSize,pollTimeout);
String postTarget = config.getString("jobs.job.postTarget");
String identifySourceSQL = config.getString("jobs.job.identifySourceSQL");
String identifyDestinationSQL = config.getString("jobs.job.identifyDestinationSQL");
Job j = new Job(
sConn,
dConn,
tabName,
jobName,
tabKey,
tabSelect,
tabInsert,
preTarget,
postTarget,
nRowsToLog,
blockSize,
pollTimeout,
identifySourceSQL,
identifyDestinationSQL
);
j.start(); j.start();
j.join(); j.join();
} else { } else {
@ -241,21 +203,4 @@ public class CopyingApp {
return c; return c;
} }
// Even with JDBC 4, some drivers don't play nicely with whatever
// the classloaders are up to. So this allows us to force it the
// old fashioned way, and works around the
// "But it works fine when it's the /only/ driver!"
// cross-database problem
private void loadDrivers(Configuration config) {
String[] drivers = config.get(String[].class, "drivers.driver");
for(String d:drivers) {
try {
Class.forName(d);
log.info("Preloaded driver "+d);
} catch(ClassNotFoundException e) {
log.error("Could not preload driver "+d,e);
}
}
}
} }

View file

@ -12,7 +12,7 @@ public class CopyingAppCommandParser {
private String configFile; private String configFile;
@Parameter(names = {"-log4j","--log4j"}, description = "Log4J config file for this run") @Parameter(names = {"-log4j","--log4j"}, description = "Log4J config file for this run")
private String log4jConfigFile = "/etc/ujetl/default_log4j_config.properties"; private String log4jConfigFile = "/etc/ppl/default_log4j_config.properties";
public CopyingAppCommandParser(String[] args) { public CopyingAppCommandParser(String[] args) {
super(); super();
@ -23,4 +23,8 @@ public class CopyingAppCommandParser {
return configFile; return configFile;
} }
public String getLog4jConfigFile() {
return log4jConfigFile;
}
} }

View file

@ -22,29 +22,23 @@ import org.apache.logging.log4j.Logger;
public class Job extends Thread { public class Job extends Thread {
static Logger log = org.apache.logging.log4j.LogManager.getLogger(Job.class); static Logger log = org.apache.logging.log4j.LogManager.getLogger(Job.class);
private Connection sConn; Connection sConn;
private Connection dConn; Connection dConn;
private String name; String name;
private String jobName; String jobName;
private String key; String key;
private String select; String select;
private String insert; String insert;
private String preTarget; Integer nRowsToLog;
private String postTarget; Integer blockSize;
private Integer nRowsToLog; Integer pollTimeout;
private Integer blockSize;
private Integer pollTimeout;
private String identifySourceSQL;
private String identifyDestinationSQL;
private BlockingQueue<List<String>> resultBuffer; BlockingQueue<List<String>> resultBuffer;
private AtomicBoolean producerLive; AtomicBoolean producerLive;
private AtomicBoolean threadsExit = new AtomicBoolean(false);; AtomicBoolean threadsExit = new AtomicBoolean(false);;
private String sourceID;
private String destID;
public Job(Connection sConn,Connection dConn,String name,String jobName,String key,String select,String insert,String preTarget,String postTarget,Integer nRowsToLog,Integer blockSize,Integer pollTimeout,String identifySourceSQL, String identifyDestinationSQL) { public Job(Connection sConn,Connection dConn,String name,String jobName,String key,String select,String insert,Integer nRowsToLog,Integer blockSize,Integer pollTimeout) {
this.sConn = sConn; this.sConn = sConn;
this.dConn = dConn; this.dConn = dConn;
this.name = name; this.name = name;
@ -52,13 +46,9 @@ public class Job extends Thread {
this.key = key; this.key = key;
this.select = select; this.select = select;
this.insert = insert; this.insert = insert;
this.preTarget = preTarget;
this.postTarget = postTarget;
this.nRowsToLog = nRowsToLog; this.nRowsToLog = nRowsToLog;
this.blockSize = blockSize; this.blockSize = blockSize;
this.pollTimeout = pollTimeout; this.pollTimeout = pollTimeout;
this.identifySourceSQL = identifySourceSQL;
this.identifyDestinationSQL = identifyDestinationSQL;
resultBuffer = new ArrayBlockingQueue<List<String>>( 3 * blockSize); resultBuffer = new ArrayBlockingQueue<List<String>>( 3 * blockSize);
producerLive = new AtomicBoolean(true); producerLive = new AtomicBoolean(true);
@ -82,12 +72,12 @@ public class Job extends Thread {
public Producer(ResultSet src,BlockingQueue q) { public Producer(ResultSet src,BlockingQueue q) {
this.src = src; this.src = src;
this.q = q; this.q = q;
this.setName(String.format("%s-%s-Producer",jobName,name)); this.setName(String.format("%s-%s-Consumer",jobName,name));
} }
public void run() { public void run() {
try { try {
long rowsInserted = 0; long rowsInserted = 0;
long rowsAttempted = 0; long rowNum = 0;
long stamp = System.nanoTime(); long stamp = System.nanoTime();
long nstamp; long nstamp;
int columnCount = src.getMetaData().getColumnCount(); int columnCount = src.getMetaData().getColumnCount();
@ -105,13 +95,13 @@ public class Job extends Thread {
} }
log.trace("Producer queue full."); log.trace("Producer queue full.");
} }
rowsAttempted++; rowNum++;
if(rowsAttempted % nRowsToLog == 0) { if(rowNum % nRowsToLog == 0) {
log.info(String.format("%s - Queued %s rows for %s so far",jobName,rowsAttempted,name)); log.info(String.format("%s - Queued %s rows for %s so far",jobName,rowNum,name));
} }
} }
producerLive.set(false); producerLive.set(false);
log.info(String.format("%s - Queued a total of %s rows for %s",jobName,rowsAttempted,name)); log.info(String.format("%s - Queued a total of %s rows for %s",jobName,rowNum,name));
} catch(Exception e) { } catch(Exception e) {
producerLive.set(false); // Signal we've exited. producerLive.set(false); // Signal we've exited.
threadsExit.set(true); // Signal we've exited. threadsExit.set(true); // Signal we've exited.
@ -132,7 +122,7 @@ public class Job extends Thread {
} }
public void run() { public void run() {
try { try {
long rowsAttempted = 0; long rowNum = 0;
long rowsInserted = 0; long rowsInserted = 0;
while(true) { while(true) {
@ -143,7 +133,7 @@ public class Job extends Thread {
if(row == null && producerLive.get() == false) { if(row == null && producerLive.get() == false) {
rowsInserted += arraySum(insertStatement.executeBatch()); rowsInserted += arraySum(insertStatement.executeBatch());
dConn.commit(); dConn.commit();
log.info(String.format("%s - Inserted a total of %s of %s notified rows into %s",jobName,rowsInserted,rowsAttempted,name)); log.info(String.format("%s - Inserted a total of %s of %s notified rows into %s",jobName,rowsInserted,rowNum,name));
return; return;
} }
if(threadsExit.get()) { if(threadsExit.get()) {
@ -160,14 +150,14 @@ public class Job extends Thread {
} }
insertStatement.addBatch(); insertStatement.addBatch();
rowsAttempted++; rowNum++;
if(rowsAttempted % nRowsToLog == 0) { if(rowNum % nRowsToLog == 0) {
rowsInserted += arraySum(insertStatement.executeBatch()); rowsInserted += arraySum(insertStatement.executeBatch());
dConn.commit(); dConn.commit();
log.info(String.format("%s - Inserted %s of %s notified rows into %s", log.info(String.format("%s - Inserted %s of %s notified rows into %s",
jobName, jobName,
rowsInserted, rowsInserted,
rowsAttempted, rowNum,
name)); name));
} }
} }
@ -179,34 +169,11 @@ public class Job extends Thread {
} }
} }
// Outer run
public void run() { public void run() {
try { try {
ResultSet rs; ResultSet rs;
if(identifySourceSQL != null) sourceID = getSingleString(identifySourceSQL,sConn); log.info(String.format("%s - Processing table: %s",jobName,name));
if(identifyDestinationSQL != null) destID = getSingleString(identifyDestinationSQL,dConn);
if(sourceID != null || destID != null){
log.info(String.format(
"%s - Processing table: %s with source: %s, dest: %s",
jobName,
name,
sourceID==null?"":sourceID,
destID==null?"":destID
));
}else{
log.info(String.format("%s - Processing table: %s",jobName,name));
}
if(preTarget != null){
log.info(String.format("%s - Trying to execute preTarget SQL",jobName));
PreparedStatement s = dConn.prepareStatement(preTarget);
s.executeUpdate();
s.close();
dConn.commit();
}else{
log.info(String.format("%s - No preTarget; skipping.",jobName));
}
log.debug("Trying to execute: "+key); log.debug("Trying to execute: "+key);
PreparedStatement keyStatement = dConn.prepareStatement(key); PreparedStatement keyStatement = dConn.prepareStatement(key);
@ -244,33 +211,10 @@ public class Job extends Thread {
p.join(); p.join();
c.join(); c.join();
if(postTarget != null){
log.info(String.format("%s - Trying to execute postTarget SQL",jobName));
PreparedStatement s = dConn.prepareStatement(postTarget);
s.executeUpdate();
s.close();
dConn.commit();
}else{
log.info(String.format("%s - No postTarget; skipping.",jobName));
}
} catch(InterruptedException e) { } catch(InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} catch(SQLException e) { } catch(SQLException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
private String getSingleString(String sql, Connection conn){
try{
PreparedStatement s = conn.prepareStatement(sql);
ResultSet r = s.executeQuery();
r.next();
return r.getString(1);
} catch(SQLException e) {
throw new RuntimeException(e);
}
}
} }

View file

@ -1,37 +0,0 @@
package com.rasilon.ujetl;
import org.apache.commons.configuration2.Configuration;
import org.apache.commons.configuration2.builder.fluent.Configurations;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.commons.beanutils.PropertyUtils; // Why does config need this?
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.MethodOrderer.Alphanumeric;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
/**
* @author derryh
*
*/
public class TestConfig {
@Test
public void test001VerifyArrayOfDrivers() {
try {
Configurations configs = new Configurations();
Configuration config = configs.xml("TEST_config_live.xml");
String[] drivers = config.get(String[].class, "drivers.driver");
int ndrivers =drivers.length;
if(ndrivers != 3){
fail("Expected 3 drivers, but found "+ndrivers);
}
} catch(Exception e) {
fail(e.toString());
}
}
}

View file

@ -42,16 +42,12 @@ public class TestJob {
dest, dest,
"jUnit Test Config", "jUnit Test Config",
"jUnit Test Job", "jUnit Test Job",
"SELECT -1 AS \"key\"", "SELECT -1 AS key",
"SELECT id,dat FROM src WHERE id > ?", "SELECT id,dat FROM src WHERE id > ?",
"INSERT INTO dest VALUES(?,?)", "INSERT INTO dest VALUES(?,?)",
null,
null,
100, 100,
100, 100,
100, 100
"select 'PID:'||session_id()",
"select 'PID:'||session_id()"
); );
j.start(); j.start();
j.join(); j.join();

View file

@ -13,12 +13,15 @@ public class TestParser {
public void test001Parset() { public void test001Parset() {
try { try {
String[] args = { String[] args = {
"--log4j",
"log4j_test_banana.xml",
"--config", "--config",
"config_test_banana.xml" "config_test_banana.xml"
}; };
CopyingAppCommandParser p = new CopyingAppCommandParser(args); CopyingAppCommandParser p = new CopyingAppCommandParser(args);
assertEquals(p.getConfigFile(),"config_test_banana.xml"); assertEquals(p.getConfigFile(),"config_test_banana.xml");
assertEquals(p.getLog4jConfigFile(),"log4j_test_banana.xml");
} catch(Exception e) { } catch(Exception e) {
fail(e.toString()); fail(e.toString());

View file

@ -1,64 +0,0 @@
package com.rasilon.ujetl;
import java.sql.*;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.MethodOrderer.Alphanumeric;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
public class TestPrePost {
private static String jdbcURL = "jdbc:h2:mem:dbtest";
@Test
public void test002verifyH2Works() {
try {
Connection conn = DriverManager.getConnection(jdbcURL, "sa", "");
conn.close();
} catch(Exception e) {
fail(e.toString());
}
}
@Test
public void testPrePost() {
try (
Connection src = DriverManager.getConnection(jdbcURL, "sa", "");
Connection dest = DriverManager.getConnection(jdbcURL, "sa", "");
) {
src.createStatement().executeUpdate("CREATE TABLE src(id bigint not null primary key, dat varchar);");
dest.createStatement().executeUpdate("CREATE TABLE dest(id bigint not null primary key, dat varchar);");
PreparedStatement inserter = src.prepareStatement("INSERT INTO src(id,dat) VALUES(?,'banana')");
for(int i=0; i<10000; i++) {
inserter.setInt(1,i);
inserter.executeUpdate();
}
Job j = new Job(
src,
dest,
"jUnit Test Config",
"jUnit Test Job",
"SELECT -1 AS \"key\"",
"SELECT id,dat FROM src WHERE id > ?",
"INSERT INTO tmp_dest VALUES(?,?)",
"CREATE TEMP TABLE tmp_dest(id bigint not null primary key, dat varchar);",
"INSERT INTO dest SELECT * from tmp_dest;",
100,
100,
100,
"select 'PID:'||session_id()",
"select 'PID:'||session_id()"
);
j.start();
j.join();
// do stuff
} catch(Exception e) {
e.printStackTrace();
fail(e.toString());
}
}
}

View file

@ -4,11 +4,6 @@
<nRowsToLog>10000</nRowsToLog> <nRowsToLog>10000</nRowsToLog>
<blockSize>1000</blockSize> <blockSize>1000</blockSize>
<pollTimeout>500</pollTimeout> <pollTimeout>500</pollTimeout>
<drivers>
<driver>org.postgresql.Driver</driver>
<driver>org.h2.Driver</driver>
<driver>org.relique.jdbc.csv.CsvDriver</driver>
</drivers>
<source> <source>
<dsn>jdbc:postgresql://localhost:5432/test</dsn> <dsn>jdbc:postgresql://localhost:5432/test</dsn>
<username>test</username> <username>test</username>
@ -23,8 +18,6 @@
<jobs> <jobs>
<job> <job>
<name>test</name> <name>test</name>
<identifySourceSQL>select 'PID:'||pg_backend_pid()</identifySourceSQL>
<identifyDestinationSQL>select 'PID:'||pg_backend_pid()</identifyDestinationSQL>
<key>select coalesce(-1,max(id),-1) as key from dest</key> <key>select coalesce(-1,max(id),-1) as key from dest</key>
<select> <select>
select select
@ -56,39 +49,6 @@
OR dest.test_ts = EXCLUDED.test_ts OR dest.test_ts = EXCLUDED.test_ts
</insert> </insert>
</job> </job>
<job>
<name>test upsert</name>
<key>select -1 as key</key>
<select>
select
id,
test_int,
test_text,
test_ts
from
public.source where id > ?::bigint</select>
<insert>
insert into public.dest(
id,
test_int,
test_text,
test_ts
)values(
?::bigint,
?::integer,
?::text,
?::timestamp with time zone
)ON CONFLICT(id) DO UPDATE
set
test_int = EXCLUDED.test_int,
test_text = EXCLUDED.test_text,
test_ts = EXCLUDED.test_ts
WHERE
dest.test_int IS DISTINCT FROM EXCLUDED.test_int
OR dest.test_text IS DISTINCT FROM EXCLUDED.test_text
OR dest.test_ts IS DISTINCT FROM EXCLUDED.test_ts
</insert>
</job>
<job> <job>
<name>denormalise</name> <name>denormalise</name>
<key>select -1 as key</key> <key>select -1 as key</key>

33
uJETL.spec Normal file
View file

@ -0,0 +1,33 @@
Summary: Java app to facilitate moving data between databases.
Name: uJETL
Version: 2.0.4
Release: 1
Group: Applications/Database
License: All rights reserved.
Source: uJETL-%{version}.tar.gz
URL: https://github.com/rasilon/ujetl.git
Distribution: derryh
Vendor: derryh
Packager: Derry Hamilton <derryh@rasilon.net>
#BuildRoot: .
%description
A very small ETL app
%prep
%setup
%build
#mvn -Dmaven.test.skip=true clean package
true
%install
mkdir -p $RPM_BUILD_ROOT/usr/share/ujetl/lib $RPM_BUILD_ROOT/etc/ujetl $RPM_BUILD_ROOT/usr/bin
cp target/CopyingApp-*-jar-with-dependencies.jar $RPM_BUILD_ROOT/usr/share/ujetl/lib/CopyingApp.jar
cp install_extra/run_copying_job $RPM_BUILD_ROOT/usr/bin
cp install_extra/copying_defaults_log4j.xml $RPM_BUILD_ROOT/etc/ujetl
%files
/usr/share/ujetl/lib/CopyingApp.jar
/usr/bin/run_copying_job
/etc/ujetl/copying_defaults_log4j.xml