View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.zookeeper.lock;
21  
22  import java.io.IOException;
23  import java.util.Comparator;
24  import java.util.List;
25  import java.util.concurrent.CountDownLatch;
26  import java.util.concurrent.TimeUnit;
27  import java.util.concurrent.atomic.AtomicReference;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.hbase.InterProcessLock;
33  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
34  import org.apache.hadoop.hbase.zookeeper.DeletionListener;
35  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
36  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
37  import org.apache.zookeeper.CreateMode;
38  import org.apache.zookeeper.KeeperException;
39  import org.apache.zookeeper.KeeperException.BadVersionException;
40  import org.apache.zookeeper.data.Stat;
41  
42  import com.google.common.base.Preconditions;
43  
44  /**
45   * ZooKeeper based HLock implementation. Based on the Shared Locks recipe.
46   * (see:
47   * <a href="http://zookeeper.apache.org/doc/trunk/recipes.html">
48   * ZooKeeper Recipes and Solutions
49   * </a>)
50   */
51  @InterfaceAudience.Private
52  public abstract class ZKInterProcessLockBase implements InterProcessLock {
53  
54    private static final Log LOG = LogFactory.getLog(ZKInterProcessLockBase.class);
55  
56    /** ZNode prefix used by processes acquiring reader locks */
57    protected static final String READ_LOCK_CHILD_NODE_PREFIX = "read-";
58  
59    /** ZNode prefix used by processes acquiring writer locks */
60    protected static final String WRITE_LOCK_CHILD_NODE_PREFIX = "write-";
61  
62    protected final ZooKeeperWatcher zkWatcher;
63    protected final String parentLockNode;
64    protected final String fullyQualifiedZNode;
65    protected final String childZNode;
66    protected final byte[] metadata;
67    protected final MetadataHandler handler;
68  
69    // If we acquire a lock, update this field
70    protected final AtomicReference<AcquiredLock> acquiredLock =
71        new AtomicReference<AcquiredLock>(null);
72  
73    /**
74     * Represents information about a lock held by this thread.
75     */
76    protected static class AcquiredLock {
77      private final String path;
78      private final int version;
79  
80      /**
81       * Store information about a lock.
82       * @param path The path to a lock's ZNode
83       * @param version The current version of the lock's ZNode
84       */
85      public AcquiredLock(String path, int version) {
86        this.path = path;
87        this.version = version;
88      }
89  
90      public String getPath() {
91        return path;
92      }
93  
94      public int getVersion() {
95        return version;
96      }
97  
98      @Override
99      public String toString() {
100       return "AcquiredLockInfo{" +
101           "path='" + path + '\'' +
102           ", version=" + version +
103           '}';
104     }
105   }
106 
107   protected static class ZNodeComparator implements Comparator<String> {
108 
109     public static final ZNodeComparator COMPARATOR = new ZNodeComparator();
110 
111     private ZNodeComparator() {
112     }
113 
114     /** Parses sequenceId from the znode name. Zookeeper documentation
115      * states: The sequence number is always fixed length of 10 digits, 0 padded
116      */
117     public static long getChildSequenceId(String childZNode) {
118       Preconditions.checkNotNull(childZNode);
119       assert childZNode.length() >= 10;
120       String sequenceIdStr = childZNode.substring(childZNode.length() - 10);
121       return Long.parseLong(sequenceIdStr);
122     }
123 
124     @Override
125     public int compare(String zNode1, String zNode2) {
126       long seq1 = getChildSequenceId(zNode1);
127       long seq2 = getChildSequenceId(zNode2);
128       if (seq1 == seq2) {
129         return 0;
130       } else {
131         return seq1 < seq2 ? -1 : 1;
132       }
133     }
134   }
135 
136   /**
137    * Called by implementing classes.
138    * @param zkWatcher
139    * @param parentLockNode The lock ZNode path
140    * @param metadata
141    * @param handler
142    * @param childNode The prefix for child nodes created under the parent
143    */
144   protected ZKInterProcessLockBase(ZooKeeperWatcher zkWatcher,
145       String parentLockNode, byte[] metadata, MetadataHandler handler, String childNode) {
146     this.zkWatcher = zkWatcher;
147     this.parentLockNode = parentLockNode;
148     this.fullyQualifiedZNode = ZKUtil.joinZNode(parentLockNode, childNode);
149     this.metadata = metadata;
150     this.handler = handler;
151     this.childZNode = childNode;
152   }
153 
154   /**
155    * {@inheritDoc}
156    */
157   @Override
158   public void acquire() throws IOException, InterruptedException {
159     tryAcquire(-1);
160   }
161 
162   /**
163    * {@inheritDoc}
164    */
165   @Override
166   public boolean tryAcquire(long timeoutMs)
167   throws IOException, InterruptedException {
168     boolean hasTimeout = timeoutMs != -1;
169     long waitUntilMs =
170         hasTimeout ?EnvironmentEdgeManager.currentTime() + timeoutMs : -1;
171     String createdZNode;
172     try {
173       createdZNode = createLockZNode();
174     } catch (KeeperException ex) {
175       throw new IOException("Failed to create znode: " + fullyQualifiedZNode, ex);
176     }
177     while (true) {
178       List<String> children;
179       try {
180         children = ZKUtil.listChildrenNoWatch(zkWatcher, parentLockNode);
181       } catch (KeeperException e) {
182         LOG.error("Unexpected ZooKeeper error when listing children", e);
183         throw new IOException("Unexpected ZooKeeper exception", e);
184       }
185       String pathToWatch;
186       if ((pathToWatch = getLockPath(createdZNode, children)) == null) {
187         break;
188       }
189       CountDownLatch deletedLatch = new CountDownLatch(1);
190       String zkPathToWatch =
191           ZKUtil.joinZNode(parentLockNode, pathToWatch);
192       DeletionListener deletionListener =
193           new DeletionListener(zkWatcher, zkPathToWatch, deletedLatch);
194       zkWatcher.registerListener(deletionListener);
195       try {
196         if (ZKUtil.setWatchIfNodeExists(zkWatcher, zkPathToWatch)) {
197           // Wait for the watcher to fire
198           if (hasTimeout) {
199             long remainingMs = waitUntilMs - EnvironmentEdgeManager.currentTime();
200             if (remainingMs < 0 ||
201                 !deletedLatch.await(remainingMs, TimeUnit.MILLISECONDS)) {
202               LOG.warn("Unable to acquire the lock in " + timeoutMs +
203                   " milliseconds.");
204               try {
205                 ZKUtil.deleteNode(zkWatcher, createdZNode);
206               } catch (KeeperException e) {
207                 LOG.warn("Unable to remove ZNode " + createdZNode);
208               }
209               return false;
210             }
211           } else {
212             deletedLatch.await();
213           }
214           if (deletionListener.hasException()) {
215             Throwable t = deletionListener.getException();
216             throw new IOException("Exception in the watcher", t);
217           }
218         }
219       } catch (KeeperException e) {
220         throw new IOException("Unexpected ZooKeeper exception", e);
221       } finally {
222         zkWatcher.unregisterListener(deletionListener);
223       }
224     }
225     updateAcquiredLock(createdZNode);
226     LOG.debug("Acquired a lock for " + createdZNode);
227     return true;
228   }
229 
230   private String createLockZNode() throws KeeperException {
231     try {
232       return ZKUtil.createNodeIfNotExistsNoWatch(zkWatcher, fullyQualifiedZNode,
233           metadata, CreateMode.EPHEMERAL_SEQUENTIAL);
234     } catch (KeeperException.NoNodeException nne) {
235       //create parents, retry
236       ZKUtil.createWithParents(zkWatcher, parentLockNode);
237       return createLockZNode();
238     }
239   }
240 
241   /**
242    * Check if a child znode represents a read lock.
243    * @param child The child znode we want to check.
244    * @return whether the child znode represents a read lock
245    */
246   protected static boolean isChildReadLock(String child) {
247     int idx = child.lastIndexOf(ZKUtil.ZNODE_PATH_SEPARATOR);
248     String suffix = child.substring(idx + 1);
249     return suffix.startsWith(READ_LOCK_CHILD_NODE_PREFIX);
250   }
251 
252   /**
253    * Check if a child znode represents a write lock.
254    * @param child The child znode we want to check.
255    * @return whether the child znode represents a write lock
256    */
257   protected static boolean isChildWriteLock(String child) {
258     int idx = child.lastIndexOf(ZKUtil.ZNODE_PATH_SEPARATOR);
259     String suffix = child.substring(idx + 1);
260     return suffix.startsWith(WRITE_LOCK_CHILD_NODE_PREFIX);
261   }
262 
263   /**
264    * Check if a child znode represents the same type(read or write) of lock
265    * @param child The child znode we want to check.
266    * @return whether the child znode represents the same type(read or write) of lock
267    */
268   protected boolean isChildOfSameType(String child) {
269     int idx = child.lastIndexOf(ZKUtil.ZNODE_PATH_SEPARATOR);
270     String suffix = child.substring(idx + 1);
271     return suffix.startsWith(this.childZNode);
272   }
273 
274   /**
275    * Update state as to indicate that a lock is held
276    * @param createdZNode The lock znode
277    * @throws IOException If an unrecoverable ZooKeeper error occurs
278    */
279   protected void updateAcquiredLock(String createdZNode) throws IOException {
280     Stat stat = new Stat();
281     byte[] data = null;
282     Exception ex = null;
283     try {
284       data = ZKUtil.getDataNoWatch(zkWatcher, createdZNode, stat);
285     } catch (KeeperException e) {
286       LOG.warn("Cannot getData for znode:" + createdZNode, e);
287       ex = e;
288     }
289     if (data == null) {
290       LOG.error("Can't acquire a lock on a non-existent node " + createdZNode);
291       throw new IllegalStateException("ZNode " + createdZNode +
292           "no longer exists!", ex);
293     }
294     AcquiredLock newLock = new AcquiredLock(createdZNode, stat.getVersion());
295     if (!acquiredLock.compareAndSet(null, newLock)) {
296       LOG.error("The lock " + fullyQualifiedZNode +
297           " has already been acquired by another process!");
298       throw new IllegalStateException(fullyQualifiedZNode +
299           " is held by another process");
300     }
301   }
302 
303   /**
304    * {@inheritDoc}
305    */
306   @Override
307   public void release() throws IOException, InterruptedException {
308     AcquiredLock lock = acquiredLock.get();
309     if (lock == null) {
310       LOG.error("Cannot release lock" +
311           ", process does not have a lock for " + fullyQualifiedZNode);
312       throw new IllegalStateException("No lock held for " + fullyQualifiedZNode);
313     }
314     try {
315       if (ZKUtil.checkExists(zkWatcher, lock.getPath()) != -1) {
316         boolean ret = ZKUtil.deleteNode(zkWatcher, lock.getPath(), lock.getVersion());
317         if (!ret && ZKUtil.checkExists(zkWatcher, lock.getPath()) != -1) {
318           throw new IllegalStateException("Couldn't delete " + lock.getPath());
319         }
320         if (!acquiredLock.compareAndSet(lock, null)) {
321           LOG.debug("Current process no longer holds " + lock + " for " +
322               fullyQualifiedZNode);
323           throw new IllegalStateException("Not holding a lock for " +
324               fullyQualifiedZNode +"!");
325         }
326       }
327       if (LOG.isDebugEnabled()) {
328         LOG.debug("Released " + lock.getPath());
329       }
330     } catch (BadVersionException e) {
331       throw new IllegalStateException(e);
332     } catch (KeeperException e) {
333       throw new IOException(e);
334     }
335   }
336 
337   /**
338    * Process metadata stored in a ZNode using a callback
339    * <p>
340    * @param lockZNode The node holding the metadata
341    * @return True if metadata was ready and processed, false otherwise.
342    */
343   protected boolean handleLockMetadata(String lockZNode) {
344     return handleLockMetadata(lockZNode, handler);
345   }
346 
347   /**
348    * Process metadata stored in a ZNode using a callback object passed to
349    * this instance.
350    * <p>
351    * @param lockZNode The node holding the metadata
352    * @param handler the metadata handler
353    * @return True if metadata was ready and processed, false on exception.
354    */
355   protected boolean handleLockMetadata(String lockZNode, MetadataHandler handler) {
356     if (handler == null) {
357       return false;
358     }
359     try {
360       byte[] metadata = ZKUtil.getData(zkWatcher, lockZNode);
361       handler.handleMetadata(metadata);
362     } catch (KeeperException ex) {
363       LOG.warn("Error processing lock metadata in " + lockZNode);
364       return false;
365     } catch (InterruptedException e) {
366       LOG.warn("InterruptedException processing lock metadata in " + lockZNode);
367       Thread.currentThread().interrupt();
368       return false;
369     }
370     return true;
371   }
372 
373   @Override
374   public void reapAllLocks() throws IOException {
375     reapExpiredLocks(0);
376   }
377 
378   /**
379    * Will delete all lock znodes of this type (either read or write) which are "expired"
380    * according to timeout. Assumption is that the clock skew between zookeeper and this servers
381    * is negligible.
382    * Referred in zk recipe as "Revocable Shared Locks with Freaking Laser Beams".
383    * (http://zookeeper.apache.org/doc/trunk/recipes.html).
384    */
385   public void reapExpiredLocks(long timeout) throws IOException {
386     List<String> children;
387     try {
388       children = ZKUtil.listChildrenNoWatch(zkWatcher, parentLockNode);
389     } catch (KeeperException e) {
390       LOG.error("Unexpected ZooKeeper error when listing children", e);
391       throw new IOException("Unexpected ZooKeeper exception", e);
392     }
393     if (children == null) return;
394 
395     KeeperException deferred = null;
396     Stat stat = new Stat();
397     long expireDate = System.currentTimeMillis() - timeout; //we are using cTime in zookeeper
398     for (String child : children) {
399       if (isChildOfSameType(child)) {
400         String znode = ZKUtil.joinZNode(parentLockNode, child);
401         try {
402           ZKUtil.getDataNoWatch(zkWatcher, znode, stat);
403           if (stat.getCtime() < expireDate) {
404             LOG.info("Reaping lock for znode:" + znode);
405             ZKUtil.deleteNodeFailSilent(zkWatcher, znode);
406           }
407         } catch (KeeperException ex) {
408           LOG.warn("Error reaping the znode for write lock :" + znode);
409           deferred = ex;
410         }
411       }
412     }
413     if (deferred != null) {
414       throw new IOException("ZK exception while reaping locks:", deferred);
415     }
416   }
417 
418   /**
419    * Visits the locks (both held and attempted) with the given MetadataHandler.
420    * @throws IOException If there is an unrecoverable error
421    */
422   public void visitLocks(MetadataHandler handler) throws IOException {
423     List<String> children;
424     try {
425       children = ZKUtil.listChildrenNoWatch(zkWatcher, parentLockNode);
426     } catch (KeeperException e) {
427       LOG.error("Unexpected ZooKeeper error when listing children", e);
428       throw new IOException("Unexpected ZooKeeper exception", e);
429     }
430     if (children != null && children.size() > 0) {
431       for (String child : children) {
432         if (isChildOfSameType(child)) {
433           String znode = ZKUtil.joinZNode(parentLockNode, child);
434           String childWatchesZNode = getLockPath(child, children);
435           if (childWatchesZNode == null) {
436             LOG.info("Lock is held by: " + child);
437           }
438           handleLockMetadata(znode, handler);
439         }
440       }
441     }
442   }
443 
444   /**
445    * Determine based on a list of children under a ZNode, whether or not a
446    * process which created a specified ZNode has obtained a lock. If a lock is
447    * not obtained, return the path that we should watch awaiting its deletion.
448    * Otherwise, return null.
449    * This method is abstract as the logic for determining whether or not a
450    * lock is obtained depends on the type of lock being implemented.
451    * @param myZNode The ZNode created by the process attempting to acquire
452    *                a lock
453    * @param children List of all child ZNodes under the lock's parent ZNode
454    * @return The path to watch, or null if myZNode can represent a correctly
455    *         acquired lock.
456    */
457   protected abstract String getLockPath(String myZNode, List<String> children)
458   throws IOException;
459 }