[wip] initial commit

extracted useful snippets from other projects
This commit is contained in:
Peter Eiser 2021-12-17 07:31:44 +01:00
commit b950c9ebdb
26 changed files with 1362 additions and 0 deletions

View file

@ -0,0 +1,109 @@
package de.pzzz.vertx;
import java.nio.file.Path;
import java.util.Collection;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.file.FileSystem;
import io.vertx.core.json.Json;
public class PersistentRestDataAccess<T extends SerializableWithId> extends RestDataAccess<T> {
private static final Logger LOG = Logger.getLogger(PersistentRestDataAccess.class.getName());
private final String baseDir;
private final Vertx vertx;
private boolean initialized = false;
public PersistentRestDataAccess(final Class<T> classReference, final Vertx vertx, final String baseDir) {
super(classReference);
this.vertx = vertx;
this.baseDir = baseDir;
}
@Override
public void add(final T data) {
checkInitialization();
super.add(data);
vertx.fileSystem().writeFile(filePath(data.getId()), Json.encodeToBuffer(data))
.onFailure(error -> LOG.log(Level.SEVERE, "Data integrity issue! " + error.getMessage(), error));
}
@Override
public void delete(final String id) {
checkInitialization();
super.delete(id);
vertx.fileSystem().delete(filePath(id))
.onFailure(error -> LOG.log(Level.SEVERE, "Data integrity issue! " + error.getMessage(), error));
}
@Override
public boolean contains(String id) {
checkInitialization();
return super.contains(id);
}
@Override
public T get(String id) {
checkInitialization();
return super.get(id);
}
@Override
public Collection<T> list() {
checkInitialization();
return super.list();
}
@Override
public void update(final T newData) {
checkInitialization();
super.update(newData);
vertx.fileSystem().writeFile(filePath(newData.getId()), Json.encodeToBuffer(newData))
.onFailure(error -> LOG.log(Level.SEVERE, "Data integrity issue! " + error.getMessage(), error));
}
public Future<PersistentRestDataAccess<T>> initialize() {
LOG.info("Initializing " + classReference.getName() + " data access from directory " + baseDir);
Promise<PersistentRestDataAccess<T>> promise = Promise.promise();
FileSystem fs = vertx.fileSystem();
fs.readDir(baseDir)
.compose(this::readAllFiles)
.onSuccess(ar -> {
initialized = true;
promise.complete(this);
}).onFailure(promise::fail);
return promise.future();
}
private void checkInitialization() {
if (!initialized) {
throw new IllegalStateException("PersistentRestDataAccess needs to be initialized first!");
}
}
private CompositeFuture readAllFiles(final List<String> filePaths) {
LOG.info("Found " + filePaths.size() + " files in directory " + baseDir);
return CompositeFuture.join(filePaths.stream().map(this::readFileContent).toList());
}
private Future readFileContent(final String filePath) {
Promise<Void> readFuture = Promise.promise();
vertx.fileSystem().readFile(filePath)
.onComplete(ar -> {
super.add(Json.decodeValue(ar.result(), classReference));
readFuture.complete();
})
.onFailure(readFuture::fail);
return readFuture.future();
}
private final String filePath(final String id) {
return Path.of(baseDir, id + ".json").toString();
}
}

View file

@ -0,0 +1,5 @@
package de.pzzz.vertx;
public enum RestCommand {
LIST, ADD, GET, UPDATE, DELETE;
}

View file

