Core Java

Overview of implementing Distributed Locks

As we know, locks are generally used to monitor and control access to shared resources by multiple threads at the same time. They basically protect data integrity and atomicity in concurrent applications i.e. Only one thread at a time can acquire a lock on shared resource which otherwise is not accessible. But a lock in distributed environment is more than just a mutex in multi-threaded application. It is more complicated due to the fact that the lock has to be now acquired across all the nodes where as any of the nodes in the cluster or the network can fail.

Here is the user story that we consider for the explanation of scenarios in the rest of this article. The application takes data in user’s preferred format and converts it into a standardized format like PDF that can be uploaded to a Government portal. There are two different micro-services of the application which does these things: Transformer and Rules Engine. We have used Cassandra for persistence and Kafka as a message queue. Also, please note that the user request once accepted returns immediately. Once the PDF is generated, user is notified about it asynchronously. This is achieved in a sequence of steps as follows:

  • User request is put into a message queue.
  • Once the Transformer service picks up user request, it transforms the user uploaded file into a format that Rules Engine can understand.
  • Now the data is taken through Rules Engine which updates the data points
  • Finally, the data is transformed into PDF and the user is notified.

Firstly, let us try to understand why do we need to fetch locks at all in a distributed environment. Following are the use-cases we have used the distributed locks for:

  1. Efficiency: This is to make sure that the same expensive computation does not happen multiple times. For Ex: Suppose user has uploaded a file for processing. As there is heavy load on the system due to more number of requests or because the current file is too huge to be processed, it might take a while to generate the PDF. Now if user becomes restless waiting to be notified, he may upload the file again for processing(henceforth adding more load to the system unnecessarily). This can be avoided by taking a lock on the checksum of the file before processing it.
  2. Correctness: This is to avoid data corruption in the application. When we use locks, two concurrent/parallel processes in the system do not mess up the underlying data. If two processes operate on the underlying data set, at the same time without acquiring the lock, there is a high chance that the data might get corrupted. For Ex: Let us say, we have got the sales transactions and the line item data from the user. Tax amount at transaction level is calculated as sum of tax already levied at the transaction level and any additional taxes that exists at the line level. Now if rules are executed for the same transaction in 2 different nodes in parallel, there is a very good probability that the tax amount gets incremented twice for the line items. This can be avoided if we take a lock at transaction level.

Please note that locks are often not seen as a good idea. The blocking operations increase the contention for the underlying resources there by limiting the computational capacity of the system. Also, trying to lock in a distributed environment is supposed to be much more difficult and dangerous for the following reasons:

  • What happens to the lock when a node which has acquired it has crashed without releasing it?
  • How do we deal with cases of network partitioning?
  • These would bring in the additional dimension of consensus into the picture. We will get into the idea of distributed consensus in a while.

So for all the above reasons, we should try to avoid these locks if there exists any alternative solutions. Here are two possible approaches that can be used in the application:

  1. Optimistic Locking:  The resources are not actually locked in this case. We check if the resource is updated by someone else before committing the transaction. If the data is stale, the transaction will be rolled back and an error is thrown to user indicating that. In contrary to this, pessimistic locking is when you take an exclusive lock so that no one else can modify the resource. For Ex: select-for-update locks in databases, Java locks. Hibernate provides support for optimistic locking. You can read more about it here.
  2. Usage of partitions in Kafka: As mentioned earlier, we always have kept the user requests in Kafka before processing them. It is done this way as availability is one of the core architectural principles of application. We did not want the application to crash when the load increases multiple folds during some peak usages. Kafka stores messages published for a topic into multiple partitions internally. Also, it guarantees that messages from a given partition are always served to consumer in the same order as they are published. Leveraging this information, we published all requests that we don’t want to process in parallel(and hence use locks) to the same partition. This can be done by specifying a partition-key while publishing the message to Kafka. Messages with same key will be published to the same partition. Now as messages are taken up sequentially from the partition, we don’t need locks anymore.

