Hi,
I've written a sample application with reactive messaging and mutiny.
Now I have few problems with the back-pressure, well, trying to understand how to handle it.
In my sample I have 3 classes: Producer -> Processor -> Consumer
Producer generate data very fast, let's say every 10ms and writes the data to an Emitter.
private AtomicInteger counter = new AtomicInteger();
@Inject @Channel("from-producer-to-processor")
Emitter<ClassA> emitter;
public void periodicallySendMessage() {
Runnable runnable = () -> {
ClassA message = new ClassA("Hello " + counter.getAndIncrement());
while (true) {
try {
if (emitter.hasRequests()) {
emitter.send(message);
break;
}
} catch (Exception e) {
log.warnf("Handling... %s", e.getMessage());
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}
};
Executors.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(runnable, 1, 10, TimeUnit.MILLISECONDS);
}
From the other side the processor has a convert function
Unchecked.function(i -> this.convert(i))
that often rises an exception. It has a really low success rate, only 1/7
@Incoming("from-producer-to-processor")
@Outgoing("from-processor-to-consumer")
public Multi<List<ClassB>> consumeMulti2Multi(Multi<ClassA> stream) {
return stream
.group()
.intoLists()
.of(100)
.onItem()
.transform(Unchecked.function(i -> this.convert(i)))
.onFailure()
.retry()
.withBackOff(this.initialBackOff).withJitter(.4)
.atMost(this.maxRetry);
}
When the exception is raised, I don't see any other exception
I assumed that a group is processed only and only if, a reasonable number of retries were done and within the limit configured with
.withBackOff(this.initialBackOff).withJitter(.4)
.atMost(this.maxRetry);
But instead, if there is an exception while a group is processed that group seems to be discarded and the transform(Unchecked.function(i -> this.convert(i)))
is never called again.
In other words, it seems that when there is an exception the convert function is not retried again.
Am I doing something wrong? Any help or suggestion appreciated.
Best,
Vincenzo