diff --git a/README.md b/README.md index b57c353..3245b14 100644 --- a/README.md +++ b/README.md @@ -1 +1,4 @@ -# ujetl +# ujetl (Micro Java ETL) +Originally written in the days of trying to do something, with no budget, I wrote this out of necessity. I subsequently got permission to open-source it so long as there were no references to the company in it. So this is the cleaned up version, with a few additional features added and the things that turned out to be pointless removed. + +It's probably the smallest functional ETL application with decent performance. Since I only use it on Postgres nowadays, it only officially supports Postgres at the moment. But in the near past it's worked pulling data from "several commercial databases" that don't like being named in benchmarks etc. and if you have the JDBC jars in your classpath then it should just work. diff --git a/src/main/java/com/rasilon/ujetl/Job.java b/src/main/java/com/rasilon/ujetl/Job.java index 77b3c2c..1cc0594 100644 --- a/src/main/java/com/rasilon/ujetl/Job.java +++ b/src/main/java/com/rasilon/ujetl/Job.java @@ -34,6 +34,7 @@ public class Job extends Thread { BlockingQueue> resultBuffer; AtomicBoolean producerLive; + AtomicBoolean threadsExit = new AtomicBoolean(false);; public Job(Connection sConn,Connection dConn,String name,String jobName,String key,String select,String insert,Integer nRowsToLog,Integer blockSize) { @@ -83,7 +84,13 @@ public class Job extends Thread { for(int i=1; i<=columnCount; i++) { row.add(src.getString(i)); } - q.put(row); + while(!q.offer(row,1000,java.util.concurrent.TimeUnit.MILLISECONDS)) { + if(threadsExit.get()) { + log.error("Producer thread asked to exit."); + return; + } + log.trace("Producer queue full."); + } rowNum++; if(rowNum % nRowsToLog == 0) { log.info(String.format("%s - Queued %s rows for %s so far",jobName,rowNum,name)); @@ -91,10 +98,9 @@ public class Job extends Thread { } producerLive.set(false); log.info(String.format("%s - Queued a total of %s rows for %s",jobName,rowNum,name)); - } catch(SQLException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } catch(InterruptedException e) { + } catch(Exception e) { + producerLive.set(false); // Signal we've exited. + threadsExit.set(true); // Signal we've exited. e.printStackTrace(); throw new RuntimeException(e); } @@ -125,7 +131,11 @@ public class Job extends Thread { log.info(String.format("%s - Inserted a total of %s of %s notified rows into %s",jobName,rowNum,rowsInserted,name)); return; } - if(row == null){ + if(threadsExit.get()) { + log.error("Consumer thread asked to exit."); + return; + } + if(row == null) { log.warn("Queue empty."); continue; } @@ -142,10 +152,8 @@ public class Job extends Thread { log.info(String.format("%s - Inserted %s of %s notified rows into %s so far",jobName,rowNum,rowsInserted,name)); } } - } catch(SQLException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } catch(InterruptedException e) { + } catch(Exception e) { + threadsExit.set(true); // Signal we've exited. e.printStackTrace(); throw new RuntimeException(e); } @@ -176,10 +184,19 @@ public class Job extends Thread { log.debug("About to execute select."); rs = selectStatement.executeQuery(); + Thread.UncaughtExceptionHandler h = new Thread.UncaughtExceptionHandler() { + public void uncaughtException(Thread th, Throwable ex) { + threadsExit.set(true); + log.error("Uncaught exception: " + ex); + System.exit(1); + } + }; Thread p = new Producer(rs,resultBuffer); + p.setUncaughtExceptionHandler(h); p.start(); Thread c = new Consumer(insertStatement,resultBuffer); + c.setUncaughtExceptionHandler(h); c.start(); p.join();