View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.zookeeper;
20  
21  import java.io.IOException;
22  import java.lang.management.ManagementFactory;
23  import java.util.ArrayList;
24  import java.util.LinkedList;
25  import java.util.List;
26  import java.util.Random;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.hbase.util.Bytes;
32  import org.apache.hadoop.hbase.util.RetryCounter;
33  import org.apache.hadoop.hbase.util.RetryCounterFactory;
34  import org.apache.zookeeper.AsyncCallback;
35  import org.apache.zookeeper.CreateMode;
36  import org.apache.zookeeper.KeeperException;
37  import org.apache.zookeeper.Op;
38  import org.apache.zookeeper.OpResult;
39  import org.apache.zookeeper.Watcher;
40  import org.apache.zookeeper.ZooDefs;
41  import org.apache.zookeeper.ZooKeeper;
42  import org.apache.zookeeper.ZooKeeper.States;
43  import org.apache.zookeeper.data.ACL;
44  import org.apache.zookeeper.data.Stat;
45  import org.apache.zookeeper.proto.CreateRequest;
46  import org.apache.zookeeper.proto.SetDataRequest;
47  import org.apache.htrace.Trace;
48  import org.apache.htrace.TraceScope;
49  
50  /**
51   * A zookeeper that can handle 'recoverable' errors.
52   * To handle recoverable errors, developers need to realize that there are two
53   * classes of requests: idempotent and non-idempotent requests. Read requests
54   * and unconditional sets and deletes are examples of idempotent requests, they
55   * can be reissued with the same results.
56   * (Although, the delete may throw a NoNodeException on reissue its effect on
57   * the ZooKeeper state is the same.) Non-idempotent requests need special
58   * handling, application and library writers need to keep in mind that they may
59   * need to encode information in the data or name of znodes to detect
60   * retries. A simple example is a create that uses a sequence flag.
61   * If a process issues a create("/x-", ..., SEQUENCE) and gets a connection
62   * loss exception, that process will reissue another
63   * create("/x-", ..., SEQUENCE) and get back x-111. When the process does a
64   * getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be
65   * that x-109 was the result of the previous create, so the process actually
66   * owns both x-109 and x-111. An easy way around this is to use "x-process id-"
67   * when doing the create. If the process is using an id of 352, before reissuing
68   * the create it will do a getChildren("/") and see "x-222-1", "x-542-30",
69   * "x-352-109", x-333-110". The process will know that the original create
70   * succeeded an the znode it created is "x-352-109".
71   * @see "http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling"
72   */
73  @InterfaceAudience.Private
74  public class RecoverableZooKeeper {
75    private static final Log LOG = LogFactory.getLog(RecoverableZooKeeper.class);
76    // the actual ZooKeeper client instance
77    private ZooKeeper zk;
78    private final RetryCounterFactory retryCounterFactory;
79    // An identifier of this process in the cluster
80    private final String identifier;
81    private final byte[] id;
82    private Watcher watcher;
83    private int sessionTimeout;
84    private String quorumServers;
85    private final Random salter;
86  
87    // The metadata attached to each piece of data has the
88    // format:
89    //   <magic> 1-byte constant
90    //   <id length> 4-byte big-endian integer (length of next field)
91    //   <id> identifier corresponding uniquely to this process
92    // It is prepended to the data supplied by the user.
93  
94    // the magic number is to be backward compatible
95    private static final byte MAGIC =(byte) 0XFF;
96    private static final int MAGIC_SIZE = Bytes.SIZEOF_BYTE;
97    private static final int ID_LENGTH_OFFSET = MAGIC_SIZE;
98    private static final int ID_LENGTH_SIZE =  Bytes.SIZEOF_INT;
99  
100   public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
101       Watcher watcher, int maxRetries, int retryIntervalMillis)
102   throws IOException {
103     this(quorumServers, sessionTimeout, watcher, maxRetries, retryIntervalMillis,
104         null);
105   }
106 
107   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DE_MIGHT_IGNORE",
108       justification="None. Its always been this way.")
109   public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
110       Watcher watcher, int maxRetries, int retryIntervalMillis, String identifier)
111   throws IOException {
112     // TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should.
113     this.retryCounterFactory =
114       new RetryCounterFactory(maxRetries+1, retryIntervalMillis);
115 
116     if (identifier == null || identifier.length() == 0) {
117       // the identifier = processID@hostName
118       identifier = ManagementFactory.getRuntimeMXBean().getName();
119     }
120     LOG.info("Process identifier=" + identifier +
121       " connecting to ZooKeeper ensemble=" + quorumServers);
122     this.identifier = identifier;
123     this.id = Bytes.toBytes(identifier);
124 
125     this.watcher = watcher;
126     this.sessionTimeout = sessionTimeout;
127     this.quorumServers = quorumServers;
128     try {checkZk();} catch (Exception x) {/* ignore */}
129     salter = new Random();
130   }
131 
132   /**
133    * Try to create a Zookeeper connection. Turns any exception encountered into a
134    * KeeperException.OperationTimeoutException so it can retried.
135    * @return The created Zookeeper connection object
136    * @throws KeeperException
137    */
138   protected synchronized ZooKeeper checkZk() throws KeeperException {
139     if (this.zk == null) {
140       try {
141         this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher);
142       } catch (IOException ex) {
143         LOG.warn("Unable to create ZooKeeper Connection", ex);
144         throw new KeeperException.OperationTimeoutException();
145       }
146     }
147     return zk;
148   }
149 
150   public synchronized void reconnectAfterExpiration()
151         throws IOException, KeeperException, InterruptedException {
152     if (zk != null) {
153       LOG.info("Closing dead ZooKeeper connection, session" +
154         " was: 0x"+Long.toHexString(zk.getSessionId()));
155       zk.close();
156       // reset the Zookeeper connection
157       zk = null;
158     }
159     checkZk();
160     LOG.info("Recreated a ZooKeeper, session" +
161       " is: 0x"+Long.toHexString(zk.getSessionId()));
162   }
163 
164   /**
165    * delete is an idempotent operation. Retry before throwing exception.
166    * This function will not throw NoNodeException if the path does not
167    * exist.
168    */
169   public void delete(String path, int version)
170   throws InterruptedException, KeeperException {
171     TraceScope traceScope = null;
172     try {
173       traceScope = Trace.startSpan("RecoverableZookeeper.delete");
174       RetryCounter retryCounter = retryCounterFactory.create();
175       boolean isRetry = false; // False for first attempt, true for all retries.
176       while (true) {
177         try {
178           checkZk().delete(path, version);
179           return;
180         } catch (KeeperException e) {
181           switch (e.code()) {
182             case NONODE:
183               if (isRetry) {
184                 LOG.debug("Node " + path + " already deleted. Assuming a " +
185                     "previous attempt succeeded.");
186                 return;
187               }
188               LOG.debug("Node " + path + " already deleted, retry=" + isRetry);
189               throw e;
190 
191             case CONNECTIONLOSS:
192             case OPERATIONTIMEOUT:
193               retryOrThrow(retryCounter, e, "delete");
194               break;
195 
196             default:
197               throw e;
198           }
199         }
200         retryCounter.sleepUntilNextRetry();
201         isRetry = true;
202       }
203     } finally {
204       if (traceScope != null) traceScope.close();
205     }
206   }
207 
208   /**
209    * exists is an idempotent operation. Retry before throwing exception
210    * @return A Stat instance
211    */
212   public Stat exists(String path, Watcher watcher)
213   throws KeeperException, InterruptedException {
214     TraceScope traceScope = null;
215     try {
216       traceScope = Trace.startSpan("RecoverableZookeeper.exists");
217       RetryCounter retryCounter = retryCounterFactory.create();
218       while (true) {
219         try {
220           return checkZk().exists(path, watcher);
221         } catch (KeeperException e) {
222           switch (e.code()) {
223             case CONNECTIONLOSS:
224             case OPERATIONTIMEOUT:
225               retryOrThrow(retryCounter, e, "exists");
226               break;
227 
228             default:
229               throw e;
230           }
231         }
232         retryCounter.sleepUntilNextRetry();
233       }
234     } finally {
235       if (traceScope != null) traceScope.close();
236     }
237   }
238 
239   /**
240    * exists is an idempotent operation. Retry before throwing exception
241    * @return A Stat instance
242    */
243   public Stat exists(String path, boolean watch)
244   throws KeeperException, InterruptedException {
245     TraceScope traceScope = null;
246     try {
247       traceScope = Trace.startSpan("RecoverableZookeeper.exists");
248       RetryCounter retryCounter = retryCounterFactory.create();
249       while (true) {
250         try {
251           return checkZk().exists(path, watch);
252         } catch (KeeperException e) {
253           switch (e.code()) {
254             case CONNECTIONLOSS:
255             case OPERATIONTIMEOUT:
256               retryOrThrow(retryCounter, e, "exists");
257               break;
258 
259             default:
260               throw e;
261           }
262         }
263         retryCounter.sleepUntilNextRetry();
264       }
265     } finally {
266       if (traceScope != null) traceScope.close();
267     }
268   }
269 
270   private void retryOrThrow(RetryCounter retryCounter, KeeperException e,
271       String opName) throws KeeperException {
272     LOG.debug("Possibly transient ZooKeeper, quorum=" + quorumServers + ", exception=" + e);
273     if (!retryCounter.shouldRetry()) {
274       LOG.error("ZooKeeper " + opName + " failed after "
275         + retryCounter.getMaxAttempts() + " attempts");
276       throw e;
277     }
278   }
279 
280   /**
281    * getChildren is an idempotent operation. Retry before throwing exception
282    * @return List of children znodes
283    */
284   public List<String> getChildren(String path, Watcher watcher)
285     throws KeeperException, InterruptedException {
286     TraceScope traceScope = null;
287     try {
288       traceScope = Trace.startSpan("RecoverableZookeeper.getChildren");
289       RetryCounter retryCounter = retryCounterFactory.create();
290       while (true) {
291         try {
292           return checkZk().getChildren(path, watcher);
293         } catch (KeeperException e) {
294           switch (e.code()) {
295             case CONNECTIONLOSS:
296             case OPERATIONTIMEOUT:
297               retryOrThrow(retryCounter, e, "getChildren");
298               break;
299 
300             default:
301               throw e;
302           }
303         }
304         retryCounter.sleepUntilNextRetry();
305       }
306     } finally {
307       if (traceScope != null) traceScope.close();
308     }
309   }
310 
311   /**
312    * getChildren is an idempotent operation. Retry before throwing exception
313    * @return List of children znodes
314    */
315   public List<String> getChildren(String path, boolean watch)
316   throws KeeperException, InterruptedException {
317     TraceScope traceScope = null;
318     try {
319       traceScope = Trace.startSpan("RecoverableZookeeper.getChildren");
320       RetryCounter retryCounter = retryCounterFactory.create();
321       while (true) {
322         try {
323           return checkZk().getChildren(path, watch);
324         } catch (KeeperException e) {
325           switch (e.code()) {
326             case CONNECTIONLOSS:
327             case OPERATIONTIMEOUT:
328               retryOrThrow(retryCounter, e, "getChildren");
329               break;
330 
331             default:
332               throw e;
333           }
334         }
335         retryCounter.sleepUntilNextRetry();
336       }
337     } finally {
338       if (traceScope != null) traceScope.close();
339     }
340   }
341 
342   /**
343    * getData is an idempotent operation. Retry before throwing exception
344    * @return Data
345    */
346   public byte[] getData(String path, Watcher watcher, Stat stat)
347   throws KeeperException, InterruptedException {
348     TraceScope traceScope = null;
349     try {
350       traceScope = Trace.startSpan("RecoverableZookeeper.getData");
351       RetryCounter retryCounter = retryCounterFactory.create();
352       while (true) {
353         try {
354           byte[] revData = checkZk().getData(path, watcher, stat);
355           return this.removeMetaData(revData);
356         } catch (KeeperException e) {
357           switch (e.code()) {
358             case CONNECTIONLOSS:
359             case OPERATIONTIMEOUT:
360               retryOrThrow(retryCounter, e, "getData");
361               break;
362 
363             default:
364               throw e;
365           }
366         }
367         retryCounter.sleepUntilNextRetry();
368       }
369     } finally {
370       if (traceScope != null) traceScope.close();
371     }
372   }
373 
374   /**
375    * getData is an idemnpotent operation. Retry before throwing exception
376    * @return Data
377    */
378   public byte[] getData(String path, boolean watch, Stat stat)
379   throws KeeperException, InterruptedException {
380     TraceScope traceScope = null;
381     try {
382       traceScope = Trace.startSpan("RecoverableZookeeper.getData");
383       RetryCounter retryCounter = retryCounterFactory.create();
384       while (true) {
385         try {
386           byte[] revData = checkZk().getData(path, watch, stat);
387           return this.removeMetaData(revData);
388         } catch (KeeperException e) {
389           switch (e.code()) {
390             case CONNECTIONLOSS:
391             case OPERATIONTIMEOUT:
392               retryOrThrow(retryCounter, e, "getData");
393               break;
394 
395             default:
396               throw e;
397           }
398         }
399         retryCounter.sleepUntilNextRetry();
400       }
401     } finally {
402       if (traceScope != null) traceScope.close();
403     }
404   }
405 
406   /**
407    * setData is NOT an idempotent operation. Retry may cause BadVersion Exception
408    * Adding an identifier field into the data to check whether
409    * badversion is caused by the result of previous correctly setData
410    * @return Stat instance
411    */
412   public Stat setData(String path, byte[] data, int version)
413   throws KeeperException, InterruptedException {
414     TraceScope traceScope = null;
415     try {
416       traceScope = Trace.startSpan("RecoverableZookeeper.setData");
417       RetryCounter retryCounter = retryCounterFactory.create();
418       byte[] newData = appendMetaData(data);
419       boolean isRetry = false;
420       while (true) {
421         try {
422           return checkZk().setData(path, newData, version);
423         } catch (KeeperException e) {
424           switch (e.code()) {
425             case CONNECTIONLOSS:
426             case OPERATIONTIMEOUT:
427               retryOrThrow(retryCounter, e, "setData");
428               break;
429             case BADVERSION:
430               if (isRetry) {
431                 // try to verify whether the previous setData success or not
432                 try{
433                   Stat stat = new Stat();
434                   byte[] revData = checkZk().getData(path, false, stat);
435                   if(Bytes.compareTo(revData, newData) == 0) {
436                     // the bad version is caused by previous successful setData
437                     return stat;
438                   }
439                 } catch(KeeperException keeperException){
440                   // the ZK is not reliable at this moment. just throwing exception
441                   throw keeperException;
442                 }
443               }
444             // throw other exceptions and verified bad version exceptions
445             default:
446               throw e;
447           }
448         }
449         retryCounter.sleepUntilNextRetry();
450         isRetry = true;
451       }
452     } finally {
453       if (traceScope != null) traceScope.close();
454     }
455   }
456 
457   /**
458    * getAcl is an idempotent operation. Retry before throwing exception
459    * @return list of ACLs
460    */
461   public List<ACL> getAcl(String path, Stat stat)
462   throws KeeperException, InterruptedException {
463     TraceScope traceScope = null;
464     try {
465       traceScope = Trace.startSpan("RecoverableZookeeper.getAcl");
466       RetryCounter retryCounter = retryCounterFactory.create();
467       while (true) {
468         try {
469           return checkZk().getACL(path, stat);
470         } catch (KeeperException e) {
471           switch (e.code()) {
472             case CONNECTIONLOSS:
473             case OPERATIONTIMEOUT:
474               retryOrThrow(retryCounter, e, "getAcl");
475               break;
476 
477             default:
478               throw e;
479           }
480         }
481         retryCounter.sleepUntilNextRetry();
482       }
483     } finally {
484       if (traceScope != null) traceScope.close();
485     }
486   }
487 
488   /**
489    * setAcl is an idempotent operation. Retry before throwing exception
490    * @return list of ACLs
491    */
492   public Stat setAcl(String path, List<ACL> acls, int version)
493   throws KeeperException, InterruptedException {
494     TraceScope traceScope = null;
495     try {
496       traceScope = Trace.startSpan("RecoverableZookeeper.setAcl");
497       RetryCounter retryCounter = retryCounterFactory.create();
498       while (true) {
499         try {
500           return checkZk().setACL(path, acls, version);
501         } catch (KeeperException e) {
502           switch (e.code()) {
503             case CONNECTIONLOSS:
504             case OPERATIONTIMEOUT:
505               retryOrThrow(retryCounter, e, "setAcl");
506               break;
507 
508             default:
509               throw e;
510           }
511         }
512         retryCounter.sleepUntilNextRetry();
513       }
514     } finally {
515       if (traceScope != null) traceScope.close();
516     }
517   }
518 
519   /**
520    * <p>
521    * NONSEQUENTIAL create is idempotent operation.
522    * Retry before throwing exceptions.
523    * But this function will not throw the NodeExist exception back to the
524    * application.
525    * </p>
526    * <p>
527    * But SEQUENTIAL is NOT idempotent operation. It is necessary to add
528    * identifier to the path to verify, whether the previous one is successful
529    * or not.
530    * </p>
531    *
532    * @return Path
533    */
534   public String create(String path, byte[] data, List<ACL> acl,
535       CreateMode createMode)
536   throws KeeperException, InterruptedException {
537     TraceScope traceScope = null;
538     try {
539       traceScope = Trace.startSpan("RecoverableZookeeper.create");
540       byte[] newData = appendMetaData(data);
541       switch (createMode) {
542         case EPHEMERAL:
543         case PERSISTENT:
544           return createNonSequential(path, newData, acl, createMode);
545 
546         case EPHEMERAL_SEQUENTIAL:
547         case PERSISTENT_SEQUENTIAL:
548           return createSequential(path, newData, acl, createMode);
549 
550         default:
551           throw new IllegalArgumentException("Unrecognized CreateMode: " +
552               createMode);
553       }
554     } finally {
555       if (traceScope != null) traceScope.close();
556     }
557   }
558 
559   private String createNonSequential(String path, byte[] data, List<ACL> acl,
560       CreateMode createMode) throws KeeperException, InterruptedException {
561     RetryCounter retryCounter = retryCounterFactory.create();
562     boolean isRetry = false; // False for first attempt, true for all retries.
563     while (true) {
564       try {
565         return checkZk().create(path, data, acl, createMode);
566       } catch (KeeperException e) {
567         switch (e.code()) {
568           case NODEEXISTS:
569             if (isRetry) {
570               // If the connection was lost, there is still a possibility that
571               // we have successfully created the node at our previous attempt,
572               // so we read the node and compare.
573               byte[] currentData = checkZk().getData(path, false, null);
574               if (currentData != null &&
575                   Bytes.compareTo(currentData, data) == 0) {
576                 // We successfully created a non-sequential node
577                 return path;
578               }
579               LOG.error("Node " + path + " already exists with " +
580                   Bytes.toStringBinary(currentData) + ", could not write " +
581                   Bytes.toStringBinary(data));
582               throw e;
583             }
584             LOG.debug("Node " + path + " already exists");
585             throw e;
586 
587           case CONNECTIONLOSS:
588           case OPERATIONTIMEOUT:
589             retryOrThrow(retryCounter, e, "create");
590             break;
591 
592           default:
593             throw e;
594         }
595       }
596       retryCounter.sleepUntilNextRetry();
597       isRetry = true;
598     }
599   }
600 
601   private String createSequential(String path, byte[] data,
602       List<ACL> acl, CreateMode createMode)
603   throws KeeperException, InterruptedException {
604     RetryCounter retryCounter = retryCounterFactory.create();
605     boolean first = true;
606     String newPath = path+this.identifier;
607     while (true) {
608       try {
609         if (!first) {
610           // Check if we succeeded on a previous attempt
611           String previousResult = findPreviousSequentialNode(newPath);
612           if (previousResult != null) {
613             return previousResult;
614           }
615         }
616         first = false;
617         return checkZk().create(newPath, data, acl, createMode);
618       } catch (KeeperException e) {
619         switch (e.code()) {
620           case CONNECTIONLOSS:
621           case OPERATIONTIMEOUT:
622             retryOrThrow(retryCounter, e, "create");
623             break;
624 
625           default:
626             throw e;
627         }
628       }
629       retryCounter.sleepUntilNextRetry();
630     }
631   }
632   /**
633    * Convert Iterable of {@link ZKOp} we got into the ZooKeeper.Op
634    * instances to actually pass to multi (need to do this in order to appendMetaData).
635    */
636   private Iterable<Op> prepareZKMulti(Iterable<Op> ops)
637   throws UnsupportedOperationException {
638     if(ops == null) return null;
639 
640     List<Op> preparedOps = new LinkedList<Op>();
641     for (Op op : ops) {
642       if (op.getType() == ZooDefs.OpCode.create) {
643         CreateRequest create = (CreateRequest)op.toRequestRecord();
644         preparedOps.add(Op.create(create.getPath(), appendMetaData(create.getData()),
645           create.getAcl(), create.getFlags()));
646       } else if (op.getType() == ZooDefs.OpCode.delete) {
647         // no need to appendMetaData for delete
648         preparedOps.add(op);
649       } else if (op.getType() == ZooDefs.OpCode.setData) {
650         SetDataRequest setData = (SetDataRequest)op.toRequestRecord();
651         preparedOps.add(Op.setData(setData.getPath(), appendMetaData(setData.getData()),
652           setData.getVersion()));
653       } else {
654         throw new UnsupportedOperationException("Unexpected ZKOp type: " + op.getClass().getName());
655       }
656     }
657     return preparedOps;
658   }
659 
660   /**
661    * Run multiple operations in a transactional manner. Retry before throwing exception
662    */
663   public List<OpResult> multi(Iterable<Op> ops)
664   throws KeeperException, InterruptedException {
665     TraceScope traceScope = null;
666     try {
667       traceScope = Trace.startSpan("RecoverableZookeeper.multi");
668       RetryCounter retryCounter = retryCounterFactory.create();
669       Iterable<Op> multiOps = prepareZKMulti(ops);
670       while (true) {
671         try {
672           return checkZk().multi(multiOps);
673         } catch (KeeperException e) {
674           switch (e.code()) {
675             case CONNECTIONLOSS:
676             case OPERATIONTIMEOUT:
677               retryOrThrow(retryCounter, e, "multi");
678               break;
679 
680             default:
681               throw e;
682           }
683         }
684         retryCounter.sleepUntilNextRetry();
685     }
686     } finally {
687       if (traceScope != null) traceScope.close();
688     }
689   }
690 
691   private String findPreviousSequentialNode(String path)
692     throws KeeperException, InterruptedException {
693     int lastSlashIdx = path.lastIndexOf('/');
694     assert(lastSlashIdx != -1);
695     String parent = path.substring(0, lastSlashIdx);
696     String nodePrefix = path.substring(lastSlashIdx+1);
697 
698     List<String> nodes = checkZk().getChildren(parent, false);
699     List<String> matching = filterByPrefix(nodes, nodePrefix);
700     for (String node : matching) {
701       String nodePath = parent + "/" + node;
702       Stat stat = checkZk().exists(nodePath, false);
703       if (stat != null) {
704         return nodePath;
705       }
706     }
707     return null;
708   }
709 
710   public byte[] removeMetaData(byte[] data) {
711     if(data == null || data.length == 0) {
712       return data;
713     }
714     // check the magic data; to be backward compatible
715     byte magic = data[0];
716     if(magic != MAGIC) {
717       return data;
718     }
719 
720     int idLength = Bytes.toInt(data, ID_LENGTH_OFFSET);
721     int dataLength = data.length-MAGIC_SIZE-ID_LENGTH_SIZE-idLength;
722     int dataOffset = MAGIC_SIZE+ID_LENGTH_SIZE+idLength;
723 
724     byte[] newData = new byte[dataLength];
725     System.arraycopy(data, dataOffset, newData, 0, dataLength);
726     return newData;
727   }
728 
729   private byte[] appendMetaData(byte[] data) {
730     if(data == null || data.length == 0){
731       return data;
732     }
733     byte[] salt = Bytes.toBytes(salter.nextLong());
734     int idLength = id.length + salt.length;
735     byte[] newData = new byte[MAGIC_SIZE+ID_LENGTH_SIZE+idLength+data.length];
736     int pos = 0;
737     pos = Bytes.putByte(newData, pos, MAGIC);
738     pos = Bytes.putInt(newData, pos, idLength);
739     pos = Bytes.putBytes(newData, pos, id, 0, id.length);
740     pos = Bytes.putBytes(newData, pos, salt, 0, salt.length);
741     pos = Bytes.putBytes(newData, pos, data, 0, data.length);
742     return newData;
743   }
744 
745   public synchronized long getSessionId() {
746     return zk == null ? -1 : zk.getSessionId();
747   }
748 
749   public synchronized void close() throws InterruptedException {
750     if (zk != null) zk.close();
751   }
752 
753   public synchronized States getState() {
754     return zk == null ? null : zk.getState();
755   }
756 
757   public synchronized ZooKeeper getZooKeeper() {
758     return zk;
759   }
760 
761   public synchronized byte[] getSessionPasswd() {
762     return zk == null ? null : zk.getSessionPasswd();
763   }
764 
765   public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) throws KeeperException {
766     checkZk().sync(path, null, null);
767   }
768 
769   /**
770    * Filters the given node list by the given prefixes.
771    * This method is all-inclusive--if any element in the node list starts
772    * with any of the given prefixes, then it is included in the result.
773    *
774    * @param nodes the nodes to filter
775    * @param prefixes the prefixes to include in the result
776    * @return list of every element that starts with one of the prefixes
777    */
778   private static List<String> filterByPrefix(List<String> nodes,
779       String... prefixes) {
780     List<String> lockChildren = new ArrayList<String>();
781     for (String child : nodes){
782       for (String prefix : prefixes){
783         if (child.startsWith(prefix)){
784           lockChildren.add(child);
785           break;
786         }
787       }
788     }
789     return lockChildren;
790   }
791 
792   public String getIdentifier() {
793     return identifier;
794   }
795 }