1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.zookeeper.lock;
21
22 import java.io.IOException;
23 import java.util.Comparator;
24 import java.util.List;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicReference;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.hbase.InterProcessLock;
33 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
34 import org.apache.hadoop.hbase.zookeeper.DeletionListener;
35 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
36 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
37 import org.apache.zookeeper.CreateMode;
38 import org.apache.zookeeper.KeeperException;
39 import org.apache.zookeeper.KeeperException.BadVersionException;
40 import org.apache.zookeeper.data.Stat;
41
42 import com.google.common.base.Preconditions;
43
44
45
46
47
48
49
50
51 @InterfaceAudience.Private
52 public abstract class ZKInterProcessLockBase implements InterProcessLock {
53
54 private static final Log LOG = LogFactory.getLog(ZKInterProcessLockBase.class);
55
56
57 protected static final String READ_LOCK_CHILD_NODE_PREFIX = "read-";
58
59
60 protected static final String WRITE_LOCK_CHILD_NODE_PREFIX = "write-";
61
62 protected final ZooKeeperWatcher zkWatcher;
63 protected final String parentLockNode;
64 protected final String fullyQualifiedZNode;
65 protected final String childZNode;
66 protected final byte[] metadata;
67 protected final MetadataHandler handler;
68
69
70 protected final AtomicReference<AcquiredLock> acquiredLock =
71 new AtomicReference<AcquiredLock>(null);
72
73
74
75
76 protected static class AcquiredLock {
77 private final String path;
78 private final int version;
79
80
81
82
83
84
85 public AcquiredLock(String path, int version) {
86 this.path = path;
87 this.version = version;
88 }
89
90 public String getPath() {
91 return path;
92 }
93
94 public int getVersion() {
95 return version;
96 }
97
98 @Override
99 public String toString() {
100 return "AcquiredLockInfo{" +
101 "path='" + path + '\'' +
102 ", version=" + version +
103 '}';
104 }
105 }
106
107 protected static class ZNodeComparator implements Comparator<String> {
108
109 public static final ZNodeComparator COMPARATOR = new ZNodeComparator();
110
111 private ZNodeComparator() {
112 }
113
114
115
116
117 public static long getChildSequenceId(String childZNode) {
118 Preconditions.checkNotNull(childZNode);
119 assert childZNode.length() >= 10;
120 String sequenceIdStr = childZNode.substring(childZNode.length() - 10);
121 return Long.parseLong(sequenceIdStr);
122 }
123
124 @Override
125 public int compare(String zNode1, String zNode2) {
126 long seq1 = getChildSequenceId(zNode1);
127 long seq2 = getChildSequenceId(zNode2);
128 if (seq1 == seq2) {
129 return 0;
130 } else {
131 return seq1 < seq2 ? -1 : 1;
132 }
133 }
134 }
135
136
137
138
139
140
141
142
143
144 protected ZKInterProcessLockBase(ZooKeeperWatcher zkWatcher,
145 String parentLockNode, byte[] metadata, MetadataHandler handler, String childNode) {
146 this.zkWatcher = zkWatcher;
147 this.parentLockNode = parentLockNode;
148 this.fullyQualifiedZNode = ZKUtil.joinZNode(parentLockNode, childNode);
149 this.metadata = metadata;
150 this.handler = handler;
151 this.childZNode = childNode;
152 }
153
154
155
156
157 @Override
158 public void acquire() throws IOException, InterruptedException {
159 tryAcquire(-1);
160 }
161
162
163
164
165 @Override
166 public boolean tryAcquire(long timeoutMs)
167 throws IOException, InterruptedException {
168 boolean hasTimeout = timeoutMs != -1;
169 long waitUntilMs =
170 hasTimeout ?EnvironmentEdgeManager.currentTime() + timeoutMs : -1;
171 String createdZNode;
172 try {
173 createdZNode = createLockZNode();
174 } catch (KeeperException ex) {
175 throw new IOException("Failed to create znode: " + fullyQualifiedZNode, ex);
176 }
177 while (true) {
178 List<String> children;
179 try {
180 children = ZKUtil.listChildrenNoWatch(zkWatcher, parentLockNode);
181 } catch (KeeperException e) {
182 LOG.error("Unexpected ZooKeeper error when listing children", e);
183 throw new IOException("Unexpected ZooKeeper exception", e);
184 }
185 String pathToWatch;
186 if ((pathToWatch = getLockPath(createdZNode, children)) == null) {
187 break;
188 }
189 CountDownLatch deletedLatch = new CountDownLatch(1);
190 String zkPathToWatch =
191 ZKUtil.joinZNode(parentLockNode, pathToWatch);
192 DeletionListener deletionListener =
193 new DeletionListener(zkWatcher, zkPathToWatch, deletedLatch);
194 zkWatcher.registerListener(deletionListener);
195 try {
196 if (ZKUtil.setWatchIfNodeExists(zkWatcher, zkPathToWatch)) {
197
198 if (hasTimeout) {
199 long remainingMs = waitUntilMs - EnvironmentEdgeManager.currentTime();
200 if (remainingMs < 0 ||
201 !deletedLatch.await(remainingMs, TimeUnit.MILLISECONDS)) {
202 LOG.warn("Unable to acquire the lock in " + timeoutMs +
203 " milliseconds.");
204 try {
205 ZKUtil.deleteNode(zkWatcher, createdZNode);
206 } catch (KeeperException e) {
207 LOG.warn("Unable to remove ZNode " + createdZNode);
208 }
209 return false;
210 }
211 } else {
212 deletedLatch.await();
213 }
214 if (deletionListener.hasException()) {
215 Throwable t = deletionListener.getException();
216 throw new IOException("Exception in the watcher", t);
217 }
218 }
219 } catch (KeeperException e) {
220 throw new IOException("Unexpected ZooKeeper exception", e);
221 } finally {
222 zkWatcher.unregisterListener(deletionListener);
223 }
224 }
225 updateAcquiredLock(createdZNode);
226 LOG.debug("Acquired a lock for " + createdZNode);
227 return true;
228 }
229
230 private String createLockZNode() throws KeeperException {
231 try {
232 return ZKUtil.createNodeIfNotExistsNoWatch(zkWatcher, fullyQualifiedZNode,
233 metadata, CreateMode.EPHEMERAL_SEQUENTIAL);
234 } catch (KeeperException.NoNodeException nne) {
235
236 ZKUtil.createWithParents(zkWatcher, parentLockNode);
237 return createLockZNode();
238 }
239 }
240
241
242
243
244
245
246 protected static boolean isChildReadLock(String child) {
247 int idx = child.lastIndexOf(ZKUtil.ZNODE_PATH_SEPARATOR);
248 String suffix = child.substring(idx + 1);
249 return suffix.startsWith(READ_LOCK_CHILD_NODE_PREFIX);
250 }
251
252
253
254
255
256
257 protected static boolean isChildWriteLock(String child) {
258 int idx = child.lastIndexOf(ZKUtil.ZNODE_PATH_SEPARATOR);
259 String suffix = child.substring(idx + 1);
260 return suffix.startsWith(WRITE_LOCK_CHILD_NODE_PREFIX);
261 }
262
263
264
265
266
267
268 protected boolean isChildOfSameType(String child) {
269 int idx = child.lastIndexOf(ZKUtil.ZNODE_PATH_SEPARATOR);
270 String suffix = child.substring(idx + 1);
271 return suffix.startsWith(this.childZNode);
272 }
273
274
275
276
277
278
279 protected void updateAcquiredLock(String createdZNode) throws IOException {
280 Stat stat = new Stat();
281 byte[] data = null;
282 Exception ex = null;
283 try {
284 data = ZKUtil.getDataNoWatch(zkWatcher, createdZNode, stat);
285 } catch (KeeperException e) {
286 LOG.warn("Cannot getData for znode:" + createdZNode, e);
287 ex = e;
288 }
289 if (data == null) {
290 LOG.error("Can't acquire a lock on a non-existent node " + createdZNode);
291 throw new IllegalStateException("ZNode " + createdZNode +
292 "no longer exists!", ex);
293 }
294 AcquiredLock newLock = new AcquiredLock(createdZNode, stat.getVersion());
295 if (!acquiredLock.compareAndSet(null, newLock)) {
296 LOG.error("The lock " + fullyQualifiedZNode +
297 " has already been acquired by another process!");
298 throw new IllegalStateException(fullyQualifiedZNode +
299 " is held by another process");
300 }
301 }
302
303
304
305
306 @Override
307 public void release() throws IOException, InterruptedException {
308 AcquiredLock lock = acquiredLock.get();
309 if (lock == null) {
310 LOG.error("Cannot release lock" +
311 ", process does not have a lock for " + fullyQualifiedZNode);
312 throw new IllegalStateException("No lock held for " + fullyQualifiedZNode);
313 }
314 try {
315 if (ZKUtil.checkExists(zkWatcher, lock.getPath()) != -1) {
316 boolean ret = ZKUtil.deleteNode(zkWatcher, lock.getPath(), lock.getVersion());
317 if (!ret && ZKUtil.checkExists(zkWatcher, lock.getPath()) != -1) {
318 throw new IllegalStateException("Couldn't delete " + lock.getPath());
319 }
320 if (!acquiredLock.compareAndSet(lock, null)) {
321 LOG.debug("Current process no longer holds " + lock + " for " +
322 fullyQualifiedZNode);
323 throw new IllegalStateException("Not holding a lock for " +
324 fullyQualifiedZNode +"!");
325 }
326 }
327 if (LOG.isDebugEnabled()) {
328 LOG.debug("Released " + lock.getPath());
329 }
330 } catch (BadVersionException e) {
331 throw new IllegalStateException(e);
332 } catch (KeeperException e) {
333 throw new IOException(e);
334 }
335 }
336
337
338
339
340
341
342
343 protected boolean handleLockMetadata(String lockZNode) {
344 return handleLockMetadata(lockZNode, handler);
345 }
346
347
348
349
350
351
352
353
354
355 protected boolean handleLockMetadata(String lockZNode, MetadataHandler handler) {
356 if (handler == null) {
357 return false;
358 }
359 try {
360 byte[] metadata = ZKUtil.getData(zkWatcher, lockZNode);
361 handler.handleMetadata(metadata);
362 } catch (KeeperException ex) {
363 LOG.warn("Error processing lock metadata in " + lockZNode);
364 return false;
365 } catch (InterruptedException e) {
366 LOG.warn("InterruptedException processing lock metadata in " + lockZNode);
367 Thread.currentThread().interrupt();
368 return false;
369 }
370 return true;
371 }
372
373 @Override
374 public void reapAllLocks() throws IOException {
375 reapExpiredLocks(0);
376 }
377
378
379
380
381
382
383
384
385 public void reapExpiredLocks(long timeout) throws IOException {
386 List<String> children;
387 try {
388 children = ZKUtil.listChildrenNoWatch(zkWatcher, parentLockNode);
389 } catch (KeeperException e) {
390 LOG.error("Unexpected ZooKeeper error when listing children", e);
391 throw new IOException("Unexpected ZooKeeper exception", e);
392 }
393 if (children == null) return;
394
395 KeeperException deferred = null;
396 Stat stat = new Stat();
397 long expireDate = System.currentTimeMillis() - timeout;
398 for (String child : children) {
399 if (isChildOfSameType(child)) {
400 String znode = ZKUtil.joinZNode(parentLockNode, child);
401 try {
402 ZKUtil.getDataNoWatch(zkWatcher, znode, stat);
403 if (stat.getCtime() < expireDate) {
404 LOG.info("Reaping lock for znode:" + znode);
405 ZKUtil.deleteNodeFailSilent(zkWatcher, znode);
406 }
407 } catch (KeeperException ex) {
408 LOG.warn("Error reaping the znode for write lock :" + znode);
409 deferred = ex;
410 }
411 }
412 }
413 if (deferred != null) {
414 throw new IOException("ZK exception while reaping locks:", deferred);
415 }
416 }
417
418
419
420
421
422 public void visitLocks(MetadataHandler handler) throws IOException {
423 List<String> children;
424 try {
425 children = ZKUtil.listChildrenNoWatch(zkWatcher, parentLockNode);
426 } catch (KeeperException e) {
427 LOG.error("Unexpected ZooKeeper error when listing children", e);
428 throw new IOException("Unexpected ZooKeeper exception", e);
429 }
430 if (children != null && children.size() > 0) {
431 for (String child : children) {
432 if (isChildOfSameType(child)) {
433 String znode = ZKUtil.joinZNode(parentLockNode, child);
434 String childWatchesZNode = getLockPath(child, children);
435 if (childWatchesZNode == null) {
436 LOG.info("Lock is held by: " + child);
437 }
438 handleLockMetadata(znode, handler);
439 }
440 }
441 }
442 }
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457 protected abstract String getLockPath(String myZNode, List<String> children)
458 throws IOException;
459 }