Merge pull request #1 from rasilon/pre_setup_post

Add pre and post target SQL
This commit is contained in:
Derry Hamilton 2019-08-05 11:36:14 +01:00 committed by GitHub
commit f521f25a0e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 151 additions and 5 deletions

View file

@ -98,5 +98,59 @@
OR denormalised_personalia.lname is distinct from EXCLUDED.lname OR denormalised_personalia.lname is distinct from EXCLUDED.lname
</insert> </insert>
</job> </job>
<job>
<name>test pre post</name>
<key>select -1 as key</key>
<select>
select
id,
test_int,
test_text,
test_ts
from
public.source where id > ?::bigint
</select>
<preTarget>
drop table if exists tmp_dest;
create temp table tmp_dest(
id bigint,
test_int integer,
test_text text,
test_ts timestamp with time zone
);
</preTarget>
<insert>
insert into tmp_dest(
id,
test_int,
test_text,
test_ts
)values(
?::bigint,
?::integer,
?::text,
?::timestamp with time zone
)
</insert>
<insert>
insert into public.dest(
id,
test_int,
test_text,
test_ts
)
select id,test_int,test_text,test_ts
from tmp_dest
ON CONFLICT(id) DO UPDATE
set
test_int = EXCLUDED.test_int,
test_text = EXCLUDED.test_text,
test_ts = EXCLUDED.test_ts
WHERE
dest.test_int IS DISTINCT FROM EXCLUDED.test_int
OR dest.test_text IS DISTINCT FROM EXCLUDED.test_text
OR dest.test_ts IS DISTINCT FROM EXCLUDED.test_ts
</insert>
</job>
</jobs> </jobs>
</CopyingApp> </CopyingApp>

View file

@ -6,7 +6,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma
<groupId>com.rasilon.ujetl</groupId> <groupId>com.rasilon.ujetl</groupId>
<artifactId>CopyingApp</artifactId> <artifactId>CopyingApp</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
<version>2.0.5</version> <version>2.1.5</version>
<name>uJETL</name> <name>uJETL</name>
<url>https://github.com/rasilon/ujetl</url> <url>https://github.com/rasilon/ujetl</url>
<properties> <properties>

View file

