View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.zookeeper;
20  
21  import java.io.IOException;
22  import java.lang.management.ManagementFactory;
23  import java.security.SecureRandom;
24  import java.util.ArrayList;
25  import java.util.LinkedList;
26  import java.util.List;
27  import java.util.Random;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.classification.InterfaceAudience;
32  import org.apache.hadoop.hbase.util.Bytes;
33  import org.apache.hadoop.hbase.util.RetryCounter;
34  import org.apache.hadoop.hbase.util.RetryCounterFactory;
35  import org.apache.zookeeper.AsyncCallback;
36  import org.apache.zookeeper.CreateMode;
37  import org.apache.zookeeper.KeeperException;
38  import org.apache.zookeeper.Op;
39  import org.apache.zookeeper.OpResult;
40  import org.apache.zookeeper.Watcher;
41  import org.apache.zookeeper.ZooDefs;
42  import org.apache.zookeeper.ZooKeeper;
43  import org.apache.zookeeper.ZooKeeper.States;
44  import org.apache.zookeeper.data.ACL;
45  import org.apache.zookeeper.data.Stat;
46  import org.apache.zookeeper.proto.CreateRequest;
47  import org.apache.zookeeper.proto.SetDataRequest;
48  import org.htrace.Trace;
49  import org.htrace.TraceScope;
50  
51  /**
52   * A zookeeper that can handle 'recoverable' errors.
53   * To handle recoverable errors, developers need to realize that there are two
54   * classes of requests: idempotent and non-idempotent requests. Read requests
55   * and unconditional sets and deletes are examples of idempotent requests, they
56   * can be reissued with the same results.
57   * (Although, the delete may throw a NoNodeException on reissue its effect on
58   * the ZooKeeper state is the same.) Non-idempotent requests need special
59   * handling, application and library writers need to keep in mind that they may
60   * need to encode information in the data or name of znodes to detect
61   * retries. A simple example is a create that uses a sequence flag.
62   * If a process issues a create("/x-", ..., SEQUENCE) and gets a connection
63   * loss exception, that process will reissue another
64   * create("/x-", ..., SEQUENCE) and get back x-111. When the process does a
65   * getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be
66   * that x-109 was the result of the previous create, so the process actually
67   * owns both x-109 and x-111. An easy way around this is to use "x-process id-"
68   * when doing the create. If the process is using an id of 352, before reissuing
69   * the create it will do a getChildren("/") and see "x-222-1", "x-542-30",
70   * "x-352-109", x-333-110". The process will know that the original create
71   * succeeded an the znode it created is "x-352-109".
72   * @see "http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling"
73   */
74  @InterfaceAudience.Private
75  public class RecoverableZooKeeper {
76    private static final Log LOG = LogFactory.getLog(RecoverableZooKeeper.class);
77    // the actual ZooKeeper client instance
78    volatile private ZooKeeper zk;
79    private final RetryCounterFactory retryCounterFactory;
80    // An identifier of this process in the cluster
81    private final String identifier;
82    private final byte[] id;
83    private Watcher watcher;
84    private int sessionTimeout;
85    private String quorumServers;
86    private final Random salter;
87  
88    // The metadata attached to each piece of data has the
89    // format:
90    //   <magic> 1-byte constant
91    //   <id length> 4-byte big-endian integer (length of next field)
92    //   <id> identifier corresponding uniquely to this process
93    // It is prepended to the data supplied by the user.
94  
95    // the magic number is to be backward compatible
96    private static final byte MAGIC =(byte) 0XFF;
97    private static final int MAGIC_SIZE = Bytes.SIZEOF_BYTE;
98    private static final int ID_LENGTH_OFFSET = MAGIC_SIZE;
99    private static final int ID_LENGTH_SIZE =  Bytes.SIZEOF_INT;
100 
101   public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
102       Watcher watcher, int maxRetries, int retryIntervalMillis)
103   throws IOException {
104     this(quorumServers, sessionTimeout, watcher, maxRetries, retryIntervalMillis,
105         null);
106   }
107 
108   public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
109       Watcher watcher, int maxRetries, int retryIntervalMillis, String identifier)
110   throws IOException {
111     // TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should.
112     this.retryCounterFactory =
113       new RetryCounterFactory(maxRetries+1, retryIntervalMillis);
114 
115     if (identifier == null || identifier.length() == 0) {
116       // the identifier = processID@hostName
117       identifier = ManagementFactory.getRuntimeMXBean().getName();
118     }
119     LOG.info("Process identifier=" + identifier +
120       " connecting to ZooKeeper ensemble=" + quorumServers);
121     this.identifier = identifier;
122     this.id = Bytes.toBytes(identifier);
123 
124     this.watcher = watcher;
125     this.sessionTimeout = sessionTimeout;
126     this.quorumServers = quorumServers;
127     try {checkZk();} catch (Exception x) {/* ignore */}
128     salter = new SecureRandom();
129   }
130 
131   /**
132    * Try to create a Zookeeper connection. Turns any exception encountered into a
133    * {@link KeeperException.OperationTimeoutException} so it can retried.
134    * @return The created Zookeeper connection object
135    * @throws KeeperException
136    */
137   protected ZooKeeper checkZk() throws KeeperException {
138     if (this.zk == null) {
139       try {
140         this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher);
141       } catch (Exception uhe) {
142         LOG.warn("Unable to create ZooKeeper Connection", uhe);
143         throw new KeeperException.OperationTimeoutException();
144       }
145     }
146     return zk;
147   }
148 
149   public void reconnectAfterExpiration()
150         throws IOException, KeeperException, InterruptedException {
151     if (zk != null) {
152       LOG.info("Closing dead ZooKeeper connection, session" +
153         " was: 0x"+Long.toHexString(zk.getSessionId()));
154       zk.close();
155     }
156     checkZk();
157     LOG.info("Recreated a ZooKeeper, session" +
158       " is: 0x"+Long.toHexString(zk.getSessionId()));
159   }
160 
161   /**
162    * delete is an idempotent operation. Retry before throwing exception.
163    * This function will not throw NoNodeException if the path does not
164    * exist.
165    */
166   public void delete(String path, int version)
167   throws InterruptedException, KeeperException {
168     TraceScope traceScope = null;
169     try {
170       traceScope = Trace.startSpan("RecoverableZookeeper.delete");
171       RetryCounter retryCounter = retryCounterFactory.create();
172       boolean isRetry = false; // False for first attempt, true for all retries.
173       while (true) {
174         try {
175           checkZk().delete(path, version);
176           return;
177         } catch (KeeperException e) {
178           switch (e.code()) {
179             case NONODE:
180               if (isRetry) {
181                 LOG.info("Node " + path + " already deleted. Assuming a " +
182                     "previous attempt succeeded.");
183                 return;
184               }
185               LOG.info("Node " + path + " already deleted, retry=" + isRetry);
186               throw e;
187 
188             case CONNECTIONLOSS:
189             case SESSIONEXPIRED:
190             case OPERATIONTIMEOUT:
191               retryOrThrow(retryCounter, e, "delete");
192               break;
193 
194             default:
195               throw e;
196           }
197         }
198         retryCounter.sleepUntilNextRetry();
199         isRetry = true;
200       }
201     } finally {
202       if (traceScope != null) traceScope.close();
203     }
204   }
205 
206   /**
207    * exists is an idempotent operation. Retry before throwing exception
208    * @return A Stat instance
209    */
210   public Stat exists(String path, Watcher watcher)
211   throws KeeperException, InterruptedException {
212     TraceScope traceScope = null;
213     try {
214       traceScope = Trace.startSpan("RecoverableZookeeper.exists");
215       RetryCounter retryCounter = retryCounterFactory.create();
216       while (true) {
217         try {
218           return checkZk().exists(path, watcher);
219         } catch (KeeperException e) {
220           switch (e.code()) {
221             case CONNECTIONLOSS:
222             case SESSIONEXPIRED:
223             case OPERATIONTIMEOUT:
224               retryOrThrow(retryCounter, e, "exists");
225               break;
226 
227             default:
228               throw e;
229           }
230         }
231         retryCounter.sleepUntilNextRetry();
232       }
233     } finally {
234       if (traceScope != null) traceScope.close();
235     }
236   }
237 
238   /**
239    * exists is an idempotent operation. Retry before throwing exception
240    * @return A Stat instance
241    */
242   public Stat exists(String path, boolean watch)
243   throws KeeperException, InterruptedException {
244     TraceScope traceScope = null;
245     try {
246       traceScope = Trace.startSpan("RecoverableZookeeper.exists");
247       RetryCounter retryCounter = retryCounterFactory.create();
248       while (true) {
249         try {
250           return checkZk().exists(path, watch);
251         } catch (KeeperException e) {
252           switch (e.code()) {
253             case CONNECTIONLOSS:
254             case SESSIONEXPIRED:
255             case OPERATIONTIMEOUT:
256               retryOrThrow(retryCounter, e, "exists");
257               break;
258 
259             default:
260               throw e;
261           }
262         }
263         retryCounter.sleepUntilNextRetry();
264       }
265     } finally {
266       if (traceScope != null) traceScope.close();
267     }
268   }
269 
270   private void retryOrThrow(RetryCounter retryCounter, KeeperException e,
271       String opName) throws KeeperException {
272     LOG.warn("Possibly transient ZooKeeper, quorum=" + quorumServers + ", exception=" + e);
273     if (!retryCounter.shouldRetry()) {
274       LOG.error("ZooKeeper " + opName + " failed after "
275         + retryCounter.getMaxAttempts() + " attempts");
276       throw e;
277     }
278   }
279 
280   /**
281    * getChildren is an idempotent operation. Retry before throwing exception
282    * @return List of children znodes
283    */
284   public List<String> getChildren(String path, Watcher watcher)
285     throws KeeperException, InterruptedException {
286     TraceScope traceScope = null;
287     try {
288       traceScope = Trace.startSpan("RecoverableZookeeper.getChildren");
289       RetryCounter retryCounter = retryCounterFactory.create();
290       while (true) {
291         try {
292           return checkZk().getChildren(path, watcher);
293         } catch (KeeperException e) {
294           switch (e.code()) {
295             case CONNECTIONLOSS:
296             case SESSIONEXPIRED:
297             case OPERATIONTIMEOUT:
298               retryOrThrow(retryCounter, e, "getChildren");
299               break;
300 
301             default:
302               throw e;
303           }
304         }
305         retryCounter.sleepUntilNextRetry();
306       }
307     } finally {
308       if (traceScope != null) traceScope.close();
309     }
310   }
311 
312   /**
313    * getChildren is an idempotent operation. Retry before throwing exception
314    * @return List of children znodes
315    */
316   public List<String> getChildren(String path, boolean watch)
317   throws KeeperException, InterruptedException {
318     TraceScope traceScope = null;
319     try {
320       traceScope = Trace.startSpan("RecoverableZookeeper.getChildren");
321       RetryCounter retryCounter = retryCounterFactory.create();
322       while (true) {
323         try {
324           return checkZk().getChildren(path, watch);
325         } catch (KeeperException e) {
326           switch (e.code()) {
327             case CONNECTIONLOSS:
328             case SESSIONEXPIRED:
329             case OPERATIONTIMEOUT:
330               retryOrThrow(retryCounter, e, "getChildren");
331               break;
332 
333             default:
334               throw e;
335           }
336         }
337         retryCounter.sleepUntilNextRetry();
338       }
339     } finally {
340       if (traceScope != null) traceScope.close();
341     }
342   }
343 
344   /**
345    * getData is an idempotent operation. Retry before throwing exception
346    * @return Data
347    */
348   public byte[] getData(String path, Watcher watcher, Stat stat)
349   throws KeeperException, InterruptedException {
350     TraceScope traceScope = null;
351     try {
352       traceScope = Trace.startSpan("RecoverableZookeeper.getData");
353       RetryCounter retryCounter = retryCounterFactory.create();
354       while (true) {
355         try {
356           byte[] revData = checkZk().getData(path, watcher, stat);
357           return this.removeMetaData(revData);
358         } catch (KeeperException e) {
359           switch (e.code()) {
360             case CONNECTIONLOSS:
361             case SESSIONEXPIRED:
362             case OPERATIONTIMEOUT:
363               retryOrThrow(retryCounter, e, "getData");
364               break;
365 
366             default:
367               throw e;
368           }
369         }
370         retryCounter.sleepUntilNextRetry();
371       }
372     } finally {
373       if (traceScope != null) traceScope.close();
374     }
375   }
376 
377   /**
378    * getData is an idemnpotent operation. Retry before throwing exception
379    * @return Data
380    */
381   public byte[] getData(String path, boolean watch, Stat stat)
382   throws KeeperException, InterruptedException {
383     TraceScope traceScope = null;
384     try {
385       traceScope = Trace.startSpan("RecoverableZookeeper.getData");
386       RetryCounter retryCounter = retryCounterFactory.create();
387       while (true) {
388         try {
389           byte[] revData = checkZk().getData(path, watch, stat);
390           return this.removeMetaData(revData);
391         } catch (KeeperException e) {
392           switch (e.code()) {
393             case CONNECTIONLOSS:
394             case SESSIONEXPIRED:
395             case OPERATIONTIMEOUT:
396               retryOrThrow(retryCounter, e, "getData");
397               break;
398 
399             default:
400               throw e;
401           }
402         }
403         retryCounter.sleepUntilNextRetry();
404       }
405     } finally {
406       if (traceScope != null) traceScope.close();
407     }
408   }
409 
410   /**
411    * setData is NOT an idempotent operation. Retry may cause BadVersion Exception
412    * Adding an identifier field into the data to check whether
413    * badversion is caused by the result of previous correctly setData
414    * @return Stat instance
415    */
416   public Stat setData(String path, byte[] data, int version)
417   throws KeeperException, InterruptedException {
418     TraceScope traceScope = null;
419     try {
420       traceScope = Trace.startSpan("RecoverableZookeeper.setData");
421       RetryCounter retryCounter = retryCounterFactory.create();
422       byte[] newData = appendMetaData(data);
423       boolean isRetry = false;
424       while (true) {
425         try {
426           return checkZk().setData(path, newData, version);
427         } catch (KeeperException e) {
428           switch (e.code()) {
429             case CONNECTIONLOSS:
430             case SESSIONEXPIRED:
431             case OPERATIONTIMEOUT:
432               retryOrThrow(retryCounter, e, "setData");
433               break;
434             case BADVERSION:
435               if (isRetry) {
436                 // try to verify whether the previous setData success or not
437                 try{
438                   Stat stat = new Stat();
439                   byte[] revData = checkZk().getData(path, false, stat);
440                   if(Bytes.compareTo(revData, newData) == 0) {
441                     // the bad version is caused by previous successful setData
442                     return stat;
443                   }
444                 } catch(KeeperException keeperException){
445                   // the ZK is not reliable at this moment. just throwing exception
446                   throw keeperException;
447                 }
448               }
449             // throw other exceptions and verified bad version exceptions
450             default:
451               throw e;
452           }
453         }
454         retryCounter.sleepUntilNextRetry();
455         isRetry = true;
456       }
457     } finally {
458       if (traceScope != null) traceScope.close();
459     }
460   }
461 
462   /**
463    * <p>
464    * NONSEQUENTIAL create is idempotent operation.
465    * Retry before throwing exceptions.
466    * But this function will not throw the NodeExist exception back to the
467    * application.
468    * </p>
469    * <p>
470    * But SEQUENTIAL is NOT idempotent operation. It is necessary to add
471    * identifier to the path to verify, whether the previous one is successful
472    * or not.
473    * </p>
474    *
475    * @return Path
476    */
477   public String create(String path, byte[] data, List<ACL> acl,
478       CreateMode createMode)
479   throws KeeperException, InterruptedException {
480     TraceScope traceScope = null;
481     try {
482       traceScope = Trace.startSpan("RecoverableZookeeper.create");
483       byte[] newData = appendMetaData(data);
484       switch (createMode) {
485         case EPHEMERAL:
486         case PERSISTENT:
487           return createNonSequential(path, newData, acl, createMode);
488 
489         case EPHEMERAL_SEQUENTIAL:
490         case PERSISTENT_SEQUENTIAL:
491           return createSequential(path, newData, acl, createMode);
492 
493         default:
494           throw new IllegalArgumentException("Unrecognized CreateMode: " +
495               createMode);
496       }
497     } finally {
498       if (traceScope != null) traceScope.close();
499     }
500   }
501 
502   private String createNonSequential(String path, byte[] data, List<ACL> acl,
503       CreateMode createMode) throws KeeperException, InterruptedException {
504     RetryCounter retryCounter = retryCounterFactory.create();
505     boolean isRetry = false; // False for first attempt, true for all retries.
506     while (true) {
507       try {
508         return checkZk().create(path, data, acl, createMode);
509       } catch (KeeperException e) {
510         switch (e.code()) {
511           case NODEEXISTS:
512             if (isRetry) {
513               // If the connection was lost, there is still a possibility that
514               // we have successfully created the node at our previous attempt,
515               // so we read the node and compare.
516               byte[] currentData = checkZk().getData(path, false, null);
517               if (currentData != null &&
518                   Bytes.compareTo(currentData, data) == 0) {
519                 // We successfully created a non-sequential node
520                 return path;
521               }
522               LOG.error("Node " + path + " already exists with " +
523                   Bytes.toStringBinary(currentData) + ", could not write " +
524                   Bytes.toStringBinary(data));
525               throw e;
526             }
527             LOG.debug("Node " + path + " already exists");
528             throw e;
529 
530           case CONNECTIONLOSS:
531           case SESSIONEXPIRED:
532           case OPERATIONTIMEOUT:
533             retryOrThrow(retryCounter, e, "create");
534             break;
535 
536           default:
537             throw e;
538         }
539       }
540       retryCounter.sleepUntilNextRetry();
541       isRetry = true;
542     }
543   }
544 
545   private String createSequential(String path, byte[] data,
546       List<ACL> acl, CreateMode createMode)
547   throws KeeperException, InterruptedException {
548     RetryCounter retryCounter = retryCounterFactory.create();
549     boolean first = true;
550     String newPath = path+this.identifier;
551     while (true) {
552       try {
553         if (!first) {
554           // Check if we succeeded on a previous attempt
555           String previousResult = findPreviousSequentialNode(newPath);
556           if (previousResult != null) {
557             return previousResult;
558           }
559         }
560         first = false;
561         return checkZk().create(newPath, data, acl, createMode);
562       } catch (KeeperException e) {
563         switch (e.code()) {
564           case CONNECTIONLOSS:
565           case SESSIONEXPIRED:
566           case OPERATIONTIMEOUT:
567             retryOrThrow(retryCounter, e, "create");
568             break;
569 
570           default:
571             throw e;
572         }
573       }
574       retryCounter.sleepUntilNextRetry();
575     }
576   }
577   /**
578    * Convert Iterable of {@link ZKOp} we got into the ZooKeeper.Op
579    * instances to actually pass to multi (need to do this in order to appendMetaData).
580    */
581   private Iterable<Op> prepareZKMulti(Iterable<Op> ops)
582   throws UnsupportedOperationException {
583     if(ops == null) return null;
584 
585     List<Op> preparedOps = new LinkedList<Op>();
586     for (Op op : ops) {
587       if (op.getType() == ZooDefs.OpCode.create) {
588         CreateRequest create = (CreateRequest)op.toRequestRecord();
589         preparedOps.add(Op.create(create.getPath(), appendMetaData(create.getData()),
590           create.getAcl(), create.getFlags()));
591       } else if (op.getType() == ZooDefs.OpCode.delete) {
592         // no need to appendMetaData for delete
593         preparedOps.add(op);
594       } else if (op.getType() == ZooDefs.OpCode.setData) {
595         SetDataRequest setData = (SetDataRequest)op.toRequestRecord();
596         preparedOps.add(Op.setData(setData.getPath(), appendMetaData(setData.getData()),
597           setData.getVersion()));
598       } else {
599         throw new UnsupportedOperationException("Unexpected ZKOp type: " + op.getClass().getName());
600       }
601     }
602     return preparedOps;
603   }
604 
605   /**
606    * Run multiple operations in a transactional manner. Retry before throwing exception
607    */
608   public List<OpResult> multi(Iterable<Op> ops)
609   throws KeeperException, InterruptedException {
610     TraceScope traceScope = null;
611     try {
612       traceScope = Trace.startSpan("RecoverableZookeeper.multi");
613       RetryCounter retryCounter = retryCounterFactory.create();
614       Iterable<Op> multiOps = prepareZKMulti(ops);
615       while (true) {
616         try {
617           return checkZk().multi(multiOps);
618         } catch (KeeperException e) {
619           switch (e.code()) {
620             case CONNECTIONLOSS:
621             case SESSIONEXPIRED:
622             case OPERATIONTIMEOUT:
623               retryOrThrow(retryCounter, e, "multi");
624               break;
625 
626             default:
627               throw e;
628           }
629         }
630         retryCounter.sleepUntilNextRetry();
631     }
632     } finally {
633       if (traceScope != null) traceScope.close();
634     }
635   }
636 
637   private String findPreviousSequentialNode(String path)
638     throws KeeperException, InterruptedException {
639     int lastSlashIdx = path.lastIndexOf('/');
640     assert(lastSlashIdx != -1);
641     String parent = path.substring(0, lastSlashIdx);
642     String nodePrefix = path.substring(lastSlashIdx+1);
643 
644     List<String> nodes = checkZk().getChildren(parent, false);
645     List<String> matching = filterByPrefix(nodes, nodePrefix);
646     for (String node : matching) {
647       String nodePath = parent + "/" + node;
648       Stat stat = checkZk().exists(nodePath, false);
649       if (stat != null) {
650         return nodePath;
651       }
652     }
653     return null;
654   }
655 
656   public byte[] removeMetaData(byte[] data) {
657     if(data == null || data.length == 0) {
658       return data;
659     }
660     // check the magic data; to be backward compatible
661     byte magic = data[0];
662     if(magic != MAGIC) {
663       return data;
664     }
665 
666     int idLength = Bytes.toInt(data, ID_LENGTH_OFFSET);
667     int dataLength = data.length-MAGIC_SIZE-ID_LENGTH_SIZE-idLength;
668     int dataOffset = MAGIC_SIZE+ID_LENGTH_SIZE+idLength;
669 
670     byte[] newData = new byte[dataLength];
671     System.arraycopy(data, dataOffset, newData, 0, dataLength);
672     return newData;
673   }
674 
675   private byte[] appendMetaData(byte[] data) {
676     if(data == null || data.length == 0){
677       return data;
678     }
679     byte[] salt = Bytes.toBytes(salter.nextLong());
680     int idLength = id.length + salt.length;
681     byte[] newData = new byte[MAGIC_SIZE+ID_LENGTH_SIZE+idLength+data.length];
682     int pos = 0;
683     pos = Bytes.putByte(newData, pos, MAGIC);
684     pos = Bytes.putInt(newData, pos, idLength);
685     pos = Bytes.putBytes(newData, pos, id, 0, id.length);
686     pos = Bytes.putBytes(newData, pos, salt, 0, salt.length);
687     pos = Bytes.putBytes(newData, pos, data, 0, data.length);
688     return newData;
689   }
690 
691   public long getSessionId() {
692     return zk == null ? null : zk.getSessionId();
693   }
694 
695   public void close() throws InterruptedException {
696     if (zk != null) zk.close();
697   }
698 
699   public States getState() {
700     return zk == null ? null : zk.getState();
701   }
702 
703   public ZooKeeper getZooKeeper() {
704     return zk;
705   }
706 
707   public byte[] getSessionPasswd() {
708     return zk == null ? null : zk.getSessionPasswd();
709   }
710 
711   public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) throws KeeperException {
712     checkZk().sync(path, null, null);
713   }
714 
715   /**
716    * Filters the given node list by the given prefixes.
717    * This method is all-inclusive--if any element in the node list starts
718    * with any of the given prefixes, then it is included in the result.
719    *
720    * @param nodes the nodes to filter
721    * @param prefixes the prefixes to include in the result
722    * @return list of every element that starts with one of the prefixes
723    */
724   private static List<String> filterByPrefix(List<String> nodes,
725       String... prefixes) {
726     List<String> lockChildren = new ArrayList<String>();
727     for (String child : nodes){
728       for (String prefix : prefixes){
729         if (child.startsWith(prefix)){
730           lockChildren.add(child);
731           break;
732         }
733       }
734     }
735     return lockChildren;
736   }
737 
738   public String getIdentifier() {
739     return identifier;
740   }
741 }