Improve thread cleanup code

This commit is contained in:
Derry Hamilton 2019-05-13 09:53:25 +01:00
parent f21698b449
commit 7863e78838
2 changed files with 31 additions and 11 deletions

View file

@ -34,6 +34,7 @@ public class Job extends Thread {
BlockingQueue<List<String>> resultBuffer;
AtomicBoolean producerLive;
AtomicBoolean threadsExit = new AtomicBoolean(false);;
public Job(Connection sConn,Connection dConn,String name,String jobName,String key,String select,String insert,Integer nRowsToLog,Integer blockSize) {
@ -83,7 +84,13 @@ public class Job extends Thread {
for(int i=1; i<=columnCount; i++) {
row.add(src.getString(i));
}
q.put(row);
while(!q.offer(row,1000,java.util.concurrent.TimeUnit.MILLISECONDS)) {
if(threadsExit.get()) {
log.error("Producer thread asked to exit.");
return;
}
log.trace("Producer queue full.");
}
rowNum++;
if(rowNum % nRowsToLog == 0) {
log.info(String.format("%s - Queued %s rows for %s so far",jobName,rowNum,name));
@ -91,10 +98,9 @@ public class Job extends Thread {
}
producerLive.set(false);
log.info(String.format("%s - Queued a total of %s rows for %s",jobName,rowNum,name));
} catch(SQLException e) {
e.printStackTrace();
throw new RuntimeException(e);
} catch(InterruptedException e) {
} catch(Exception e) {
producerLive.set(false); // Signal we've exited.
threadsExit.set(true); // Signal we've exited.
e.printStackTrace();
throw new RuntimeException(e);
}
@ -125,7 +131,11 @@ public class Job extends Thread {
log.info(String.format("%s - Inserted a total of %s of %s notified rows into %s",jobName,rowNum,rowsInserted,name));
return;
}
if(row == null){
if(threadsExit.get()) {
log.error("Consumer thread asked to exit.");
return;
}
if(row == null) {
log.warn("Queue empty.");
continue;
}
@ -142,10 +152,8 @@ public class Job extends Thread {
log.info(String.format("%s - Inserted %s of %s notified rows into %s so far",jobName,rowNum,rowsInserted,name));
}
}
} catch(SQLException e) {
e.printStackTrace();
throw new RuntimeException(e);
} catch(InterruptedException e) {
} catch(Exception e) {
threadsExit.set(true); // Signal we've exited.
e.printStackTrace();
throw new RuntimeException(e);
}
@ -176,10 +184,19 @@ public class Job extends Thread {
log.debug("About to execute select.");
rs = selectStatement.executeQuery();
Thread.UncaughtExceptionHandler h = new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread th, Throwable ex) {
threadsExit.set(true);
log.error("Uncaught exception: " + ex);
System.exit(1);
}
};
Thread p = new Producer(rs,resultBuffer);
p.setUncaughtExceptionHandler(h);
p.start();
Thread c = new Consumer(insertStatement,resultBuffer);
c.setUncaughtExceptionHandler(h);
c.start();
p.join();