Using Redis with Mutiny - Composing asynchronous actions
I got an interesting question from a user about Redis and Mutiny. While the problem was not specifically about Redis and could be applied to many other APIs, I found the context amusing.
Enrico, the user, wanted to do something like this:
1. get all keys from Redis
2. for each key -> retrieve the associated object
3. add this object to a JsonArray
4. produce the JsonArray with all the objects
Enrico is using the Mutiny variant of the Vert.x Redis Client.
This client offers a few methods to help us with our problem:
-
the
RedisClient.keys(pattern)
method returningUni<JsonArray>
. This array contains the list of keys matching a pattern passed to thekeys
method. To simplify this post, let’s use:keys("*")
returning all the keys. -
the
RedisClient.hgetall(key)
method returning aUni<JsonObject>
. This method retrieves the object associated with the passed key.
Both methods are asynchronous (they return Uni
), and we need to call the second one for each retrieved key. In other words, we need to iterate over the set of keys, and for each key to invoke an asynchronous action. Finally, we want to collect the result of these asynchronous actions into a JsonArray
.
Let’s start with the beginning; we need the Redis Client instance:
RedisClient redis = RedisClient.create(vertx, new JsonObject()
.put("port", 6379)
.put("host", "localhost"));
Note that in Quarkus, you should use the Redis extension directly, which exposes a similar API. Enrico wanted to use the Vert.x Redis Client directly.
Now that we have our client, let’s retrieve the list of keys:
Uni<JsonArray> keys = redis.keys("*")
That produces the JsonArray
, but we want a stream of keys. Again, it’s an asynchronous method. The returned Uni
receives the array when it’s available. Once received (onItem
), we can create a stream out of this array:
Multi<String> keys = redis.keys("*")
.onItem().transformToMulti(array -> Multi.createFrom().iterable(array))
.onItem().castTo(String.class);
This snippet:
-
retrieves the
JsonArray
containing the keys -
creates a
Multi
streaming these keys, it’s aMulti<Object>
as aJsonArray
is extendingIterable<Object>
-
maps the items from this
Multi
toString
At this point, we have a stream of (String) keys. So, we are done with step 1.
Now, step 2: for each key, we want to retrieve the associated object.
So let’s use the hgetall
method:
Multi<JsonObject> objects = keys
.onItem().transformToUniAndMerge(key -> redis.hgetall(key));
This snippet requires a bit of an explanation.
For each item of the stream keys
, we call hgetall
, which produces a Uni<JsonObject>
.
So, we want to transform our key into a Uni (transformToUni).
When you have a stream of items and need to invoke an asynchronous action for each item, you must choose how you will merge the results. Mutiny provides two strategies:
-
merge - as soon as the item produced by the
Uni
is received we send it downstream -
concatenate - we preserve the order of the input stream to be sure that the items are sent downstream in the same order
Let’s illustrate this. Imagine we have the keys 1
, 2
, 3
and to the stream {1, 2, 3}
. Also, let’s consider that in our Redis database, the key 1
is associated to A
, 2
to B
and 3
to C
.
If you use the merge strategy, we are retrieving the associated objects in an undetermined order. We can end up with {A, C, B}
or {B, A, C}
. It depends on many factors, such as the latency, scheduling, load and so on. However, it also means we can retrieve all the associated objects concurrently and produce the resulting stream without taking care of the order.
If you use the concatenate strategy, it preserves the order from the input stream. So, it will always produce {A, B, C}
. While it may be desirable, it may reduce the ability to retrieve the object concurrently, as Mutiny has to wait for all the retrieval of all the previous objects. For example, if Mutiny receives C
first, it needs to wait for A
and B
before sending C
downstream.
In our context, let’s not preserve the order and use the merge strategy. So we use transformToUniAndMerge
.
If you run the code multiple times, you might see order changes in the resulting array.
Ok, step 2 done. Let’s focus on the final steps: accumulate the objects into a JsonArray
, and produce a Uni<JsonArray>
, containing all the objects. Mutiny provides methods to gather items from a stream into lists, maps, sets, but there is no built-in JsonArray
support. Fortunately, Mutiny offers a method that you can use to collect items in any structure:
Uni<JsonArray> result = objects
.collectItems().in(() -> new JsonArray(), (arr, obj) -> arr.add(obj));
collectItems().in
allows accumulating the items in your own structure. It takes two parameters: a supplier of the structure, called only once, and a bi-consumer taking the structure and the item to add, called for each item.
Here we go, we have everything to solve Enrico’s question.
The all in one code is the following:
Uni<JsonArray> result =
// Step 1 - retrieve the keys
redis.keys("*")
.onItem().transformToMulti(keys -> Multi.createFrom().iterable(keys))
.onItem().castTo(String.class)
// Step 2 - retrieve the associated object for each key
.onItem().transformToUniAndMerge(key -> redis.hgetall(key))
// Step 3 and 4 - accumulate the retrieved object in a JsonArray
.collectItems().in(() -> new JsonArray(), (arr, obj) -> arr.add(obj));
In this snippet, there are a few interesting patterns:
-
When you have a collection, and you want to iterate on it with Mutiny, transform it into a
Multi
-
When you execute asynchronous action for each item of a stream, think about merge vs. concatenate. Use the one that makes sense for you.
-
To accumulate items into a structure, use
collectItems
, it offers many methods to produce your structure of choice.
If you want to see this code in action, check this gist. You even can run it directly with JBang:
jbang https://gist.github.com/cescoffier/e8c8a18897f9e5ca15f1378876a1bd93
You can replace merge with concatenate to see the difference.
Enjoy!