@ -0,0 +1,154 @@
package de.pzzz.vertx;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.json.Json;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
public class RestDataAccess<T extends SerializableWithId> {
private Map<String, T> dataMap = new HashMap<>();
protected final Class<T> classReference;
public RestDataAccess(final Class<T> classReference) {
this.classReference = classReference;
}
public void registerRoutes(final Router router, final String baseUrl) {
router.get(baseUrl).handler(this::list);
router.post(baseUrl).handler(this::add);
router.get(baseUrl + "/:id").handler(this::get);
router.put(baseUrl + "/:id").handler(this::update);
router.delete(baseUrl + "/:id").handler(this::delete);
}
/**
* Returns bus address.
*
* @param vertx
* @return
*/
public String setupConsumer(final Vertx vertx) {
final String busAddress = this.getClass().getName() + "." + UUID.randomUUID();
MessageConsumer<RestDataRequest<T>> consumer = vertx.eventBus()
.consumer(busAddress);
consumer.handler(this::handleMessage);
return busAddress;
}
public Collection<T> list() {
return dataMap.values();
}
public T get(final String id) {
return dataMap.get(id);
}
public boolean contains(final String id) {
return dataMap.containsKey(id);
}
public void add(final T data) {
dataMap.put(data.getId(), data);
}
public void update(final T newData) {
dataMap.put(newData.getId(), newData);
}
public void delete(final String id) {
dataMap.remove(id);
}
protected void update(final String id, final RoutingContext context) {
T newData = Json.decodeValue(context.getBody(), classReference);
if (!newData.getId().equals(id)) {
context.fail(400);
}
update(newData);
}
protected T getDataFromRequest(final RoutingContext context) {
return Json.decodeValue(context.getBody(), classReference);
}
private void handleMessage(final Message<RestDataRequest<T>> request) {
switch (request.body().getCommand()) {
case LIST:
request.reply(new ArrayList<>(list()));
break;
case GET:
request.reply(get(request.body().getId()));
default:
request.fail(500, "Unsupported request type");
break;
}
}
private void list(final RoutingContext context) {
if (context.response().ended()) {
return;
}
context.response().end(Json.encodePrettily(list()));
}
private void add(final RoutingContext context) {
if (context.response().ended()) {
return;
}
T data = getDataFromRequest(context);
if (context.response().ended()) {
return;
}
add(data);
context.response().setStatusCode(201).end();
}
private void get(final RoutingContext context) {
if (context.response().ended()) {
return;
}
final String id = context.pathParam("id");
if (!dataMap.containsKey(id)) {
context.fail(404);
return;
}
context.response().end(Json.encodePrettily(get(id)));
}
private void update(final RoutingContext context) {
if (context.response().ended()) {
return;
}
final String id = context.pathParam("id");
if (!dataMap.containsKey(id)) {
context.fail(404);
return;
}
update(id, context);
if (context.response().ended()) {
return;
}
context.response().setStatusCode(204).end();
}
private void delete(final RoutingContext context) {
if (context.response().ended()) {
return;
}
final String id = context.pathParam("id");
if (!dataMap.containsKey(id)) {
context.fail(404);
return;
}
delete(id);
context.response().setStatusCode(204).end();
}
}

View file

@ -0,0 +1,46 @@
package de.pzzz.vertx;
import java.io.Serializable;
public class RestDataRequest<T> implements Serializable {
private static final long serialVersionUID = 1L;
private String id;
private T data;
private RestCommand command;
public RestDataRequest<T> listRequest() {
this.command = RestCommand.LIST;
return this;
}
public RestDataRequest<T> getRequest(final String id) {
this.command = RestCommand.GET;
this.id = id;
return this;
}
public String getId() {
return id;
}
public void setId(final String id) {
this.id = id;
}
public T getData() {
return data;
}
public void setData(final T data) {
this.data = data;
}
public RestCommand getCommand() {
return command;
}
public void setCommand(final RestCommand command) {
this.command = command;
}
}

View file

@ -0,0 +1,22 @@
package de.pzzz.vertx;
import java.io.Serializable;
import java.util.UUID;
public abstract class SerializableWithId implements Serializable {
private static final long serialVersionUID = -7935376730476798064L;
private String id;
public SerializableWithId() {
this.id = UUID.randomUUID().toString();
}
public String getId() {
return id;
}
public void setId(final String id) {
this.id = id;
}
}