There might still be cases where we prefer to take a distributed lock as they do not fit into the above scenarios. So distributed consensus comes into picture when we speak about distributed locks. Consensus can be defined as the process of getting all nodes in a cluster to agree on some specific value based on their votes. All nodes must agree upon the same value and it must be a value that was submitted by at least one of the nodes. Now when a particular node is said to acquire a distributed lock in a cluster, rest of the nodes in the cluster have to agree that the lock has been taken up by it. There are multiple consensus algorithms like Paxos, Raft, ZAB, Pacifica and so on. I have given some links for explanation of these algorithms towards the end of the blog for those interested in it. Here are the two most general ways of implementing the consensus systems:

  1. Symmetric/leader-less: Here all servers participating in the consensus have equal roles. So client can connect to anyone of the servers in this case. Example: Paxos
  2. Asymmetric/leader-based: Here at any given time, one server acts as leader from those participating in the consensus. Rest of the servers accept the leader’s decisions. Here clients can only communicate with the leader. Example: Raft, ZAB

For decades, consensus has become synonymous with Paxos. But now, there are different implementations of consensus as discussed above. Raft actually overcomes some of the drawbacks of the traditional Paxos. For each of algorithms mentioned above, there are different implementations. For Ex: Cassandra implemented Paxos for their lightweight transactions. Kafka internally uses Pacifica where as Zookeeper and Hazelcast uses ZAB and Raft respectively. Here is the generic interface of the Distributed lock in our application:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package common.concurrent.lock;
import java.util.concurrent.TimeUnit;
/**
 * Provides interface for the distributed lock implementations based on Zookeeper and Hazelcast.
 * @author pgullapalli
 */
public interface DistributedLock {
    /**
     * Acquires the lock. If the lock is not available, the current thread until the lock has been acquired.
     * The distributed lock acquired by a thread has to be released by same thread only.
     **/
    void lock();
    /**
     * This is a non-blocking version of lock() method; it attempts to acquire the lock immediately, return true if locking succeeds.
     * The distributed lock acquired by a thread has to be released by same thread only.
     **/
    boolean tryLock();
    /**
     * Acquires the lock. Blocks until the lock is available or timeout is expired.
     * The distributed lock acquired by a thread has to be released by same thread only.
     **/
    boolean tryLock(long timeout, TimeUnit unit);
    /**
     * Checks if current thread has already acquire the lock.
     * @return
     */
    boolean isLocked();
    /**
     * Releases the lock. This method has to be called by same thread as which has acquired the lock.
     */
    void release();
}
public interface DistributedLocker {
    /**
     * This method only fetches the lock object but does not explicitly lock. Lock has to be acquired and released.
     * specifically
     * @param key Fetch the lock object based on the key provided.
     * @return Implementation of DistributedLock object
     */
    DistributedLock getLock(String key);
}

For our application, here are the options that we have explored for implementing distributed locks:

a) InterProcessSemaphoreMutex from Zookeeper: Curator open-sourced by Netflix, a highlevel API built on top of Zookeeper, provides many recipes and handles the complexity of managing connections and retrying operations to the underlying ZooKeeper ensemble. InterProcessSemaphoreMutex, a recipe from Curator Framework, is a re-entrant mutex that works across JVMs. It uses Zookeeper to hold the lock. All processes across JVMs that use the same lock path will achieve an inter-process critical section. Further, this mutex is “fair” – each user will get the mutex in the order requested (from Zookeeper’s point of view).

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package common.concurrent.lock.impl;
import common.concurrent.lock.DistributedLock;
import common.concurrent.lock.DistributedLocker;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.concurrent.TimeUnit;
public class ZKBasedDistributedLocker implements DistributedLocker {
    private final CuratorFramework curatorClient;
    private final String basePath;
    public ZKBasedDistributedLocker(){
        curatorClient = CuratorFrameworkFactory.newClient("localhost:2181",
                new ExponentialBackoffRetry(1000, 3));
        basePath = new StringBuilder("/config/sample-app/distributed-locks/").toString();
    }
    @Override
    public DistributedLock getLock(String key) {
        String lock = new StringBuilder(basePath).append(key).toString();
        return new ZKLock(new InterProcessSemaphoreMutex(curatorClient, lock));
    }
    private class ZKLock implements DistributedLock {
        private final InterProcessLock lock;
        public ZKLock(InterProcessLock lock){
            this.lock = lock;
        }
        @Override
        public void lock() {
            try {
                lock.acquire();
            } catch (Exception e) {
                throw new RuntimeException("Error while acquiring lock", e);
            }
        }
        @Override
        public boolean tryLock() {
            return tryLock(10, TimeUnit.MILLISECONDS);
        }
        @Override
        public boolean tryLock(long timeout, TimeUnit unit) {
            try {
                return lock.acquire(timeout, unit);
            } catch (Exception e) {
                throw new RuntimeException("Error while acquiring lock", e);
            }
        }
        @Override
        public boolean isLocked() {
            return lock.isAcquiredInThisProcess();
        }
        @Override
        public void release() {
            try {
                lock.release();
            } catch (Exception e) {
                throw new RuntimeException("Error while releasing lock", e);
            }
        }
    }
}

