Reactive Spring Web API with Couchbase

reactive spring web api

Reactive programming is the new hype, and Java has jumped on the train since Java 9. In this article, we are going to take a look at how we can write a reactive spring web application with Couchbase. We will make a basic reactive spring web API using Mono and Flux, and for the database, we will use the NoSQL database Couchbase which provides a reactive API.

Dependencies

Let’s start by setting up everything that we need. We will deploy Couchbase with Docker.

docker run -d -p 8091:8091 -p 11210:11210 -p 11211:11211 -p 8093:8093 -p 8092:8092 --name cb-5.5.0 couchbase:enterprise-5.5.0

We do not bother with mapping any volume since it is just a test database. Just create a basic bucket and a security user to it accordingly. If you are not sure how to do this, I recommend checking out my previous article about it.

Next, we use Spring Initializr to bootstrap a Spring Boot application with Spring Boot 2.0.4 and Java 10.

We will also add the following dependencies.

<dependency>
  <groupId>com.couchbase.client</groupId>
  <artifactId>java-client</artifactId>
  <version>2.5.4</version>
</dependency>

<dependency>
  <groupId>io.reactivex</groupId>
  <artifactId>rxjava-reactive-streams</artifactId>
  <version>1.2.1</version>
</dependency>

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

And we will add the following properties to our file/src/main/resources/application.properties.

spring.couchbase.bootstrap-hosts=localhost
spring.couchbase.bucket.name=test
spring.couchbase.bucket.password=testtest

Reactive Spring Web API

Let’s start with our Reactive Spring Web API. We are going to create a very basic REST API for creating users, fetching one user by name, and fetching all users.


package org.thecuriousdev.reactivewebcouchbase;


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import org.thecuriousdev.reactivewebcouchbase.domain.User;
import org.thecuriousdev.reactivewebcouchbase.repository.UserService;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@RestController
public class UserController {

  private UserService userService;

  @Autowired
  public UserController(UserService userService) {
    this.userService = userService;
  }

  @GetMapping("/api/user/{name}")
  public Mono findUserById(@PathVariable("name") String name) {
    return userService.findById(name);
  }

  @PostMapping("/api/user")
  public Mono createUser(@RequestBody User user) {
    return userService.create(user);
  }

  @GetMapping("/api/user")
  public Flux findAllUsers() {
    return userService.findAll();
  }

}

As for our user object, when I am working with Couchbase I like to create a CouchbaseDocument class that all my objects that my objects that are going to be saved into the database extend. The reason for this is pretty simple, all objects have an ID, an expiration, and a CAS (Check And Set) value. I also like to add a type field to each document so that it will be easier to index the documents in the future, sort of like a schema name.


package org.thecuriousdev.reactivewebcouchbase.domain;

import com.fasterxml.jackson.annotation.JsonIgnore;

public abstract class CouchbaseDocument {

  @JsonIgnore private String id;
  @JsonIgnore private long cas;
  @JsonIgnore private int expiry;
  private String type;

  public String getId() {
    return id;
  }

  public void setId(String id) {
    this.id = id;
  }

  public long getCas() {
    return cas;
  }

  public void setCas(long cas) {
    this.cas = cas;
  }

  public int getExpiry() {
    return expiry;
  }

  public void setExpiry(int expiry) {
    this.expiry = expiry;
  }

  public void setType(String type) {
    this.type = type;
  }

  public abstract String getType();

}

package org.thecuriousdev.reactivewebcouchbase.domain;

import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;

@Data
@ToString
@EqualsAndHashCode
public class User extends CouchbaseDocument {

  private String name;
  private int age;

  @Override
  public String getType() {
    return "user";
  }

}

Our UserService is going to need a bucket to fetch and store documents from/in. What is really nice about Spring Boot is that it will auto-configure a Cluster for us if we specify the bootstrap-hosts parameter and a SDK is found on the classpath, so we only need to make sure to open the bucket/s that we are going to work with.


package org.thecuriousdev.reactivewebcouchbase;

