diff --git a/src/main/java/com/rasilon/ujetl/CopyingApp.java b/src/main/java/com/rasilon/ujetl/CopyingApp.java index aa6621b..6938888 100644 --- a/src/main/java/com/rasilon/ujetl/CopyingApp.java +++ b/src/main/java/com/rasilon/ujetl/CopyingApp.java @@ -108,6 +108,17 @@ public class CopyingApp { log.info(String.format("%s - Setting Row count interval to default of 100 rows.",jobName)); } + Integer pollTimeout = null; + try { + pollTimeout = new Integer(config.getString("nRowsToLog")); + log.info(String.format("%s - Setting Poll timeout to %s milliseconds", jobName, pollTimeout)); + } catch(Exception e) { + pollTimeout = new Integer(1000); // If we don't have a new setting, use the old default + log.info(String.format("%s - Setting poll timeout to default of 1 second.",jobName)); + } + + + long startTime = System.nanoTime(); log.info(String.format("%s - Jobs are:",jobName)); @@ -120,7 +131,7 @@ public class CopyingApp { String tabKey = config.getString("jobs.job("+i+").key"); String tabSelect = config.getString("jobs.job("+i+").select"); String tabInsert = config.getString("jobs.job("+i+").insert"); - Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,nRowsToLog,blockSize); + Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,nRowsToLog,blockSize,pollTimeout); j.start(); j.join(); @@ -130,7 +141,7 @@ public class CopyingApp { String tabKey = config.getString("jobs.job.key"); String tabSelect = config.getString("jobs.job.select"); String tabInsert = config.getString("jobs.job.insert"); - Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,nRowsToLog,blockSize); + Job j = new Job(sConn,dConn,tabName,jobName,tabKey,tabSelect,tabInsert,nRowsToLog,blockSize,pollTimeout); j.start(); j.join(); } else { diff --git a/src/main/java/com/rasilon/ujetl/Job.java b/src/main/java/com/rasilon/ujetl/Job.java index 679a121..a083f56 100644 --- a/src/main/java/com/rasilon/ujetl/Job.java +++ b/src/main/java/com/rasilon/ujetl/Job.java @@ -31,13 +31,14 @@ public class Job extends Thread { String insert; Integer nRowsToLog; Integer blockSize; + Integer pollTimeout; BlockingQueue> resultBuffer; AtomicBoolean producerLive; AtomicBoolean threadsExit = new AtomicBoolean(false);; - public Job(Connection sConn,Connection dConn,String name,String jobName,String key,String select,String insert,Integer nRowsToLog,Integer blockSize) { + public Job(Connection sConn,Connection dConn,String name,String jobName,String key,String select,String insert,Integer nRowsToLog,Integer blockSize,Integer pollTimeout) { this.sConn = sConn; this.dConn = dConn; this.name = name; @@ -47,6 +48,7 @@ public class Job extends Thread { this.insert = insert; this.nRowsToLog = nRowsToLog; this.blockSize = blockSize; + this.pollTimeout = pollTimeout; resultBuffer = new ArrayBlockingQueue>( 3 * blockSize); producerLive = new AtomicBoolean(true); @@ -124,7 +126,7 @@ public class Job extends Thread { long rowsInserted = 0; while(true) { - List row = q.poll(100,java.util.concurrent.TimeUnit.MILLISECONDS); + List row = q.poll(pollTimeout,java.util.concurrent.TimeUnit.MILLISECONDS); if(row == null && producerLive.get() == false) { row = q.poll(1,java.util.concurrent.TimeUnit.MILLISECONDS); }