smallrye - reactive messaging - handling backpressure

109 views
Skip to first unread message

Vincenzo D'Amore

unread,
Jul 18, 2022, 5:13:45 PM7/18/22
to SmallRye
Hi,

I've written a sample application with reactive messaging and mutiny.
Now I have few problems with the back-pressure, well, trying to understand how to handle it.

In my sample I have 3 classes: Producer -> Processor -> Consumer
Producer generate data very fast, let's say every 10ms and writes the data to an Emitter.

    private AtomicInteger counter = new AtomicInteger();

    @Inject @Channel("from-producer-to-processor")
    Emitter<ClassA> emitter;
    public void periodicallySendMessage() {
        Runnable runnable = () -> {
            ClassA message = new ClassA("Hello " + counter.getAndIncrement());
            while (true) {
                try {
                    if (emitter.hasRequests()) {
                        emitter.send(message);
                        break;
                    }
                } catch (Exception e) {
                    log.warnf("Handling... %s", e.getMessage());
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException ex) {
                        throw new RuntimeException(ex);
                    }
                }
            }
        };

        Executors.newSingleThreadScheduledExecutor()
                .scheduleAtFixedRate(runnable, 1, 10, TimeUnit.MILLISECONDS);
    }

From the other side the processor has a convert function 

Unchecked.function(i -> this.convert(i))

that often rises an exception. It has a really low success rate, only 1/7

    @Incoming("from-producer-to-processor")
    @Outgoing("from-processor-to-consumer")
    public Multi<List<ClassB>> consumeMulti2Multi(Multi<ClassA> stream) {
        return stream
                .group()
                .intoLists()
                .of(100)
                .onItem()
                .transform(Unchecked.function(i -> this.convert(i)))
                .onFailure()
                .retry()
                .withBackOff(this.initialBackOff).withJitter(.4)
                .atMost(this.maxRetry);
    }

When the exception is raised,  I don't see any other exception 
I assumed that a group is processed only and only if, a reasonable number of retries were done and within the limit configured with

                .withBackOff(this.initialBackOff).withJitter(.4)
                .atMost(this.maxRetry);

But instead, if there is an exception while a group is processed that group seems to be discarded and the transform(Unchecked.function(i -> this.convert(i)))
is never called again.
In other words, it seems that when there is an exception the convert function is not retried again.
Am I doing something wrong? Any help or suggestion appreciated.

Best,
Vincenzo

Vincenzo D'Amore

unread,
Jul 19, 2022, 2:48:59 AM7/19/22
to SmallRye
I think I found a solution to my problem. Using Uni.createFrom().deferred() will pospone the

    @Incoming("from-producer-to-processor")
    @Outgoing("from-processor-to-consumer")
    public Multi<List<ClassB>> consumeMulti2Multi(Multi<ClassA> stream) {
        return stream
                .group()
                .intoLists()
                .of(100)
                .flatMap(l -> Multi
                            .createFrom()
                            .deferred(() -> Multi.createFrom().item(this.convert(l)))
                            .onFailure()
                            .retry()
                            .withBackOff(this.initialBackOff)
                            .withJitter(.4)
                            .atMost(this.maxRetry)
                );
    }

Basically I realized that when using transform the result is somehow cached and it is not "retried" again, on the other hand deferred will postpone the creation of the Multi and this let the retry() do the job.
Am I right? And if yes, when the retry in this piece of code can affect the transform successfully?

                .transform(Unchecked.function(i -> this.convert(i)))
                .onFailure()
                .retry()
                .withBackOff(this.initialBackOff).withJitter(.4)
                .atMost(this.maxRetry);

clement escoffier

unread,
Jul 21, 2022, 10:17:10 AM7/21/22
to SmallRye, Julien Ponge
Hello,

(CC-ing Julien Ponge).

Yes, Uni and Multi created from known values are cached and never re-computed. Using `deferred` changes this behavior as the value is recomputed at subscription time. In your case, each retry re-subscribe which recomputes the value. 

Also, about the very fast producer. Reactive streams back pressure works when the pace of the producer can be controlled. It's, obviously, not always the case. For example, time cannot be slowed down.  In that case, you need to buffer or drop. This is done using the `.onOverflow()` group. Recently, Julien added two new operators that give you more control over the demand (https://smallrye.io/smallrye-mutiny/1.6.0/guides/controlling-demand/). It may not work in this case (as the producer is emitting), but if you want to implement a rate limiter, it's what you would use. 

Clement

--
You received this message because you are subscribed to the Google Groups "SmallRye" group.
To unsubscribe from this group and stop receiving emails from it, send an email to [email protected].
To view this discussion on the web visit https://groups.google.com/d/msgid/smallrye/c15d6451-2c7e-451d-965a-26f60387bed1n%40googlegroups.com.
Reply all
Reply to author
Forward
0 new messages