View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.zookeeper;
20  
21  import java.io.IOException;
22  import java.lang.management.ManagementFactory;
23  import java.security.SecureRandom;
24  import java.util.ArrayList;
25  import java.util.LinkedList;
26  import java.util.List;
27  import java.util.Random;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.classification.InterfaceAudience;
32  import org.apache.hadoop.hbase.util.Bytes;
33  import org.apache.hadoop.hbase.util.RetryCounter;
34  import org.apache.hadoop.hbase.util.RetryCounterFactory;
35  import org.apache.zookeeper.AsyncCallback;
36  import org.apache.zookeeper.CreateMode;
37  import org.apache.zookeeper.KeeperException;
38  import org.apache.zookeeper.Op;
39  import org.apache.zookeeper.OpResult;
40  import org.apache.zookeeper.Watcher;
41  import org.apache.zookeeper.ZooDefs;
42  import org.apache.zookeeper.ZooKeeper;
43  import org.apache.zookeeper.ZooKeeper.States;
44  import org.apache.zookeeper.data.ACL;
45  import org.apache.zookeeper.data.Stat;
46  import org.apache.zookeeper.proto.CreateRequest;
47  import org.apache.zookeeper.proto.SetDataRequest;
48  import org.htrace.Trace;
49  import org.htrace.TraceScope;
50  
51  /**
52   * A zookeeper that can handle 'recoverable' errors.
53   * To handle recoverable errors, developers need to realize that there are two
54   * classes of requests: idempotent and non-idempotent requests. Read requests
55   * and unconditional sets and deletes are examples of idempotent requests, they
56   * can be reissued with the same results.
57   * (Although, the delete may throw a NoNodeException on reissue its effect on
58   * the ZooKeeper state is the same.) Non-idempotent requests need special
59   * handling, application and library writers need to keep in mind that they may
60   * need to encode information in the data or name of znodes to detect
61   * retries. A simple example is a create that uses a sequence flag.
62   * If a process issues a create("/x-", ..., SEQUENCE) and gets a connection
63   * loss exception, that process will reissue another
64   * create("/x-", ..., SEQUENCE) and get back x-111. When the process does a
65   * getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be
66   * that x-109 was the result of the previous create, so the process actually
67   * owns both x-109 and x-111. An easy way around this is to use "x-process id-"
68   * when doing the create. If the process is using an id of 352, before reissuing
69   * the create it will do a getChildren("/") and see "x-222-1", "x-542-30",
70   * "x-352-109", x-333-110". The process will know that the original create
71   * succeeded an the znode it created is "x-352-109".
72   * @see "http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling"
73   */
74  @InterfaceAudience.Private
75  public class RecoverableZooKeeper {
76    private static final Log LOG = LogFactory.getLog(RecoverableZooKeeper.class);
77    // the actual ZooKeeper client instance
78    private ZooKeeper zk;
79    private final RetryCounterFactory retryCounterFactory;
80    // An identifier of this process in the cluster
81    private final String identifier;
82    private final byte[] id;
83    private Watcher watcher;
84    private int sessionTimeout;
85    private String quorumServers;
86    private final Random salter;
87  
88    // The metadata attached to each piece of data has the
89    // format:
90    //   <magic> 1-byte constant
91    //   <id length> 4-byte big-endian integer (length of next field)
92    //   <id> identifier corresponding uniquely to this process
93    // It is prepended to the data supplied by the user.
94  
95    // the magic number is to be backward compatible
96    private static final byte MAGIC =(byte) 0XFF;
97    private static final int MAGIC_SIZE = Bytes.SIZEOF_BYTE;
98    private static final int ID_LENGTH_OFFSET = MAGIC_SIZE;
99    private static final int ID_LENGTH_SIZE =  Bytes.SIZEOF_INT;
100 
101   public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
102       Watcher watcher, int maxRetries, int retryIntervalMillis)
103   throws IOException {
104     this(quorumServers, sessionTimeout, watcher, maxRetries, retryIntervalMillis,
105         null);
106   }
107 
108   public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
109       Watcher watcher, int maxRetries, int retryIntervalMillis, String identifier)
110   throws IOException {
111     // TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should.
112     this.retryCounterFactory =
113       new RetryCounterFactory(maxRetries+1, retryIntervalMillis);
114 
115     if (identifier == null || identifier.length() == 0) {
116       // the identifier = processID@hostName
117       identifier = ManagementFactory.getRuntimeMXBean().getName();
118     }
119     LOG.info("Process identifier=" + identifier +
120       " connecting to ZooKeeper ensemble=" + quorumServers);
121     this.identifier = identifier;
122     this.id = Bytes.toBytes(identifier);
123 
124     this.watcher = watcher;
125     this.sessionTimeout = sessionTimeout;
126     this.quorumServers = quorumServers;
127     try {checkZk();} catch (Exception x) {/* ignore */}
128     salter = new SecureRandom();
129   }
130 
131   /**
132    * Try to create a Zookeeper connection. Turns any exception encountered into a
133    * KeeperException.OperationTimeoutException so it can retried.
134    * @return The created Zookeeper connection object
135    * @throws KeeperException
136    */
137   protected synchronized ZooKeeper checkZk() throws KeeperException {
138     if (this.zk == null) {
139       try {
140         this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher);
141       } catch (IOException ex) {
142         LOG.warn("Unable to create ZooKeeper Connection", ex);
143         throw new KeeperException.OperationTimeoutException();
144       }
145     }
146     return zk;
147   }
148 
149   public synchronized void reconnectAfterExpiration()
150         throws IOException, KeeperException, InterruptedException {
151     if (zk != null) {
152       LOG.info("Closing dead ZooKeeper connection, session" +
153         " was: 0x"+Long.toHexString(zk.getSessionId()));
154       zk.close();
155       // reset the Zookeeper connection
156       zk = null;
157     }
158     checkZk();
159     LOG.info("Recreated a ZooKeeper, session" +
160       " is: 0x"+Long.toHexString(zk.getSessionId()));
161   }
162 
163   /**
164    * delete is an idempotent operation. Retry before throwing exception.
165    * This function will not throw NoNodeException if the path does not
166    * exist.
167    */
168   public void delete(String path, int version)
169   throws InterruptedException, KeeperException {
170     TraceScope traceScope = null;
171     try {
172       traceScope = Trace.startSpan("RecoverableZookeeper.delete");
173       RetryCounter retryCounter = retryCounterFactory.create();
174       boolean isRetry = false; // False for first attempt, true for all retries.
175       while (true) {
176         try {
177           checkZk().delete(path, version);
178           return;
179         } catch (KeeperException e) {
180           switch (e.code()) {
181             case NONODE:
182               if (isRetry) {
183                 LOG.info("Node " + path + " already deleted. Assuming a " +
184                     "previous attempt succeeded.");
185                 return;
186               }
187               LOG.info("Node " + path + " already deleted, retry=" + isRetry);
188               throw e;
189 
190             case CONNECTIONLOSS:
191             case SESSIONEXPIRED:
192             case OPERATIONTIMEOUT:
193               retryOrThrow(retryCounter, e, "delete");
194               break;
195 
196             default:
197               throw e;
198           }
199         }
200         retryCounter.sleepUntilNextRetry();
201         isRetry = true;
202       }
203     } finally {
204       if (traceScope != null) traceScope.close();
205     }
206   }
207 
208   /**
209    * exists is an idempotent operation. Retry before throwing exception
210    * @return A Stat instance
211    */
212   public Stat exists(String path, Watcher watcher)
213   throws KeeperException, InterruptedException {
214     TraceScope traceScope = null;
215     try {
216       traceScope = Trace.startSpan("RecoverableZookeeper.exists");
217       RetryCounter retryCounter = retryCounterFactory.create();
218       while (true) {
219         try {
220           return checkZk().exists(path, watcher);
221         } catch (KeeperException e) {
222           switch (e.code()) {
223             case CONNECTIONLOSS:
224             case SESSIONEXPIRED:
225             case OPERATIONTIMEOUT:
226               retryOrThrow(retryCounter, e, "exists");
227               break;
228 
229             default:
230               throw e;
231           }
232         }
233         retryCounter.sleepUntilNextRetry();
234       }
235     } finally {
236       if (traceScope != null) traceScope.close();
237     }
238   }
239 
240   /**
241    * exists is an idempotent operation. Retry before throwing exception
242    * @return A Stat instance
243    */
244   public Stat exists(String path, boolean watch)
245   throws KeeperException, InterruptedException {
246     TraceScope traceScope = null;
247     try {
248       traceScope = Trace.startSpan("RecoverableZookeeper.exists");
249       RetryCounter retryCounter = retryCounterFactory.create();
250       while (true) {
251         try {
252           return checkZk().exists(path, watch);
253         } catch (KeeperException e) {
254           switch (e.code()) {
255             case CONNECTIONLOSS:
256             case SESSIONEXPIRED:
257             case OPERATIONTIMEOUT:
258               retryOrThrow(retryCounter, e, "exists");
259               break;
260 
261             default:
262               throw e;
263           }
264         }
265         retryCounter.sleepUntilNextRetry();
266       }
267     } finally {
268       if (traceScope != null) traceScope.close();
269     }
270   }
271 
272   private void retryOrThrow(RetryCounter retryCounter, KeeperException e,
273       String opName) throws KeeperException {
274     LOG.warn("Possibly transient ZooKeeper, quorum=" + quorumServers + ", exception=" + e);
275     if (!retryCounter.shouldRetry()) {
276       LOG.error("ZooKeeper " + opName + " failed after "
277         + retryCounter.getMaxAttempts() + " attempts");
278       throw e;
279     }
280   }
281 
282   /**
283    * getChildren is an idempotent operation. Retry before throwing exception
284    * @return List of children znodes
285    */
286   public List<String> getChildren(String path, Watcher watcher)
287     throws KeeperException, InterruptedException {
288     TraceScope traceScope = null;
289     try {
290       traceScope = Trace.startSpan("RecoverableZookeeper.getChildren");
291       RetryCounter retryCounter = retryCounterFactory.create();
292       while (true) {
293         try {
294           return checkZk().getChildren(path, watcher);
295         } catch (KeeperException e) {
296           switch (e.code()) {
297             case CONNECTIONLOSS:
298             case SESSIONEXPIRED:
299             case OPERATIONTIMEOUT:
300               retryOrThrow(retryCounter, e, "getChildren");
301               break;
302 
303             default:
304               throw e;
305           }
306         }
307         retryCounter.sleepUntilNextRetry();
308       }
309     } finally {
310       if (traceScope != null) traceScope.close();
311     }
312   }
313 
314   /**
315    * getChildren is an idempotent operation. Retry before throwing exception
316    * @return List of children znodes
317    */
318   public List<String> getChildren(String path, boolean watch)
319   throws KeeperException, InterruptedException {
320     TraceScope traceScope = null;
321     try {
322       traceScope = Trace.startSpan("RecoverableZookeeper.getChildren");
323       RetryCounter retryCounter = retryCounterFactory.create();
324       while (true) {
325         try {
326           return checkZk().getChildren(path, watch);
327         } catch (KeeperException e) {
328           switch (e.code()) {
329             case CONNECTIONLOSS:
330             case SESSIONEXPIRED:
331             case OPERATIONTIMEOUT:
332               retryOrThrow(retryCounter, e, "getChildren");
333               break;
334 
335             default:
336               throw e;
337           }
338         }
339         retryCounter.sleepUntilNextRetry();
340       }
341     } finally {
342       if (traceScope != null) traceScope.close();
343     }
344   }
345 
346   /**
347    * getData is an idempotent operation. Retry before throwing exception
348    * @return Data
349    */
350   public byte[] getData(String path, Watcher watcher, Stat stat)
351   throws KeeperException, InterruptedException {
352     TraceScope traceScope = null;
353     try {
354       traceScope = Trace.startSpan("RecoverableZookeeper.getData");
355       RetryCounter retryCounter = retryCounterFactory.create();
356       while (true) {
357         try {
358           byte[] revData = checkZk().getData(path, watcher, stat);
359           return this.removeMetaData(revData);
360         } catch (KeeperException e) {
361           switch (e.code()) {
362             case CONNECTIONLOSS:
363             case SESSIONEXPIRED:
364             case OPERATIONTIMEOUT:
365               retryOrThrow(retryCounter, e, "getData");
366               break;
367 
368             default:
369               throw e;
370           }
371         }
372         retryCounter.sleepUntilNextRetry();
373       }
374     } finally {
375       if (traceScope != null) traceScope.close();
376     }
377   }
378 
379   /**
380    * getData is an idemnpotent operation. Retry before throwing exception
381    * @return Data
382    */
383   public byte[] getData(String path, boolean watch, Stat stat)
384   throws KeeperException, InterruptedException {
385     TraceScope traceScope = null;
386     try {
387       traceScope = Trace.startSpan("RecoverableZookeeper.getData");
388       RetryCounter retryCounter = retryCounterFactory.create();
389       while (true) {
390         try {
391           byte[] revData = checkZk().getData(path, watch, stat);
392           return this.removeMetaData(revData);
393         } catch (KeeperException e) {
394           switch (e.code()) {
395             case CONNECTIONLOSS:
396             case SESSIONEXPIRED:
397             case OPERATIONTIMEOUT:
398               retryOrThrow(retryCounter, e, "getData");
399               break;
400 
401             default:
402               throw e;
403           }
404         }
405         retryCounter.sleepUntilNextRetry();
406       }
407     } finally {
408       if (traceScope != null) traceScope.close();
409     }
410   }
411 
412   /**
413    * setData is NOT an idempotent operation. Retry may cause BadVersion Exception
414    * Adding an identifier field into the data to check whether
415    * badversion is caused by the result of previous correctly setData
416    * @return Stat instance
417    */
418   public Stat setData(String path, byte[] data, int version)
419   throws KeeperException, InterruptedException {
420     TraceScope traceScope = null;
421     try {
422       traceScope = Trace.startSpan("RecoverableZookeeper.setData");
423       RetryCounter retryCounter = retryCounterFactory.create();
424       byte[] newData = appendMetaData(data);
425       boolean isRetry = false;
426       while (true) {
427         try {
428           return checkZk().setData(path, newData, version);
429         } catch (KeeperException e) {
430           switch (e.code()) {
431             case CONNECTIONLOSS:
432             case SESSIONEXPIRED:
433             case OPERATIONTIMEOUT:
434               retryOrThrow(retryCounter, e, "setData");
435               break;
436             case BADVERSION:
437               if (isRetry) {
438                 // try to verify whether the previous setData success or not
439                 try{
440                   Stat stat = new Stat();
441                   byte[] revData = checkZk().getData(path, false, stat);
442                   if(Bytes.compareTo(revData, newData) == 0) {
443                     // the bad version is caused by previous successful setData
444                     return stat;
445                   }
446                 } catch(KeeperException keeperException){
447                   // the ZK is not reliable at this moment. just throwing exception
448                   throw keeperException;
449                 }
450               }
451             // throw other exceptions and verified bad version exceptions
452             default:
453               throw e;
454           }
455         }
456         retryCounter.sleepUntilNextRetry();
457         isRetry = true;
458       }
459     } finally {
460       if (traceScope != null) traceScope.close();
461     }
462   }
463 
464   /**
465    * <p>
466    * NONSEQUENTIAL create is idempotent operation.
467    * Retry before throwing exceptions.
468    * But this function will not throw the NodeExist exception back to the
469    * application.
470    * </p>
471    * <p>
472    * But SEQUENTIAL is NOT idempotent operation. It is necessary to add
473    * identifier to the path to verify, whether the previous one is successful
474    * or not.
475    * </p>
476    *
477    * @return Path
478    */
479   public String create(String path, byte[] data, List<ACL> acl,
480       CreateMode createMode)
481   throws KeeperException, InterruptedException {
482     TraceScope traceScope = null;
483     try {
484       traceScope = Trace.startSpan("RecoverableZookeeper.create");
485       byte[] newData = appendMetaData(data);
486       switch (createMode) {
487         case EPHEMERAL:
488         case PERSISTENT:
489           return createNonSequential(path, newData, acl, createMode);
490 
491         case EPHEMERAL_SEQUENTIAL:
492         case PERSISTENT_SEQUENTIAL:
493           return createSequential(path, newData, acl, createMode);
494 
495         default:
496           throw new IllegalArgumentException("Unrecognized CreateMode: " +
497               createMode);
498       }
499     } finally {
500       if (traceScope != null) traceScope.close();
501     }
502   }
503 
504   private String createNonSequential(String path, byte[] data, List<ACL> acl,
505       CreateMode createMode) throws KeeperException, InterruptedException {
506     RetryCounter retryCounter = retryCounterFactory.create();
507     boolean isRetry = false; // False for first attempt, true for all retries.
508     while (true) {
509       try {
510         return checkZk().create(path, data, acl, createMode);
511       } catch (KeeperException e) {
512         switch (e.code()) {
513           case NODEEXISTS:
514             if (isRetry) {
515               // If the connection was lost, there is still a possibility that
516               // we have successfully created the node at our previous attempt,
517               // so we read the node and compare.
518               byte[] currentData = checkZk().getData(path, false, null);
519               if (currentData != null &&
520                   Bytes.compareTo(currentData, data) == 0) {
521                 // We successfully created a non-sequential node
522                 return path;
523               }
524               LOG.error("Node " + path + " already exists with " +
525                   Bytes.toStringBinary(currentData) + ", could not write " +
526                   Bytes.toStringBinary(data));
527               throw e;
528             }
529             LOG.debug("Node " + path + " already exists");
530             throw e;
531 
532           case CONNECTIONLOSS:
533           case SESSIONEXPIRED:
534           case OPERATIONTIMEOUT:
535             retryOrThrow(retryCounter, e, "create");
536             break;
537 
538           default:
539             throw e;
540         }
541       }
542       retryCounter.sleepUntilNextRetry();
543       isRetry = true;
544     }
545   }
546 
547   private String createSequential(String path, byte[] data,
548       List<ACL> acl, CreateMode createMode)
549   throws KeeperException, InterruptedException {
550     RetryCounter retryCounter = retryCounterFactory.create();
551     boolean first = true;
552     String newPath = path+this.identifier;
553     while (true) {
554       try {
555         if (!first) {
556           // Check if we succeeded on a previous attempt
557           String previousResult = findPreviousSequentialNode(newPath);
558           if (previousResult != null) {
559             return previousResult;
560           }
561         }
562         first = false;
563         return checkZk().create(newPath, data, acl, createMode);
564       } catch (KeeperException e) {
565         switch (e.code()) {
566           case CONNECTIONLOSS:
567           case SESSIONEXPIRED:
568           case OPERATIONTIMEOUT:
569             retryOrThrow(retryCounter, e, "create");
570             break;
571 
572           default:
573             throw e;
574         }
575       }
576       retryCounter.sleepUntilNextRetry();
577     }
578   }
579   /**
580    * Convert Iterable of {@link ZKOp} we got into the ZooKeeper.Op
581    * instances to actually pass to multi (need to do this in order to appendMetaData).
582    */
583   private Iterable<Op> prepareZKMulti(Iterable<Op> ops)
584   throws UnsupportedOperationException {
585     if(ops == null) return null;
586 
587     List<Op> preparedOps = new LinkedList<Op>();
588     for (Op op : ops) {
589       if (op.getType() == ZooDefs.OpCode.create) {
590         CreateRequest create = (CreateRequest)op.toRequestRecord();
591         preparedOps.add(Op.create(create.getPath(), appendMetaData(create.getData()),
592           create.getAcl(), create.getFlags()));
593       } else if (op.getType() == ZooDefs.OpCode.delete) {
594         // no need to appendMetaData for delete
595         preparedOps.add(op);
596       } else if (op.getType() == ZooDefs.OpCode.setData) {
597         SetDataRequest setData = (SetDataRequest)op.toRequestRecord();
598         preparedOps.add(Op.setData(setData.getPath(), appendMetaData(setData.getData()),
599           setData.getVersion()));
600       } else {
601         throw new UnsupportedOperationException("Unexpected ZKOp type: " + op.getClass().getName());
602       }
603     }
604     return preparedOps;
605   }
606 
607   /**
608    * Run multiple operations in a transactional manner. Retry before throwing exception
609    */
610   public List<OpResult> multi(Iterable<Op> ops)
611   throws KeeperException, InterruptedException {
612     TraceScope traceScope = null;
613     try {
614       traceScope = Trace.startSpan("RecoverableZookeeper.multi");
615       RetryCounter retryCounter = retryCounterFactory.create();
616       Iterable<Op> multiOps = prepareZKMulti(ops);
617       while (true) {
618         try {
619           return checkZk().multi(multiOps);
620         } catch (KeeperException e) {
621           switch (e.code()) {
622             case CONNECTIONLOSS:
623             case SESSIONEXPIRED:
624             case OPERATIONTIMEOUT:
625               retryOrThrow(retryCounter, e, "multi");
626               break;
627 
628             default:
629               throw e;
630           }
631         }
632         retryCounter.sleepUntilNextRetry();
633     }
634     } finally {
635       if (traceScope != null) traceScope.close();
636     }
637   }
638 
639   private String findPreviousSequentialNode(String path)
640     throws KeeperException, InterruptedException {
641     int lastSlashIdx = path.lastIndexOf('/');
642     assert(lastSlashIdx != -1);
643     String parent = path.substring(0, lastSlashIdx);
644     String nodePrefix = path.substring(lastSlashIdx+1);
645 
646     List<String> nodes = checkZk().getChildren(parent, false);
647     List<String> matching = filterByPrefix(nodes, nodePrefix);
648     for (String node : matching) {
649       String nodePath = parent + "/" + node;
650       Stat stat = checkZk().exists(nodePath, false);
651       if (stat != null) {
652         return nodePath;
653       }
654     }
655     return null;
656   }
657 
658   public byte[] removeMetaData(byte[] data) {
659     if(data == null || data.length == 0) {
660       return data;
661     }
662     // check the magic data; to be backward compatible
663     byte magic = data[0];
664     if(magic != MAGIC) {
665       return data;
666     }
667 
668     int idLength = Bytes.toInt(data, ID_LENGTH_OFFSET);
669     int dataLength = data.length-MAGIC_SIZE-ID_LENGTH_SIZE-idLength;
670     int dataOffset = MAGIC_SIZE+ID_LENGTH_SIZE+idLength;
671 
672     byte[] newData = new byte[dataLength];
673     System.arraycopy(data, dataOffset, newData, 0, dataLength);
674     return newData;
675   }
676 
677   private byte[] appendMetaData(byte[] data) {
678     if(data == null || data.length == 0){
679       return data;
680     }
681     byte[] salt = Bytes.toBytes(salter.nextLong());
682     int idLength = id.length + salt.length;
683     byte[] newData = new byte[MAGIC_SIZE+ID_LENGTH_SIZE+idLength+data.length];
684     int pos = 0;
685     pos = Bytes.putByte(newData, pos, MAGIC);
686     pos = Bytes.putInt(newData, pos, idLength);
687     pos = Bytes.putBytes(newData, pos, id, 0, id.length);
688     pos = Bytes.putBytes(newData, pos, salt, 0, salt.length);
689     pos = Bytes.putBytes(newData, pos, data, 0, data.length);
690     return newData;
691   }
692 
693   public long getSessionId() {
694     return zk == null ? null : zk.getSessionId();
695   }
696 
697   public void close() throws InterruptedException {
698     if (zk != null) zk.close();
699   }
700 
701   public States getState() {
702     return zk == null ? null : zk.getState();
703   }
704 
705   public ZooKeeper getZooKeeper() {
706     return zk;
707   }
708 
709   public byte[] getSessionPasswd() {
710     return zk == null ? null : zk.getSessionPasswd();
711   }
712 
713   public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) throws KeeperException {
714     checkZk().sync(path, null, null);
715   }
716 
717   /**
718    * Filters the given node list by the given prefixes.
719    * This method is all-inclusive--if any element in the node list starts
720    * with any of the given prefixes, then it is included in the result.
721    *
722    * @param nodes the nodes to filter
723    * @param prefixes the prefixes to include in the result
724    * @return list of every element that starts with one of the prefixes
725    */
726   private static List<String> filterByPrefix(List<String> nodes,
727       String... prefixes) {
728     List<String> lockChildren = new ArrayList<String>();
729     for (String child : nodes){
730       for (String prefix : prefixes){
731         if (child.startsWith(prefix)){
732           lockChildren.add(child);
733           break;
734         }
735       }
736     }
737     return lockChildren;
738   }
739 
740   public String getIdentifier() {
741     return identifier;
742   }
743 }