CQRS using Java and Axon - Command module

Fabio Hiroki - Mar 20 '20 - - Dev Community

Introduction

In this second part of this article series, we will implement the Command module, responsible for application state changes. Final code is in Github.

REST API Layer

We will effectively start coding our application from the external layer and keep going internally. The ProductController class will be responsible for exposing the endpoints to request state changes.

The only dependency of this class will be Axon's CommandGateway responsible for dispatching command objects. The initial structure will be:

@RestController
@RequestMapping("/products")
public class ProductController {

    @Autowired
    public ProductController(final CommandGateway commandGateway) {
        this.commandGateway = commandGateway;
    }

    private CommandGateway commandGateway;

    @PostMapping
    public CompletableFuture<String> create(@RequestBody ProductDTO dto) {
        return null;
    }

    @PutMapping
    public CompletableFuture<String> update(@RequestBody ProductDTO dto) {
        return null;
    }
}
Enter fullscreen mode Exit fullscreen mode

Where ProductDTO class is just a POJO to map the json request.

public class ProductDTO {

    private Long id;

    private String name;

    private int quantity;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getQuantity() {
        return quantity;
    }

    public void setQuantity(int quantity) {
        this.quantity = quantity;
    }
}
Enter fullscreen mode Exit fullscreen mode

Command model

Command instances on this application will represent intents of state change. The subset of application state will be represented by an Aggregate object. For example, the intent for adding a new product to cart is represented by AddProductCommand class:

public class AddProductCommand {

    public AddProductCommand(
            final Long id,
            final String name,
            final int quantity) {
        this.id = id;
        this.name = name;
        this.quantity = quantity;
    }

    @TargetAggregateIdentifier
    private final Long id;
    private final String name;
    private final int quantity;

    public Long getId() {
        return id;
    }

    public String getName() {
        return name;
    }

    public int getQuantity() {
        return quantity;
    }
}
Enter fullscreen mode Exit fullscreen mode

Where TargetAggregateIdentifier annotation is used to identify which instance of an Aggregate type should be handled by this command.

Now to dispatch this command from our RestController we just need to instantiate it and pass as argument through CommandGateway send method:

@RestController
@RequestMapping("/products")
public class ProductController {

    @Autowired
    public ProductController(final CommandGateway commandGateway) {
        this.commandGateway = commandGateway;
    }

    private CommandGateway commandGateway;

    @PostMapping
    public CompletableFuture<String> create(@RequestBody ProductDTO dto) {
        AddProductCommand command = new AddProductCommand(
                dto.getId(),
                dto.getName(),
                dto.getQuantity());
        return commandGateway.send(command);
    }

    // ....
}
Enter fullscreen mode Exit fullscreen mode

Aggregate

As mentioned before, the Aggregate class will be responsible for representing part of the application state plus the Command handling for that Aggregate.

An example of an Aggregate could be:

{
  "id": 1,
  "name": "iPhone",
  "quantity": 2
}
Enter fullscreen mode Exit fullscreen mode

Translating into code, results in:

@Aggregate
public class ProductAggregate {

    @AggregateIdentifier
    private Long id;
    private String name;
    private int quantity;

    @CommandHandler
    public ProductAggregate(AddProductCommand cmd) {
        // Verifies state consistency and applies events
    }
}
Enter fullscreen mode Exit fullscreen mode

CommandHandler annotation on constructor means the AddProductCommand command is used to create a new Aggregate.

At this point, the state of your application hasn't changed yet. Command handling is the location to perform business logic (i.e.: check if quantity is a positive value) and possibly apply Events that will result in state change.

Event model

First we create the AddProductEvent class with attributes needed to trigger our desired state change. In this case specifically it will be very similar to its respective Command model.

Now we will change the ProductAggregate constructor to dispatch an AddProductEvent whenever a AddProductCommand is sent. This same class can also act as the event sourcing handler of AddProductEvent, performing the change of the application state.

import static org.axonframework.modelling.command.AggregateLifecycle.apply;


@Aggregate
public class ProductAggregate {

    @AggregateIdentifier
    private Long id;
    private String name;
    private int quantity;

    @CommandHandler
    public ProductAggregate(AddProductCommand cmd) {
        apply(new AddProductEvent(cmd.getId(), cmd.getName(), cmd.getQuantity()));
    }

    @EventSourcingHandler
    public void on(AddProductEvent event) {
        this.id = event.getId();
        this.name = event.getName();
        this.quantity = event.getQuantity();
    }
}
Enter fullscreen mode Exit fullscreen mode

So far we have established a checkpoint that allow us to test the application and observe what's happening. Follow the terminal commands to run the commandside application:

docker-compose up -d
./gradlew clean assemble
java -jar commandside/build/libs/commandside.jar
Enter fullscreen mode Exit fullscreen mode

Now we can test the endpoint by adding a new product:

curl -X POST http://localhost:8080/products -H 'Content-Type: application/json' -d '{"id": 1, "name": "iPhone", "quantity": 7}'
Enter fullscreen mode Exit fullscreen mode

We can verify in the mongo database, in domainevents collection that there's a new event stored there:

{
   "_id":"5e0a2924b813b63783e1e092",
   "aggregateIdentifier":"1",
   "type":"ProductAggregate",
   "sequenceNumber":"0",
   "serializedPayload":"<com.example.project.command.addproduct.AddProductEvent><id>1</id><name>iPhone</name><quantity>7</quantity></com.example.project.command.addproduct.AddProductEvent>",
   "timestamp":"2019-12-30T16:43:16.851862731Z",
   "payloadType":"com.example.project.command.addproduct.AddProductEvent",
   "payloadRevision":null,
   "serializedMetaData":"<meta-data><entry><string>traceId</string><string>e62c8e0d-7505-4e99-ab7e-84b4619ee159</string></entry><entry><string>correlationId</string><string>e62c8e0d-7505-4e99-ab7e-84b4619ee159</string></entry></meta-data>",
   "eventIdentifier":"6eef19d8-b22a-4be6-9fd9-7681a31580b8"
}
Enter fullscreen mode Exit fullscreen mode

For each new product added through POST request, we will have a new entry on domainevents from now on. That acts as a history of what happened on our application.

Aggregate persistence

Besides the individual event persistence, we also want to persist the aggregate on each change (State-Stored Aggregate). To achieve this we just need to add JPA annotations to turn our aggregate class into an Entity:

@Aggregate
@Entity // This class can now be mapped to a table 
public class ProductAggregate {

    @AggregateIdentifier
    @Id // Defines the primary key
    private Long id;

    @Column // Map to a column with same name
    private String name;

    @Column // Map to a column with same name
    private int quantity;

    @CommandHandler
    public ProductAggregate(AddProductCommand cmd) {
        apply(new AddProductEvent(cmd.getId(), cmd.getName(), cmd.getQuantity()));
    }

    @EventSourcingHandler
    public void on(AddProductEvent event) {
        this.id = event.getId();
        this.name = event.getName();
        this.quantity = event.getQuantity();
    }
}
Enter fullscreen mode Exit fullscreen mode

Now add a new product on your cart using the endpoint above, and check the product_table on your Postgres database to verify a new entry stored matching the desired aggregate. Your Mongo database should also contain the new event.

Conclusion

On Mongo database, we have now the history of all events which we can use to understand how the application reached its current state. On the other side, Postgres database has the data we can use to display to final users, on a checkout screen, for example.

We could go back all the way and implement the CQRS and event sourcing by ourselves but thankfully we can achieve the same result using a couple of annotations from Axon.

. . . . . . . . . . . . . . . . . . . . . . . . . . . .