In my previous blog post Transactional Outbox pattern with Spring Boot, I compared Spring Integration with Spring Modulith to implement the microservices outbox pattern. I mentioned that a drawback of Spring Integration is the fact that Java serialization is used so the data in the database is not readable with standard database tooling.
In this blog post, I will show to configure Spring Integration to use Jackson instead so that the data in the database is readable JSON. This can provide convenient for debugging and troubleshooting purposes. The Spring Integration documentation has a small section on Custom Message Insertion that explains the basic idea to support JSON, but I have found it to have some challenges to get it fully working.
For context, this is the setup of Spring Integration using Java serialization:
@Configuration
public class SpringIntegrationConfiguration {
private static final String CONCURRENT_METADATA_STORE_PREFIX = "_spring_integration_";
@Bean
JdbcChannelMessageStore jdbcChannelMessageStore(
DataSource dataSource) {
JdbcChannelMessageStore jdbcChannelMessageStore = new JdbcChannelMessageStore(dataSource);
jdbcChannelMessageStore.setTablePrefix(CONCURRENT_METADATA_STORE_PREFIX);
jdbcChannelMessageStore.setChannelMessageStoreQueryProvider(
new PostgresChannelMessageStoreQueryProvider());
return jdbcChannelMessageStore;
}
}
To support serialization to and deserialization from JSON, we have to create custom implementations of two classes: ChannelMessageStorePreparedStatementSetter and MessageRowMapper
This is the code for the ChannelMessageStorePreparedStatementSetter
private static class JacksonMessageStorePreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter { (1)
private final ObjectMapper objectMapper;
public JacksonMessageStorePreparedStatementSetter(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
@Override
public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage, Object groupId, String region,
boolean priorityEnabled) throws SQLException {
super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
try {
String json = objectMapper.writeValueAsString(requestMessage); (2)
preparedStatement.setObject(6, json, java.sql.Types.OTHER); (3)
} catch (JsonProcessingException e) {
throw new RuntimeException("Unable to store message", e);
}
}
}
| 1 | Extend from the default ChannelMessageStorePreparedStatementSetter. |
| 2 | Use the injected ObjectMapper to get the JSON. |
| 3 | Write the JSON string to the database. The java.sql.Types.OTHER is there to ensure we are using the JSON database type of PostgreSQL. |
To support saving JSON instead of bytes to the database, we have to update the Flyway script that generates the Spring Integration tables:
CREATE TABLE _spring_integration_CHANNEL_MESSAGE
(
MESSAGE_ID CHAR(36) NOT NULL,
GROUP_KEY CHAR(36) NOT NULL,
CREATED_DATE BIGINT NOT NULL,
MESSAGE_PRIORITY BIGINT,
MESSAGE_SEQUENCE BIGINT NOT NULL DEFAULT NEXTVAL('_spring_integration_MESSAGE_SEQ'),
MESSAGE_BYTES JSON, (1)
REGION VARCHAR(100) NOT NULL,
CONSTRAINT _spring_integration_CHANNEL_MESSAGE_PK PRIMARY KEY (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE)
);
| 1 | Using JSON datatype for the MESSAGE_BYTES column. |
To read the JSON, we create a subclass of MessageRowMapper:
private static class JacksonMessageRowMapper extends MessageRowMapper {
private final ObjectMapper objectMapper;
public JacksonMessageRowMapper(ObjectMapper objectMapper) {
super(null, null);
this.objectMapper = objectMapper;
}
@Override
public Message<?> mapRow(ResultSet rs, int rowNum) throws SQLException {
try {
String s = rs.getString(rs.findColumn("MESSAGE_BYTES")); (1)
return objectMapper.readValue(s, new TypeReference<>() {}); (2)
} catch (JsonProcessingException e) {
throw new RuntimeException("Unable to read message", e);
}
}
}
| 1 | Get the JSON string from the MESSAGE_BYTES column in the database. |
| 2 | Convert the string back to a Message<?> object via the ObjectMapper. |
After creating both classes, we need to expose them as a Spring Bean and use them into our JdbcChannelMessageStore configuration:
@Configuration
public class SpringIntegrationConfiguration {
private static final String CONCURRENT_METADATA_STORE_PREFIX = "_spring_integration_";
private final ObjectMapper springIntegrationObjectMapper;
public SpringIntegrationConfiguration() {
springIntegrationObjectMapper = JacksonJsonUtils.messagingAwareMapper(
"com.wimdeblauwe.examples.transactional_outbox_spring_integration_json"); (1)
}
@Bean
JdbcChannelMessageStore jdbcChannelMessageStore(
DataSource dataSource,
ChannelMessageStorePreparedStatementSetter preparedStatementSetter,
MessageRowMapper messageRowMapper) {
JdbcChannelMessageStore jdbcChannelMessageStore = new JdbcChannelMessageStore(dataSource);
jdbcChannelMessageStore.setTablePrefix(CONCURRENT_METADATA_STORE_PREFIX);
jdbcChannelMessageStore.setChannelMessageStoreQueryProvider(
new PostgresChannelMessageStoreQueryProvider());
jdbcChannelMessageStore.setPreparedStatementSetter(preparedStatementSetter);
jdbcChannelMessageStore.setMessageRowMapper(messageRowMapper);
return jdbcChannelMessageStore;
}
@Bean
ChannelMessageStorePreparedStatementSetter channelMessageStorePreparedStatementSetter() {
return new JacksonMessageStorePreparedStatementSetter(springIntegrationObjectMapper);
}
@Bean
MessageRowMapper messageRowMapper() {
return new JacksonMessageRowMapper(springIntegrationObjectMapper);
}
// inner classes JacksonMessageStorePreparedStatementSetter and JacksonMessageRowMapper omitted
}
| 1 | Spring Integration has a JacksonJsonUtils class that can give a Jackson ObjectMapper that knows how to properly serialize Message objects to JSON. We use the factory method messagingAwareMapper() to create a new instance passing in our root package so classes of that package (or sub-packages) can be deserialized. |
Note that I am not exposing the ObjectMapper as a Spring bean as that would override the default ObjectMapper in the Spring Boot application. For that reason, I just create it in the constructor and inject it manually in our two beans.
With this configuration in place, we can again test using the test endpoint. The database has now a JSON version of our message (formatted for clarity):
{
"@class": "org.springframework.messaging.support.GenericMessage",
"payload": {
"@class": "com.wimdeblauwe.examples.transactional_outbox_spring_integration_json.infrastructure.mail.MailMessage",
"subject": "Order 8 completed",
"body": "Your order is registered in our system and will be processed.",
"to": "test@example.com"
},
"headers": {
"@class": "java.util.HashMap",
"replyChannel": "nullChannel",
"errorChannel": "",
"id": [
"java.util.UUID",
"a831fd04-72e7-9c4e-f49c-f2ab3f785928"
],
"timestamp": [
"java.lang.Long",
1720604864891
]
}
}
Conclusion
It is possible to use Jackson for serialization of the Spring Integration messages in case you like to have a more readable format in your database.
See transactional-outbox-spring-integration-json on GitHub for the full sources of this example.
If you have any questions or remarks, feel free to post a comment at GitHub discussions.