1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.zookeeper;
20  
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.apache.hadoop.classification.InterfaceAudience;
24  import org.apache.hadoop.classification.InterfaceStability;
25  import org.apache.hadoop.hbase.util.Bytes;
26  import org.apache.hadoop.hbase.util.RetryCounter;
27  import org.apache.hadoop.hbase.util.RetryCounterFactory;
28  import org.apache.zookeeper.AsyncCallback;
29  import org.apache.zookeeper.CreateMode;
30  import org.apache.zookeeper.KeeperException;
31  import org.apache.zookeeper.Op;
32  import org.apache.zookeeper.OpResult;
33  import org.apache.zookeeper.Watcher;
34  import org.apache.zookeeper.ZooDefs;
35  import org.apache.zookeeper.ZooKeeper;
36  import org.apache.zookeeper.ZooKeeper.States;
37  import org.apache.zookeeper.data.ACL;
38  import org.apache.zookeeper.data.Stat;
39  import org.apache.zookeeper.proto.CreateRequest;
40  import org.apache.zookeeper.proto.SetDataRequest;
41  
42  import java.io.IOException;
43  import java.lang.management.ManagementFactory;
44  import java.security.SecureRandom;
45  import java.util.ArrayList;
46  import java.util.LinkedList;
47  import java.util.List;
48  import java.util.Random;
49  
50  /**
51   * A zookeeper that can handle 'recoverable' errors.
52   * To handle recoverable errors, developers need to realize that there are two
53   * classes of requests: idempotent and non-idempotent requests. Read requests
54   * and unconditional sets and deletes are examples of idempotent requests, they
55   * can be reissued with the same results.
56   * (Although, the delete may throw a NoNodeException on reissue its effect on
57   * the ZooKeeper state is the same.) Non-idempotent requests need special
58   * handling, application and library writers need to keep in mind that they may
59   * need to encode information in the data or name of znodes to detect
60   * retries. A simple example is a create that uses a sequence flag.
61   * If a process issues a create("/x-", ..., SEQUENCE) and gets a connection
62   * loss exception, that process will reissue another
63   * create("/x-", ..., SEQUENCE) and get back x-111. When the process does a
64   * getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be
65   * that x-109 was the result of the previous create, so the process actually
66   * owns both x-109 and x-111. An easy way around this is to use "x-process id-"
67   * when doing the create. If the process is using an id of 352, before reissuing
68   * the create it will do a getChildren("/") and see "x-222-1", "x-542-30",
69   * "x-352-109", x-333-110". The process will know that the original create
70   * succeeded an the znode it created is "x-352-109".
71   * @see "http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling"
72   */
73  @InterfaceAudience.Public
74  @InterfaceStability.Evolving
75  public class RecoverableZooKeeper {
76    private static final Log LOG = LogFactory.getLog(RecoverableZooKeeper.class);
77    // the actual ZooKeeper client instance
78    volatile private ZooKeeper zk;
79    private final RetryCounterFactory retryCounterFactory;
80    // An identifier of this process in the cluster
81    private final String identifier;
82    private final byte[] id;
83    private Watcher watcher;
84    private int sessionTimeout;
85    private String quorumServers;
86    private final Random salter;
87  
88    // The metadata attached to each piece of data has the
89    // format:
90    //   <magic> 1-byte constant
91    //   <id length> 4-byte big-endian integer (length of next field)
92    //   <id> identifier corresponding uniquely to this process
93    // It is prepended to the data supplied by the user.
94  
95    // the magic number is to be backward compatible
96    private static final byte MAGIC =(byte) 0XFF;
97    private static final int MAGIC_SIZE = Bytes.SIZEOF_BYTE;
98    private static final int ID_LENGTH_OFFSET = MAGIC_SIZE;
99    private static final int ID_LENGTH_SIZE =  Bytes.SIZEOF_INT;
100 
101   public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
102       Watcher watcher, int maxRetries, int retryIntervalMillis)
103   throws IOException {
104     this(quorumServers, sessionTimeout, watcher, maxRetries, retryIntervalMillis,
105         null);
106   }
107 
108   public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
109       Watcher watcher, int maxRetries, int retryIntervalMillis, String identifier)
110   throws IOException {
111     this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher);
112     this.retryCounterFactory =
113       new RetryCounterFactory(maxRetries, retryIntervalMillis);
114 
115     if (identifier == null || identifier.length() == 0) {
116       // the identifier = processID@hostName
117       identifier = ManagementFactory.getRuntimeMXBean().getName();
118     }
119     LOG.info("Process identifier=" + identifier +
120       " connecting to ZooKeeper ensemble=" + quorumServers);
121     this.identifier = identifier;
122     this.id = Bytes.toBytes(identifier);
123 
124     this.watcher = watcher;
125     this.sessionTimeout = sessionTimeout;
126     this.quorumServers = quorumServers;
127     salter = new SecureRandom();
128   }
129 
130   public void reconnectAfterExpiration()
131         throws IOException, InterruptedException {
132     LOG.info("Closing dead ZooKeeper connection, session" +
133       " was: 0x"+Long.toHexString(zk.getSessionId()));
134     zk.close();
135     this.zk = new ZooKeeper(this.quorumServers,
136       this.sessionTimeout, this.watcher);
137     LOG.info("Recreated a ZooKeeper, session" +
138       " is: 0x"+Long.toHexString(zk.getSessionId()));
139   }
140 
141   /**
142    * delete is an idempotent operation. Retry before throwing exception.
143    * This function will not throw NoNodeException if the path does not
144    * exist.
145    */
146   public void delete(String path, int version)
147   throws InterruptedException, KeeperException {
148     RetryCounter retryCounter = retryCounterFactory.create();
149     boolean isRetry = false; // False for first attempt, true for all retries.
150     while (true) {
151       try {
152         zk.delete(path, version);
153         return;
154       } catch (KeeperException e) {
155         switch (e.code()) {
156           case NONODE:
157             if (isRetry) {
158               LOG.info("Node " + path + " already deleted. Assuming a " +
159                   "previous attempt succeeded.");
160               return;
161             }
162             LOG.warn("Node " + path + " already deleted, retry=" + isRetry);
163             throw e;
164 
165           case CONNECTIONLOSS:
166           case SESSIONEXPIRED:
167           case OPERATIONTIMEOUT:
168             retryOrThrow(retryCounter, e, "delete");
169             break;
170 
171           default:
172             throw e;
173         }
174       }
175       retryCounter.sleepUntilNextRetry();
176       retryCounter.useRetry();
177       isRetry = true;
178     }
179   }
180 
181   /**
182    * exists is an idempotent operation. Retry before throwing exception
183    * @return A Stat instance
184    */
185   public Stat exists(String path, Watcher watcher)
186   throws KeeperException, InterruptedException {
187     RetryCounter retryCounter = retryCounterFactory.create();
188     while (true) {
189       try {
190         return zk.exists(path, watcher);
191       } catch (KeeperException e) {
192         switch (e.code()) {
193           case CONNECTIONLOSS:
194           case SESSIONEXPIRED:
195           case OPERATIONTIMEOUT:
196             retryOrThrow(retryCounter, e, "exists");
197             break;
198 
199           default:
200             throw e;
201         }
202       }
203       retryCounter.sleepUntilNextRetry();
204       retryCounter.useRetry();
205     }
206   }
207 
208   /**
209    * exists is an idempotent operation. Retry before throwing exception
210    * @return A Stat instance
211    */
212   public Stat exists(String path, boolean watch)
213   throws KeeperException, InterruptedException {
214     RetryCounter retryCounter = retryCounterFactory.create();
215     while (true) {
216       try {
217         return zk.exists(path, watch);
218       } catch (KeeperException e) {
219         switch (e.code()) {
220           case CONNECTIONLOSS:
221           case SESSIONEXPIRED:
222           case OPERATIONTIMEOUT:
223             retryOrThrow(retryCounter, e, "exists");
224             break;
225 
226           default:
227             throw e;
228         }
229       }
230       retryCounter.sleepUntilNextRetry();
231       retryCounter.useRetry();
232     }
233   }
234 
235   private void retryOrThrow(RetryCounter retryCounter, KeeperException e,
236       String opName) throws KeeperException {
237     LOG.warn("Possibly transient ZooKeeper, quorum=" + quorumServers + ", exception=" + e);
238     if (!retryCounter.shouldRetry()) {
239       LOG.error("ZooKeeper " + opName + " failed after "
240         + retryCounter.getMaxRetries() + " retries");
241       throw e;
242     }
243   }
244 
245   /**
246    * getChildren is an idempotent operation. Retry before throwing exception
247    * @return List of children znodes
248    */
249   public List<String> getChildren(String path, Watcher watcher)
250     throws KeeperException, InterruptedException {
251     RetryCounter retryCounter = retryCounterFactory.create();
252     while (true) {
253       try {
254         return zk.getChildren(path, watcher);
255       } catch (KeeperException e) {
256         switch (e.code()) {
257           case CONNECTIONLOSS:
258           case SESSIONEXPIRED:
259           case OPERATIONTIMEOUT:
260             retryOrThrow(retryCounter, e, "getChildren");
261             break;
262 
263           default:
264             throw e;
265         }
266       }
267       retryCounter.sleepUntilNextRetry();
268       retryCounter.useRetry();
269     }
270   }
271 
272   /**
273    * getChildren is an idempotent operation. Retry before throwing exception
274    * @return List of children znodes
275    */
276   public List<String> getChildren(String path, boolean watch)
277   throws KeeperException, InterruptedException {
278     RetryCounter retryCounter = retryCounterFactory.create();
279     while (true) {
280       try {
281         return zk.getChildren(path, watch);
282       } catch (KeeperException e) {
283         switch (e.code()) {
284           case CONNECTIONLOSS:
285           case SESSIONEXPIRED:
286           case OPERATIONTIMEOUT:
287             retryOrThrow(retryCounter, e, "getChildren");
288             break;
289 
290           default:
291             throw e;
292         }
293       }
294       retryCounter.sleepUntilNextRetry();
295       retryCounter.useRetry();
296     }
297   }
298 
299   /**
300    * getData is an idempotent operation. Retry before throwing exception
301    * @return Data
302    */
303   public byte[] getData(String path, Watcher watcher, Stat stat)
304   throws KeeperException, InterruptedException {
305     RetryCounter retryCounter = retryCounterFactory.create();
306     while (true) {
307       try {
308         byte[] revData = zk.getData(path, watcher, stat);
309         return this.removeMetaData(revData);
310       } catch (KeeperException e) {
311         switch (e.code()) {
312           case CONNECTIONLOSS:
313           case SESSIONEXPIRED:
314           case OPERATIONTIMEOUT:
315             retryOrThrow(retryCounter, e, "getData");
316             break;
317 
318           default:
319             throw e;
320         }
321       }
322       retryCounter.sleepUntilNextRetry();
323       retryCounter.useRetry();
324     }
325   }
326 
327   /**
328    * getData is an idemnpotent operation. Retry before throwing exception
329    * @return Data
330    */
331   public byte[] getData(String path, boolean watch, Stat stat)
332   throws KeeperException, InterruptedException {
333     RetryCounter retryCounter = retryCounterFactory.create();
334     while (true) {
335       try {
336         byte[] revData = zk.getData(path, watch, stat);
337         return this.removeMetaData(revData);
338       } catch (KeeperException e) {
339         switch (e.code()) {
340           case CONNECTIONLOSS:
341           case SESSIONEXPIRED:
342           case OPERATIONTIMEOUT:
343             retryOrThrow(retryCounter, e, "getData");
344             break;
345 
346           default:
347             throw e;
348         }
349       }
350       retryCounter.sleepUntilNextRetry();
351       retryCounter.useRetry();
352     }
353   }
354 
355   /**
356    * setData is NOT an idempotent operation. Retry may cause BadVersion Exception
357    * Adding an identifier field into the data to check whether
358    * badversion is caused by the result of previous correctly setData
359    * @return Stat instance
360    */
361   public Stat setData(String path, byte[] data, int version)
362   throws KeeperException, InterruptedException {
363     RetryCounter retryCounter = retryCounterFactory.create();
364     byte[] newData = appendMetaData(data);
365     boolean isRetry = false;
366     while (true) {
367       try {
368         return zk.setData(path, newData, version);
369       } catch (KeeperException e) {
370         switch (e.code()) {
371           case CONNECTIONLOSS:
372           case SESSIONEXPIRED:
373           case OPERATIONTIMEOUT:
374             retryOrThrow(retryCounter, e, "setData");
375             break;
376           case BADVERSION:
377             if (isRetry) {
378               // try to verify whether the previous setData success or not
379               try{
380                 Stat stat = new Stat();
381                 byte[] revData = zk.getData(path, false, stat);
382                 if(Bytes.compareTo(revData, newData) == 0) {
383                   // the bad version is caused by previous successful setData
384                   return stat;
385                 }
386               } catch(KeeperException keeperException){
387                 // the ZK is not reliable at this moment. just throwing exception
388                 throw keeperException;
389               }
390             }
391           // throw other exceptions and verified bad version exceptions
392           default:
393             throw e;
394         }
395       }
396       retryCounter.sleepUntilNextRetry();
397       retryCounter.useRetry();
398       isRetry = true;
399     }
400   }
401 
402   /**
403    * <p>
404    * NONSEQUENTIAL create is idempotent operation.
405    * Retry before throwing exceptions.
406    * But this function will not throw the NodeExist exception back to the
407    * application.
408    * </p>
409    * <p>
410    * But SEQUENTIAL is NOT idempotent operation. It is necessary to add
411    * identifier to the path to verify, whether the previous one is successful
412    * or not.
413    * </p>
414    *
415    * @return Path
416    */
417   public String create(String path, byte[] data, List<ACL> acl,
418       CreateMode createMode)
419   throws KeeperException, InterruptedException {
420     byte[] newData = appendMetaData(data);
421     switch (createMode) {
422       case EPHEMERAL:
423       case PERSISTENT:
424         return createNonSequential(path, newData, acl, createMode);
425 
426       case EPHEMERAL_SEQUENTIAL:
427       case PERSISTENT_SEQUENTIAL:
428         return createSequential(path, newData, acl, createMode);
429 
430       default:
431         throw new IllegalArgumentException("Unrecognized CreateMode: " +
432             createMode);
433     }
434   }
435 
436   private String createNonSequential(String path, byte[] data, List<ACL> acl,
437       CreateMode createMode) throws KeeperException, InterruptedException {
438     RetryCounter retryCounter = retryCounterFactory.create();
439     boolean isRetry = false; // False for first attempt, true for all retries.
440     while (true) {
441       try {
442         return zk.create(path, data, acl, createMode);
443       } catch (KeeperException e) {
444         switch (e.code()) {
445           case NODEEXISTS:
446             if (isRetry) {
447               // If the connection was lost, there is still a possibility that
448               // we have successfully created the node at our previous attempt,
449               // so we read the node and compare.
450               byte[] currentData = zk.getData(path, false, null);
451               if (currentData != null &&
452                   Bytes.compareTo(currentData, data) == 0) {
453                 // We successfully created a non-sequential node
454                 return path;
455               }
456               LOG.error("Node " + path + " already exists with " +
457                   Bytes.toStringBinary(currentData) + ", could not write " +
458                   Bytes.toStringBinary(data));
459               throw e;
460             }
461             LOG.info("Node " + path + " already exists and this is not a " +
462                 "retry");
463             throw e;
464 
465           case CONNECTIONLOSS:
466           case SESSIONEXPIRED:
467           case OPERATIONTIMEOUT:
468             retryOrThrow(retryCounter, e, "create");
469             break;
470 
471           default:
472             throw e;
473         }
474       }
475       retryCounter.sleepUntilNextRetry();
476       retryCounter.useRetry();
477       isRetry = true;
478     }
479   }
480 
481   private String createSequential(String path, byte[] data,
482       List<ACL> acl, CreateMode createMode)
483   throws KeeperException, InterruptedException {
484     RetryCounter retryCounter = retryCounterFactory.create();
485     boolean first = true;
486     String newPath = path+this.identifier;
487     while (true) {
488       try {
489         if (!first) {
490           // Check if we succeeded on a previous attempt
491           String previousResult = findPreviousSequentialNode(newPath);
492           if (previousResult != null) {
493             return previousResult;
494           }
495         }
496         first = false;
497         return zk.create(newPath, data, acl, createMode);
498       } catch (KeeperException e) {
499         switch (e.code()) {
500           case CONNECTIONLOSS:
501           case SESSIONEXPIRED:
502           case OPERATIONTIMEOUT:
503             retryOrThrow(retryCounter, e, "create");
504             break;
505 
506           default:
507             throw e;
508         }
509       }
510       retryCounter.sleepUntilNextRetry();
511       retryCounter.useRetry();
512     }
513   }
514   /**
515    * Convert Iterable of {@link ZKOp} we got into the ZooKeeper.Op
516    * instances to actually pass to multi (need to do this in order to appendMetaData).
517    */
518   private Iterable<Op> prepareZKMulti(Iterable<Op> ops)
519   throws UnsupportedOperationException {
520     if(ops == null) return null;
521 
522     List<Op> preparedOps = new LinkedList<Op>();
523     for (Op op : ops) {
524       if (op.getType() == ZooDefs.OpCode.create) {
525         CreateRequest create = (CreateRequest)op.toRequestRecord();
526         preparedOps.add(Op.create(create.getPath(), appendMetaData(create.getData()),
527           create.getAcl(), create.getFlags()));
528       } else if (op.getType() == ZooDefs.OpCode.delete) {
529         // no need to appendMetaData for delete
530         preparedOps.add(op);
531       } else if (op.getType() == ZooDefs.OpCode.setData) {
532         SetDataRequest setData = (SetDataRequest)op.toRequestRecord();
533         preparedOps.add(Op.setData(setData.getPath(), appendMetaData(setData.getData()),
534           setData.getVersion()));
535       } else {
536         throw new UnsupportedOperationException("Unexpected ZKOp type: " + op.getClass().getName());
537       }
538     }
539     return preparedOps;
540   }
541 
542   /**
543    * Run multiple operations in a transactional manner. Retry before throwing exception
544    */
545   public List<OpResult> multi(Iterable<Op> ops)
546   throws KeeperException, InterruptedException {
547     RetryCounter retryCounter = retryCounterFactory.create();
548     Iterable<Op> multiOps = prepareZKMulti(ops);
549     while (true) {
550       try {
551         return zk.multi(multiOps);
552       } catch (KeeperException e) {
553         switch (e.code()) {
554           case CONNECTIONLOSS:
555           case SESSIONEXPIRED:
556           case OPERATIONTIMEOUT:
557             retryOrThrow(retryCounter, e, "multi");
558             break;
559 
560           default:
561             throw e;
562         }
563       }
564       retryCounter.sleepUntilNextRetry();
565       retryCounter.useRetry();
566     }
567   }
568 
569   private String findPreviousSequentialNode(String path)
570     throws KeeperException, InterruptedException {
571     int lastSlashIdx = path.lastIndexOf('/');
572     assert(lastSlashIdx != -1);
573     String parent = path.substring(0, lastSlashIdx);
574     String nodePrefix = path.substring(lastSlashIdx+1);
575 
576     List<String> nodes = zk.getChildren(parent, false);
577     List<String> matching = filterByPrefix(nodes, nodePrefix);
578     for (String node : matching) {
579       String nodePath = parent + "/" + node;
580       Stat stat = zk.exists(nodePath, false);
581       if (stat != null) {
582         return nodePath;
583       }
584     }
585     return null;
586   }
587 
588   public byte[] removeMetaData(byte[] data) {
589     if(data == null || data.length == 0) {
590       return data;
591     }
592     // check the magic data; to be backward compatible
593     byte magic = data[0];
594     if(magic != MAGIC) {
595       return data;
596     }
597 
598     int idLength = Bytes.toInt(data, ID_LENGTH_OFFSET);
599     int dataLength = data.length-MAGIC_SIZE-ID_LENGTH_SIZE-idLength;
600     int dataOffset = MAGIC_SIZE+ID_LENGTH_SIZE+idLength;
601 
602     byte[] newData = new byte[dataLength];
603     System.arraycopy(data, dataOffset, newData, 0, dataLength);
604     return newData;
605   }
606 
607   private byte[] appendMetaData(byte[] data) {
608     if(data == null || data.length == 0){
609       return data;
610     }
611     byte[] salt = Bytes.toBytes(salter.nextLong());
612     int idLength = id.length + salt.length;
613     byte[] newData = new byte[MAGIC_SIZE+ID_LENGTH_SIZE+idLength+data.length];
614     int pos = 0;
615     pos = Bytes.putByte(newData, pos, MAGIC);
616     pos = Bytes.putInt(newData, pos, idLength);
617     pos = Bytes.putBytes(newData, pos, id, 0, id.length);
618     pos = Bytes.putBytes(newData, pos, salt, 0, salt.length);
619     pos = Bytes.putBytes(newData, pos, data, 0, data.length);
620     return newData;
621   }
622 
623   public long getSessionId() {
624     return zk.getSessionId();
625   }
626 
627   public void close() throws InterruptedException {
628     zk.close();
629   }
630 
631   public States getState() {
632     return zk.getState();
633   }
634 
635   public ZooKeeper getZooKeeper() {
636     return zk;
637   }
638 
639   public byte[] getSessionPasswd() {
640     return zk.getSessionPasswd();
641   }
642 
643   public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) {
644     this.zk.sync(path, null, null);
645   }
646 
647   /**
648    * Filters the given node list by the given prefixes.
649    * This method is all-inclusive--if any element in the node list starts
650    * with any of the given prefixes, then it is included in the result.
651    *
652    * @param nodes the nodes to filter
653    * @param prefixes the prefixes to include in the result
654    * @return list of every element that starts with one of the prefixes
655    */
656   private static List<String> filterByPrefix(List<String> nodes,
657       String... prefixes) {
658     List<String> lockChildren = new ArrayList<String>();
659     for (String child : nodes){
660       for (String prefix : prefixes){
661         if (child.startsWith(prefix)){
662           lockChildren.add(child);
663           break;
664         }
665       }
666     }
667     return lockChildren;
668   }
669 
670   public String getIdentifier() {
671     return identifier;
672   }
673 }