View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.zookeeper;
20  
21  import java.io.IOException;
22  import java.lang.management.ManagementFactory;
23  import java.util.ArrayList;
24  import java.util.LinkedList;
25  import java.util.List;
26  import java.util.Random;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.hbase.util.Bytes;
32  import org.apache.hadoop.hbase.util.RetryCounter;
33  import org.apache.hadoop.hbase.util.RetryCounterFactory;
34  import org.apache.zookeeper.AsyncCallback;
35  import org.apache.zookeeper.CreateMode;
36  import org.apache.zookeeper.KeeperException;
37  import org.apache.zookeeper.Op;
38  import org.apache.zookeeper.OpResult;
39  import org.apache.zookeeper.Watcher;
40  import org.apache.zookeeper.ZooDefs;
41  import org.apache.zookeeper.ZooKeeper;
42  import org.apache.zookeeper.ZooKeeper.States;
43  import org.apache.zookeeper.data.ACL;
44  import org.apache.zookeeper.data.Stat;
45  import org.apache.zookeeper.proto.CreateRequest;
46  import org.apache.zookeeper.proto.SetDataRequest;
47  import org.apache.htrace.Trace;
48  import org.apache.htrace.TraceScope;
49  
50  /**
51   * A zookeeper that can handle 'recoverable' errors.
52   * To handle recoverable errors, developers need to realize that there are two
53   * classes of requests: idempotent and non-idempotent requests. Read requests
54   * and unconditional sets and deletes are examples of idempotent requests, they
55   * can be reissued with the same results.
56   * (Although, the delete may throw a NoNodeException on reissue its effect on
57   * the ZooKeeper state is the same.) Non-idempotent requests need special
58   * handling, application and library writers need to keep in mind that they may
59   * need to encode information in the data or name of znodes to detect
60   * retries. A simple example is a create that uses a sequence flag.
61   * If a process issues a create("/x-", ..., SEQUENCE) and gets a connection
62   * loss exception, that process will reissue another
63   * create("/x-", ..., SEQUENCE) and get back x-111. When the process does a
64   * getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be
65   * that x-109 was the result of the previous create, so the process actually
66   * owns both x-109 and x-111. An easy way around this is to use "x-process id-"
67   * when doing the create. If the process is using an id of 352, before reissuing
68   * the create it will do a getChildren("/") and see "x-222-1", "x-542-30",
69   * "x-352-109", x-333-110". The process will know that the original create
70   * succeeded an the znode it created is "x-352-109".
71   * @see "http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling"
72   */
73  @InterfaceAudience.Private
74  public class RecoverableZooKeeper {
75    private static final Log LOG = LogFactory.getLog(RecoverableZooKeeper.class);
76    // the actual ZooKeeper client instance
77    private ZooKeeper zk;
78    private final RetryCounterFactory retryCounterFactory;
79    // An identifier of this process in the cluster
80    private final String identifier;
81    private final byte[] id;
82    private Watcher watcher;
83    private int sessionTimeout;
84    private String quorumServers;
85    private final Random salter;
86  
87    // The metadata attached to each piece of data has the
88    // format:
89    //   <magic> 1-byte constant
90    //   <id length> 4-byte big-endian integer (length of next field)
91    //   <id> identifier corresponding uniquely to this process
92    // It is prepended to the data supplied by the user.
93  
94    // the magic number is to be backward compatible
95    private static final byte MAGIC =(byte) 0XFF;
96    private static final int MAGIC_SIZE = Bytes.SIZEOF_BYTE;
97    private static final int ID_LENGTH_OFFSET = MAGIC_SIZE;
98    private static final int ID_LENGTH_SIZE =  Bytes.SIZEOF_INT;
99  
100   public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
101       Watcher watcher, int maxRetries, int retryIntervalMillis)
102   throws IOException {
103     this(quorumServers, sessionTimeout, watcher, maxRetries, retryIntervalMillis,
104         null);
105   }
106 
107   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DE_MIGHT_IGNORE",
108       justification="None. Its always been this way.")
109   public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
110       Watcher watcher, int maxRetries, int retryIntervalMillis, String identifier)
111   throws IOException {
112     // TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should.
113     this.retryCounterFactory =
114       new RetryCounterFactory(maxRetries+1, retryIntervalMillis);
115 
116     if (identifier == null || identifier.length() == 0) {
117       // the identifier = processID@hostName
118       identifier = ManagementFactory.getRuntimeMXBean().getName();
119     }
120     LOG.info("Process identifier=" + identifier +
121       " connecting to ZooKeeper ensemble=" + quorumServers);
122     this.identifier = identifier;
123     this.id = Bytes.toBytes(identifier);
124 
125     this.watcher = watcher;
126     this.sessionTimeout = sessionTimeout;
127     this.quorumServers = quorumServers;
128     try {checkZk();} catch (Exception x) {/* ignore */}
129     salter = new Random();
130   }
131 
132   /**
133    * Try to create a Zookeeper connection. Turns any exception encountered into a
134    * KeeperException.OperationTimeoutException so it can retried.
135    * @return The created Zookeeper connection object
136    * @throws KeeperException
137    */
138   protected synchronized ZooKeeper checkZk() throws KeeperException {
139     if (this.zk == null) {
140       try {
141         this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher);
142       } catch (IOException ex) {
143         LOG.warn("Unable to create ZooKeeper Connection", ex);
144         throw new KeeperException.OperationTimeoutException();
145       }
146     }
147     return zk;
148   }
149 
150   public synchronized void reconnectAfterExpiration()
151         throws IOException, KeeperException, InterruptedException {
152     if (zk != null) {
153       LOG.info("Closing dead ZooKeeper connection, session" +
154         " was: 0x"+Long.toHexString(zk.getSessionId()));
155       zk.close();
156       // reset the Zookeeper connection
157       zk = null;
158     }
159     checkZk();
160     LOG.info("Recreated a ZooKeeper, session" +
161       " is: 0x"+Long.toHexString(zk.getSessionId()));
162   }
163 
164   /**
165    * delete is an idempotent operation. Retry before throwing exception.
166    * This function will not throw NoNodeException if the path does not
167    * exist.
168    */
169   public void delete(String path, int version)
170   throws InterruptedException, KeeperException {
171     TraceScope traceScope = null;
172     try {
173       traceScope = Trace.startSpan("RecoverableZookeeper.delete");
174       RetryCounter retryCounter = retryCounterFactory.create();
175       boolean isRetry = false; // False for first attempt, true for all retries.
176       while (true) {
177         try {
178           checkZk().delete(path, version);
179           return;
180         } catch (KeeperException e) {
181           switch (e.code()) {
182             case NONODE:
183               if (isRetry) {
184                 LOG.debug("Node " + path + " already deleted. Assuming a " +
185                     "previous attempt succeeded.");
186                 return;
187               }
188               LOG.debug("Node " + path + " already deleted, retry=" + isRetry);
189               throw e;
190 
191             case CONNECTIONLOSS:
192             case SESSIONEXPIRED:
193             case OPERATIONTIMEOUT:
194               retryOrThrow(retryCounter, e, "delete");
195               break;
196 
197             default:
198               throw e;
199           }
200         }
201         retryCounter.sleepUntilNextRetry();
202         isRetry = true;
203       }
204     } finally {
205       if (traceScope != null) traceScope.close();
206     }
207   }
208 
209   /**
210    * exists is an idempotent operation. Retry before throwing exception
211    * @return A Stat instance
212    */
213   public Stat exists(String path, Watcher watcher)
214   throws KeeperException, InterruptedException {
215     TraceScope traceScope = null;
216     try {
217       traceScope = Trace.startSpan("RecoverableZookeeper.exists");
218       RetryCounter retryCounter = retryCounterFactory.create();
219       while (true) {
220         try {
221           return checkZk().exists(path, watcher);
222         } catch (KeeperException e) {
223           switch (e.code()) {
224             case CONNECTIONLOSS:
225             case SESSIONEXPIRED:
226             case OPERATIONTIMEOUT:
227               retryOrThrow(retryCounter, e, "exists");
228               break;
229 
230             default:
231               throw e;
232           }
233         }
234         retryCounter.sleepUntilNextRetry();
235       }
236     } finally {
237       if (traceScope != null) traceScope.close();
238     }
239   }
240 
241   /**
242    * exists is an idempotent operation. Retry before throwing exception
243    * @return A Stat instance
244    */
245   public Stat exists(String path, boolean watch)
246   throws KeeperException, InterruptedException {
247     TraceScope traceScope = null;
248     try {
249       traceScope = Trace.startSpan("RecoverableZookeeper.exists");
250       RetryCounter retryCounter = retryCounterFactory.create();
251       while (true) {
252         try {
253           return checkZk().exists(path, watch);
254         } catch (KeeperException e) {
255           switch (e.code()) {
256             case CONNECTIONLOSS:
257             case SESSIONEXPIRED:
258             case OPERATIONTIMEOUT:
259               retryOrThrow(retryCounter, e, "exists");
260               break;
261 
262             default:
263               throw e;
264           }
265         }
266         retryCounter.sleepUntilNextRetry();
267       }
268     } finally {
269       if (traceScope != null) traceScope.close();
270     }
271   }
272 
273   private void retryOrThrow(RetryCounter retryCounter, KeeperException e,
274       String opName) throws KeeperException {
275     LOG.debug("Possibly transient ZooKeeper, quorum=" + quorumServers + ", exception=" + e);
276     if (!retryCounter.shouldRetry()) {
277       LOG.error("ZooKeeper " + opName + " failed after "
278         + retryCounter.getMaxAttempts() + " attempts");
279       throw e;
280     }
281   }
282 
283   /**
284    * getChildren is an idempotent operation. Retry before throwing exception
285    * @return List of children znodes
286    */
287   public List<String> getChildren(String path, Watcher watcher)
288     throws KeeperException, InterruptedException {
289     TraceScope traceScope = null;
290     try {
291       traceScope = Trace.startSpan("RecoverableZookeeper.getChildren");
292       RetryCounter retryCounter = retryCounterFactory.create();
293       while (true) {
294         try {
295           return checkZk().getChildren(path, watcher);
296         } catch (KeeperException e) {
297           switch (e.code()) {
298             case CONNECTIONLOSS:
299             case SESSIONEXPIRED:
300             case OPERATIONTIMEOUT:
301               retryOrThrow(retryCounter, e, "getChildren");
302               break;
303 
304             default:
305               throw e;
306           }
307         }
308         retryCounter.sleepUntilNextRetry();
309       }
310     } finally {
311       if (traceScope != null) traceScope.close();
312     }
313   }
314 
315   /**
316    * getChildren is an idempotent operation. Retry before throwing exception
317    * @return List of children znodes
318    */
319   public List<String> getChildren(String path, boolean watch)
320   throws KeeperException, InterruptedException {
321     TraceScope traceScope = null;
322     try {
323       traceScope = Trace.startSpan("RecoverableZookeeper.getChildren");
324       RetryCounter retryCounter = retryCounterFactory.create();
325       while (true) {
326         try {
327           return checkZk().getChildren(path, watch);
328         } catch (KeeperException e) {
329           switch (e.code()) {
330             case CONNECTIONLOSS:
331             case SESSIONEXPIRED:
332             case OPERATIONTIMEOUT:
333               retryOrThrow(retryCounter, e, "getChildren");
334               break;
335 
336             default:
337               throw e;
338           }
339         }
340         retryCounter.sleepUntilNextRetry();
341       }
342     } finally {
343       if (traceScope != null) traceScope.close();
344     }
345   }
346 
347   /**
348    * getData is an idempotent operation. Retry before throwing exception
349    * @return Data
350    */
351   public byte[] getData(String path, Watcher watcher, Stat stat)
352   throws KeeperException, InterruptedException {
353     TraceScope traceScope = null;
354     try {
355       traceScope = Trace.startSpan("RecoverableZookeeper.getData");
356       RetryCounter retryCounter = retryCounterFactory.create();
357       while (true) {
358         try {
359           byte[] revData = checkZk().getData(path, watcher, stat);
360           return this.removeMetaData(revData);
361         } catch (KeeperException e) {
362           switch (e.code()) {
363             case CONNECTIONLOSS:
364             case SESSIONEXPIRED:
365             case OPERATIONTIMEOUT:
366               retryOrThrow(retryCounter, e, "getData");
367               break;
368 
369             default:
370               throw e;
371           }
372         }
373         retryCounter.sleepUntilNextRetry();
374       }
375     } finally {
376       if (traceScope != null) traceScope.close();
377     }
378   }
379 
380   /**
381    * getData is an idemnpotent operation. Retry before throwing exception
382    * @return Data
383    */
384   public byte[] getData(String path, boolean watch, Stat stat)
385   throws KeeperException, InterruptedException {
386     TraceScope traceScope = null;
387     try {
388       traceScope = Trace.startSpan("RecoverableZookeeper.getData");
389       RetryCounter retryCounter = retryCounterFactory.create();
390       while (true) {
391         try {
392           byte[] revData = checkZk().getData(path, watch, stat);
393           return this.removeMetaData(revData);
394         } catch (KeeperException e) {
395           switch (e.code()) {
396             case CONNECTIONLOSS:
397             case SESSIONEXPIRED:
398             case OPERATIONTIMEOUT:
399               retryOrThrow(retryCounter, e, "getData");
400               break;
401 
402             default:
403               throw e;
404           }
405         }
406         retryCounter.sleepUntilNextRetry();
407       }
408     } finally {
409       if (traceScope != null) traceScope.close();
410     }
411   }
412 
413   /**
414    * setData is NOT an idempotent operation. Retry may cause BadVersion Exception
415    * Adding an identifier field into the data to check whether
416    * badversion is caused by the result of previous correctly setData
417    * @return Stat instance
418    */
419   public Stat setData(String path, byte[] data, int version)
420   throws KeeperException, InterruptedException {
421     TraceScope traceScope = null;
422     try {
423       traceScope = Trace.startSpan("RecoverableZookeeper.setData");
424       RetryCounter retryCounter = retryCounterFactory.create();
425       byte[] newData = appendMetaData(data);
426       boolean isRetry = false;
427       while (true) {
428         try {
429           return checkZk().setData(path, newData, version);
430         } catch (KeeperException e) {
431           switch (e.code()) {
432             case CONNECTIONLOSS:
433             case SESSIONEXPIRED:
434             case OPERATIONTIMEOUT:
435               retryOrThrow(retryCounter, e, "setData");
436               break;
437             case BADVERSION:
438               if (isRetry) {
439                 // try to verify whether the previous setData success or not
440                 try{
441                   Stat stat = new Stat();
442                   byte[] revData = checkZk().getData(path, false, stat);
443                   if(Bytes.compareTo(revData, newData) == 0) {
444                     // the bad version is caused by previous successful setData
445                     return stat;
446                   }
447                 } catch(KeeperException keeperException){
448                   // the ZK is not reliable at this moment. just throwing exception
449                   throw keeperException;
450                 }
451               }
452             // throw other exceptions and verified bad version exceptions
453             default:
454               throw e;
455           }
456         }
457         retryCounter.sleepUntilNextRetry();
458         isRetry = true;
459       }
460     } finally {
461       if (traceScope != null) traceScope.close();
462     }
463   }
464 
465   /**
466    * getAcl is an idempotent operation. Retry before throwing exception
467    * @return list of ACLs
468    */
469   public List<ACL> getAcl(String path, Stat stat)
470   throws KeeperException, InterruptedException {
471     TraceScope traceScope = null;
472     try {
473       traceScope = Trace.startSpan("RecoverableZookeeper.getAcl");
474       RetryCounter retryCounter = retryCounterFactory.create();
475       while (true) {
476         try {
477           return checkZk().getACL(path, stat);
478         } catch (KeeperException e) {
479           switch (e.code()) {
480             case CONNECTIONLOSS:
481             case SESSIONEXPIRED:
482             case OPERATIONTIMEOUT:
483               retryOrThrow(retryCounter, e, "getAcl");
484               break;
485 
486             default:
487               throw e;
488           }
489         }
490         retryCounter.sleepUntilNextRetry();
491       }
492     } finally {
493       if (traceScope != null) traceScope.close();
494     }
495   }
496 
497   /**
498    * setAcl is an idempotent operation. Retry before throwing exception
499    * @return list of ACLs
500    */
501   public Stat setAcl(String path, List<ACL> acls, int version)
502   throws KeeperException, InterruptedException {
503     TraceScope traceScope = null;
504     try {
505       traceScope = Trace.startSpan("RecoverableZookeeper.setAcl");
506       RetryCounter retryCounter = retryCounterFactory.create();
507       while (true) {
508         try {
509           return checkZk().setACL(path, acls, version);
510         } catch (KeeperException e) {
511           switch (e.code()) {
512             case CONNECTIONLOSS:
513             case SESSIONEXPIRED:
514             case OPERATIONTIMEOUT:
515               retryOrThrow(retryCounter, e, "setAcl");
516               break;
517 
518             default:
519               throw e;
520           }
521         }
522         retryCounter.sleepUntilNextRetry();
523       }
524     } finally {
525       if (traceScope != null) traceScope.close();
526     }
527   }
528 
529   /**
530    * <p>
531    * NONSEQUENTIAL create is idempotent operation.
532    * Retry before throwing exceptions.
533    * But this function will not throw the NodeExist exception back to the
534    * application.
535    * </p>
536    * <p>
537    * But SEQUENTIAL is NOT idempotent operation. It is necessary to add
538    * identifier to the path to verify, whether the previous one is successful
539    * or not.
540    * </p>
541    *
542    * @return Path
543    */
544   public String create(String path, byte[] data, List<ACL> acl,
545       CreateMode createMode)
546   throws KeeperException, InterruptedException {
547     TraceScope traceScope = null;
548     try {
549       traceScope = Trace.startSpan("RecoverableZookeeper.create");
550       byte[] newData = appendMetaData(data);
551       switch (createMode) {
552         case EPHEMERAL:
553         case PERSISTENT:
554           return createNonSequential(path, newData, acl, createMode);
555 
556         case EPHEMERAL_SEQUENTIAL:
557         case PERSISTENT_SEQUENTIAL:
558           return createSequential(path, newData, acl, createMode);
559 
560         default:
561           throw new IllegalArgumentException("Unrecognized CreateMode: " +
562               createMode);
563       }
564     } finally {
565       if (traceScope != null) traceScope.close();
566     }
567   }
568 
569   private String createNonSequential(String path, byte[] data, List<ACL> acl,
570       CreateMode createMode) throws KeeperException, InterruptedException {
571     RetryCounter retryCounter = retryCounterFactory.create();
572     boolean isRetry = false; // False for first attempt, true for all retries.
573     while (true) {
574       try {
575         return checkZk().create(path, data, acl, createMode);
576       } catch (KeeperException e) {
577         switch (e.code()) {
578           case NODEEXISTS:
579             if (isRetry) {
580               // If the connection was lost, there is still a possibility that
581               // we have successfully created the node at our previous attempt,
582               // so we read the node and compare.
583               byte[] currentData = checkZk().getData(path, false, null);
584               if (currentData != null &&
585                   Bytes.compareTo(currentData, data) == 0) {
586                 // We successfully created a non-sequential node
587                 return path;
588               }
589               LOG.error("Node " + path + " already exists with " +
590                   Bytes.toStringBinary(currentData) + ", could not write " +
591                   Bytes.toStringBinary(data));
592               throw e;
593             }
594             LOG.debug("Node " + path + " already exists");
595             throw e;
596 
597           case CONNECTIONLOSS:
598           case SESSIONEXPIRED:
599           case OPERATIONTIMEOUT:
600             retryOrThrow(retryCounter, e, "create");
601             break;
602 
603           default:
604             throw e;
605         }
606       }
607       retryCounter.sleepUntilNextRetry();
608       isRetry = true;
609     }
610   }
611 
612   private String createSequential(String path, byte[] data,
613       List<ACL> acl, CreateMode createMode)
614   throws KeeperException, InterruptedException {
615     RetryCounter retryCounter = retryCounterFactory.create();
616     boolean first = true;
617     String newPath = path+this.identifier;
618     while (true) {
619       try {
620         if (!first) {
621           // Check if we succeeded on a previous attempt
622           String previousResult = findPreviousSequentialNode(newPath);
623           if (previousResult != null) {
624             return previousResult;
625           }
626         }
627         first = false;
628         return checkZk().create(newPath, data, acl, createMode);
629       } catch (KeeperException e) {
630         switch (e.code()) {
631           case CONNECTIONLOSS:
632           case SESSIONEXPIRED:
633           case OPERATIONTIMEOUT:
634             retryOrThrow(retryCounter, e, "create");
635             break;
636 
637           default:
638             throw e;
639         }
640       }
641       retryCounter.sleepUntilNextRetry();
642     }
643   }
644   /**
645    * Convert Iterable of {@link ZKOp} we got into the ZooKeeper.Op
646    * instances to actually pass to multi (need to do this in order to appendMetaData).
647    */
648   private Iterable<Op> prepareZKMulti(Iterable<Op> ops)
649   throws UnsupportedOperationException {
650     if(ops == null) return null;
651 
652     List<Op> preparedOps = new LinkedList<Op>();
653     for (Op op : ops) {
654       if (op.getType() == ZooDefs.OpCode.create) {
655         CreateRequest create = (CreateRequest)op.toRequestRecord();
656         preparedOps.add(Op.create(create.getPath(), appendMetaData(create.getData()),
657           create.getAcl(), create.getFlags()));
658       } else if (op.getType() == ZooDefs.OpCode.delete) {
659         // no need to appendMetaData for delete
660         preparedOps.add(op);
661       } else if (op.getType() == ZooDefs.OpCode.setData) {
662         SetDataRequest setData = (SetDataRequest)op.toRequestRecord();
663         preparedOps.add(Op.setData(setData.getPath(), appendMetaData(setData.getData()),
664           setData.getVersion()));
665       } else {
666         throw new UnsupportedOperationException("Unexpected ZKOp type: " + op.getClass().getName());
667       }
668     }
669     return preparedOps;
670   }
671 
672   /**
673    * Run multiple operations in a transactional manner. Retry before throwing exception
674    */
675   public List<OpResult> multi(Iterable<Op> ops)
676   throws KeeperException, InterruptedException {
677     TraceScope traceScope = null;
678     try {
679       traceScope = Trace.startSpan("RecoverableZookeeper.multi");
680       RetryCounter retryCounter = retryCounterFactory.create();
681       Iterable<Op> multiOps = prepareZKMulti(ops);
682       while (true) {
683         try {
684           return checkZk().multi(multiOps);
685         } catch (KeeperException e) {
686           switch (e.code()) {
687             case CONNECTIONLOSS:
688             case SESSIONEXPIRED:
689             case OPERATIONTIMEOUT:
690               retryOrThrow(retryCounter, e, "multi");
691               break;
692 
693             default:
694               throw e;
695           }
696         }
697         retryCounter.sleepUntilNextRetry();
698     }
699     } finally {
700       if (traceScope != null) traceScope.close();
701     }
702   }
703 
704   private String findPreviousSequentialNode(String path)
705     throws KeeperException, InterruptedException {
706     int lastSlashIdx = path.lastIndexOf('/');
707     assert(lastSlashIdx != -1);
708     String parent = path.substring(0, lastSlashIdx);
709     String nodePrefix = path.substring(lastSlashIdx+1);
710 
711     List<String> nodes = checkZk().getChildren(parent, false);
712     List<String> matching = filterByPrefix(nodes, nodePrefix);
713     for (String node : matching) {
714       String nodePath = parent + "/" + node;
715       Stat stat = checkZk().exists(nodePath, false);
716       if (stat != null) {
717         return nodePath;
718       }
719     }
720     return null;
721   }
722 
723   public byte[] removeMetaData(byte[] data) {
724     if(data == null || data.length == 0) {
725       return data;
726     }
727     // check the magic data; to be backward compatible
728     byte magic = data[0];
729     if(magic != MAGIC) {
730       return data;
731     }
732 
733     int idLength = Bytes.toInt(data, ID_LENGTH_OFFSET);
734     int dataLength = data.length-MAGIC_SIZE-ID_LENGTH_SIZE-idLength;
735     int dataOffset = MAGIC_SIZE+ID_LENGTH_SIZE+idLength;
736 
737     byte[] newData = new byte[dataLength];
738     System.arraycopy(data, dataOffset, newData, 0, dataLength);
739     return newData;
740   }
741 
742   private byte[] appendMetaData(byte[] data) {
743     if(data == null || data.length == 0){
744       return data;
745     }
746     byte[] salt = Bytes.toBytes(salter.nextLong());
747     int idLength = id.length + salt.length;
748     byte[] newData = new byte[MAGIC_SIZE+ID_LENGTH_SIZE+idLength+data.length];
749     int pos = 0;
750     pos = Bytes.putByte(newData, pos, MAGIC);
751     pos = Bytes.putInt(newData, pos, idLength);
752     pos = Bytes.putBytes(newData, pos, id, 0, id.length);
753     pos = Bytes.putBytes(newData, pos, salt, 0, salt.length);
754     pos = Bytes.putBytes(newData, pos, data, 0, data.length);
755     return newData;
756   }
757 
758   public synchronized long getSessionId() {
759     return zk == null ? -1 : zk.getSessionId();
760   }
761 
762   public synchronized void close() throws InterruptedException {
763     if (zk != null) zk.close();
764   }
765 
766   public synchronized States getState() {
767     return zk == null ? null : zk.getState();
768   }
769 
770   public synchronized ZooKeeper getZooKeeper() {
771     return zk;
772   }
773 
774   public synchronized byte[] getSessionPasswd() {
775     return zk == null ? null : zk.getSessionPasswd();
776   }
777 
778   public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) throws KeeperException {
779     checkZk().sync(path, null, null);
780   }
781 
782   /**
783    * Filters the given node list by the given prefixes.
784    * This method is all-inclusive--if any element in the node list starts
785    * with any of the given prefixes, then it is included in the result.
786    *
787    * @param nodes the nodes to filter
788    * @param prefixes the prefixes to include in the result
789    * @return list of every element that starts with one of the prefixes
790    */
791   private static List<String> filterByPrefix(List<String> nodes,
792       String... prefixes) {
793     List<String> lockChildren = new ArrayList<String>();
794     for (String child : nodes){
795       for (String prefix : prefixes){
796         if (child.startsWith(prefix)){
797           lockChildren.add(child);
798           break;
799         }
800       }
801     }
802     return lockChildren;
803   }
804 
805   public String getIdentifier() {
806     return identifier;
807   }
808 }