From b950c9ebdb11a6da17c8e6d23c6a23a43f0a15b5 Mon Sep 17 00:00:00 2001 From: Peter Schuller Date: Fri, 17 Dec 2021 07:31:44 +0100 Subject: [PATCH] [wip] initial commit extracted useful snippets from other projects --- .classpath | 11 ++ .gitignore | 20 +++ .project | 18 ++ build.xml | 106 ++++++++++++ ivy.xml | 14 ++ .../pzzz/vertx/PersistentRestDataAccess.java | 109 +++++++++++++ src/de/pzzz/vertx/RestCommand.java | 5 + src/de/pzzz/vertx/RestDataAccess.java | 154 ++++++++++++++++++ src/de/pzzz/vertx/RestDataRequest.java | 46 ++++++ src/de/pzzz/vertx/SerializableWithId.java | 22 +++ src/de/pzzz/vertx/ServerVerticle.java | 85 ++++++++++ src/de/pzzz/vertx/Startup.java | 15 ++ .../pzzz/vertx/process/ExecutableProcess.java | 68 ++++++++ src/de/pzzz/vertx/process/ItemProcess.java | 44 +++++ .../pzzz/vertx/process/ProcessController.java | 149 +++++++++++++++++ .../process/ProcessExecutionController.java | 62 +++++++ src/de/pzzz/vertx/process/ProcessRequest.java | 31 ++++ src/de/pzzz/vertx/process/ProcessStatus.java | 5 + .../pzzz/vertx/worker/DeployableWorker.java | 9 + .../pzzz/vertx/worker/FileSaveController.java | 38 +++++ .../pzzz/vertx/worker/FileSaveVerticle.java | 45 +++++ .../vertx/worker/QueueProcessingStatus.java | 70 ++++++++ src/de/pzzz/vertx/worker/QueuedWorker.java | 80 +++++++++ src/de/pzzz/vertx/worker/SaveFile.java | 26 +++ .../pzzz/vertx/worker/WorkerController.java | 86 ++++++++++ src/de/pzzz/vertx/worker/WorkerVerticle.java | 44 +++++ 26 files changed, 1362 insertions(+) create mode 100644 .classpath create mode 100644 .gitignore create mode 100644 .project create mode 100755 build.xml create mode 100755 ivy.xml create mode 100755 src/de/pzzz/vertx/PersistentRestDataAccess.java create mode 100755 src/de/pzzz/vertx/RestCommand.java create mode 100755 src/de/pzzz/vertx/RestDataAccess.java create mode 100755 src/de/pzzz/vertx/RestDataRequest.java create mode 100755 src/de/pzzz/vertx/SerializableWithId.java create mode 100755 src/de/pzzz/vertx/ServerVerticle.java create mode 100755 src/de/pzzz/vertx/Startup.java create mode 100755 src/de/pzzz/vertx/process/ExecutableProcess.java create mode 100755 src/de/pzzz/vertx/process/ItemProcess.java create mode 100755 src/de/pzzz/vertx/process/ProcessController.java create mode 100755 src/de/pzzz/vertx/process/ProcessExecutionController.java create mode 100755 src/de/pzzz/vertx/process/ProcessRequest.java create mode 100755 src/de/pzzz/vertx/process/ProcessStatus.java create mode 100755 src/de/pzzz/vertx/worker/DeployableWorker.java create mode 100755 src/de/pzzz/vertx/worker/FileSaveController.java create mode 100755 src/de/pzzz/vertx/worker/FileSaveVerticle.java create mode 100755 src/de/pzzz/vertx/worker/QueueProcessingStatus.java create mode 100755 src/de/pzzz/vertx/worker/QueuedWorker.java create mode 100755 src/de/pzzz/vertx/worker/SaveFile.java create mode 100755 src/de/pzzz/vertx/worker/WorkerController.java create mode 100755 src/de/pzzz/vertx/worker/WorkerVerticle.java diff --git a/.classpath b/.classpath new file mode 100644 index 0000000..04669dc --- /dev/null +++ b/.classpath @@ -0,0 +1,11 @@ + + + + + + + + + + + diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a02aea5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,20 @@ +# ---> Java +*.class +/bin/ +/lib/default/ + +# Mobile Tools for Java (J2ME) +.mtj.tmp/ + +# Package Files # +*.jar +!fallback.jar +*.war +*.ear + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* + +# compiled reports +reports/ +javadoc/ diff --git a/.project b/.project new file mode 100644 index 0000000..14f128e --- /dev/null +++ b/.project @@ -0,0 +1,18 @@ + + + opinionated-vertx + + + + + + org.eclipse.jdt.core.javabuilder + + + + + + org.eclipse.jdt.core.javanature + org.apache.ivyde.eclipse.ivynature + + diff --git a/build.xml b/build.xml new file mode 100755 index 0000000..62fd19a --- /dev/null +++ b/build.xml @@ -0,0 +1,106 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/ivy.xml b/ivy.xml new file mode 100755 index 0000000..745fac2 --- /dev/null +++ b/ivy.xml @@ -0,0 +1,14 @@ + + + + + + + + + + + diff --git a/src/de/pzzz/vertx/PersistentRestDataAccess.java b/src/de/pzzz/vertx/PersistentRestDataAccess.java new file mode 100755 index 0000000..02b8b11 --- /dev/null +++ b/src/de/pzzz/vertx/PersistentRestDataAccess.java @@ -0,0 +1,109 @@ +package de.pzzz.vertx; + +import java.nio.file.Path; +import java.util.Collection; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +import io.vertx.core.CompositeFuture; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.file.FileSystem; +import io.vertx.core.json.Json; + +public class PersistentRestDataAccess extends RestDataAccess { + private static final Logger LOG = Logger.getLogger(PersistentRestDataAccess.class.getName()); + + private final String baseDir; + private final Vertx vertx; + private boolean initialized = false; + + public PersistentRestDataAccess(final Class classReference, final Vertx vertx, final String baseDir) { + super(classReference); + this.vertx = vertx; + this.baseDir = baseDir; + } + + @Override + public void add(final T data) { + checkInitialization(); + super.add(data); + vertx.fileSystem().writeFile(filePath(data.getId()), Json.encodeToBuffer(data)) + .onFailure(error -> LOG.log(Level.SEVERE, "Data integrity issue! " + error.getMessage(), error)); + } + + @Override + public void delete(final String id) { + checkInitialization(); + super.delete(id); + vertx.fileSystem().delete(filePath(id)) + .onFailure(error -> LOG.log(Level.SEVERE, "Data integrity issue! " + error.getMessage(), error)); + } + + @Override + public boolean contains(String id) { + checkInitialization(); + return super.contains(id); + } + + @Override + public T get(String id) { + checkInitialization(); + return super.get(id); + } + + @Override + public Collection list() { + checkInitialization(); + return super.list(); + } + + @Override + public void update(final T newData) { + checkInitialization(); + super.update(newData); + vertx.fileSystem().writeFile(filePath(newData.getId()), Json.encodeToBuffer(newData)) + .onFailure(error -> LOG.log(Level.SEVERE, "Data integrity issue! " + error.getMessage(), error)); + } + + public Future> initialize() { + LOG.info("Initializing " + classReference.getName() + " data access from directory " + baseDir); + Promise> promise = Promise.promise(); + FileSystem fs = vertx.fileSystem(); + fs.readDir(baseDir) + .compose(this::readAllFiles) + .onSuccess(ar -> { + initialized = true; + promise.complete(this); + }).onFailure(promise::fail); + return promise.future(); + } + + private void checkInitialization() { + if (!initialized) { + throw new IllegalStateException("PersistentRestDataAccess needs to be initialized first!"); + } + } + + private CompositeFuture readAllFiles(final List filePaths) { + LOG.info("Found " + filePaths.size() + " files in directory " + baseDir); + return CompositeFuture.join(filePaths.stream().map(this::readFileContent).toList()); + } + + private Future readFileContent(final String filePath) { + Promise readFuture = Promise.promise(); + vertx.fileSystem().readFile(filePath) + .onComplete(ar -> { + super.add(Json.decodeValue(ar.result(), classReference)); + readFuture.complete(); + }) + .onFailure(readFuture::fail); + return readFuture.future(); + } + + private final String filePath(final String id) { + return Path.of(baseDir, id + ".json").toString(); + } +} diff --git a/src/de/pzzz/vertx/RestCommand.java b/src/de/pzzz/vertx/RestCommand.java new file mode 100755 index 0000000..5f4e5a9 --- /dev/null +++ b/src/de/pzzz/vertx/RestCommand.java @@ -0,0 +1,5 @@ +package de.pzzz.vertx; + +public enum RestCommand { + LIST, ADD, GET, UPDATE, DELETE; +} diff --git a/src/de/pzzz/vertx/RestDataAccess.java b/src/de/pzzz/vertx/RestDataAccess.java new file mode 100755 index 0000000..092a2b3 --- /dev/null +++ b/src/de/pzzz/vertx/RestDataAccess.java @@ -0,0 +1,154 @@ +package de.pzzz.vertx; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import io.vertx.core.Vertx; +import io.vertx.core.eventbus.Message; +import io.vertx.core.eventbus.MessageConsumer; +import io.vertx.core.json.Json; +import io.vertx.ext.web.Router; +import io.vertx.ext.web.RoutingContext; + +public class RestDataAccess { + private Map dataMap = new HashMap<>(); + protected final Class classReference; + + public RestDataAccess(final Class classReference) { + this.classReference = classReference; + } + + public void registerRoutes(final Router router, final String baseUrl) { + router.get(baseUrl).handler(this::list); + router.post(baseUrl).handler(this::add); + router.get(baseUrl + "/:id").handler(this::get); + router.put(baseUrl + "/:id").handler(this::update); + router.delete(baseUrl + "/:id").handler(this::delete); + } + + /** + * Returns bus address. + * + * @param vertx + * @return + */ + public String setupConsumer(final Vertx vertx) { + final String busAddress = this.getClass().getName() + "." + UUID.randomUUID(); + MessageConsumer> consumer = vertx.eventBus() + .consumer(busAddress); + consumer.handler(this::handleMessage); + return busAddress; + } + + public Collection list() { + return dataMap.values(); + } + + public T get(final String id) { + return dataMap.get(id); + } + + public boolean contains(final String id) { + return dataMap.containsKey(id); + } + + public void add(final T data) { + dataMap.put(data.getId(), data); + } + + public void update(final T newData) { + dataMap.put(newData.getId(), newData); + } + + public void delete(final String id) { + dataMap.remove(id); + } + + protected void update(final String id, final RoutingContext context) { + T newData = Json.decodeValue(context.getBody(), classReference); + if (!newData.getId().equals(id)) { + context.fail(400); + } + update(newData); + } + + protected T getDataFromRequest(final RoutingContext context) { + return Json.decodeValue(context.getBody(), classReference); + } + + private void handleMessage(final Message> request) { + switch (request.body().getCommand()) { + case LIST: + request.reply(new ArrayList<>(list())); + break; + case GET: + request.reply(get(request.body().getId())); + default: + request.fail(500, "Unsupported request type"); + break; + } + } + + private void list(final RoutingContext context) { + if (context.response().ended()) { + return; + } + context.response().end(Json.encodePrettily(list())); + } + + private void add(final RoutingContext context) { + if (context.response().ended()) { + return; + } + T data = getDataFromRequest(context); + if (context.response().ended()) { + return; + } + add(data); + context.response().setStatusCode(201).end(); + } + + private void get(final RoutingContext context) { + if (context.response().ended()) { + return; + } + final String id = context.pathParam("id"); + if (!dataMap.containsKey(id)) { + context.fail(404); + return; + } + context.response().end(Json.encodePrettily(get(id))); + } + + private void update(final RoutingContext context) { + if (context.response().ended()) { + return; + } + final String id = context.pathParam("id"); + if (!dataMap.containsKey(id)) { + context.fail(404); + return; + } + update(id, context); + if (context.response().ended()) { + return; + } + context.response().setStatusCode(204).end(); + } + + private void delete(final RoutingContext context) { + if (context.response().ended()) { + return; + } + final String id = context.pathParam("id"); + if (!dataMap.containsKey(id)) { + context.fail(404); + return; + } + delete(id); + context.response().setStatusCode(204).end(); + } +} diff --git a/src/de/pzzz/vertx/RestDataRequest.java b/src/de/pzzz/vertx/RestDataRequest.java new file mode 100755 index 0000000..22ae16f --- /dev/null +++ b/src/de/pzzz/vertx/RestDataRequest.java @@ -0,0 +1,46 @@ +package de.pzzz.vertx; + +import java.io.Serializable; + +public class RestDataRequest implements Serializable { + private static final long serialVersionUID = 1L; + + private String id; + private T data; + private RestCommand command; + + public RestDataRequest listRequest() { + this.command = RestCommand.LIST; + return this; + } + + public RestDataRequest getRequest(final String id) { + this.command = RestCommand.GET; + this.id = id; + return this; + } + + public String getId() { + return id; + } + + public void setId(final String id) { + this.id = id; + } + + public T getData() { + return data; + } + + public void setData(final T data) { + this.data = data; + } + + public RestCommand getCommand() { + return command; + } + + public void setCommand(final RestCommand command) { + this.command = command; + } +} diff --git a/src/de/pzzz/vertx/SerializableWithId.java b/src/de/pzzz/vertx/SerializableWithId.java new file mode 100755 index 0000000..9f88691 --- /dev/null +++ b/src/de/pzzz/vertx/SerializableWithId.java @@ -0,0 +1,22 @@ +package de.pzzz.vertx; + +import java.io.Serializable; +import java.util.UUID; + +public abstract class SerializableWithId implements Serializable { + private static final long serialVersionUID = -7935376730476798064L; + + private String id; + + public SerializableWithId() { + this.id = UUID.randomUUID().toString(); + } + + public String getId() { + return id; + } + + public void setId(final String id) { + this.id = id; + } +} diff --git a/src/de/pzzz/vertx/ServerVerticle.java b/src/de/pzzz/vertx/ServerVerticle.java new file mode 100755 index 0000000..1045871 --- /dev/null +++ b/src/de/pzzz/vertx/ServerVerticle.java @@ -0,0 +1,85 @@ +package de.pzzz.vertx; + +import java.util.logging.Logger; + +import io.vertx.config.ConfigRetriever; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.http.HttpMethod; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.Router; +import io.vertx.ext.web.handler.BodyHandler; +import io.vertx.ext.web.handler.CorsHandler; +import io.vertx.ext.web.handler.StaticHandler; + +public abstract class ServerVerticle extends AbstractVerticle { + private static final Logger LOG = Logger.getLogger(ServerVerticle.class.getName()); + + public static final String API_URL_BASE = "/api/v1"; + private Router router; + + @Override + public void start(Promise startPromise) throws Exception { + super.start(); + long startTime = System.currentTimeMillis(); + + initConfig().compose(this::setupRouter) + .compose(this::initServerLogic) + .compose(this::setupStaticRoutes) + .compose(this::startServer) + .onComplete(result -> LOG.info(() -> "Started in " + (System.currentTimeMillis() - startTime) + "ms")) + .onFailure(error -> { + error.printStackTrace(); + startPromise.fail(error.getMessage()); + }) + .onComplete(startPromise); + } + + protected abstract Future setupServerLogic(final Startup startup, final Router router); + + private Future initConfig() { + ConfigRetriever configRetriever = ConfigRetriever.create(vertx); + configRetriever.getConfig(); + return configRetriever.getConfig().map(Startup::new); + } + + private Future setupRouter(final Startup startup) { + router = Router.router(vertx); + router.route(API_URL_BASE + "/*") + .handler(CorsHandler.create() + .addOrigin("*") + .allowedMethod(HttpMethod.GET) + .allowedMethod(HttpMethod.POST) + .allowedMethod(HttpMethod.PUT) + .allowedMethod(HttpMethod.DELETE) + .allowedMethod(HttpMethod.OPTIONS) + .allowedHeader("Access-Control-Request-Method") + .allowedHeader("Access-Control-Allow-Credentials") + .allowedHeader("Access-Control-Allow-Origin") + .allowedHeader("Access-Control-Allow-Headers") + .allowedHeader("Content-Type") + .allowedHeader("Origin") + .allowedHeader("Accept") + .allowedHeader("Authorization")); + router.route(API_URL_BASE + "/*").handler(BodyHandler.create()); + return Future.succeededFuture(startup); + } + + private Future initServerLogic(final Startup startup) { + return setupServerLogic(startup, router); + } + + private Future setupStaticRoutes(final Startup startup) { + router.route("/*").handler(StaticHandler.create()); + router.get().handler(context -> context.response().sendFile("webroot/index.html")); + return Future.succeededFuture(startup); + } + + private Future startServer(final Startup startup) { + JsonObject httpConfig = startup.getConfig().getJsonObject("http", new JsonObject()); + int port = httpConfig.getInteger("port", 8080); + vertx.createHttpServer().requestHandler(router).listen(port); + return Future.succeededFuture(); + } +} diff --git a/src/de/pzzz/vertx/Startup.java b/src/de/pzzz/vertx/Startup.java new file mode 100755 index 0000000..0b5f329 --- /dev/null +++ b/src/de/pzzz/vertx/Startup.java @@ -0,0 +1,15 @@ +package de.pzzz.vertx; + +import io.vertx.core.json.JsonObject; + +public class Startup { + private final JsonObject config; + + public Startup(JsonObject config) { + this.config = config; + } + + public JsonObject getConfig() { + return config; + } +} diff --git a/src/de/pzzz/vertx/process/ExecutableProcess.java b/src/de/pzzz/vertx/process/ExecutableProcess.java new file mode 100755 index 0000000..286bf6c --- /dev/null +++ b/src/de/pzzz/vertx/process/ExecutableProcess.java @@ -0,0 +1,68 @@ +package de.pzzz.vertx.process; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import de.pzzz.vertx.SerializableWithId; + +public abstract class ExecutableProcess> extends SerializableWithId { + private static final long serialVersionUID = 4475632132455503715L; + + private String name; + private ProcessStatus status; + private int parallelRequests = 1; + private List processingItems; + + public ExecutableProcess() { + super(); + } + + public ExecutableProcess(final ProcessRequest request) { + super(); + this.name = request.getName(); + this.status = ProcessStatus.READY; + this.parallelRequests = request.getParallelRequests(); + this.processingItems = new ArrayList<>(); + Set itemRequests = new HashSet<>(request.getProcessingItems()); + for (T itemRequest: itemRequests) { + processingItems.add(getItemProcess(itemRequest)); + } + } + + protected abstract U getItemProcess(final T itemRequest); + + public String getName() { + return name; + } + + public void setName(final String name) { + this.name = name; + } + + public ProcessStatus getStatus() { + return status; + } + + public void setStatus(final ProcessStatus status) { + this.status = status; + } + + public int getParallelRequests() { + return parallelRequests; + } + + public void setParallelRequests(final int parallelRequests) { + this.parallelRequests = parallelRequests; + } + + public List getProcessingItems() { + return processingItems; + } + + public void setProcessingItems(final List processingItems) { + this.processingItems = processingItems; + } +} diff --git a/src/de/pzzz/vertx/process/ItemProcess.java b/src/de/pzzz/vertx/process/ItemProcess.java new file mode 100755 index 0000000..0963307 --- /dev/null +++ b/src/de/pzzz/vertx/process/ItemProcess.java @@ -0,0 +1,44 @@ +package de.pzzz.vertx.process; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +public class ItemProcess implements Serializable { + private static final long serialVersionUID = -152613060384269555L; + + private T item; + private List messages = new ArrayList<>(); + + public ItemProcess() {} + + public ItemProcess(final T item) { + this(); + this.item = item; + } + + public void setErrorState() {} + + public void addMessage(final String message) { + if (null == messages) { + messages = new ArrayList<>(); + } + messages.add(message); + } + + public T getItem() { + return item; + } + + public void setItem(final T item) { + this.item = item; + } + + public List getMessages() { + return messages; + } + + public void setMessages(final List messages) { + this.messages = messages; + } +} diff --git a/src/de/pzzz/vertx/process/ProcessController.java b/src/de/pzzz/vertx/process/ProcessController.java new file mode 100755 index 0000000..658a20e --- /dev/null +++ b/src/de/pzzz/vertx/process/ProcessController.java @@ -0,0 +1,149 @@ +package de.pzzz.vertx.process; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import de.pzzz.vertx.worker.QueueProcessingStatus; +import de.pzzz.vertx.PersistentRestDataAccess; +import de.pzzz.vertx.Startup; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.json.Json; +import io.vertx.ext.web.Router; +import io.vertx.ext.web.RoutingContext; + +public abstract class ProcessController, U extends ItemProcess,V extends Serializable, W> extends PersistentRestDataAccess { + + private Map> executors = new HashMap<>(); + private final Vertx vertx; + private final Startup startup; + + public ProcessController(final Class classReference, final Vertx vertx, final Startup startup, final String baseDir) { + super(classReference, vertx, baseDir); + this.vertx = vertx; + this.startup = startup; + } + + @Override + public void delete(final String id) { + //TODO work with Future and improve error handling + if (executors.containsKey(id)) { + ProcessExecutionController executor = executors.get(id); + executor.stopProcessing(); + executor.close(null); + } + super.delete(id); + } + + @Override + protected void update(final String id, final RoutingContext context) { + if (!get(id).getStatus().equals(ProcessStatus.READY)) { + context.fail(405); + return; + } + //TODO work with Future and improve error handling + if (executors.containsKey(id)) { + ProcessExecutionController executor = executors.get(id); + executor.stopProcessing(); + executor.close(null); + } + T newData = getDataFromRequest(context); + newData.setId(id); + update(newData); + } + + @Override + public void registerRoutes(final Router router, final String baseUrl) { + super.registerRoutes(router, baseUrl); + router.post(baseUrl + "/:id/start").handler(this::startProcessing); + router.get(baseUrl + "/:id/status").handler(this::processStatus); + router.post(baseUrl + "/:id/stop").handler(this::stopProcessing); + router.post(baseUrl + "/:id/clear").handler(this::clearProcessingQueue); + } + + abstract protected ProcessExecutionController createNewProcessExecutionController(final T process, final Vertx vertx, final Startup startup, final String id); + + private void startProcessing(final RoutingContext context) { + if (context.response().ended()) { + return; + } + final String id = context.pathParam("id"); + if (!contains(id)) { + context.fail(404); + return; + } + startOrContinueProcessing(id) + .onSuccess(status -> context.response().end(Json.encodePrettily(status))) + .onFailure(error -> context.fail(500)); + } + + private Future> startOrContinueProcessing(final String id) { + Promise> promise = Promise.promise(); + if (!executors.containsKey(id)) { + T process = get(id); + process.setStatus(ProcessStatus.RUNNING); + ProcessExecutionController executor = createNewProcessExecutionController(process, vertx, startup, id); + executors.put(id, executor); + executor.deployWorkers() + .onSuccess(res -> promise.complete(executor.startProcessing())) + .onFailure(error -> promise.fail(error)); + } else { + ProcessExecutionController executor = executors.get(id); + promise.complete(executor.continueProcessing()); + } + return promise.future(); + } + + private void processStatus(final RoutingContext context) { + if (context.response().ended()) { + return; + } + final String id = context.pathParam("id"); + if (!contains(id)) { + context.fail(404); + return; + } + if (!executors.containsKey(id)) { + context.fail(400); + return; + } + ProcessExecutionController executor = executors.get(id); + context.response().end(Json.encodePrettily(executor.processingStatus())); + } + + private void stopProcessing(final RoutingContext context) { + if (context.response().ended()) { + return; + } + final String id = context.pathParam("id"); + if (!contains(id)) { + context.fail(404); + return; + } + if (!executors.containsKey(id)) { + context.fail(400); + return; + } + ProcessExecutionController executor = executors.get(id); + context.response().end(Json.encodePrettily(executor.stopProcessing())); + } + + private void clearProcessingQueue(final RoutingContext context) { + if (context.response().ended()) { + return; + } + final String id = context.pathParam("id"); + if (!contains(id)) { + context.fail(404); + return; + } + if (!executors.containsKey(id)) { + context.fail(400); + return; + } + ProcessExecutionController executor = executors.get(id); + context.response().end(Json.encodePrettily(executor.clearQueue())); + } +} diff --git a/src/de/pzzz/vertx/process/ProcessExecutionController.java b/src/de/pzzz/vertx/process/ProcessExecutionController.java new file mode 100755 index 0000000..d1cdfe1 --- /dev/null +++ b/src/de/pzzz/vertx/process/ProcessExecutionController.java @@ -0,0 +1,62 @@ +package de.pzzz.vertx.process; + +import java.io.Serializable; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +import de.pzzz.vertx.Startup; +import de.pzzz.vertx.worker.DeployableWorker; +import de.pzzz.vertx.worker.QueuedWorker; +import io.vertx.core.Closeable; +import io.vertx.core.CompositeFuture; +import io.vertx.core.Future; +import io.vertx.core.Promise; + +public abstract class ProcessExecutionController, U extends ItemProcess, V extends Serializable, W> + extends QueuedWorker implements Closeable { + private static final Logger LOG = Logger.getLogger(ProcessExecutionController.class.getName()); + + private final T process; + private final Startup startup; + + public ProcessExecutionController(final T process, final Startup startup) { + super(process.getParallelRequests()); + this.process = process; + this.startup = startup; + } + + public Future deployWorkers() { + Promise promise = Promise.promise(); + CompositeFuture.all(getWorker().stream().map(controller -> (Future) controller.deployWorkers(startup)).toList()) + .onSuccess(res -> promise.complete()) + .onFailure(error -> { + LOG.log(Level.SEVERE, error.getMessage(), error); + promise.fail(error); + }); + return promise.future(); + } + + @Override + public void close(final Promise completion) { + CompositeFuture.all(getWorker().stream().map(worker -> (Future) worker.undeployWorkers()).toList()) + .onSuccess(res -> completion.complete()) + .onFailure(error -> { + LOG.log(Level.SEVERE, error.getMessage(), error); + completion.fail(error); + }); + } + + protected abstract List getWorker(); + + protected T getProcess() { + return process; + } + + protected void handleJobFailure(final U job, final Throwable error, final Promise promise) { + LOG.log(Level.WARNING, "Failed job for " + job.getItem() + " with error " + error.getMessage(), error); + job.setErrorState(); + job.addMessage(error.getMessage()); + promise.fail(error); + } +} diff --git a/src/de/pzzz/vertx/process/ProcessRequest.java b/src/de/pzzz/vertx/process/ProcessRequest.java new file mode 100755 index 0000000..121ef2d --- /dev/null +++ b/src/de/pzzz/vertx/process/ProcessRequest.java @@ -0,0 +1,31 @@ +package de.pzzz.vertx.process; + +import java.io.Serializable; +import java.util.List; + +public class ProcessRequest implements Serializable { + private static final long serialVersionUID = 4946708367771253605L; + + private String name; + private List processingItems; + private int parallelRequests = 1; + + public String getName() { + return name; + } + public void setName(final String name) { + this.name = name; + } + public List getProcessingItems() { + return processingItems; + } + public void setProcessingItems(final List processingItems) { + this.processingItems = processingItems; + } + public int getParallelRequests() { + return parallelRequests; + } + public void setParallelRequests(final int parallelRequests) { + this.parallelRequests = parallelRequests; + } +} diff --git a/src/de/pzzz/vertx/process/ProcessStatus.java b/src/de/pzzz/vertx/process/ProcessStatus.java new file mode 100755 index 0000000..ed0a48f --- /dev/null +++ b/src/de/pzzz/vertx/process/ProcessStatus.java @@ -0,0 +1,5 @@ +package de.pzzz.vertx.process; + +public enum ProcessStatus { + READY, RUNNING, COMPLETED, ERRORS, FAILED; +} diff --git a/src/de/pzzz/vertx/worker/DeployableWorker.java b/src/de/pzzz/vertx/worker/DeployableWorker.java new file mode 100755 index 0000000..c734a1e --- /dev/null +++ b/src/de/pzzz/vertx/worker/DeployableWorker.java @@ -0,0 +1,9 @@ +package de.pzzz.vertx.worker; + +import de.pzzz.vertx.Startup; +import io.vertx.core.Future; + +public interface DeployableWorker { + Future deployWorkers(final Startup startup); + Future undeployWorkers(); +} diff --git a/src/de/pzzz/vertx/worker/FileSaveController.java b/src/de/pzzz/vertx/worker/FileSaveController.java new file mode 100755 index 0000000..4aa1d31 --- /dev/null +++ b/src/de/pzzz/vertx/worker/FileSaveController.java @@ -0,0 +1,38 @@ +package de.pzzz.vertx.worker; + +import java.nio.file.Paths; + +import io.vertx.core.Vertx; + +public abstract class FileSaveController extends WorkerController { + private final String parentJobId; + private final String fileExtension; + + public FileSaveController(final int maxWorkers, final Vertx vertx, final String parentJobId, final String fileExtension) { + super(maxWorkers, vertx, null); + this.parentJobId = parentJobId; + if (!fileExtension.startsWith(".")) { + this.fileExtension = "." + fileExtension; + } else { + this.fileExtension = fileExtension; + } + } + + protected abstract byte[] getContent(final T saveObject); + + protected abstract String getFilename(final T saveObject); + + @Override + protected Class> workerVerticleClass() { + return FileSaveVerticle.class; + } + + @Override + protected SaveFile getRequest(final T saveObject) { + String fileName = Paths.get(parentJobId, getFilename(saveObject) + fileExtension).toString(); + SaveFile saveTarget = new SaveFile(); + saveTarget.setName(fileName); + saveTarget.setContent(getContent(saveObject)); + return saveTarget; + } +} diff --git a/src/de/pzzz/vertx/worker/FileSaveVerticle.java b/src/de/pzzz/vertx/worker/FileSaveVerticle.java new file mode 100755 index 0000000..71da92e --- /dev/null +++ b/src/de/pzzz/vertx/worker/FileSaveVerticle.java @@ -0,0 +1,45 @@ +package de.pzzz.vertx.worker; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.logging.Level; +import java.util.logging.Logger; + +import de.pzzz.vertx.Startup; +import io.vertx.core.Future; +import io.vertx.core.eventbus.Message; + +public class FileSaveVerticle extends WorkerVerticle { + private static final Logger LOG = Logger.getLogger(FileSaveVerticle.class.getName()); + private static final String DEFAULT_OUTDIR = "out"; + + private Path baseDir; + + @Override + protected Future doSetup(final Startup startup) { + baseDir = Path.of(startup.getConfig().getString("file.basePath"), startup.getConfig().getString("file.out.dir", DEFAULT_OUTDIR)); + File baseDirFile = baseDir.toFile(); + if (!baseDirFile.isDirectory() || !baseDirFile.canWrite()) { + return Future.failedFuture("Failed to access " + baseDir); + } + return Future.succeededFuture(startup); + } + + @Override + protected void handleMessage(final Message message) { + Path targetFile = Path.of(baseDir.toString(), message.body().getName()); + LOG.info("Saving file " + targetFile); + try { + Files.createDirectories(targetFile.getParent()); + Files.write(targetFile, message.body().getContent()); + } catch (IOException e) { + LOG.log(Level.WARNING, e.getMessage(), e);; + message.fail(500, e.getMessage()); + return; + } + message.reply(targetFile.toString()); + } + +} diff --git a/src/de/pzzz/vertx/worker/QueueProcessingStatus.java b/src/de/pzzz/vertx/worker/QueueProcessingStatus.java new file mode 100755 index 0000000..dc433b3 --- /dev/null +++ b/src/de/pzzz/vertx/worker/QueueProcessingStatus.java @@ -0,0 +1,70 @@ +package de.pzzz.vertx.worker; + +import java.io.Serializable; +import java.util.LinkedList; +import java.util.Queue; + +public class QueueProcessingStatus implements Serializable { + private static final long serialVersionUID = 7560765077464782742L; + + private transient Queue requestsToProcess = new LinkedList<>(); + private boolean calculate = false; + private int runningCalculations = 0; + + public T startProcessing() { + if (requestsToProcess.isEmpty()) { + throw new IllegalStateException("No requests to process!"); + } + calculate = true; + runningCalculations = 1; + return requestsToProcess.poll(); + } + + public void stopProcessing() { + calculate = false; + } + + public boolean hasNext() { + return !requestsToProcess.isEmpty(); + } + + public T processNext() { + runningCalculations += 1; + return requestsToProcess.poll(); + } + + public void complete() { + runningCalculations -= 1; + if (runningCalculations == 0 && requestsToProcess.isEmpty()) { + calculate = false; + } + } + + public void enqueue(final T request) { + requestsToProcess.add(request); + } + + public void clear() { + requestsToProcess.clear(); + } + + public boolean isCalculate() { + return calculate; + } + + public void setCalculate(final boolean calculate) { + this.calculate = calculate; + } + + public int getRunningCalculations() { + return runningCalculations; + } + + public void setRunningCalculations(final int running) { + this.runningCalculations = running; + } + + public int getQueueLength() { + return requestsToProcess.size(); + } +} diff --git a/src/de/pzzz/vertx/worker/QueuedWorker.java b/src/de/pzzz/vertx/worker/QueuedWorker.java new file mode 100755 index 0000000..bcae17a --- /dev/null +++ b/src/de/pzzz/vertx/worker/QueuedWorker.java @@ -0,0 +1,80 @@ +package de.pzzz.vertx.worker; + +import java.util.stream.Stream; + +import io.vertx.core.Future; + +public abstract class QueuedWorker { + private QueueProcessingStatus queue = new QueueProcessingStatus<>(); + private int maxWorkers; + + protected QueuedWorker(int maxWorkers) { + this.maxWorkers = maxWorkers; + } + + public QueueProcessingStatus startProcessing() { + getInputs().forEach(queue::enqueue); + startInputProcessing(); + return queue; + } + + public QueueProcessingStatus continueProcessing() { + queue.setCalculate(true); + return queue; + } + + public QueueProcessingStatus stopProcessing() { + queue.setCalculate(false); + return queue; + } + + public QueueProcessingStatus processingStatus() { + return queue; + } + + public QueueProcessingStatus clearQueue() { + queue.clear(); + return queue; + } + + protected abstract Stream getInputs(); + + protected abstract Future doJob(final T job); + + protected void beforeJob(final T job) {} + + protected void afterSuccessfulJob(final T job, final U response) {} + + protected void afterFailedJob(final T job, final Throwable error) {} + + private void startInputProcessing() { + if (!queue.hasNext()) { + return; + } + T job = queue.startProcessing(); + executeJob(job); + for (int i = 0; i < maxWorkers; i++) { + startNextJob(); + } + } + + private void startNextJob() { + if (!queue.isCalculate() || !queue.hasNext() || queue.getRunningCalculations() >= maxWorkers) { + return; + } + executeJob(queue.processNext()); + } + + private void executeJob(final T job) { + beforeJob(job); + doJob(job).onSuccess(response -> { + queue.complete(); + afterSuccessfulJob(job, response); + startNextJob(); + }).onFailure(error -> { + queue.complete(); + afterFailedJob(job, error); + startNextJob(); + }); + } +} diff --git a/src/de/pzzz/vertx/worker/SaveFile.java b/src/de/pzzz/vertx/worker/SaveFile.java new file mode 100755 index 0000000..9aa002f --- /dev/null +++ b/src/de/pzzz/vertx/worker/SaveFile.java @@ -0,0 +1,26 @@ +package de.pzzz.vertx.worker; + +import java.io.Serializable; + +public class SaveFile implements Serializable { + private static final long serialVersionUID = 1189414819710044802L; + + private String name; + private byte[] content; + + public String getName() { + return name; + } + + public void setName(final String name) { + this.name = name; + } + + public byte[] getContent() { + return content; + } + + public void setContent(final byte[] content) { + this.content = content; + } +} diff --git a/src/de/pzzz/vertx/worker/WorkerController.java b/src/de/pzzz/vertx/worker/WorkerController.java new file mode 100755 index 0000000..b6492af --- /dev/null +++ b/src/de/pzzz/vertx/worker/WorkerController.java @@ -0,0 +1,86 @@ +package de.pzzz.vertx.worker; + +import java.util.UUID; +import java.util.logging.Level; +import java.util.logging.Logger; + +import de.pzzz.vertx.Startup; +import io.vertx.core.DeploymentOptions; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.eventbus.DeliveryOptions; +import io.vertx.core.json.JsonObject; + +public abstract class WorkerController implements DeployableWorker { + private static final Logger LOG = Logger.getLogger(WorkerController.class.getName()); + public static final String WORKER_BUS_ADDRESS_CONFIG = "worker.bus.address"; + + private final String uuid = UUID.randomUUID().toString(); + protected final int maxWorkers; + private String deploymentId; + private final Vertx vertx; + private final DeliveryOptions deliveryOptions = new DeliveryOptions(); + + protected WorkerController(final int maxWorkers, final Vertx vertx, final Long timeout) { + this.maxWorkers = maxWorkers; + this.vertx = vertx; + if (null != timeout) { + deliveryOptions.setSendTimeout(timeout); + } + } + + public Future deployWorkers(final Startup startup) { + if (null != deploymentId) { + throw new IllegalStateException("Verticle already running!"); + } + Promise promise = Promise.promise(); + JsonObject config = startup.getConfig(); + config.put(WORKER_BUS_ADDRESS_CONFIG, busAddress()); + LOG.info("Deploy " + maxWorkers + " workers for bus " + busAddress()); + DeploymentOptions workerOpts = new DeploymentOptions() + .setWorker(true) + .setConfig(startup.getConfig()) + .setWorkerPoolName(busAddress()) + .setInstances(maxWorkers) + .setWorkerPoolSize(maxWorkers); + vertx.deployVerticle(workerVerticleClass(), workerOpts) + .onSuccess(res -> { + deploymentId = res; + promise.complete(startup); + }) + .onFailure(promise::fail); + return promise.future(); + } + + public Future undeployWorkers() { + if (null == deploymentId) { + throw new IllegalStateException("Verticle is not running!"); + } + Promise promise = Promise.promise(); + vertx.undeploy(deploymentId) + .onSuccess(promise::complete) + .onFailure(promise::fail); + return promise.future(); + } + + public Future doJob(final T job) { + Promise result = Promise.promise(); + vertx.eventBus() + .request(busAddress(), getRequest(job), deliveryOptions) + .onSuccess(res -> result.complete(res.body())) + .onFailure(error -> { + LOG.log(Level.WARNING, error.getMessage(), error); + result.fail(error); + }); + return result.future(); + } + + protected abstract Class> workerVerticleClass(); + + protected String busAddress() { + return workerVerticleClass().getName() + "." + uuid; + } + + protected abstract U getRequest(final T job); +} diff --git a/src/de/pzzz/vertx/worker/WorkerVerticle.java b/src/de/pzzz/vertx/worker/WorkerVerticle.java new file mode 100755 index 0000000..bf57ffc --- /dev/null +++ b/src/de/pzzz/vertx/worker/WorkerVerticle.java @@ -0,0 +1,44 @@ +package de.pzzz.vertx.worker; + +import java.util.logging.Logger; + +import de.pzzz.vertx.Startup; +import io.vertx.config.ConfigRetriever; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.eventbus.Message; +import io.vertx.core.eventbus.MessageConsumer; + +public abstract class WorkerVerticle extends AbstractVerticle { + private static final Logger LOG = Logger.getLogger(WorkerVerticle.class.getName()); + + @Override + public void start(final Promise startPromise) throws Exception { + long startTime = System.currentTimeMillis(); + + initConfig().compose(this::doSetup) + .compose(this::setupConsumer) + .onComplete(result -> LOG.info(() -> "Started " + this.getClass().getName() + " in " + (System.currentTimeMillis() - startTime) + "ms")) + .onComplete(startPromise); + } + + protected abstract Future doSetup(final Startup startup); + + protected abstract void handleMessage(final Message message); + + private Future initConfig() { + ConfigRetriever configRetriever = ConfigRetriever.create(vertx); + configRetriever.getConfig(); + return configRetriever.getConfig().map(Startup::new); + } + + private Future setupConsumer(final Startup startup) { + LOG.info("Setup consumer for bus address " + startup.getConfig().getString(WorkerController.WORKER_BUS_ADDRESS_CONFIG)); + MessageConsumer consumer = vertx.eventBus() + .consumer(startup.getConfig().getString(WorkerController.WORKER_BUS_ADDRESS_CONFIG)); + consumer.handler(this::handleMessage); + return Future.succeededFuture(); + } + +}