@ -131,7 +131,10 @@ public class CopyingApp {
String tabKey = config.getString("jobs.job("+i+").key"); String tabKey = config.getString("jobs.job("+i+").key");
String tabSelect = config.getString("jobs.job("+i+").select"); String tabSelect = config.getString("jobs.job("+i+").select");
String tabInsert = config.getString("jobs.job("+i+").insert"); String tabInsert = config.getString("jobs.job("+i+").insert");
Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,nRowsToLog,blockSize,pollTimeout); String preTarget = config.getString("jobs.job("+i+").preTarget");
String postTarget = config.getString("jobs.job("+i+").postTarget");
Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,preTarget,postTarget,nRowsToLog,blockSize,pollTimeout);
j.start(); j.start();
j.join(); j.join();
@ -141,7 +144,9 @@ public class CopyingApp {
String tabKey = config.getString("jobs.job.key"); String tabKey = config.getString("jobs.job.key");
String tabSelect = config.getString("jobs.job.select"); String tabSelect = config.getString("jobs.job.select");
String tabInsert = config.getString("jobs.job.insert"); String tabInsert = config.getString("jobs.job.insert");
Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,nRowsToLog,blockSize,pollTimeout); String preTarget = config.getString("jobs.job.preTarget");
String postTarget = config.getString("jobs.job.postTarget");
Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,preTarget,postTarget,nRowsToLog,blockSize,pollTimeout);
j.start(); j.start();
j.join(); j.join();
} else { } else {

View file

@ -29,6 +29,8 @@ public class Job extends Thread {
String key; String key;
String select; String select;
String insert; String insert;
String preTarget;
String postTarget;
Integer nRowsToLog; Integer nRowsToLog;
Integer blockSize; Integer blockSize;
Integer pollTimeout; Integer pollTimeout;
@ -38,7 +40,7 @@ public class Job extends Thread {
AtomicBoolean threadsExit = new AtomicBoolean(false);; AtomicBoolean threadsExit = new AtomicBoolean(false);;
public Job(Connection sConn,Connection dConn,String name,String jobName,String key,String select,String insert,Integer nRowsToLog,Integer blockSize,Integer pollTimeout) { public Job(Connection sConn,Connection dConn,String name,String jobName,String key,String select,String insert,String preTarget,String postTarget,Integer nRowsToLog,Integer blockSize,Integer pollTimeout) {
this.sConn = sConn; this.sConn = sConn;
this.dConn = dConn; this.dConn = dConn;
this.name = name; this.name = name;
@ -46,6 +48,8 @@ public class Job extends Thread {
this.key = key; this.key = key;
this.select = select; this.select = select;
this.insert = insert; this.insert = insert;
this.preTarget = preTarget;
this.postTarget = postTarget;
this.nRowsToLog = nRowsToLog; this.nRowsToLog = nRowsToLog;
this.blockSize = blockSize; this.blockSize = blockSize;
this.pollTimeout = pollTimeout; this.pollTimeout = pollTimeout;
@ -169,11 +173,20 @@ public class Job extends Thread {
} }
} }
// Outer run
public void run() { public void run() {
try { try {
ResultSet rs; ResultSet rs;
log.info(String.format("%s - Processing table: %s",jobName,name)); log.info(String.format("%s - Processing table: %s",jobName,name));
if(preTarget != null){
log.info("Trying to execute preTarget SQL");
PreparedStatement s = dConn.prepareStatement(preTarget);
s.executeUpdate();
s.close();
}else{
log.info("No preTarget; skipping.");
}
log.debug("Trying to execute: "+key); log.debug("Trying to execute: "+key);
PreparedStatement keyStatement = dConn.prepareStatement(key); PreparedStatement keyStatement = dConn.prepareStatement(key);
@ -211,6 +224,16 @@ public class Job extends Thread {
p.join(); p.join();
c.join(); c.join();
if(postTarget != null){
log.info("Trying to execute postTarget SQL");
PreparedStatement s = dConn.prepareStatement(postTarget);
s.executeUpdate();
s.close();
}else{
log.info("No postTarget; skipping.");
}
} catch(InterruptedException e) { } catch(InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} catch(SQLException e) { } catch(SQLException e) {

View file

@ -45,6 +45,8 @@ public class TestJob {
"SELECT -1 AS key", "SELECT -1 AS key",
"SELECT id,dat FROM src WHERE id > ?", "SELECT id,dat FROM src WHERE id > ?",
"INSERT INTO dest VALUES(?,?)", "INSERT INTO dest VALUES(?,?)",
null,
null,
100, 100,
100, 100,
100 100

View file

@ -0,0 +1,62 @@
package com.rasilon.ujetl;
import java.sql.*;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.MethodOrderer.Alphanumeric;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
public class TestPrePost {
private static String jdbcURL = "jdbc:h2:mem:dbtest";
@Test
public void test002verifyH2Works() {
try {
Connection conn = DriverManager.getConnection(jdbcURL, "sa", "");
conn.close();
} catch(Exception e) {
fail(e.toString());
}
}
@Test
public void testPrePost() {
try (
Connection src = DriverManager.getConnection(jdbcURL, "sa", "");
Connection dest = DriverManager.getConnection(jdbcURL, "sa", "");
) {
src.createStatement().executeUpdate("CREATE TABLE src(id bigint not null primary key, dat varchar);");
dest.createStatement().executeUpdate("CREATE TABLE dest(id bigint not null primary key, dat varchar);");
PreparedStatement inserter = src.prepareStatement("INSERT INTO src(id,dat) VALUES(?,'banana')");
for(int i=0; i<10000; i++) {
inserter.setInt(1,i);
inserter.executeUpdate();
}
Job j = new Job(
src,
dest,
"jUnit Test Config",
"jUnit Test Job",
"SELECT -1 AS key",
"SELECT id,dat FROM src WHERE id > ?",
"INSERT INTO tmp_dest VALUES(?,?)",
"CREATE TEMP TABLE tmp_dest(id bigint not null primary key, dat varchar);",
"INSERT INTO dest SELECT * from tmp_dest;",
100,
100,
100
);
j.start();
j.join();
// do stuff
} catch(Exception e) {
e.printStackTrace();
fail(e.toString());
}
}
}

View file

@ -1,6 +1,6 @@
Summary: Java app to facilitate moving data between databases. Summary: Java app to facilitate moving data between databases.
Name: uJETL Name: uJETL
Version: 2.0.5 Version: 2.1.5
Release: 1 Release: 1
Group: Applications/Database Group: Applications/Database
License: All rights reserved. License: All rights reserved.