Add session identification

This commit is contained in:
Derry Hamilton 2019-08-27 12:53:37 +01:00
parent 01310bfca4
commit 734dc8608f
6 changed files with 100 additions and 21 deletions

View file

@ -18,6 +18,8 @@
<jobs>
<job>
<name>test</name>
<identifySourceSQL>select 'PID:'||pg_backend_pid()</identifySourceSQL>
<identifyDestinationSQL>select 'PID:'||pg_backend_pid()</identifyDestinationSQL>
<key>select coalesce(-1,max(id),-1) as key from dest</key>
<select>
select
@ -51,6 +53,8 @@
</job>
<job>
<name>test upsert</name>
<identifySourceSQL>select 'PID:'||pg_backend_pid()</identifySourceSQL>
<identifyDestinationSQL>select 'PID:'||pg_backend_pid()</identifyDestinationSQL>
<key>select -1 as key</key>
<select>
select
@ -84,6 +88,8 @@
</job>
<job>
<name>denormalise</name>
<identifySourceSQL>select 'PID:'||pg_backend_pid()</identifySourceSQL>
<identifyDestinationSQL>select 'PID:'||pg_backend_pid()</identifyDestinationSQL>
<key>select -1 as key</key>
<select>select person_id,fname,lname from normalised_personalia p join normalised_first_names f using(fid) join normalised_last_names l using(lid) where ?::integer is not null;</select>
<insert>

View file

@ -133,8 +133,25 @@ public class CopyingApp {
String tabInsert = config.getString("jobs.job("+i+").insert");
String preTarget = config.getString("jobs.job("+i+").preTarget");
String postTarget = config.getString("jobs.job("+i+").postTarget");
String identifySourceSQL = config.getString("jobs.job.identifySourceSQL");
String identifyDestinationSQL = config.getString("jobs.job.identifyDestinationSQL");
Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,preTarget,postTarget,nRowsToLog,blockSize,pollTimeout);
Job j = new Job(
sConn,
dConn,
tabName,
jobName,
tabKey,
tabSelect,
tabInsert,
preTarget,
postTarget,
nRowsToLog,
blockSize,
pollTimeout,
identifySourceSQL,
identifyDestinationSQL
);
j.start();
j.join();
@ -146,7 +163,26 @@ public class CopyingApp {
String tabInsert = config.getString("jobs.job.insert");
String preTarget = config.getString("jobs.job.preTarget");
String postTarget = config.getString("jobs.job.postTarget");
Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,preTarget,postTarget,nRowsToLog,blockSize,pollTimeout);
String identifySourceSQL = config.getString("jobs.job.identifySourceSQL");
String identifyDestinationSQL = config.getString("jobs.job.identifyDestinationSQL");
Job j = new Job(
sConn,
dConn,
tabName,
jobName,
tabKey,
tabSelect,
tabInsert,
preTarget,
postTarget,
nRowsToLog,
blockSize,
pollTimeout,
identifySourceSQL,
identifyDestinationSQL
);
j.start();
j.join();
} else {

View file

@ -22,25 +22,29 @@ import org.apache.logging.log4j.Logger;
public class Job extends Thread {
static Logger log = org.apache.logging.log4j.LogManager.getLogger(Job.class);
Connection sConn;
Connection dConn;
String name;
String jobName;
String key;
String select;
String insert;
String preTarget;
String postTarget;
Integer nRowsToLog;
Integer blockSize;
Integer pollTimeout;
private Connection sConn;
private Connection dConn;
private String name;
private String jobName;
private String key;
private String select;
private String insert;
private String preTarget;
private String postTarget;
private Integer nRowsToLog;
private Integer blockSize;
private Integer pollTimeout;
private String identifySourceSQL;
private String identifyDestinationSQL;
BlockingQueue<List<String>> resultBuffer;
AtomicBoolean producerLive;
AtomicBoolean threadsExit = new AtomicBoolean(false);;
private BlockingQueue<List<String>> resultBuffer;
private AtomicBoolean producerLive;
private AtomicBoolean threadsExit = new AtomicBoolean(false);;
private String sourceID;
private String destID;
public Job(Connection sConn,Connection dConn,String name,String jobName,String key,String select,String insert,String preTarget,String postTarget,Integer nRowsToLog,Integer blockSize,Integer pollTimeout) {
public Job(Connection sConn,Connection dConn,String name,String jobName,String key,String select,String insert,String preTarget,String postTarget,Integer nRowsToLog,Integer blockSize,Integer pollTimeout,String identifySourceSQL, String identifyDestinationSQL) {
this.sConn = sConn;
this.dConn = dConn;
this.name = name;
@ -53,6 +57,8 @@ public class Job extends Thread {
this.nRowsToLog = nRowsToLog;
this.blockSize = blockSize;
this.pollTimeout = pollTimeout;
this.identifySourceSQL = identifySourceSQL;
this.identifyDestinationSQL = identifyDestinationSQL;
resultBuffer = new ArrayBlockingQueue<List<String>>( 3 * blockSize);
producerLive = new AtomicBoolean(true);
@ -178,7 +184,20 @@ public class Job extends Thread {
try {
ResultSet rs;
if(identifySourceSQL != null) sourceID = getSingleString(identifySourceSQL,sConn);
if(identifyDestinationSQL != null) destID = getSingleString(identifyDestinationSQL,dConn);
if(sourceID != null || destID != null){
log.info(String.format(
"%s - Processing table: %s with source: %s, dest: %s",
jobName,
name,
sourceID==null?"":sourceID,
destID==null?"":destID
));
}else{
log.info(String.format("%s - Processing table: %s",jobName,name));
}
if(preTarget != null){
log.info("Trying to execute preTarget SQL");
PreparedStatement s = dConn.prepareStatement(preTarget);
@ -242,4 +261,16 @@ public class Job extends Thread {
throw new RuntimeException(e);
}
}
private String getSingleString(String sql, Connection conn){
try{
PreparedStatement s = conn.prepareStatement(sql);
ResultSet r = s.executeQuery();
r.next();
return r.getString(1);
} catch(SQLException e) {
throw new RuntimeException(e);
}
}
}

View file

@ -49,7 +49,9 @@ public class TestJob {
null,
100,
100,
100
100,
"select 'PID:'||session_id()",
"select 'PID:'||session_id()"
);
j.start();
j.join();

View file

@ -49,7 +49,9 @@ public class TestPrePost {
"INSERT INTO dest SELECT * from tmp_dest;",
100,
100,
100
100,
"select 'PID:'||session_id()",
"select 'PID:'||session_id()"
);
j.start();
j.join();

View file

@ -18,6 +18,8 @@
<jobs>
<job>
<name>test</name>
<identifySourceSQL>select 'PID:'||pg_backend_pid()</identifySourceSQL>
<identifyDestinationSQL>select 'PID:'||pg_backend_pid()</identifyDestinationSQL>
<key>select coalesce(-1,max(id),-1) as key from dest</key>
<select>
select