package de.pzzz.vertx.worker; import java.util.stream.Stream; import io.vertx.core.Future; public abstract class QueuedWorker { private QueueProcessingStatus queue = new QueueProcessingStatus<>(); private int maxWorkers; protected QueuedWorker(int maxWorkers) { this.maxWorkers = maxWorkers; } public QueueProcessingStatus startProcessing() { getInputs().forEach(queue::enqueue); startInputProcessing(); return queue; } public QueueProcessingStatus continueProcessing() { queue.setCalculate(true); return queue; } public QueueProcessingStatus stopProcessing() { queue.setCalculate(false); return queue; } public QueueProcessingStatus processingStatus() { return queue; } public QueueProcessingStatus clearQueue() { queue.clear(); return queue; } protected abstract Stream getInputs(); protected abstract Future doJob(final T job); protected void beforeJob(final T job) {} protected void afterSuccessfulJob(final T job, final U response) {} protected void afterFailedJob(final T job, final Throwable error) {} private void startInputProcessing() { if (!queue.hasNext()) { return; } T job = queue.startProcessing(); executeJob(job); for (int i = 0; i < maxWorkers; i++) { startNextJob(); } } private void startNextJob() { if (!queue.isCalculate() || !queue.hasNext() || queue.getRunningCalculations() >= maxWorkers) { return; } executeJob(queue.processNext()); } private void executeJob(final T job) { beforeJob(job); doJob(job).onSuccess(response -> { queue.complete(); afterSuccessfulJob(job, response); startNextJob(); }).onFailure(error -> { queue.complete(); afterFailedJob(job, error); startNextJob(); }); } }