Aggregating Results Across Sequential RxJS Requests
Something in RxJS that comes up often is the question of how to best aggregate the results of a number of sequential requests - the out-of-box solution feels more complicated than it should be for something that is a fairly basic requirement.
In this article we will show some options for how to achieve this using standard RxJS. This will explain our motivation for implementing a utility function (concatJoin) to make the intention of the code more explicit by hiding the mechanics of maintaining the aggregate result. This function is available in the npm package rxjs-concat-join.
To illustrate the problem, let’s start with a simple sequence of 3 requests (a “request” being an observable that just emits 1 value and completes such as an http request). Here is a basic implementation:
return request1.pipe(
mergeMap(result1=>request2(result1)),
mergeMap(result2=>request3(result2)),
);
Note that each request is based on the results of the previous request, which is typically why we do them in sequence rather than in parallel. There are other reasons though why requests that could otherwise be in parallel might be done in sequence, such as reducing server load, progress reporting and ability for the user to cancel, or because of underlying dependencies (eg for related database updates).
Note also the use of mergeMap. One question that arises with this pattern is what form of xMap to use? This is discussed in detail in a later section, but the short answer is that the question is spurious: it makes no difference which is used. This is illustrated by the fact we can use concat instead (if we aren’t interested in the interim values) without having to choose between different xMap types:concat(request1, request2, request3).pipe(last());
The last() at the end is required because concat emits every value emitted by the inner observables. In fact, this pattern without the last() is interesting if we do want to emit a value as each request completes, such as when tracking progress.
And if we only want to gather the results at the end, the concat pattern can be used with toArray instead of last():
concat(request1, request2, request3).pipe(toArray());
So far that’s all fairly straightforward. Now we come to the specific challenge: how would we change this so that any request in the sequence can use, as input, the result of any of the previous requests?
In essence this challenge boils down to aggregating the results as we go down the pipe, so that the results of all previous requests are available at any point, and the aggregate of all results is output from the sequence as a whole.
Using Standard RxJS
To utilise the interim results using out-of-the-box RxJS there are 3 basic patterns that can be used:
- Nested requests
- Local variables
- Aggregate per request
Nested Requests
Nesting the requests is undoubtedly the simplest approach. Results of previous requests are kept in scope by nesting subsequent requests. A typical pattern would be something like:
request1.pipe(
mergeMap(result1=>request2(result1)).pipe(
mergeMap(result2=>request3(result1, result2)).pipe(
map(result3 => [result1, result2, result3]),
)
)
);
The reason this solution doesn’t feel quite right is that the nested structure does not reflect the linear nature of the requests. And wasn’t nesting something that Promises were brought in to get rid of?
Local variables
Local variables can be used to save whatever interim values are desired. The only thing to be wary of is scoping. If it possible to have multiple instances of the observable in play at the same time then each instance need its own set of local variables, which is solved using the defer:
defer( () => {
const result1, result2;
return request1.pipe(
tap(result => result1 = result),
mergeMap(()=>request2(result1)).pipe(
tap(result => results2 = result)),
mergeMap(()=>request3(result1, result2)),
map(result3 => [result1, result2, result3])),
);
});
Exactly the same can be achieved using the concat pattern:
defer( () => {
const result1, result2, result3;
return concat(
request1.pipe(tap(result => result1=result)),
request2(result1).pipe(tap(result => result2=result)),
request3(result1, result2).pipe(tap(result => results3=result)),
of([result1, result2, result3]),
).pipe(last());
});
The problem here is the amount of boilerplate. It is significant enough to generally avoid the approach. Another issue is the use of tap, which feels very non-functional.
Aggregate per request
We can solve the problem of using tap by passing the aggregated set of results-so-far down the pipe, so that each request modifies and passes the aggregate value on. However, this requires that the results-so-far are collected into a single compound value, eg an array. We can use destructuring to simplify access back to the individual values, but because we also need the whole result-so-far within each mergeMap so that we can append to it, destructuring requires an extra assignment to achieve. The following example shows a couple of different possible approaches to dereferencing:
request1.pipe(
map(result1 => [result1]);
mergeMap(results => {
const [result1] = results;
return request2(result1);
}).pipe(
map(result2 => [...results, result2])
)
mergeMap(results=>request3(results[0], results[1]).pipe(
map(result3 => [...results, result3])
)
);
This feels like a better code structure, but the dereferencing issues cause a lot more boilerplate.
Which Type of “Map”?
The patterns above all use mergeMap as a default, but what happens in places where we would naturally use another kind of map: switchMap, concatMap or exhaustMap? It takes a little bit of thinking about, so it is worth going through the reasoning process.
The different map functions only act differently when there are at least 2 events flowing through the stream, at the same time. To process them in parallel we use mergeMap. The others all ensure that only one event will be processed at a time: concatMap processes every event in turn, switchMap cancels the old in favour of the new, and exhaustMap throws away the new in favour of the old.
The critical thing to observe is that our code above creates a new observable every time it is invoked. Each stream has only ever 1 event flowing through it. It therefore makes no difference which type of map we use. The mapping functions only affect observables in the same stream, not calls to the same operator across multiple streams.
We can prove this reasoning with a marble test where the same function is used to create a series of 2 requests using concatMap. If we invoke it twice (in this case with the 2nd invocation offset by 1 frame), we can see that two streams end immediately after each other. The concatMap is therefore not holding off the second subscription to e2.
const e1 = cold('----a|');
const e2 = cold('----b|');
const expected = '--------bb|'; // 2 merged sequences, the 2nd offset by 1
const obsFactory = () => e1.pipe(concatMap(()=>e2));
expectObservable( merge(obsFactory(), obsFactory().pipe(delay(1)))).toBe(expected);
This can be somewhat counterintuitive, and for me is another major reason for hiding the mechanics of the sequence of requests in utility functions, avoiding having to make a spurious decision about which mapping function to use, and providing a misguided sense that there is a control mechanism in place that actually isn’t.
Of course we can still make an explicit decision about how to manage multiple invocations of the whole sequence, for example the following ensures that if a new source value arrives then the in-progress sequence will be cancelled and a new one started:
source.pipe(
switchMap(() => concat(request1, request2, request3).pipe(last()),
);
Simplification Using a Utility Function
As is probably clear by now, we personally find all of the above approaches somewhat unsatisfactory. The real code (i.e. issuing requests) is hidden amongst a lot of boilerplate, so the essential meaning of the code is lost. And we are forced to consider issues such as which type of map to use when in reality it makes no difference.
What we are fundamentally trying to do here is already covered by a combination of existing RxJS concepts:
forkJoin - which emits an aggregate value based on a set of requests (but executed in parallel)
concat - which issues a set of requests in sequence
pipe - which provides the output of one function as input to the next
We can therefore think of the utility function as a concatJoin, which takes a list of either an observable (like concat) or a factory function that takes an input from the previous observable (like pipe), and at each step outputs the aggregation as an array (like forkJoin). With this function our earlier example would now be expressed as:
concatJoin(
request1,
([result1]) => request2(result1),
([result1, result2]) => request3(result1, result2),
); // emits [result1, result2, result3]
Or if we weren’t interested in the interim results:
concatJoin(
request1,
request2,
request3
); // emits [result1, result2, result3]
We could also define an equivalent of the version of forkJoin that takes and outputs an object, so that we are not dependent on the position of requests in the sequence. However, in our case order is important, and order is not guaranteed for properties of an object. So instead we pass a list of objects and merge the results:
concatJoin(
{result1: request1},
{result2: ({result1}) => request2(result1)},
{result3: ({result1, result2}) => request3(result1, result2)},
); // emits {result1: result1, result2: result2, result3: result3}
Conclusion
For simple scenarios with 2, or possibly 3, requests, we used to happily use the nested approach. Any more than 3 though and we would start feeling very uncomfortable. It would come up in code reviews and none of the devs were happy with the solution. This utility function is something I have always felt was needed, but we have now had the time to properly implement it, with full type inference, and to make it publicly available. We hope that some of you find it as useful as we do.