View file

@ -0,0 +1,85 @@
package de.pzzz.vertx;
import java.util.logging.Logger;
import io.vertx.config.ConfigRetriever;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.BodyHandler;
import io.vertx.ext.web.handler.CorsHandler;
import io.vertx.ext.web.handler.StaticHandler;
public abstract class ServerVerticle extends AbstractVerticle {
private static final Logger LOG = Logger.getLogger(ServerVerticle.class.getName());
public static final String API_URL_BASE = "/api/v1";
private Router router;
@Override
public void start(Promise<Void> startPromise) throws Exception {
super.start();
long startTime = System.currentTimeMillis();
initConfig().compose(this::setupRouter)
.compose(this::initServerLogic)
.compose(this::setupStaticRoutes)
.compose(this::startServer)
.onComplete(result -> LOG.info(() -> "Started in " + (System.currentTimeMillis() - startTime) + "ms"))
.onFailure(error -> {
error.printStackTrace();
startPromise.fail(error.getMessage());
})
.onComplete(startPromise);
}
protected abstract Future<Startup> setupServerLogic(final Startup startup, final Router router);
private Future<Startup> initConfig() {
ConfigRetriever configRetriever = ConfigRetriever.create(vertx);
configRetriever.getConfig();
return configRetriever.getConfig().map(Startup::new);
}
private Future<Startup> setupRouter(final Startup startup) {
router = Router.router(vertx);
router.route(API_URL_BASE + "/*")
.handler(CorsHandler.create()
.addOrigin("*")
.allowedMethod(HttpMethod.GET)
.allowedMethod(HttpMethod.POST)
.allowedMethod(HttpMethod.PUT)
.allowedMethod(HttpMethod.DELETE)
.allowedMethod(HttpMethod.OPTIONS)
.allowedHeader("Access-Control-Request-Method")
.allowedHeader("Access-Control-Allow-Credentials")
.allowedHeader("Access-Control-Allow-Origin")
.allowedHeader("Access-Control-Allow-Headers")
.allowedHeader("Content-Type")
.allowedHeader("Origin")
.allowedHeader("Accept")
.allowedHeader("Authorization"));
router.route(API_URL_BASE + "/*").handler(BodyHandler.create());
return Future.succeededFuture(startup);
}
private Future<Startup> initServerLogic(final Startup startup) {
return setupServerLogic(startup, router);
}
private Future<Startup> setupStaticRoutes(final Startup startup) {
router.route("/*").handler(StaticHandler.create());
router.get().handler(context -> context.response().sendFile("webroot/index.html"));
return Future.succeededFuture(startup);
}
private Future<Void> startServer(final Startup startup) {
JsonObject httpConfig = startup.getConfig().getJsonObject("http", new JsonObject());
int port = httpConfig.getInteger("port", 8080);
vertx.createHttpServer().requestHandler(router).listen(port);
return Future.succeededFuture();
}
}

15
src/de/pzzz/vertx/Startup.java Executable file
View file

@ -0,0 +1,15 @@
package de.pzzz.vertx;
import io.vertx.core.json.JsonObject;
public class Startup {
private final JsonObject config;
public Startup(JsonObject config) {
this.config = config;
}
public JsonObject getConfig() {
return config;
}
}

View file