As Zookeeper is commonly used in many distributed systems, using this option does not need any additional frameworks for locking. But one observation is that the performance degraded as the number of locks increased. This is due to the fact that all the locks are actually created as znodes internally. As the number of znodes started increasing, we even started facing problems while listing/deleting the locks folder in Zookeeper. So for cases where we take less number of locks, Zookeeper is a good fit. As many services of an application might be dependent on Zookeeper, any problem with Zookeeper might impact them too. Few such usecases are Microservices registering themselves for Service Discovery, Services using Kafka which in turn is dependent on Zookeeper for leader election.

b) Light weight Transactions from Cassandra: It is easy to achieve strong consistency in master based distributed systems. However, it also means that there is compromise on availability of system if the master is down. Cassandra is master-less system and trades off availability over consistency. It falls under AP category of the CAP theorem and hence is highly available and eventually consistent by default. Eventually consistent implies that read-after-write of a value may not yield the latest value written. But we can achieve strong consistency in Cassandra by specifying the consistency level for the query as QUORUM. Quorum means that a write transaction would succeed only after writing it to a majority of servers. We can implement lock in Cassandra as follows:

  1. create table lock_requests(resource_id text,lock_status text, created_on timestamp, primary key(resource_id));
  2. Thread which tries to acquire a lock checks if there exists an entry in locks table with specified key: select * from lock_requests where resource_id = ‘ABC’;
  3. If lock does not exist, now we say that the lock is acquired after inserting an entry into locks: insert into lock_requests(resource_id,lock_status,created_on) values(‘ABC’, ‘Locked’, toTimestamp(now()))

But please note that there is always a possibility of a race condition among threads between steps 2 and 3 if we do these as separate steps from the application. But if database itself can check for row existence before insertion, the race condition can be avoided. This is what is referred as Linearizable consistency(i.e. Serial isolation level in ACID terms). Light weight transaction exactly does the same. So here is how steps 2 and 3 above can be combined:

insert into lock_requests(resource_id,lock_status,created_on) values('ABC', 'Locked', toTimestamp(now())) if not exists;

If the lock exists, the above write fails and hence lock is not acquired. Now, the next problem is what happens if the service that acquired the lock has not released it. The server might have crashed or the code might have thrown an exception. The lock will never get released. For such cases, we can define time-to-live(TTL) for the row. This means the lock row will automatically expire after the prescribed seconds. Here is how we can achieve it by defining TTL for every record of the row.

create table lock_requests(resource_id text,lock_status text, created_on timestamp, primary key(resource_id)) with gc_grace_seconds=86400 and default_time_to_live=600;

