View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.zookeeper;
20  
21  import java.io.IOException;
22  import java.lang.management.ManagementFactory;
23  import java.security.SecureRandom;
24  import java.util.ArrayList;
25  import java.util.LinkedList;
26  import java.util.List;
27  import java.util.Random;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.hbase.util.Bytes;
33  import org.apache.hadoop.hbase.util.RetryCounter;
34  import org.apache.hadoop.hbase.util.RetryCounterFactory;
35  import org.apache.zookeeper.AsyncCallback;
36  import org.apache.zookeeper.CreateMode;
37  import org.apache.zookeeper.KeeperException;
38  import org.apache.zookeeper.Op;
39  import org.apache.zookeeper.OpResult;
40  import org.apache.zookeeper.Watcher;
41  import org.apache.zookeeper.ZooDefs;
42  import org.apache.zookeeper.ZooKeeper;
43  import org.apache.zookeeper.ZooKeeper.States;
44  import org.apache.zookeeper.data.ACL;
45  import org.apache.zookeeper.data.Stat;
46  import org.apache.zookeeper.proto.CreateRequest;
47  import org.apache.zookeeper.proto.SetDataRequest;
48  import org.htrace.Trace;
49  import org.htrace.TraceScope;
50  
51  /**
52   * A zookeeper that can handle 'recoverable' errors.
53   * To handle recoverable errors, developers need to realize that there are two
54   * classes of requests: idempotent and non-idempotent requests. Read requests
55   * and unconditional sets and deletes are examples of idempotent requests, they
56   * can be reissued with the same results.
57   * (Although, the delete may throw a NoNodeException on reissue its effect on
58   * the ZooKeeper state is the same.) Non-idempotent requests need special
59   * handling, application and library writers need to keep in mind that they may
60   * need to encode information in the data or name of znodes to detect
61   * retries. A simple example is a create that uses a sequence flag.
62   * If a process issues a create("/x-", ..., SEQUENCE) and gets a connection
63   * loss exception, that process will reissue another
64   * create("/x-", ..., SEQUENCE) and get back x-111. When the process does a
65   * getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be
66   * that x-109 was the result of the previous create, so the process actually
67   * owns both x-109 and x-111. An easy way around this is to use "x-process id-"
68   * when doing the create. If the process is using an id of 352, before reissuing
69   * the create it will do a getChildren("/") and see "x-222-1", "x-542-30",
70   * "x-352-109", x-333-110". The process will know that the original create
71   * succeeded an the znode it created is "x-352-109".
72   * @see "http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling"
73   */
74  @InterfaceAudience.Private
75  public class RecoverableZooKeeper {
76    private static final Log LOG = LogFactory.getLog(RecoverableZooKeeper.class);
77    // the actual ZooKeeper client instance
78    private ZooKeeper zk;
79    private final RetryCounterFactory retryCounterFactory;
80    // An identifier of this process in the cluster
81    private final String identifier;
82    private final byte[] id;
83    private Watcher watcher;
84    private int sessionTimeout;
85    private String quorumServers;
86    private final Random salter;
87  
88    // The metadata attached to each piece of data has the
89    // format:
90    //   <magic> 1-byte constant
91    //   <id length> 4-byte big-endian integer (length of next field)
92    //   <id> identifier corresponding uniquely to this process
93    // It is prepended to the data supplied by the user.
94  
95    // the magic number is to be backward compatible
96    private static final byte MAGIC =(byte) 0XFF;
97    private static final int MAGIC_SIZE = Bytes.SIZEOF_BYTE;
98    private static final int ID_LENGTH_OFFSET = MAGIC_SIZE;
99    private static final int ID_LENGTH_SIZE =  Bytes.SIZEOF_INT;
100 
101   public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
102       Watcher watcher, int maxRetries, int retryIntervalMillis)
103   throws IOException {
104     this(quorumServers, sessionTimeout, watcher, maxRetries, retryIntervalMillis,
105         null);
106   }
107 
108   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DE_MIGHT_IGNORE",
109       justification="None. Its always been this way.")
110   public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
111       Watcher watcher, int maxRetries, int retryIntervalMillis, String identifier)
112   throws IOException {
113     // TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should.
114     this.retryCounterFactory =
115       new RetryCounterFactory(maxRetries+1, retryIntervalMillis);
116 
117     if (identifier == null || identifier.length() == 0) {
118       // the identifier = processID@hostName
119       identifier = ManagementFactory.getRuntimeMXBean().getName();
120     }
121     LOG.info("Process identifier=" + identifier +
122       " connecting to ZooKeeper ensemble=" + quorumServers);
123     this.identifier = identifier;
124     this.id = Bytes.toBytes(identifier);
125 
126     this.watcher = watcher;
127     this.sessionTimeout = sessionTimeout;
128     this.quorumServers = quorumServers;
129     try {checkZk();} catch (Exception x) {/* ignore */}
130     salter = new SecureRandom();
131   }
132 
133   /**
134    * Try to create a Zookeeper connection. Turns any exception encountered into a
135    * KeeperException.OperationTimeoutException so it can retried.
136    * @return The created Zookeeper connection object
137    * @throws KeeperException
138    */
139   protected synchronized ZooKeeper checkZk() throws KeeperException {
140     if (this.zk == null) {
141       try {
142         this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher);
143       } catch (IOException ex) {
144         LOG.warn("Unable to create ZooKeeper Connection", ex);
145         throw new KeeperException.OperationTimeoutException();
146       }
147     }
148     return zk;
149   }
150 
151   public synchronized void reconnectAfterExpiration()
152         throws IOException, KeeperException, InterruptedException {
153     if (zk != null) {
154       LOG.info("Closing dead ZooKeeper connection, session" +
155         " was: 0x"+Long.toHexString(zk.getSessionId()));
156       zk.close();
157       // reset the Zookeeper connection
158       zk = null;
159     }
160     checkZk();
161     LOG.info("Recreated a ZooKeeper, session" +
162       " is: 0x"+Long.toHexString(zk.getSessionId()));
163   }
164 
165   /**
166    * delete is an idempotent operation. Retry before throwing exception.
167    * This function will not throw NoNodeException if the path does not
168    * exist.
169    */
170   public void delete(String path, int version)
171   throws InterruptedException, KeeperException {
172     TraceScope traceScope = null;
173     try {
174       traceScope = Trace.startSpan("RecoverableZookeeper.delete");
175       RetryCounter retryCounter = retryCounterFactory.create();
176       boolean isRetry = false; // False for first attempt, true for all retries.
177       while (true) {
178         try {
179           checkZk().delete(path, version);
180           return;
181         } catch (KeeperException e) {
182           switch (e.code()) {
183             case NONODE:
184               if (isRetry) {
185                 LOG.info("Node " + path + " already deleted. Assuming a " +
186                     "previous attempt succeeded.");
187                 return;
188               }
189               LOG.info("Node " + path + " already deleted, retry=" + isRetry);
190               throw e;
191 
192             case CONNECTIONLOSS:
193             case SESSIONEXPIRED:
194             case OPERATIONTIMEOUT:
195               retryOrThrow(retryCounter, e, "delete");
196               break;
197 
198             default:
199               throw e;
200           }
201         }
202         retryCounter.sleepUntilNextRetry();
203         isRetry = true;
204       }
205     } finally {
206       if (traceScope != null) traceScope.close();
207     }
208   }
209 
210   /**
211    * exists is an idempotent operation. Retry before throwing exception
212    * @return A Stat instance
213    */
214   public Stat exists(String path, Watcher watcher)
215   throws KeeperException, InterruptedException {
216     TraceScope traceScope = null;
217     try {
218       traceScope = Trace.startSpan("RecoverableZookeeper.exists");
219       RetryCounter retryCounter = retryCounterFactory.create();
220       while (true) {
221         try {
222           return checkZk().exists(path, watcher);
223         } catch (KeeperException e) {
224           switch (e.code()) {
225             case CONNECTIONLOSS:
226             case SESSIONEXPIRED:
227             case OPERATIONTIMEOUT:
228               retryOrThrow(retryCounter, e, "exists");
229               break;
230 
231             default:
232               throw e;
233           }
234         }
235         retryCounter.sleepUntilNextRetry();
236       }
237     } finally {
238       if (traceScope != null) traceScope.close();
239     }
240   }
241 
242   /**
243    * exists is an idempotent operation. Retry before throwing exception
244    * @return A Stat instance
245    */
246   public Stat exists(String path, boolean watch)
247   throws KeeperException, InterruptedException {
248     TraceScope traceScope = null;
249     try {
250       traceScope = Trace.startSpan("RecoverableZookeeper.exists");
251       RetryCounter retryCounter = retryCounterFactory.create();
252       while (true) {
253         try {
254           return checkZk().exists(path, watch);
255         } catch (KeeperException e) {
256           switch (e.code()) {
257             case CONNECTIONLOSS:
258             case SESSIONEXPIRED:
259             case OPERATIONTIMEOUT:
260               retryOrThrow(retryCounter, e, "exists");
261               break;
262 
263             default:
264               throw e;
265           }
266         }
267         retryCounter.sleepUntilNextRetry();
268       }
269     } finally {
270       if (traceScope != null) traceScope.close();
271     }
272   }
273 
274   private void retryOrThrow(RetryCounter retryCounter, KeeperException e,
275       String opName) throws KeeperException {
276     LOG.warn("Possibly transient ZooKeeper, quorum=" + quorumServers + ", exception=" + e);
277     if (!retryCounter.shouldRetry()) {
278       LOG.error("ZooKeeper " + opName + " failed after "
279         + retryCounter.getMaxAttempts() + " attempts");
280       throw e;
281     }
282   }
283 
284   /**
285    * getChildren is an idempotent operation. Retry before throwing exception
286    * @return List of children znodes
287    */
288   public List<String> getChildren(String path, Watcher watcher)
289     throws KeeperException, InterruptedException {
290     TraceScope traceScope = null;
291     try {
292       traceScope = Trace.startSpan("RecoverableZookeeper.getChildren");
293       RetryCounter retryCounter = retryCounterFactory.create();
294       while (true) {
295         try {
296           return checkZk().getChildren(path, watcher);
297         } catch (KeeperException e) {
298           switch (e.code()) {
299             case CONNECTIONLOSS:
300             case SESSIONEXPIRED:
301             case OPERATIONTIMEOUT:
302               retryOrThrow(retryCounter, e, "getChildren");
303               break;
304 
305             default:
306               throw e;
307           }
308         }
309         retryCounter.sleepUntilNextRetry();
310       }
311     } finally {
312       if (traceScope != null) traceScope.close();
313     }
314   }
315 
316   /**
317    * getChildren is an idempotent operation. Retry before throwing exception
318    * @return List of children znodes
319    */
320   public List<String> getChildren(String path, boolean watch)
321   throws KeeperException, InterruptedException {
322     TraceScope traceScope = null;
323     try {
324       traceScope = Trace.startSpan("RecoverableZookeeper.getChildren");
325       RetryCounter retryCounter = retryCounterFactory.create();
326       while (true) {
327         try {
328           return checkZk().getChildren(path, watch);
329         } catch (KeeperException e) {
330           switch (e.code()) {
331             case CONNECTIONLOSS:
332             case SESSIONEXPIRED:
333             case OPERATIONTIMEOUT:
334               retryOrThrow(retryCounter, e, "getChildren");
335               break;
336 
337             default:
338               throw e;
339           }
340         }
341         retryCounter.sleepUntilNextRetry();
342       }
343     } finally {
344       if (traceScope != null) traceScope.close();
345     }
346   }
347 
348   /**
349    * getData is an idempotent operation. Retry before throwing exception
350    * @return Data
351    */
352   public byte[] getData(String path, Watcher watcher, Stat stat)
353   throws KeeperException, InterruptedException {
354     TraceScope traceScope = null;
355     try {
356       traceScope = Trace.startSpan("RecoverableZookeeper.getData");
357       RetryCounter retryCounter = retryCounterFactory.create();
358       while (true) {
359         try {
360           byte[] revData = checkZk().getData(path, watcher, stat);
361           return this.removeMetaData(revData);
362         } catch (KeeperException e) {
363           switch (e.code()) {
364             case CONNECTIONLOSS:
365             case SESSIONEXPIRED:
366             case OPERATIONTIMEOUT:
367               retryOrThrow(retryCounter, e, "getData");
368               break;
369 
370             default:
371               throw e;
372           }
373         }
374         retryCounter.sleepUntilNextRetry();
375       }
376     } finally {
377       if (traceScope != null) traceScope.close();
378     }
379   }
380 
381   /**
382    * getData is an idemnpotent operation. Retry before throwing exception
383    * @return Data
384    */
385   public byte[] getData(String path, boolean watch, Stat stat)
386   throws KeeperException, InterruptedException {
387     TraceScope traceScope = null;
388     try {
389       traceScope = Trace.startSpan("RecoverableZookeeper.getData");
390       RetryCounter retryCounter = retryCounterFactory.create();
391       while (true) {
392         try {
393           byte[] revData = checkZk().getData(path, watch, stat);
394           return this.removeMetaData(revData);
395         } catch (KeeperException e) {
396           switch (e.code()) {
397             case CONNECTIONLOSS:
398             case SESSIONEXPIRED:
399             case OPERATIONTIMEOUT:
400               retryOrThrow(retryCounter, e, "getData");
401               break;
402 
403             default:
404               throw e;
405           }
406         }
407         retryCounter.sleepUntilNextRetry();
408       }
409     } finally {
410       if (traceScope != null) traceScope.close();
411     }
412   }
413 
414   /**
415    * setData is NOT an idempotent operation. Retry may cause BadVersion Exception
416    * Adding an identifier field into the data to check whether
417    * badversion is caused by the result of previous correctly setData
418    * @return Stat instance
419    */
420   public Stat setData(String path, byte[] data, int version)
421   throws KeeperException, InterruptedException {
422     TraceScope traceScope = null;
423     try {
424       traceScope = Trace.startSpan("RecoverableZookeeper.setData");
425       RetryCounter retryCounter = retryCounterFactory.create();
426       byte[] newData = appendMetaData(data);
427       boolean isRetry = false;
428       while (true) {
429         try {
430           return checkZk().setData(path, newData, version);
431         } catch (KeeperException e) {
432           switch (e.code()) {
433             case CONNECTIONLOSS:
434             case SESSIONEXPIRED:
435             case OPERATIONTIMEOUT:
436               retryOrThrow(retryCounter, e, "setData");
437               break;
438             case BADVERSION:
439               if (isRetry) {
440                 // try to verify whether the previous setData success or not
441                 try{
442                   Stat stat = new Stat();
443                   byte[] revData = checkZk().getData(path, false, stat);
444                   if(Bytes.compareTo(revData, newData) == 0) {
445                     // the bad version is caused by previous successful setData
446                     return stat;
447                   }
448                 } catch(KeeperException keeperException){
449                   // the ZK is not reliable at this moment. just throwing exception
450                   throw keeperException;
451                 }
452               }
453             // throw other exceptions and verified bad version exceptions
454             default:
455               throw e;
456           }
457         }
458         retryCounter.sleepUntilNextRetry();
459         isRetry = true;
460       }
461     } finally {
462       if (traceScope != null) traceScope.close();
463     }
464   }
465 
466   /**
467    * <p>
468    * NONSEQUENTIAL create is idempotent operation.
469    * Retry before throwing exceptions.
470    * But this function will not throw the NodeExist exception back to the
471    * application.
472    * </p>
473    * <p>
474    * But SEQUENTIAL is NOT idempotent operation. It is necessary to add
475    * identifier to the path to verify, whether the previous one is successful
476    * or not.
477    * </p>
478    *
479    * @return Path
480    */
481   public String create(String path, byte[] data, List<ACL> acl,
482       CreateMode createMode)
483   throws KeeperException, InterruptedException {
484     TraceScope traceScope = null;
485     try {
486       traceScope = Trace.startSpan("RecoverableZookeeper.create");
487       byte[] newData = appendMetaData(data);
488       switch (createMode) {
489         case EPHEMERAL:
490         case PERSISTENT:
491           return createNonSequential(path, newData, acl, createMode);
492 
493         case EPHEMERAL_SEQUENTIAL:
494         case PERSISTENT_SEQUENTIAL:
495           return createSequential(path, newData, acl, createMode);
496 
497         default:
498           throw new IllegalArgumentException("Unrecognized CreateMode: " +
499               createMode);
500       }
501     } finally {
502       if (traceScope != null) traceScope.close();
503     }
504   }
505 
506   private String createNonSequential(String path, byte[] data, List<ACL> acl,
507       CreateMode createMode) throws KeeperException, InterruptedException {
508     RetryCounter retryCounter = retryCounterFactory.create();
509     boolean isRetry = false; // False for first attempt, true for all retries.
510     while (true) {
511       try {
512         return checkZk().create(path, data, acl, createMode);
513       } catch (KeeperException e) {
514         switch (e.code()) {
515           case NODEEXISTS:
516             if (isRetry) {
517               // If the connection was lost, there is still a possibility that
518               // we have successfully created the node at our previous attempt,
519               // so we read the node and compare.
520               byte[] currentData = checkZk().getData(path, false, null);
521               if (currentData != null &&
522                   Bytes.compareTo(currentData, data) == 0) {
523                 // We successfully created a non-sequential node
524                 return path;
525               }
526               LOG.error("Node " + path + " already exists with " +
527                   Bytes.toStringBinary(currentData) + ", could not write " +
528                   Bytes.toStringBinary(data));
529               throw e;
530             }
531             LOG.debug("Node " + path + " already exists");
532             throw e;
533 
534           case CONNECTIONLOSS:
535           case SESSIONEXPIRED:
536           case OPERATIONTIMEOUT:
537             retryOrThrow(retryCounter, e, "create");
538             break;
539 
540           default:
541             throw e;
542         }
543       }
544       retryCounter.sleepUntilNextRetry();
545       isRetry = true;
546     }
547   }
548 
549   private String createSequential(String path, byte[] data,
550       List<ACL> acl, CreateMode createMode)
551   throws KeeperException, InterruptedException {
552     RetryCounter retryCounter = retryCounterFactory.create();
553     boolean first = true;
554     String newPath = path+this.identifier;
555     while (true) {
556       try {
557         if (!first) {
558           // Check if we succeeded on a previous attempt
559           String previousResult = findPreviousSequentialNode(newPath);
560           if (previousResult != null) {
561             return previousResult;
562           }
563         }
564         first = false;
565         return checkZk().create(newPath, data, acl, createMode);
566       } catch (KeeperException e) {
567         switch (e.code()) {
568           case CONNECTIONLOSS:
569           case SESSIONEXPIRED:
570           case OPERATIONTIMEOUT:
571             retryOrThrow(retryCounter, e, "create");
572             break;
573 
574           default:
575             throw e;
576         }
577       }
578       retryCounter.sleepUntilNextRetry();
579     }
580   }
581   /**
582    * Convert Iterable of {@link ZKOp} we got into the ZooKeeper.Op
583    * instances to actually pass to multi (need to do this in order to appendMetaData).
584    */
585   private Iterable<Op> prepareZKMulti(Iterable<Op> ops)
586   throws UnsupportedOperationException {
587     if(ops == null) return null;
588 
589     List<Op> preparedOps = new LinkedList<Op>();
590     for (Op op : ops) {
591       if (op.getType() == ZooDefs.OpCode.create) {
592         CreateRequest create = (CreateRequest)op.toRequestRecord();
593         preparedOps.add(Op.create(create.getPath(), appendMetaData(create.getData()),
594           create.getAcl(), create.getFlags()));
595       } else if (op.getType() == ZooDefs.OpCode.delete) {
596         // no need to appendMetaData for delete
597         preparedOps.add(op);
598       } else if (op.getType() == ZooDefs.OpCode.setData) {
599         SetDataRequest setData = (SetDataRequest)op.toRequestRecord();
600         preparedOps.add(Op.setData(setData.getPath(), appendMetaData(setData.getData()),
601           setData.getVersion()));
602       } else {
603         throw new UnsupportedOperationException("Unexpected ZKOp type: " + op.getClass().getName());
604       }
605     }
606     return preparedOps;
607   }
608 
609   /**
610    * Run multiple operations in a transactional manner. Retry before throwing exception
611    */
612   public List<OpResult> multi(Iterable<Op> ops)
613   throws KeeperException, InterruptedException {
614     TraceScope traceScope = null;
615     try {
616       traceScope = Trace.startSpan("RecoverableZookeeper.multi");
617       RetryCounter retryCounter = retryCounterFactory.create();
618       Iterable<Op> multiOps = prepareZKMulti(ops);
619       while (true) {
620         try {
621           return checkZk().multi(multiOps);
622         } catch (KeeperException e) {
623           switch (e.code()) {
624             case CONNECTIONLOSS:
625             case SESSIONEXPIRED:
626             case OPERATIONTIMEOUT:
627               retryOrThrow(retryCounter, e, "multi");
628               break;
629 
630             default:
631               throw e;
632           }
633         }
634         retryCounter.sleepUntilNextRetry();
635     }
636     } finally {
637       if (traceScope != null) traceScope.close();
638     }
639   }
640 
641   private String findPreviousSequentialNode(String path)
642     throws KeeperException, InterruptedException {
643     int lastSlashIdx = path.lastIndexOf('/');
644     assert(lastSlashIdx != -1);
645     String parent = path.substring(0, lastSlashIdx);
646     String nodePrefix = path.substring(lastSlashIdx+1);
647 
648     List<String> nodes = checkZk().getChildren(parent, false);
649     List<String> matching = filterByPrefix(nodes, nodePrefix);
650     for (String node : matching) {
651       String nodePath = parent + "/" + node;
652       Stat stat = checkZk().exists(nodePath, false);
653       if (stat != null) {
654         return nodePath;
655       }
656     }
657     return null;
658   }
659 
660   public byte[] removeMetaData(byte[] data) {
661     if(data == null || data.length == 0) {
662       return data;
663     }
664     // check the magic data; to be backward compatible
665     byte magic = data[0];
666     if(magic != MAGIC) {
667       return data;
668     }
669 
670     int idLength = Bytes.toInt(data, ID_LENGTH_OFFSET);
671     int dataLength = data.length-MAGIC_SIZE-ID_LENGTH_SIZE-idLength;
672     int dataOffset = MAGIC_SIZE+ID_LENGTH_SIZE+idLength;
673 
674     byte[] newData = new byte[dataLength];
675     System.arraycopy(data, dataOffset, newData, 0, dataLength);
676     return newData;
677   }
678 
679   private byte[] appendMetaData(byte[] data) {
680     if(data == null || data.length == 0){
681       return data;
682     }
683     byte[] salt = Bytes.toBytes(salter.nextLong());
684     int idLength = id.length + salt.length;
685     byte[] newData = new byte[MAGIC_SIZE+ID_LENGTH_SIZE+idLength+data.length];
686     int pos = 0;
687     pos = Bytes.putByte(newData, pos, MAGIC);
688     pos = Bytes.putInt(newData, pos, idLength);
689     pos = Bytes.putBytes(newData, pos, id, 0, id.length);
690     pos = Bytes.putBytes(newData, pos, salt, 0, salt.length);
691     pos = Bytes.putBytes(newData, pos, data, 0, data.length);
692     return newData;
693   }
694 
695   public synchronized long getSessionId() {
696     return zk == null ? -1 : zk.getSessionId();
697   }
698 
699   public synchronized void close() throws InterruptedException {
700     if (zk != null) zk.close();
701   }
702 
703   public synchronized States getState() {
704     return zk == null ? null : zk.getState();
705   }
706 
707   public synchronized ZooKeeper getZooKeeper() {
708     return zk;
709   }
710 
711   public synchronized byte[] getSessionPasswd() {
712     return zk == null ? null : zk.getSessionPasswd();
713   }
714 
715   public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) throws KeeperException {
716     checkZk().sync(path, null, null);
717   }
718 
719   /**
720    * Filters the given node list by the given prefixes.
721    * This method is all-inclusive--if any element in the node list starts
722    * with any of the given prefixes, then it is included in the result.
723    *
724    * @param nodes the nodes to filter
725    * @param prefixes the prefixes to include in the result
726    * @return list of every element that starts with one of the prefixes
727    */
728   private static List<String> filterByPrefix(List<String> nodes,
729       String... prefixes) {
730     List<String> lockChildren = new ArrayList<String>();
731     for (String child : nodes){
732       for (String prefix : prefixes){
733         if (child.startsWith(prefix)){
734           lockChildren.add(child);
735           break;
736         }
737       }
738     }
739     return lockChildren;
740   }
741 
742   public String getIdentifier() {
743     return identifier;
744   }
745 }