@ -0,0 +1,68 @@
package de.pzzz.vertx.process;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import de.pzzz.vertx.SerializableWithId;
public abstract class ExecutableProcess<T extends Serializable, U extends ItemProcess<T>> extends SerializableWithId {
private static final long serialVersionUID = 4475632132455503715L;
private String name;
private ProcessStatus status;
private int parallelRequests = 1;
private List<U> processingItems;
public ExecutableProcess() {
super();
}
public ExecutableProcess(final ProcessRequest<T> request) {
super();
this.name = request.getName();
this.status = ProcessStatus.READY;
this.parallelRequests = request.getParallelRequests();
this.processingItems = new ArrayList<>();
Set<T> itemRequests = new HashSet<>(request.getProcessingItems());
for (T itemRequest: itemRequests) {
processingItems.add(getItemProcess(itemRequest));
}
}
protected abstract U getItemProcess(final T itemRequest);
public String getName() {
return name;
}
public void setName(final String name) {
this.name = name;
}
public ProcessStatus getStatus() {
return status;
}
public void setStatus(final ProcessStatus status) {
this.status = status;
}
public int getParallelRequests() {
return parallelRequests;
}
public void setParallelRequests(final int parallelRequests) {
this.parallelRequests = parallelRequests;
}
public List<U> getProcessingItems() {
return processingItems;
}
public void setProcessingItems(final List<U> processingItems) {
this.processingItems = processingItems;
}
}

View file

@ -0,0 +1,44 @@
package de.pzzz.vertx.process;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class ItemProcess<T extends Serializable> implements Serializable {
private static final long serialVersionUID = -152613060384269555L;
private T item;
private List<String> messages = new ArrayList<>();
public ItemProcess() {}
public ItemProcess(final T item) {
this();
this.item = item;
}
public void setErrorState() {}
public void addMessage(final String message) {
if (null == messages) {
messages = new ArrayList<>();
}
messages.add(message);
}
public T getItem() {
return item;
}
public void setItem(final T item) {
this.item = item;
}
public List<String> getMessages() {
return messages;
}
public void setMessages(final List<String> messages) {
this.messages = messages;
}
}

View file

@ -0,0 +1,149 @@
package de.pzzz.vertx.process;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import de.pzzz.vertx.worker.QueueProcessingStatus;
import de.pzzz.vertx.PersistentRestDataAccess;
import de.pzzz.vertx.Startup;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.json.Json;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
public abstract class ProcessController<T extends ExecutableProcess<V,U>, U extends ItemProcess<V>,V extends Serializable, W> extends PersistentRestDataAccess<T> {
private Map<String, ProcessExecutionController<T,U,V,W>> executors = new HashMap<>();
private final Vertx vertx;
private final Startup startup;
public ProcessController(final Class<T> classReference, final Vertx vertx, final Startup startup, final String baseDir) {
super(classReference, vertx, baseDir);
this.vertx = vertx;
this.startup = startup;
}
@Override
public void delete(final String id) {
//TODO work with Future and improve error handling
if (executors.containsKey(id)) {
ProcessExecutionController<T,U,V,W> executor = executors.get(id);
executor.stopProcessing();
executor.close(null);
}
super.delete(id);
}
@Override
protected void update(final String id, final RoutingContext context) {
if (!get(id).getStatus().equals(ProcessStatus.READY)) {
context.fail(405);
return;
}
//TODO work with Future and improve error handling
if (executors.containsKey(id)) {
ProcessExecutionController<T,U,V,W> executor = executors.get(id);
executor.stopProcessing();
executor.close(null);
}
T newData = getDataFromRequest(context);
newData.setId(id);
update(newData);
}
@Override
public void registerRoutes(final Router router, final String baseUrl) {
super.registerRoutes(router, baseUrl);
router.post(baseUrl + "/:id/start").handler(this::startProcessing);
router.get(baseUrl + "/:id/status").handler(this::processStatus);
router.post(baseUrl + "/:id/stop").handler(this::stopProcessing);
router.post(baseUrl + "/:id/clear").handler(this::clearProcessingQueue);
}
abstract protected ProcessExecutionController<T,U,V,W> createNewProcessExecutionController(final T process, final Vertx vertx, final Startup startup, final String id);
private void startProcessing(final RoutingContext context) {
if (context.response().ended()) {
return;
}
final String id = context.pathParam("id");
if (!contains(id)) {
context.fail(404);
return;
}
startOrContinueProcessing(id)
.onSuccess(status -> context.response().end(Json.encodePrettily(status)))
.onFailure(error -> context.fail(500));
}
private Future<QueueProcessingStatus<U>> startOrContinueProcessing(final String id) {
Promise<QueueProcessingStatus<U>> promise = Promise.promise();
if (!executors.containsKey(id)) {
T process = get(id);
process.setStatus(ProcessStatus.RUNNING);
ProcessExecutionController<T,U,V,W> executor = createNewProcessExecutionController(process, vertx, startup, id);
executors.put(id, executor);
executor.deployWorkers()
.onSuccess(res -> promise.complete(executor.startProcessing()))
.onFailure(error -> promise.fail(error));
} else {
ProcessExecutionController<T,U,V,W> executor = executors.get(id);
promise.complete(executor.continueProcessing());
}
return promise.future();
}
private void processStatus(final RoutingContext context) {
if (context.response().ended()) {
return;
}
final String id = context.pathParam("id");
if (!contains(id)) {
context.fail(404);
return;
}
if (!executors.containsKey(id)) {
context.fail(400);
return;
}
ProcessExecutionController<T,U,V,W> executor = executors.get(id);
context.response().end(Json.encodePrettily(executor.processingStatus()));
}
private void stopProcessing(final RoutingContext context) {
if (context.response().ended()) {
return;
}
final String id = context.pathParam("id");
if (!contains(id)) {
context.fail(404);
return;
}
if (!executors.containsKey(id)) {
context.fail(400);
return;
}
ProcessExecutionController<T,U,V,W> executor = executors.get(id);
context.response().end(Json.encodePrettily(executor.stopProcessing()));
}
private void clearProcessingQueue(final RoutingContext context) {
if (context.response().ended()) {
return;
}
final String id = context.pathParam("id");
if (!contains(id)) {
context.fail(404);
return;
}
if (!executors.containsKey(id)) {
context.fail(400);
return;
}
ProcessExecutionController<T,U,V,W> executor = executors.get(id);
context.response().end(Json.encodePrettily(executor.clearQueue()));
}
}