Now the lock will automatically expire in 10 mins. This setting can be overridden for every row by defining TTL for all the columns. TTL might not be of help if we don’t have a rough estimate of how much time a computation(that is surrounded by the lock) can take.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package common.concurrent.lock.impl;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import common.concurrent.lock.DistributedLock;
import common.concurrent.lock.DistributedLocker;
import org.apache.commons.lang3.time.StopWatch;
import java.net.InetSocketAddress;
import java.time.Instant;
import java.util.concurrent.TimeUnit;
public class CassandraDistributedLocker implements DistributedLocker {
    private final CqlSession session;
    private final PreparedStatement selectStatement, insertStatement, deleteStatement;
    public CassandraDistributedLocker(){
        session = CqlSession.builder()
                .addContactPoint(new InetSocketAddress("127.0.0.1", 9042))
                .withKeyspace("sample").build();
        selectStatement = session.prepare(
                "select * from lock_requests where resource_id=?");
        insertStatement = session.prepare(
                "insert into lock_requests(resource_id,lock_status,created_on) values(?,?,?) if not exists");
        deleteStatement = session.prepare(
                "delete from lock_requests where resource_id=? if exists");
    }
    @Override
    public DistributedLock getLock(String key) {
        return new CassandraLock(key);
    }
    private class CassandraLock implements DistributedLock{
        private final String key;
        public CassandraLock(String key) {
            this.key = key;
        }
        @Override
        public void lock() {
            insertLock();
        }
        private boolean insertLock() {
            BoundStatement boundStatement = insertStatement.bind()
                    .setString(0, key)
                    .setString(1, "LOCKED")
                    .setInstant(2, Instant.now());
            ResultSet resultSet = session.execute(boundStatement);
            return resultSet.wasApplied();// this is equivalent to row.getBool("applied")
        }
        @Override
        public boolean tryLock() {
            return tryLock(10, TimeUnit.MILLISECONDS);
        }
        @Override
        public boolean tryLock(long timeout, TimeUnit unit) {
            try {
                boolean locked = false;
                StopWatch stopWatch = StopWatch.createStarted();
                while(stopWatch.getTime(TimeUnit.SECONDS) < timeout) {
                    if(insertLock()) {
                        locked = true;
                        break;
                    }
                }
                return locked;
            } catch (Exception e) {
                throw new RuntimeException("Error while acquiring lock", e);
            }
        }
        @Override
        public boolean isLocked() {
            BoundStatement boundStatement = selectStatement.bind().setString(0, key);
            ResultSet resultSet = session.execute(boundStatement);
            Row row = resultSet.one();
            return row != null ? "LOCKED".equals(row.getString("lock_status")) : false;
        }
        @Override
        public void release() {
            try {
                BoundStatement boundStatement = deleteStatement.bind().setString(0, key);
                session.execute(boundStatement);
            } catch (Exception e){
                throw new RuntimeException("Error while releasing lock", e);
            }
        }
    }
}

Cassandra internally uses a modified version of Paxos for Light weight transactions implementation. It does 4 extra round trips to achieve this linearizability. That sounds like a high cost – perhaps too high, if you have the rare case of an application that requires every operation to be linearizable. But for most applications, only a very small minority of operations require linearizability, and this is a good tool to add to the strong/eventual consistency we’ve provided so far. Refer to this link for more information.

Of course, this solution is viable only if the application is already using Cassandra for persistence. We have also seen LWTs timing out under heavy loads. So, it is better to exercise these locks with caution. One good thing about these locks is that no constraint exists that the lock has to be released by the one who acquired it. This might come handy if we have such scenarios where one microservice takes a lock initially and the other service releases it after the workflow completion asynchronously.

c) Distributed locks with Hazelcast: Hazelcast IMDG provides distributed versions of fundamental Java collections and synchronizers. The beauty of the Hazelcast API is that they are pretty simple to understand as they implement Java API itself. For Ex: com.hazelcast.map.IMap extends java.util.Map. So there is lesser learning curve here. The distributed map implementation has a method to lock a specific key. If the lock is not available, the current thread is blocked until the lock has been released. We can get a lock on the key even if it is not present in the map. If the key does not exist in the map, any thread apart from the lock owner will get blocked if it tries to put the locked key in the map.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package common.concurrent.lock.impl;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import common.concurrent.lock.DistributedLock;
import common.concurrent.lock.DistributedLocker;
import java.util.concurrent.TimeUnit;
public class HzMapBasedDistributedLocker implements DistributedLocker {
    private IMap txLockMap;
    public HzMapBasedDistributedLocker(){
        HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();
        txLockMap = hazelcastInstance.getMap("txLockMap");
    }
    @Override
    public DistributedLock getLock(String lockKey) {
        return new HzMapBasedLock(lockKey);
    }
    private class HzMapBasedLock implements DistributedLock{
        private final String key;
        public HzMapBasedLock(String key) {
            this.key = key;
        }
        @Override
        public void lock() {
            txLockMap.lock(key);
        }
        @Override
        public boolean tryLock() {
            return txLockMap.tryLock(key);
        }
        @Override
        public boolean tryLock(long timeout, TimeUnit unit) {
            try {
                return txLockMap.tryLock(key, timeout, unit);
            } catch (Exception e) {
                throw new RuntimeException("Error while acquiring lock", e);
            }
        }
        @Override
        public boolean isLocked() {
            return txLockMap.isLocked(key);
        }
        @Override
        public void release() {
            try {
                txLockMap.unlock(key);
            } catch (Exception e){
                throw new RuntimeException("Error while releasing lock", e);
            }
        }
    }
}