import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class CouchbaseConfig {

  @Autowired
  private Cluster cluster;

  @Value("${spring.couchbase.bucket.name}")
  private String name;

  @Value("${spring.couchbase.bucket.password")
  private String password;

  @Bean
  public Bucket bucket() {
    return cluster.openBucket(name, password);
  }
}

Now our UserService can use that bucket when performing operations towards Couchbase.


package org.thecuriousdev.reactivewebcouchbase.repository;

import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.query.N1qlQuery;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.thecuriousdev.reactivewebcouchbase.domain.User;
import org.thecuriousdev.reactivewebcouchbase.util.CouchbaseConverter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Observable;

@Service
public class UserService {

  private Bucket bucket;
  private CouchbaseConverter couchbaseConverter;

  @Autowired
  public UserService(Bucket bucket) {
    this.bucket = bucket;
    this.couchbaseConverter = new CouchbaseConverter(bucket.name(), User.class);
  }

  public Mono<User> findById(String id) {
    return couchbaseConverter.toMono(bucket.async().get(id));
  }

  public Flux<User> findAll() {
    return couchbaseConverter.n1qlQueryResultToFlux(bucket.async()
        .query(N1qlQuery.simple("SELECT * FROM `" + bucket.name() + "` WHERE type = \"user\"")));
  }

  public Mono<User> create(User user) {
    var observable = bucket.async().upsert(
        couchbaseConverter.toCouchbaseDocument(user.getName(), user));
    return couchbaseConverter.toMono(observable);
  }

}

The Couchbase bucket provides an asynchronous (reactive) API when performing the methodasync() on it. I have then created a CouchbaseConverter class which will convert from Observable to a Mono or a Flux of the class that we are interested in.


package org.thecuriousdev.reactivewebcouchbase.util;

import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.document.json.JsonObject;
import com.couchbase.client.java.query.AsyncN1qlQueryResult;
import com.couchbase.client.java.query.AsyncN1qlQueryRow;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import org.reactivestreams.Publisher;
import org.thecuriousdev.reactivewebcouchbase.domain.CouchbaseDocument;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Observable;
import rx.RxReactiveStreams;

public class CouchbaseConverter<T extends CouchbaseDocument> {

  private static final ObjectMapper MAPPER = new ObjectMapper();
  private final String bucketName;
  private final Class<T> clazz;

  public CouchbaseConverter(String bucketName, Class<T> clazz) {
    this.bucketName = bucketName;
    this.clazz = clazz;
  }

  public Mono<T> toMono(Observable<JsonDocument> observable) {
    var publisher = RxReactiveStreams.toPublisher(observable);
    return Mono.from(publisher)
        .map(document -> {
          JsonObject jsonObj = document.content();
          T obj = parseObject(jsonObj);
          obj.setId(document.id());
          obj.setCas(document.cas());
          obj.setExpiry(document.expiry());
          return obj;
        });
  }

  public JsonDocument toCouchbaseDocument(String id, T obj) {
    try {
      return JsonDocument.create(id, obj.getExpiry(),
          JsonObject.fromJson(MAPPER.writeValueAsString(obj)), obj.getCas());
    } catch (JsonProcessingException e) {
      throw new RuntimeException(e.getMessage());
    }
  }

  public Flux<T> n1qlQueryResultToFlux(Observable<AsyncN1qlQueryResult> query) {
    var publisher = RxReactiveStreams
        .toPublisher(query.flatMap(result -> result.rows()));

    return Flux.from(publisher)
        .map(row -> row.value().get(bucketName))
        .map(this::parseObject);
  }

  private T parseObject(Object obj) {
    try {
      return MAPPER.readValue(obj.toString(), clazz);
    } catch (IOException e) {
      throw new RuntimeException(e.getMessage());
    }
  }
}

In there, RxReactiveStreams.toPublisher(Observable) is being used to convert it to a publisher which we then can create a Mono or a Flux from.

Summary

And that’s pretty much it. We have now created a very basic Reactive REST API with Spring Boot and Couchbase. As usual, I have uploaded the code to GitHub so that you can easier view the code.

You may also like

2 Comments

Leave a Reply

Your email address will not be published. Required fields are marked *