View file

@ -0,0 +1,62 @@
package de.pzzz.vertx.process;
import java.io.Serializable;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import de.pzzz.vertx.Startup;
import de.pzzz.vertx.worker.DeployableWorker;
import de.pzzz.vertx.worker.QueuedWorker;
import io.vertx.core.Closeable;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Promise;
public abstract class ProcessExecutionController<T extends ExecutableProcess<V,U>, U extends ItemProcess<V>, V extends Serializable, W>
extends QueuedWorker<U,W> implements Closeable {
private static final Logger LOG = Logger.getLogger(ProcessExecutionController.class.getName());
private final T process;
private final Startup startup;
public ProcessExecutionController(final T process, final Startup startup) {
super(process.getParallelRequests());
this.process = process;
this.startup = startup;
}
public Future<Void> deployWorkers() {
Promise<Void> promise = Promise.promise();
CompositeFuture.all(getWorker().stream().map(controller -> (Future) controller.deployWorkers(startup)).toList())
.onSuccess(res -> promise.complete())
.onFailure(error -> {
LOG.log(Level.SEVERE, error.getMessage(), error);
promise.fail(error);
});
return promise.future();
}
@Override
public void close(final Promise<Void> completion) {
CompositeFuture.all(getWorker().stream().map(worker -> (Future) worker.undeployWorkers()).toList())
.onSuccess(res -> completion.complete())
.onFailure(error -> {
LOG.log(Level.SEVERE, error.getMessage(), error);
completion.fail(error);
});
}
protected abstract List<DeployableWorker> getWorker();
protected T getProcess() {
return process;
}
protected void handleJobFailure(final U job, final Throwable error, final Promise<?> promise) {
LOG.log(Level.WARNING, "Failed job for " + job.getItem() + " with error " + error.getMessage(), error);
job.setErrorState();
job.addMessage(error.getMessage());
promise.fail(error);
}
}

View file

