Table of Contents#
- What is Bolts Task.whenAll()?
- RxJava Alternatives to Task.whenAll()
- Detailed Implementation Examples
- Handling Errors
- Advanced Scenarios
- Conclusion
- References
What is Bolts Task.whenAll()?#
Before diving into RxJava, let’s briefly recap how Task.whenAll() works in Bolts. The Bolts Framework simplifies async task management with the Task class, similar to Promise in JavaScript.
Key Behavior of Task.whenAll():#
- Success Case: Returns a new
Task<Void>that completes successfully only if all input tasks complete successfully. - Error Case: If any input task fails, the combined task fails immediately with the error from the first failed task.
- Execution: Tasks run in parallel (unless constrained by threading).
Example: Bolts Task.whenAll() in Action#
Suppose we have three async tasks: saving a user, updating a profile, and logging an event. We want to proceed only when all three finish.
// Define individual tasks
Task<Void> saveUserTask = saveUserToDatabase(user);
Task<Void> updateProfileTask = updateUserProfile(profile);
Task<Void> logEventTask = logAnalyticsEvent("user_updated");
// Combine tasks with whenAll()
Task.whenAll(saveUserTask, updateProfileTask, logEventTask)
.continueWith(task -> {
if (task.isSuccessful()) {
Log.d("App", "All tasks completed successfully!");
} else {
Log.e("App", "Task failed: " + task.getError().getMessage());
}
return null;
});This is straightforward, but RxJava offers more flexibility for complex async workflows. Let’s explore how to replicate (and enhance) this behavior with RxJava.
RxJava Alternatives to Task.whenAll()#
RxJava provides several operators to combine Observables and wait for their completion. The choice depends on whether you need to:
- Simply wait for all to complete (no results).
- Collect results from all Observables.
- Handle fixed vs. dynamic numbers of Observables.
Completable.merge(): The Direct Replacement#
Completable in RxJava represents an asynchronous operation that either completes successfully or errors out (no emissions). Completable.merge() is the closest equivalent to Task.whenAll() because it:
- Takes a list of
Completables. - Completes only when all
Completables complete. - Errors immediately if any
Completableerrors.
Why It’s Ideal:#
- Matches
Task.whenAll()’s core behavior (wait for all, error fast). - Lightweight (no need to handle emissions).
Observable.zip(): Combining Results#
If you need to collect results from all Observables (e.g., fetch data from multiple APIs and merge responses), Observable.zip() is the tool. It:
- Combines emissions from multiple Observables by index (emits a combined result when all sources emit an item at the same index).
- Waits for all Observables to emit at least one item.
- Errors immediately if any source errors.
Limitation:#
Requires each Observable to emit at least one item. If a source completes without emitting, zip will complete without emitting a result.
Observable.forkJoin(): Fixed-Size Parallel Execution#
Observable.forkJoin() is designed for fixed-size lists of Observables (not dynamic/iterable sources). It:
- Waits for all Observables to complete.
- Emits a single combined result using the last emission of each source.
- Errors if any source errors.
Use Case:#
When you have a known number of Observables (e.g., 2–5) and need their final results.
Observable.merge(): Waiting for Emissions + Completion#
Observable.merge() combines emissions from multiple Observables into a single stream, emitting items as they arrive. To wait for all to complete (ignoring emissions), pair it with ignoreElements() to convert to a Completable:
Observable.merge(observable1, observable2, observable3)
.ignoreElements() // Convert to Completable (only care about completion)
.subscribe(...);Behavior:#
- Emits all items from sources (but we ignore them with
ignoreElements()). - Completes when all sources complete.
- Errors immediately if any source errors.
Detailed Implementation Examples#
Let’s walk through practical examples to replace Task.whenAll() with RxJava.
Example 1: Basic Completion with Completable.merge()#
Suppose we have three Completable operations (e.g., saving data, logging). We want to wait for all to finish.
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.disposables.CompositeDisposable;
// Define individual Completables
Completable saveUser = Completable.fromAction(() -> {
// Simulate async save: e.g., database write
Thread.sleep(1000);
System.out.println("User saved");
});
Completable updateProfile = Completable.fromAction(() -> {
Thread.sleep(1500);
System.out.println("Profile updated");
});
Completable logEvent = Completable.fromAction(() -> {
Thread.sleep(500);
System.out.println("Event logged");
});
// Combine with Completable.merge()
Completable combinedTask = Completable.merge(
saveUser,
updateProfile,
logEvent
);
// Subscribe
CompositeDisposable disposables = new CompositeDisposable();
disposables.add(
combinedTask.subscribe(
() -> System.out.println("All tasks completed!"), // On complete
error -> System.err.println("Task failed: " + error.getMessage()) // On error
)
);
// Output (order may vary due to parallel execution):
// Event logged
// User saved
// Profile updated
// All tasks completed!Key Takeaway: Completable.merge() runs all tasks in parallel and completes when all finish.
Example 2: Converting Observables to Completables#
If your async operations are Observables (not Completables) but you only care about completion (not emissions), use ignoreElements() to convert them to Completables.
import io.reactivex.rxjava3.core.Observable;
// Observables that emit items (we don't need the items)
Observable<String> fetchUser = Observable.just("User Data")
.delay(1, TimeUnit.SECONDS);
Observable<Integer> fetchStats = Observable.just(42)
.delay(2, TimeUnit.SECONDS);
// Convert to Completables (ignore emissions)
Completable fetchUserCompletable = fetchUser.ignoreElements();
Completable fetchStatsCompletable = fetchStats.ignoreElements();
// Merge Completables
Completable combined = Completable.merge(fetchUserCompletable, fetchStatsCompletable);
combined.subscribe(
() -> System.out.println("All fetches completed!"),
error -> System.err.println("Fetch failed: " + error)
);Example 3: Collecting Results with Observable.zip()#
If you need results from all Observables (e.g., combine data from two APIs), use Observable.zip().
import io.reactivex.rxjava3.core.Observable;
import java.util.Arrays;
// Observables emitting results
Observable<String> fetchUserName = Observable.just("Alice")
.delay(1, TimeUnit.SECONDS);
Observable<Integer> fetchUserAge = Observable.just(30)
.delay(1, TimeUnit.SECONDS);
// Zip results into a User object
Observable<User> combinedUser = Observable.zip(
fetchUserName,
fetchUserAge,
(name, age) -> new User(name, age) // Combiner function
);
combinedUser.subscribe(
user -> System.out.println("Combined User: " + user), // On next
error -> System.err.println("Error: " + error), // On error
() -> System.out.println("Fetch completed") // On complete
);
// Output:
// Combined User: User{name='Alice', age=30}
// Fetch completedNote: zip waits for both sources to emit before combining. If fetchUserName takes 2s and fetchUserAge takes 1s, zip will emit after 2s.
Handling Errors#
Error handling in RxJava differs slightly from Bolts but is highly customizable.
Immediate Error Propagation#
Like Task.whenAll(), Completable.merge(), zip(), and forkJoin() error immediately if any source errors.
Example:
Completable task1 = Completable.complete();
Completable task2 = Completable.error(new RuntimeException("Oops!"));
Completable.merge(task1, task2)
.subscribe(
() -> System.out.println("All done"), // Never called
error -> System.err.println("Error: " + error.getMessage()) // Prints "Oops!"
);Waiting for All (Even with Errors)#
To wait for all Observables to complete even if some error (e.g., log all failures), catch errors in individual sources using onErrorComplete() or onErrorResumeNext().
Completable task1 = Completable.error(new RuntimeException("Task 1 failed"))
.onErrorComplete(); // Convert error to successful completion
Completable task2 = Completable.complete();
Completable.merge(task1, task2)
.subscribe(
() -> System.out.println("All tasks processed (even with errors)"), // Called
error -> System.err.println("This won't run")
);Advanced Scenarios#
Timeout for Combined Operations#
Add a timeout to the merged operation to fail fast if it takes too long:
Completable.merge(task1, task2, task3)
.timeout(5, TimeUnit.SECONDS) // Fail if not completed in 5s
.subscribe(
() -> System.out.println("Completed on time!"),
error -> System.err.println("Timed out or failed: " + error)
);Sequential vs. Parallel Execution#
- Parallel (default for
merge,zip,forkJoin): All Observables start immediately. - Sequential: Use
Completable.concat()to run Observables one after another (slower but ordered).
// Sequential execution (task2 starts after task1 completes)
Completable.concat(task1, task2, task3)
.subscribe(() -> System.out.println("All done sequentially"));Conclusion#
RxJava offers powerful operators to replace Bolts Task.whenAll(), with more flexibility for handling results, errors, and execution flow:
- Use
Completable.merge()for simple completion (directTask.whenAll()replacement). - Use
Observable.zip()to collect results from dynamic lists of Observables. - Use
Observable.forkJoin()for fixed-size lists needing final results. - Handle errors with
onErrorComplete()oronErrorResumeNext()to wait for all operations.
By leveraging RxJava’s reactive operators, you can build more robust and maintainable async workflows.