Add configurable poll timeout for warning on empty queue

This commit is contained in:
Derry Hamilton 2019-06-12 12:20:04 +01:00
parent 07e97398af
commit 7d755df600
2 changed files with 17 additions and 4 deletions

View file

@ -108,6 +108,17 @@ public class CopyingApp {
log.info(String.format("%s - Setting Row count interval to default of 100 rows.",jobName)); log.info(String.format("%s - Setting Row count interval to default of 100 rows.",jobName));
} }
Integer pollTimeout = null;
try {
pollTimeout = new Integer(config.getString("nRowsToLog"));
log.info(String.format("%s - Setting Poll timeout to %s milliseconds", jobName, pollTimeout));
} catch(Exception e) {
pollTimeout = new Integer(1000); // If we don't have a new setting, use the old default
log.info(String.format("%s - Setting poll timeout to default of 1 second.",jobName));
}
long startTime = System.nanoTime(); long startTime = System.nanoTime();
log.info(String.format("%s - Jobs are:",jobName)); log.info(String.format("%s - Jobs are:",jobName));
@ -120,7 +131,7 @@ public class CopyingApp {
String tabKey = config.getString("jobs.job("+i+").key"); String tabKey = config.getString("jobs.job("+i+").key");
String tabSelect = config.getString("jobs.job("+i+").select"); String tabSelect = config.getString("jobs.job("+i+").select");
String tabInsert = config.getString("jobs.job("+i+").insert"); String tabInsert = config.getString("jobs.job("+i+").insert");
Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,nRowsToLog,blockSize); Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,nRowsToLog,blockSize,pollTimeout);
j.start(); j.start();
j.join(); j.join();
@ -130,7 +141,7 @@ public class CopyingApp {
String tabKey = config.getString("jobs.job.key"); String tabKey = config.getString("jobs.job.key");
String tabSelect = config.getString("jobs.job.select"); String tabSelect = config.getString("jobs.job.select");
String tabInsert = config.getString("jobs.job.insert"); String tabInsert = config.getString("jobs.job.insert");
Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,nRowsToLog,blockSize); Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,nRowsToLog,blockSize,pollTimeout);
j.start(); j.start();
j.join(); j.join();
} else { } else {

View file

@ -31,13 +31,14 @@ public class Job extends Thread {
String insert; String insert;
Integer nRowsToLog; Integer nRowsToLog;
Integer blockSize; Integer blockSize;
Integer pollTimeout;
BlockingQueue<List<String>> resultBuffer; BlockingQueue<List<String>> resultBuffer;
AtomicBoolean producerLive; AtomicBoolean producerLive;
AtomicBoolean threadsExit = new AtomicBoolean(false);; AtomicBoolean threadsExit = new AtomicBoolean(false);;
public Job(Connection sConn,Connection dConn,String name,String jobName,String key,String select,String insert,Integer nRowsToLog,Integer blockSize) { public Job(Connection sConn,Connection dConn,String name,String jobName,String key,String select,String insert,Integer nRowsToLog,Integer blockSize,Integer pollTimeout) {
this.sConn = sConn; this.sConn = sConn;
this.dConn = dConn; this.dConn = dConn;
this.name = name; this.name = name;
@ -47,6 +48,7 @@ public class Job extends Thread {
this.insert = insert; this.insert = insert;
this.nRowsToLog = nRowsToLog; this.nRowsToLog = nRowsToLog;
this.blockSize = blockSize; this.blockSize = blockSize;
this.pollTimeout = pollTimeout;
resultBuffer = new ArrayBlockingQueue<List<String>>( 3 * blockSize); resultBuffer = new ArrayBlockingQueue<List<String>>( 3 * blockSize);
producerLive = new AtomicBoolean(true); producerLive = new AtomicBoolean(true);
@ -124,7 +126,7 @@ public class Job extends Thread {
long rowsInserted = 0; long rowsInserted = 0;
while(true) { while(true) {
List<String> row = q.poll(100,java.util.concurrent.TimeUnit.MILLISECONDS); List<String> row = q.poll(pollTimeout,java.util.concurrent.TimeUnit.MILLISECONDS);
if(row == null && producerLive.get() == false) { if(row == null && producerLive.get() == false) {
row = q.poll(1,java.util.concurrent.TimeUnit.MILLISECONDS); row = q.poll(1,java.util.concurrent.TimeUnit.MILLISECONDS);
} }