@ -0,0 +1,31 @@
package de.pzzz.vertx.process;
import java.io.Serializable;
import java.util.List;
public class ProcessRequest<T> implements Serializable {
private static final long serialVersionUID = 4946708367771253605L;
private String name;
private List<T> processingItems;
private int parallelRequests = 1;
public String getName() {
return name;
}
public void setName(final String name) {
this.name = name;
}
public List<T> getProcessingItems() {
return processingItems;
}
public void setProcessingItems(final List<T> processingItems) {
this.processingItems = processingItems;
}
public int getParallelRequests() {
return parallelRequests;
}
public void setParallelRequests(final int parallelRequests) {
this.parallelRequests = parallelRequests;
}
}

View file

@ -0,0 +1,5 @@
package de.pzzz.vertx.process;
public enum ProcessStatus {
READY, RUNNING, COMPLETED, ERRORS, FAILED;
}

View file

@ -0,0 +1,9 @@
package de.pzzz.vertx.worker;
import de.pzzz.vertx.Startup;
import io.vertx.core.Future;
public interface DeployableWorker {
Future<Startup> deployWorkers(final Startup startup);
Future<Void> undeployWorkers();
}

View file

@ -0,0 +1,38 @@
package de.pzzz.vertx.worker;
import java.nio.file.Paths;
import io.vertx.core.Vertx;
public abstract class FileSaveController<T> extends WorkerController<T,SaveFile> {
private final String parentJobId;
private final String fileExtension;
public FileSaveController(final int maxWorkers, final Vertx vertx, final String parentJobId, final String fileExtension) {
super(maxWorkers, vertx, null);
this.parentJobId = parentJobId;
if (!fileExtension.startsWith(".")) {
this.fileExtension = "." + fileExtension;
} else {
this.fileExtension = fileExtension;
}
}
protected abstract byte[] getContent(final T saveObject);
protected abstract String getFilename(final T saveObject);
@Override
protected Class<? extends WorkerVerticle<SaveFile>> workerVerticleClass() {
return FileSaveVerticle.class;
}
@Override
protected SaveFile getRequest(final T saveObject) {
String fileName = Paths.get(parentJobId, getFilename(saveObject) + fileExtension).toString();
SaveFile saveTarget = new SaveFile();
saveTarget.setName(fileName);
saveTarget.setContent(getContent(saveObject));
return saveTarget;
}
}

View file

@ -0,0 +1,45 @@
package de.pzzz.vertx.worker;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.logging.Level;
import java.util.logging.Logger;
import de.pzzz.vertx.Startup;
import io.vertx.core.Future;
import io.vertx.core.eventbus.Message;
public class FileSaveVerticle extends WorkerVerticle<SaveFile> {
private static final Logger LOG = Logger.getLogger(FileSaveVerticle.class.getName());
private static final String DEFAULT_OUTDIR = "out";
private Path baseDir;
@Override
protected Future<Startup> doSetup(final Startup startup) {
baseDir = Path.of(startup.getConfig().getString("file.basePath"), startup.getConfig().getString("file.out.dir", DEFAULT_OUTDIR));
File baseDirFile = baseDir.toFile();
if (!baseDirFile.isDirectory() || !baseDirFile.canWrite()) {
return Future.failedFuture("Failed to access " + baseDir);
}
return Future.succeededFuture(startup);
}
@Override
protected void handleMessage(final Message<SaveFile> message) {
Path targetFile = Path.of(baseDir.toString(), message.body().getName());
LOG.info("Saving file " + targetFile);
try {
Files.createDirectories(targetFile.getParent());
Files.write(targetFile, message.body().getContent());
} catch (IOException e) {
LOG.log(Level.WARNING, e.getMessage(), e);;
message.fail(500, e.getMessage());
return;
}
message.reply(targetFile.toString());
}
}

View file