Please note that Hazelcast IMDG implementation too falls under the AP category of CAP system. However, strong consistency(even in failure/exceptional cases) is fundamental requirement for any tasks that requires distributed coordination. Hence there are cases where the existing locks based out of map implementation will fail. To address these issues, Hazelcast later came up with CPSubsystem implementation. CP Subsystem has got a new distributed lock implementation on top of Raft consensus. The CPSubsystem lives alongside AP data structures of Hazelcast IMDG cluster. CPSubsystem maintain linearizability in all cases, including client and server failures, network partitions, and prevent split-brain situations. In fact, Hazelcast claims that they are the one and only solution which offers a linearizable and distributed lock implementation.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package common.concurrent.lock.impl;
import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.cp.lock.FencedLock;
import common.concurrent.lock.DistributedLock;
import common.concurrent.lock.DistributedLocker;
import java.util.concurrent.TimeUnit;
public class HzLockBasedDistributedLocker implements DistributedLocker {
    private HazelcastInstance hazelcastInstance;
    public HzLockBasedDistributedLocker(int cpMemberCount){
        Config config = new Config();
        config.getCPSubsystemConfig().setCPMemberCount(3);
        config.getCPSubsystemConfig().setGroupSize(3);
        hazelcastInstance = Hazelcast.newHazelcastInstance(config);
    }
    @Override
    public DistributedLock getLock(String key) {
        return wrapHzLock(key);
    }
    private DistributedLock wrapHzLock(String key){
        return new HzLock(key);
    }
    private class HzLock implements DistributedLock {
        private final FencedLock lock;
        public HzLock(String key) {
            this.lock = hazelcastInstance.getCPSubsystem().getLock(key);
        }
        @Override
        public void lock() {
            lock.lock();
        }
        @Override
        public boolean tryLock() {
            return lock.tryLock();
        }
        @Override
        public boolean tryLock(long timeout, TimeUnit unit) {
            try {
                return lock.tryLock(timeout, unit);
            } catch (Exception e) {
                throw new RuntimeException("Error while acquiring lock", e);
            }
        }
        @Override
        public boolean isLocked() {
            return lock.isLocked();
        }
        @Override
        public void release() {
            try {
                lock.unlock();
                //((DistributedObject) lock).destroy();
            } catch (Exception e){
                throw new RuntimeException("Error while releasing lock", e);
            }
        }
    }
}

The above code looks pretty clean and simple. But the problem is that locks never expire on their own in Hazelcast unless they are explicitly destroyed. If not destroyed and are created more often, we may end up with out-of-memory exceptions over a period of time. The following from Hazelcast documentation clarifies the same:

Locks are not automatically removed. If a lock is not used anymore, Hazelcast does not automatically perform garbage collection in the lock. This can lead to an OutOfMemoryError. If you create locks on the fly, make sure they are destroyed.

Although the fix looks trivial, i.e. uncomment the destroy line in above code, the problem here is that a lock once destroyed can not be recreated in same CP Group unless restarted. So, if you need to reuse the locks which are once released, then we can not destroy them. In such cases, it is better to use map based implementation itself. Based on the specific usecase, one can go with either of the implementations. Hazelcast may address the issue in the near feature. Please refer to the ticket. You can as well vote for the ticket to be prioritized if you are also looking for it 🙂

There are others frameworks like Redis which offers the solutions to Distributed locks which I have not explained them here. I have listed them in resources section. Please go through them. One final point to keep in mind is that it is always better to use these locks with caution. If there exists any alternate solution which does not require locks, it is better to go with that.

Additional Resources

  1. Implementing replicated logs with Paxos
  2. Raft: A consensus algorithm for replicated logs
  3. Zab vs Paxos
  4. Lightweight transactions in Cassandra 2.0
Published on Java Code Geeks with permission by Prasanth Gullapalli, partner at our JCG program. See the original article here: Overview of implementing Distributed Locks

Opinions expressed by Java Code Geeks contributors are their own.

Prasanth Gullapalli

Prasanth is passionated about technology and specializes in application development in distributed environments. He has always been fascinated by the new technologies and emerging trends in software development.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button