Declarative style in reactive programming

My team has adopted reactive programming in our new project. We are using Spring WebFlux to serve data over REST call. The whole team is new on this programming style. The most essential concept I find myself struggling with is the declarative style of reactive programming

The following made-up example in Kotlin below show typical pattern we see in reactive programming. The activateMember method will chain together method call and reactor operator (map, flatMap, onErrorReturn) and finally return a Mono or Flux of something.

fun main(args: Array<String>) {
    activateMember("fake-id-11").subscribe(::println)
}
 
fun activateMember(userId: String): Mono<OperationResult> {
    return getUserInfoFromServer(userId)
        .flatMap { user -> saveUserStatus(user, UserStatus.ENABLED) }
        .map { isSuccess -> if (isSuccess) SUCCESS else ERROR }
        .onErrorReturn(ERROR)
}
 
fun saveUserStatus(user: User, status: UserStatus): Mono<Boolean> {
    //Fake implementation
    return Mono.fromCallable {
        Thread.sleep(5000)
        println("user status is saved")
    }.thenReturn(true)
}

This style is not too unfamiliar for me since it is quite the same with Java stream. That chain of execution is just declaring what will be done but don’t actually execute anything yet. Noting happen until somebody subscribe it (the Mono or Flux returned from this method).

That looks like easy enough to remember. However, composing execution in reactive programming could be a lot more tricky. Sometimes I found myself got confused and took sometime to figure out why a simple method call didn’t work as I expected.

Let’s say we decide to change the implementation of saveUserStatus() to not return any data. We could express this in reactive way by returning Mono<Void>. This method could either return Mono.empty() to say things work fine and the saving is success or return Mono with error signal.

fun saveUserStatus(user: User, status: UserStatus): Mono<Void> {
    return Mono.fromRunnable {
        Thread.sleep(5000)
        println("user status is saved")
    }
}

Let’s add another new requirement that activateMember() also need to do some choosing based on user status. The activateMember() now doesn’t need to use flatMap()since we don’t expect to get anything from calling saveUserStatus(). We can just use doOnNext() to say we want to trigger some action but don’t want to get any result from that action (one great use of doOnNext() is logging purpose).

fun activateMember(userId: String): Mono<OperationResult> {
    return getUserInfoFromServer(userId)
        .doOnNext { user ->
            when (user.status) {
                UserStatus.ENABLED   -> Mono.empty()
                UserStatus.DISABLED  -> saveUserStatus(user, UserStatus.ENABLED)
                UserStatus.BLACKLIST -> throw IllegalArgumentException("User $user is blacklisted")
            }
        }
        .thenReturn(OperationResult.SUCCESS)
        .onErrorReturn(OperationResult.ERROR)
}

The problem is that when we run the program with user with UserStatus.DISABLED status, we don’t see “user status is saved”.

The “when” expression in Kotlin is evaluated and return a value (in this case it return Mono<Void> from saveUserStatus()). The when expression is also the last expression of this whole bracket block so it will also be the return value of this lambda. Every seems like OK but somehow the code in side saveUserStatus() is not executed.

We forgot that doOnNext() is used to just trigger something but don’t make use of anything returned from the action. We will see it clearer in the method signature that it accept a Consumer.

/**
 * Add behavior triggered when the {@link Mono} emits a data successfully.
public final Mono<T> doOnNext(Consumer<? super T> onNext) {}

This means that the Mono returned from saveUserStatus() doesn’t get attached to the main execution chain of the activateMember() method. If it is not get attached to the chain then it is not get subscribed when the chain of activateMember() is subscribed. So the logic in saving user status is not executed.

Now is the point you can see the different in reactive programming style. The saveUserStatus method is called. But is call is not for actual execution. This call is for wiring chain of execution which will be executed sometime in the future when somebody subscribe it. All these method and operator we call is just to declare a chain of execution but not actually execute the action.

Let’s try to fix it by replacing doOnNext by flatMap. Now we are sure that the thing returned from lambda block will get attached to the main execution chain. Can you guess whether the code below will solve our problem?

fun activateMember(userId: String): Mono<OperationResult> {
    return getUserInfoFromServer(userId)
        .flatMap { user ->
            when (user.status) {
                UserStatus.ENABLED   -> Mono.empty()
                UserStatus.DISABLED  -> saveUserStatus(user, UserStatus.ENABLED)
                UserStatus.BLACKLIST -> throw IllegalArgumentException("User $user is blacklisted")
            }
            //If the above 'when' get error that it will not reach here
            Mono.just(SUCCESS)
        }
        .onErrorReturn(ERROR)
}

The above code still not make it work. There are two thing in the problematic lambda block. One is the Mono<Void> returned from saveUserStatus() another is Mono.just(SUCCESS). The bracket block return Mono.just(SUCCESS) to flatMap to join the main execution chain. But the Mono.just(SUCCESS) doesn’t connect to Mono<Void> from saveUserStatus() so our target code still not run

We can fix it by making sure that all the executions we want are connected together as we expected when the chain is subscribed.

fun activateMember(userId: String): Mono<OperationResult> {
    return getUserInfoFromServer(userId)
        .flatMap { user ->
            when (user.status) {
                UserStatus.ENABLED   -> Mono.empty()
                UserStatus.DISABLED  -> saveUserStatus(user, UserStatus.ENABLED)
                UserStatus.BLACKLIST -> throw IllegalArgumentException("User $user is blacklisted")
 
            }.thenReturn(SUCCESS)
        }
        .onErrorReturn(ERROR)
}

The declarative style of reactive programming looks quite easy enough on pater. But it need some getting used-to apply it correctly