EntryProcessors and EntryBackupProcessors with Hazelcast

Posted at — Sep 29, 2015

Hazelcast has the concept of EntryProcessors (like Oracle Coherence). EntryProcessors allow to update cache entries without having to pull over the actual values. You move the processing to where the value lives and update the cache there.

Furthermore, Hazelcast has the notion of EntryBackupProcessor (which Coherence does not have).

To explain the usage of this, we will use a simple User class:

class User implements Serializable
{
    private long id;
    private String name;
    private DateTime lastLoginTime;

    // getters and setters omitted

}

We will set the 'lastLoginTime' on the user by means of an EntryProcessor.

Default behaviour - using AbstractEntryProcessor

For the first test, we will use the default behaviour given by AbstractEntryProcessor. This is the code for our EntryProcessor:

private static class UpdateLastLoginTimEntryProcessor extends AbstractEntryProcessor<Long, User> {

    private DateTime loginTime;

    public UpdateLastLoginTimEntryProcessor() {
    }

    public UpdateLastLoginTimEntryProcessor(DateTime loginTime) {
        this.loginTime = loginTime;
    }

    @Override
    public Object process(Map.Entry<Long, User> entry) {
        System.out.println("Processing entry: " + entry);
        User user = entry.getValue();
        user.setLastLoginTime(loginTime);
        entry.setValue(user);
        return null;
    }
}

This default behaviour is to apply the same processing to the backup values and the normal values. To test this, we run this code:

public class HazelcastBackupEntryProcessorTest1 {

    public static void main(String[] args) throws InterruptedException {

        HazelcastInstance hazelcastInstance = setupHazelcast();
        ILock doItLock = hazelcastInstance.getLock("doItLock");
        doItLock.lock();

        IMap<Long, User> testMap;
        try {
            testMap = hazelcastInstance.getMap("testMap");

            if (!testMap.containsKey(1L)) {
                User user = new User();
                user.setId(1L);
                user.setName("Wim");
                testMap.put(user.getId(), user);

                DateTime loginTime = DateTime.now();
                System.out.println("Calling the entry processor with time: " + loginTime);

                testMap.executeOnEntries(new UpdateLastLoginTimEntryProcessor(loginTime));
            }
        } finally {
            doItLock.unlock();
        }

        printLastLoginTime(testMap);

        System.out.println("Sleeping 10 sec... manually crash entries node now..");
        Thread.sleep(10000);

        System.out.println("Done sleeping!");
        printLastLoginTime(testMap);
    }

    private static HazelcastInstance setupHazelcast() {
        Config config = new Config();
        config.setProperty("hazelcast.initial.min.cluster.size", "2");
        return Hazelcast.newHazelcastInstance(config);
    }

    private static void printLastLoginTime(IMap<Long, User> testMap) {
        User updatedUser = testMap.get(1L);
        System.out.println("last login time: " + updatedUser.getLastLoginTime());

        LocalMapStats stats = testMap.getLocalMapStats();
        System.out.println("hits: " + stats.getHits());
        System.out.println("entries: " + stats.getOwnedEntryCount());
        System.out.println("backup entries: " + stats.getBackupEntryCount());
    }

    private static class UpdateLastLoginTimEntryProcessor extends AbstractEntryProcessor<Long, User> {

        private DateTime loginTime;

        public UpdateLastLoginTimEntryProcessor() {
        }

        public UpdateLastLoginTimEntryProcessor(DateTime loginTime) {
            this.loginTime = loginTime;
        }

        @Override
        public Object process(Map.Entry<Long, User> entry) {
            System.out.println("Processing entry: " + entry);
            User user = entry.getValue();
            user.setLastLoginTime(loginTime);
            entry.setValue(user);
            return null;
        }
    }
}

If you run this code 2 times to simulate 2 nodes (I just run it using IntelliJ IDEA), you get the following output:

Node 1:

Calling the entry processor with time: 2015-09-28T11:08:32.499+02:00
Processing entry: 1=com.traficon.tmsng.server.User@27d067a4
last login time: 2015-09-28T11:08:32.499+02:00
hits: 0
entries: 0
backup entries: 1

Node 2:

Processing entry: 1=com.traficon.tmsng.server.User@77f309d3
last login time: 2015-09-28T11:08:32.499+02:00
hits: 4
entries: 1
backup entries: 0

So we see the entry processor is called 2 times, one time on each node.

During the 10 second sleep. I stop the node that has the backup entries. When the sleep is done, this is printed on the other node:

last login time: 2015-09-28T11:08:32.499+02:00
hits: 5
entries: 1
backup entries: 0

We see the backup entries have become entries now.

Without an EntryBackupProcessor

Now, what would happen if we use this implementation for our entry processor:

private static class UpdateLastLoginTimEntryProcessor implements EntryProcessor<Long, User> {