@ -0,0 +1,70 @@
package de.pzzz.vertx.worker;
import java.io.Serializable;
import java.util.LinkedList;
import java.util.Queue;
public class QueueProcessingStatus<T> implements Serializable {
private static final long serialVersionUID = 7560765077464782742L;
private transient Queue<T> requestsToProcess = new LinkedList<>();
private boolean calculate = false;
private int runningCalculations = 0;
public T startProcessing() {
if (requestsToProcess.isEmpty()) {
throw new IllegalStateException("No requests to process!");
}
calculate = true;
runningCalculations = 1;
return requestsToProcess.poll();
}
public void stopProcessing() {
calculate = false;
}
public boolean hasNext() {
return !requestsToProcess.isEmpty();
}
public T processNext() {
runningCalculations += 1;
return requestsToProcess.poll();
}
public void complete() {
runningCalculations -= 1;
if (runningCalculations == 0 && requestsToProcess.isEmpty()) {
calculate = false;
}
}
public void enqueue(final T request) {
requestsToProcess.add(request);
}
public void clear() {
requestsToProcess.clear();
}
public boolean isCalculate() {
return calculate;
}
public void setCalculate(final boolean calculate) {
this.calculate = calculate;
}
public int getRunningCalculations() {
return runningCalculations;
}
public void setRunningCalculations(final int running) {
this.runningCalculations = running;
}
public int getQueueLength() {
return requestsToProcess.size();
}
}

View file

@ -0,0 +1,80 @@
package de.pzzz.vertx.worker;
import java.util.stream.Stream;
import io.vertx.core.Future;
public abstract class QueuedWorker<T,U> {
private QueueProcessingStatus<T> queue = new QueueProcessingStatus<>();
private int maxWorkers;
protected QueuedWorker(int maxWorkers) {
this.maxWorkers = maxWorkers;
}
public QueueProcessingStatus<T> startProcessing() {
getInputs().forEach(queue::enqueue);
startInputProcessing();
return queue;
}
public QueueProcessingStatus<T> continueProcessing() {
queue.setCalculate(true);
return queue;
}
public QueueProcessingStatus<T> stopProcessing() {
queue.setCalculate(false);
return queue;
}
public QueueProcessingStatus<T> processingStatus() {
return queue;
}
public QueueProcessingStatus<T> clearQueue() {
queue.clear();
return queue;
}
protected abstract Stream<T> getInputs();
protected abstract Future<U> doJob(final T job);
protected void beforeJob(final T job) {}
protected void afterSuccessfulJob(final T job, final U response) {}
protected void afterFailedJob(final T job, final Throwable error) {}
private void startInputProcessing() {
if (!queue.hasNext()) {
return;
}
T job = queue.startProcessing();
executeJob(job);
for (int i = 0; i < maxWorkers; i++) {
startNextJob();
}
}
private void startNextJob() {
if (!queue.isCalculate() || !queue.hasNext() || queue.getRunningCalculations() >= maxWorkers) {
return;
}
executeJob(queue.processNext());
}
private void executeJob(final T job) {
beforeJob(job);
doJob(job).onSuccess(response -> {
queue.complete();
afterSuccessfulJob(job, response);
startNextJob();
}).onFailure(error -> {
queue.complete();
afterFailedJob(job, error);
startNextJob();
});
}
}

View file

@ -0,0 +1,26 @@
package de.pzzz.vertx.worker;
import java.io.Serializable;
public class SaveFile implements Serializable {
private static final long serialVersionUID = 1189414819710044802L;
private String name;
private byte[] content;
public String getName() {
return name;
}
public void setName(final String name) {
this.name = name;
}
public byte[] getContent() {
return content;
}
public void setContent(final byte[] content) {
this.content = content;
}
}

View file