    private DateTime loginTime;

    public UpdateLastLoginTimEntryProcessor() {

    }

    public UpdateLastLoginTimEntryProcessor(DateTime loginTime) {
        this.loginTime = loginTime;
    }

    @Override
    public Object process(Map.Entry<Long, User> entry) {
        System.out.println("Processing entry: " + entry);
        User user = entry.getValue();
        user.setLastLoginTime(loginTime);
        entry.setValue(user);
        return null;
    }

    @Override
    public EntryBackupProcessor<Long, User> getBackupProcessor() {
        return null;
    }
}

In this implementation, we return null for our EntryBackupProcessor. This in effect means that we will NOT be updating the backup entries!

Node 1:

Calling the entry processor with time: 2015-09-28T11:19:26.237+02:00
last login time: 2015-09-28T11:19:26.237+02:00
hits: 0
entries: 0
backup entries: 1

Node 2:

Processing entry: 1=com.traficon.tmsng.server.User@15101e96
last login time: 2015-09-28T11:19:26.237+02:00
hits: 4
entries: 1
backup entries: 0

So now, we only see "Processing entry" on the node where the actual value lives, nothing happens on the node with the backup entries. If we now crash the node 1 and print our cached User object again we see this:

last login time: null
hits: 1
entries: 1
backup entries: 0

The backup entry has been promoted to primary, but the last login time is lost since we did not run the entry processor on the backup entries.

Updating the backup without double processing

Suppose you have quite complex processing going on in your entry processor. If you want to be on the safe side, you need to run an EntryBackupProcessor. However, doing the processing twice is expensive in terms of CPU. Is there an alternative?

It turns out, you can use this construct:

private static class LostsOfProcessingEntryProcessor implements EntryProcessor<Long, User> {

    private transient User updatedUser;

    public LostsOfProcessingEntryProcessor() {
    }

    @Override
    public Object process(Map.Entry<Long, User> entry) {
        try {
            System.out.println("Processing entry: " + entry);
            User user = entry.getValue();

            Thread.sleep(2000); // Simulate processing

            //suppose you update something on the user object here

            //user.updateFoo( foo );

            user.setLastLoginTime(DateTime.now());
            updatedUser = user;

            System.out.println("updatedUser = " + updatedUser);
            entry.setValue(user);

            return null;

        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            e.printStackTrace();
            return null;
        }
    }

    @Override
    public EntryBackupProcessor<Long, User> getBackupProcessor() {
        return new CopyValueToBackupEntryBackupProcessor(updatedUser);
    }

    public static class CopyValueToBackupEntryBackupProcessor implements EntryBackupProcessor<Long, User> {

        private User user;

        public CopyValueToBackupEntryBackupProcessor(User user) {
            this.user = user;
        }

        @Override

        public void processBackup(Map.Entry<Long, User> entry) {
            System.out.println("Updating user on backup entry: " + user);
            entry.setValue(user);
        }
    }
}

When testing this, we get the following output:

Node 1:

Processing entry: 1=com.traficon.tmsng.server.User@1994ad74
updatedUser = com.traficon.tmsng.server.User@1994ad74
last login time: 2015-09-29T08:28:40.756+02:00
hits: 4
entries: 1
backup entries: 0
Sleeping 10 sec... crash entries node now..

Node 2:

Calling the entry processor
Updating user on backup entry: com.traficon.tmsng.server.User@4caf4ac
last login time: 2015-09-29T08:28:40.756+02:00
hits: 0
entries: 0
backup entries: 1
Sleeping 10 sec... crash entries node now..

Done sleeping!
last login time: 2015-09-29T08:28:40.756+02:00
hits: 2
entries: 1
backup entries: 0

Notice how on Node 2 the backup entry becomes primary after the crash of Node 1 and how we did not have to do the expensive processing again in the EntryBackupProcessor.

The CopyValueToBackupEntryBackupProcessor is now specific for this example, but can easily made generic so you can re-use it:

public static class CopyValueToBackupEntryBackupProcessor implements EntryBackupProcessor<K, V> {

    private V value;

    public CopyValueToBackupEntryBackupProcessor(V value) {
        this.value = value;
    }

    @Override
    public void processBackup(Map.Entry<K, V> entry) {
        entry.setValue(value);
    }
}

Conclusion

I have showed you several ways to use an EntryBackupProcessor in Hazelcast. Which one is best for your application really depends on your use case, as always. As a general rule of thumb, you could state that the default behaviour in AbstractEntryProcessor is best when the processing is small. If there is a lot of processing going on, it could be interesting to look into using a CopyValueToBackupEntryBackupProcessor.

If you want to be notified in the future about new articles, as well as other interesting things I'm working on, join my mailing list!
I send emails quite infrequently, and will never share your email address with anyone else.