@ -0,0 +1,86 @@
package de.pzzz.vertx.worker;
import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.Logger;
import de.pzzz.vertx.Startup;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.json.JsonObject;
public abstract class WorkerController<T, U> implements DeployableWorker {
private static final Logger LOG = Logger.getLogger(WorkerController.class.getName());
public static final String WORKER_BUS_ADDRESS_CONFIG = "worker.bus.address";
private final String uuid = UUID.randomUUID().toString();
protected final int maxWorkers;
private String deploymentId;
private final Vertx vertx;
private final DeliveryOptions deliveryOptions = new DeliveryOptions();
protected WorkerController(final int maxWorkers, final Vertx vertx, final Long timeout) {
this.maxWorkers = maxWorkers;
this.vertx = vertx;
if (null != timeout) {
deliveryOptions.setSendTimeout(timeout);
}
}
public Future<Startup> deployWorkers(final Startup startup) {
if (null != deploymentId) {
throw new IllegalStateException("Verticle already running!");
}
Promise<Startup> promise = Promise.promise();
JsonObject config = startup.getConfig();
config.put(WORKER_BUS_ADDRESS_CONFIG, busAddress());
LOG.info("Deploy " + maxWorkers + " workers for bus " + busAddress());
DeploymentOptions workerOpts = new DeploymentOptions()
.setWorker(true)
.setConfig(startup.getConfig())
.setWorkerPoolName(busAddress())
.setInstances(maxWorkers)
.setWorkerPoolSize(maxWorkers);
vertx.deployVerticle(workerVerticleClass(), workerOpts)
.onSuccess(res -> {
deploymentId = res;
promise.complete(startup);
})
.onFailure(promise::fail);
return promise.future();
}
public Future<Void> undeployWorkers() {
if (null == deploymentId) {
throw new IllegalStateException("Verticle is not running!");
}
Promise<Void> promise = Promise.promise();
vertx.undeploy(deploymentId)
.onSuccess(promise::complete)
.onFailure(promise::fail);
return promise.future();
}
public Future<Object> doJob(final T job) {
Promise<Object> result = Promise.promise();
vertx.eventBus()
.request(busAddress(), getRequest(job), deliveryOptions)
.onSuccess(res -> result.complete(res.body()))
.onFailure(error -> {
LOG.log(Level.WARNING, error.getMessage(), error);
result.fail(error);
});
return result.future();
}
protected abstract Class<? extends WorkerVerticle<U>> workerVerticleClass();
protected String busAddress() {
return workerVerticleClass().getName() + "." + uuid;
}
protected abstract U getRequest(final T job);
}

View file

@ -0,0 +1,44 @@
package de.pzzz.vertx.worker;
import java.util.logging.Logger;
import de.pzzz.vertx.Startup;
import io.vertx.config.ConfigRetriever;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
public abstract class WorkerVerticle<T> extends AbstractVerticle {
private static final Logger LOG = Logger.getLogger(WorkerVerticle.class.getName());
@Override
public void start(final Promise<Void> startPromise) throws Exception {
long startTime = System.currentTimeMillis();
initConfig().compose(this::doSetup)
.compose(this::setupConsumer)
.onComplete(result -> LOG.info(() -> "Started " + this.getClass().getName() + " in " + (System.currentTimeMillis() - startTime) + "ms"))
.onComplete(startPromise);
}
protected abstract Future<Startup> doSetup(final Startup startup);
protected abstract void handleMessage(final Message<T> message);
private Future<Startup> initConfig() {
ConfigRetriever configRetriever = ConfigRetriever.create(vertx);
configRetriever.getConfig();
return configRetriever.getConfig().map(Startup::new);
}
private Future<Void> setupConsumer(final Startup startup) {
LOG.info("Setup consumer for bus address " + startup.getConfig().getString(WorkerController.WORKER_BUS_ADDRESS_CONFIG));
MessageConsumer<T> consumer = vertx.eventBus()
.consumer(startup.getConfig().getString(WorkerController.WORKER_BUS_ADDRESS_CONFIG));
consumer.handler(this::handleMessage);
return Future.succeededFuture();
}
}