View Javadoc

1   /**
2    * Copyright 2011 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.zookeeper;
21  
22  import java.io.IOException;
23  import java.lang.management.ManagementFactory;
24  import java.util.ArrayList;
25  import java.util.LinkedList;
26  import java.util.List;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.hbase.util.Bytes;
31  import org.apache.hadoop.hbase.util.RetryCounter;
32  import org.apache.hadoop.hbase.util.RetryCounterFactory;
33  import org.apache.zookeeper.AsyncCallback;
34  import org.apache.zookeeper.CreateMode;
35  import org.apache.zookeeper.KeeperException;
36  import org.apache.zookeeper.Op;
37  import org.apache.zookeeper.OpResult;
38  import org.apache.zookeeper.Watcher;
39  import org.apache.zookeeper.ZooDefs;
40  import org.apache.zookeeper.ZooKeeper;
41  import org.apache.zookeeper.ZooKeeper.States;
42  import org.apache.zookeeper.data.ACL;
43  import org.apache.zookeeper.data.Stat;
44  import org.apache.zookeeper.proto.CreateRequest;
45  import org.apache.zookeeper.proto.SetDataRequest;
46  
47  /**
48   * A zookeeper that can handle 'recoverable' errors.
49   * To handle recoverable errors, developers need to realize that there are two 
50   * classes of requests: idempotent and non-idempotent requests. Read requests 
51   * and unconditional sets and deletes are examples of idempotent requests, they 
52   * can be reissued with the same results. 
53   * (Although, the delete may throw a NoNodeException on reissue its effect on 
54   * the ZooKeeper state is the same.) Non-idempotent requests need special 
55   * handling, application and library writers need to keep in mind that they may 
56   * need to encode information in the data or name of znodes to detect 
57   * retries. A simple example is a create that uses a sequence flag. 
58   * If a process issues a create("/x-", ..., SEQUENCE) and gets a connection 
59   * loss exception, that process will reissue another 
60   * create("/x-", ..., SEQUENCE) and get back x-111. When the process does a 
61   * getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be 
62   * that x-109 was the result of the previous create, so the process actually 
63   * owns both x-109 and x-111. An easy way around this is to use "x-process id-" 
64   * when doing the create. If the process is using an id of 352, before reissuing
65   * the create it will do a getChildren("/") and see "x-222-1", "x-542-30", 
66   * "x-352-109", x-333-110". The process will know that the original create 
67   * succeeded an the znode it created is "x-352-109".
68   * @see "http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling"
69   */
70  public class RecoverableZooKeeper {
71    private static final Log LOG = LogFactory.getLog(RecoverableZooKeeper.class);
72    // the actual ZooKeeper client instance
73    private ZooKeeper zk;
74    private final RetryCounterFactory retryCounterFactory;
75    // An identifier of this process in the cluster
76    private final String identifier;
77    private final byte[] id;
78    private Watcher watcher;
79    private int sessionTimeout;
80    private String quorumServers;
81  
82    // The metadata attached to each piece of data has the
83    // format:
84    //   <magic> 1-byte constant
85    //   <id length> 4-byte big-endian integer (length of next field)
86    //   <id> identifier corresponding uniquely to this process
87    // It is prepended to the data supplied by the user.
88  
89    // the magic number is to be backward compatible
90    private static final byte MAGIC =(byte) 0XFF;
91    private static final int MAGIC_SIZE = Bytes.SIZEOF_BYTE;
92    private static final int ID_LENGTH_OFFSET = MAGIC_SIZE;
93    private static final int ID_LENGTH_SIZE =  Bytes.SIZEOF_INT;
94  
95    public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
96        Watcher watcher, int maxRetries, int retryIntervalMillis) 
97    throws IOException {
98      this.retryCounterFactory =
99        new RetryCounterFactory(maxRetries, retryIntervalMillis);
100 
101     // the identifier = processID@hostName
102     this.identifier = ManagementFactory.getRuntimeMXBean().getName();
103     LOG.info("The identifier of this process is " + identifier);
104     this.id = Bytes.toBytes(identifier);
105     this.watcher = watcher;
106     this.sessionTimeout = sessionTimeout;
107     this.quorumServers = quorumServers;
108     try {checkZk();} catch (Exception x) {/* ignore */}
109   }
110 
111   /**
112    * Try to create a Zookeeper connection. Turns any exception encountered into a
113    * KeeperException.OperationTimeoutException so it can retried.
114    * @return The created Zookeeper connection object
115    * @throws KeeperException
116    */
117   protected synchronized ZooKeeper checkZk() throws KeeperException {
118     if (this.zk == null) {
119       try {
120         this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher);
121       } catch (IOException ex) {
122         LOG.warn("Unable to create ZooKeeper Connection", ex);
123         throw new KeeperException.OperationTimeoutException();
124       }
125     }
126     return zk;
127   }
128 
129   public synchronized void reconnectAfterExpiration()
130         throws IOException, KeeperException, InterruptedException {
131     if (zk != null) {
132       LOG.info("Closing dead ZooKeeper connection, session" +
133         " was: 0x"+Long.toHexString(zk.getSessionId()));
134       zk.close();
135       // reset the Zookeeper connection
136       zk = null;
137     }
138     checkZk();
139     LOG.info("Recreated a ZooKeeper, session" +
140       " is: 0x"+Long.toHexString(zk.getSessionId()));
141   }
142 
143   /**
144    * delete is an idempotent operation. Retry before throwing exception.
145    * This function will not throw NoNodeException if the path does not
146    * exist.
147    */
148   public void delete(String path, int version)
149   throws InterruptedException, KeeperException {
150     RetryCounter retryCounter = retryCounterFactory.create();
151     boolean isRetry = false; // False for first attempt, true for all retries.
152     while (true) {
153       try {
154         checkZk().delete(path, version);
155         return;
156       } catch (KeeperException e) {
157         switch (e.code()) {
158           case NONODE:
159             if (isRetry) {
160               LOG.info("Node " + path + " already deleted. Assuming that a " +
161                   "previous attempt succeeded.");
162               return;
163             }
164             LOG.warn("Node " + path + " already deleted, and this is not a " +
165                      "retry");
166             throw e;
167 
168           case CONNECTIONLOSS:
169           case SESSIONEXPIRED:
170           case OPERATIONTIMEOUT:
171             retryOrThrow(retryCounter, e, "delete");
172             break;
173 
174           default:
175             throw e;
176         }
177       }
178       retryCounter.sleepUntilNextRetry();
179       retryCounter.useRetry();
180       isRetry = true;
181     }
182   }
183 
184   /**
185    * exists is an idempotent operation. Retry before throwing exception
186    * @return A Stat instance
187    */
188   public Stat exists(String path, Watcher watcher)
189   throws KeeperException, InterruptedException {
190     RetryCounter retryCounter = retryCounterFactory.create();
191     while (true) {
192       try {
193         return checkZk().exists(path, watcher);
194       } catch (KeeperException e) {
195         switch (e.code()) {
196           case CONNECTIONLOSS:
197           case SESSIONEXPIRED:
198           case OPERATIONTIMEOUT:
199             retryOrThrow(retryCounter, e, "exists");
200             break;
201 
202           default:
203             throw e;
204         }
205       }
206       retryCounter.sleepUntilNextRetry();
207       retryCounter.useRetry();
208     }
209   }
210 
211   /**
212    * exists is an idempotent operation. Retry before throwing exception
213    * @return A Stat instance
214    */
215   public Stat exists(String path, boolean watch)
216   throws KeeperException, InterruptedException {
217     RetryCounter retryCounter = retryCounterFactory.create();
218     while (true) {
219       try {
220         return checkZk().exists(path, watch);
221       } catch (KeeperException e) {
222         switch (e.code()) {
223           case CONNECTIONLOSS:
224           case SESSIONEXPIRED:
225           case OPERATIONTIMEOUT:
226             retryOrThrow(retryCounter, e, "exists");
227             break;
228 
229           default:
230             throw e;
231         }
232       }
233       retryCounter.sleepUntilNextRetry();
234       retryCounter.useRetry();
235     }
236   }
237 
238   private void retryOrThrow(RetryCounter retryCounter, KeeperException e,
239       String opName) throws KeeperException {
240     LOG.warn("Possibly transient ZooKeeper exception: " + e);
241     if (!retryCounter.shouldRetry()) {
242       LOG.error("ZooKeeper " + opName + " failed after "
243         + retryCounter.getMaxRetries() + " retries");
244       throw e;
245     }
246   }
247 
248   /**
249    * getChildren is an idempotent operation. Retry before throwing exception
250    * @return List of children znodes
251    */
252   public List<String> getChildren(String path, Watcher watcher)
253     throws KeeperException, InterruptedException {
254     RetryCounter retryCounter = retryCounterFactory.create();
255     while (true) {
256       try {
257         return checkZk().getChildren(path, watcher);
258       } catch (KeeperException e) {
259         switch (e.code()) {
260           case CONNECTIONLOSS:
261           case SESSIONEXPIRED:
262           case OPERATIONTIMEOUT:
263             retryOrThrow(retryCounter, e, "getChildren");
264             break;
265 
266           default:
267             throw e;
268         }
269       }
270       retryCounter.sleepUntilNextRetry();
271       retryCounter.useRetry();
272     }
273   }
274 
275   /**
276    * getChildren is an idempotent operation. Retry before throwing exception
277    * @return List of children znodes
278    */
279   public List<String> getChildren(String path, boolean watch)
280   throws KeeperException, InterruptedException {
281     RetryCounter retryCounter = retryCounterFactory.create();
282     while (true) {
283       try {
284         return checkZk().getChildren(path, watch);
285       } catch (KeeperException e) {
286         switch (e.code()) {
287           case CONNECTIONLOSS:
288           case SESSIONEXPIRED:
289           case OPERATIONTIMEOUT:
290             retryOrThrow(retryCounter, e, "getChildren");
291             break;
292 
293           default:
294             throw e;
295         }
296       }
297       retryCounter.sleepUntilNextRetry();
298       retryCounter.useRetry();
299     }
300   }
301 
302   /**
303    * getData is an idempotent operation. Retry before throwing exception
304    * @return Data
305    */
306   public byte[] getData(String path, Watcher watcher, Stat stat)
307   throws KeeperException, InterruptedException {
308     RetryCounter retryCounter = retryCounterFactory.create();
309     while (true) {
310       try {
311         byte[] revData = checkZk().getData(path, watcher, stat);       
312         return this.removeMetaData(revData);
313       } catch (KeeperException e) {
314         switch (e.code()) {
315           case CONNECTIONLOSS:
316           case SESSIONEXPIRED:
317           case OPERATIONTIMEOUT:
318             retryOrThrow(retryCounter, e, "getData");
319             break;
320 
321           default:
322             throw e;
323         }
324       }
325       retryCounter.sleepUntilNextRetry();
326       retryCounter.useRetry();
327     }
328   }
329 
330   /**
331    * getData is an idemnpotent operation. Retry before throwing exception
332    * @return Data
333    */
334   public byte[] getData(String path, boolean watch, Stat stat)
335   throws KeeperException, InterruptedException {
336     RetryCounter retryCounter = retryCounterFactory.create();
337     while (true) {
338       try {
339         byte[] revData = checkZk().getData(path, watch, stat);
340         return this.removeMetaData(revData);
341       } catch (KeeperException e) {
342         switch (e.code()) {
343           case CONNECTIONLOSS:
344           case SESSIONEXPIRED:
345           case OPERATIONTIMEOUT:
346             retryOrThrow(retryCounter, e, "getData");
347             break;
348 
349           default:
350             throw e;
351         }
352       }
353       retryCounter.sleepUntilNextRetry();
354       retryCounter.useRetry();
355     }
356   }
357 
358   /**
359    * setData is NOT an idempotent operation. Retry may cause BadVersion Exception
360    * Adding an identifier field into the data to check whether 
361    * badversion is caused by the result of previous correctly setData
362    * @return Stat instance
363    */
364   public Stat setData(String path, byte[] data, int version)
365   throws KeeperException, InterruptedException {
366     RetryCounter retryCounter = retryCounterFactory.create();
367     byte[] newData = appendMetaData(data);
368     while (true) {
369       try {
370         return checkZk().setData(path, newData, version);
371       } catch (KeeperException e) {
372         switch (e.code()) {
373           case CONNECTIONLOSS:
374           case SESSIONEXPIRED:
375           case OPERATIONTIMEOUT:
376             retryOrThrow(retryCounter, e, "setData");
377             break;
378           case BADVERSION:
379             // try to verify whether the previous setData success or not
380             try{
381               Stat stat = new Stat();
382               byte[] revData = checkZk().getData(path, false, stat);
383               if (Bytes.equals(revData, newData)) {
384                 // the bad version is caused by previous successful setData
385                 return stat;
386               }
387             } catch(KeeperException keeperException){
388               // the ZK is not reliable at this moment. just throwing exception
389               throw keeperException;
390             }            
391           
392           // throw other exceptions and verified bad version exceptions
393           default:
394             throw e;
395         }
396       }
397       retryCounter.sleepUntilNextRetry();
398       retryCounter.useRetry();
399     }
400   }
401 
402   /**
403    * <p>
404    * NONSEQUENTIAL create is idempotent operation. 
405    * Retry before throwing exceptions.
406    * But this function will not throw the NodeExist exception back to the
407    * application.
408    * </p>
409    * <p>
410    * But SEQUENTIAL is NOT idempotent operation. It is necessary to add 
411    * identifier to the path to verify, whether the previous one is successful 
412    * or not.
413    * </p>
414    * 
415    * @return Path
416    */
417   public String create(String path, byte[] data, List<ACL> acl,
418       CreateMode createMode)
419   throws KeeperException, InterruptedException {
420     byte[] newData = appendMetaData(data);
421     switch (createMode) {
422       case EPHEMERAL:
423       case PERSISTENT:
424         return createNonSequential(path, newData, acl, createMode);
425 
426       case EPHEMERAL_SEQUENTIAL:
427       case PERSISTENT_SEQUENTIAL:
428         return createSequential(path, newData, acl, createMode);
429 
430       default:
431         throw new IllegalArgumentException("Unrecognized CreateMode: " + 
432             createMode);
433     }
434   }
435 
436   private String createNonSequential(String path, byte[] data, List<ACL> acl, 
437       CreateMode createMode) throws KeeperException, InterruptedException {
438     RetryCounter retryCounter = retryCounterFactory.create();
439     boolean isRetry = false; // False for first attempt, true for all retries.
440     while (true) {
441       try {
442         return checkZk().create(path, data, acl, createMode);
443       } catch (KeeperException e) {
444         switch (e.code()) {
445           case NODEEXISTS:
446             if (isRetry) {
447               // If the connection was lost, there is still a possibility that
448               // we have successfully created the node at our previous attempt,
449               // so we read the node and compare. 
450               byte[] currentData = checkZk().getData(path, false, null);
451               if (currentData != null &&
452                   Bytes.compareTo(currentData, data) == 0) { 
453                 // We successfully created a non-sequential node
454                 return path;
455               }
456               LOG.error("Node " + path + " already exists with " + 
457                   Bytes.toStringBinary(currentData) + ", could not write " +
458                   Bytes.toStringBinary(data));
459               throw e;
460             }
461             LOG.info("Node " + path + " already exists and this is not a " +
462                 "retry");
463             throw e;
464 
465           case CONNECTIONLOSS:
466           case SESSIONEXPIRED:
467           case OPERATIONTIMEOUT:
468             retryOrThrow(retryCounter, e, "create");
469             break;
470 
471           default:
472             throw e;
473         }
474       }
475       retryCounter.sleepUntilNextRetry();
476       retryCounter.useRetry();
477       isRetry = true;
478     }
479   }
480   
481   private String createSequential(String path, byte[] data, 
482       List<ACL> acl, CreateMode createMode)
483   throws KeeperException, InterruptedException {
484     RetryCounter retryCounter = retryCounterFactory.create();
485     boolean first = true;
486     String newPath = path+this.identifier;
487     while (true) {
488       try {
489         if (!first) {
490           // Check if we succeeded on a previous attempt
491           String previousResult = findPreviousSequentialNode(newPath);
492           if (previousResult != null) {
493             return previousResult;
494           }
495         }
496         first = false;
497         return checkZk().create(newPath, data, acl, createMode);
498       } catch (KeeperException e) {
499         switch (e.code()) {
500           case CONNECTIONLOSS:
501           case SESSIONEXPIRED:
502           case OPERATIONTIMEOUT:
503             retryOrThrow(retryCounter, e, "create");
504             break;
505 
506           default:
507             throw e;
508         }
509       }
510       retryCounter.sleepUntilNextRetry();
511       retryCounter.useRetry();
512     }
513   }
514 
515   /**
516    * Convert Iterable of {@link ZKOp} we got into the ZooKeeper.Op
517    * instances to actually pass to multi (need to do this in order to appendMetaData).
518    */
519   private Iterable<Op> prepareZKMulti(Iterable<Op> ops)
520   throws UnsupportedOperationException {
521     if(ops == null) return null;
522 
523     List<Op> preparedOps = new LinkedList<Op>();
524     for (Op op : ops) {
525       if (op.getType() == ZooDefs.OpCode.create) {
526         CreateRequest create = (CreateRequest)op.toRequestRecord();
527         preparedOps.add(Op.create(create.getPath(), appendMetaData(create.getData()),
528           create.getAcl(), create.getFlags()));
529       } else if (op.getType() == ZooDefs.OpCode.delete) {
530         // no need to appendMetaData for delete
531         preparedOps.add(op);
532       } else if (op.getType() == ZooDefs.OpCode.setData) {
533         SetDataRequest setData = (SetDataRequest)op.toRequestRecord();
534         preparedOps.add(Op.setData(setData.getPath(), appendMetaData(setData.getData()),
535           setData.getVersion()));
536       } else {
537         throw new UnsupportedOperationException("Unexpected ZKOp type: " + op.getClass().getName());
538       }
539     }
540     return preparedOps;
541   }
542 
543   /**
544    * Run multiple operations in a transactional manner. Retry before throwing exception
545    */
546   public List<OpResult> multi(Iterable<Op> ops)
547   throws KeeperException, InterruptedException {
548     RetryCounter retryCounter = retryCounterFactory.create();
549     Iterable<Op> multiOps = prepareZKMulti(ops);
550     while (true) {
551       try {
552         return checkZk().multi(multiOps);
553       } catch (KeeperException e) {
554         switch (e.code()) {
555           case CONNECTIONLOSS:
556           case SESSIONEXPIRED:
557           case OPERATIONTIMEOUT:
558             retryOrThrow(retryCounter, e, "multi");
559             break;
560 
561           default:
562             throw e;
563         }
564       }
565       retryCounter.sleepUntilNextRetry();
566       retryCounter.useRetry();
567     }
568   }
569 
570   private String findPreviousSequentialNode(String path)
571     throws KeeperException, InterruptedException {
572     int lastSlashIdx = path.lastIndexOf('/');
573     assert(lastSlashIdx != -1);
574     String parent = path.substring(0, lastSlashIdx);
575     String nodePrefix = path.substring(lastSlashIdx+1);
576 
577     List<String> nodes = checkZk().getChildren(parent, false);
578     List<String> matching = filterByPrefix(nodes, nodePrefix);
579     for (String node : matching) {
580       String nodePath = parent + "/" + node;
581       Stat stat = checkZk().exists(nodePath, false);
582       if (stat != null) {
583         return nodePath;
584       }
585     }
586     return null;
587   }
588   
589   public byte[] removeMetaData(byte[] data) {
590     if(data == null || data.length == 0) {
591       return data;
592     }
593     // check the magic data; to be backward compatible
594     byte magic = data[0];
595     if(magic != MAGIC) {
596       return data;
597     }
598     
599     int idLength = Bytes.toInt(data, ID_LENGTH_OFFSET);
600     int dataLength = data.length-MAGIC_SIZE-ID_LENGTH_SIZE-idLength;
601     int dataOffset = MAGIC_SIZE+ID_LENGTH_SIZE+idLength;
602 
603     byte[] newData = new byte[dataLength];
604     System.arraycopy(data, dataOffset, newData, 0, dataLength);
605     
606     return newData;
607     
608   }
609   
610   private byte[] appendMetaData(byte[] data) {
611     if(data == null || data.length == 0){
612       return data;
613     }
614     
615     byte[] newData = new byte[MAGIC_SIZE+ID_LENGTH_SIZE+id.length+data.length];
616     int pos = 0;
617     pos = Bytes.putByte(newData, pos, MAGIC);
618     pos = Bytes.putInt(newData, pos, id.length);
619     pos = Bytes.putBytes(newData, pos, id, 0, id.length);
620     pos = Bytes.putBytes(newData, pos, data, 0, data.length);
621 
622     return newData;
623   }
624 
625   public long getSessionId() {
626     return zk == null ? null : zk.getSessionId();
627   }
628 
629   public void close() throws InterruptedException {
630     if (zk != null) zk.close();
631   }
632 
633   public States getState() {
634     return zk == null ? null : zk.getState();
635   }
636 
637   public ZooKeeper getZooKeeper() {
638     return zk;
639   }
640 
641   public byte[] getSessionPasswd() {
642     return zk == null ? null : zk.getSessionPasswd();
643   }
644 
645   public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) throws KeeperException {
646     checkZk().sync(path, null, null);
647   }
648 
649   /**
650    * Filters the given node list by the given prefixes.
651    * This method is all-inclusive--if any element in the node list starts
652    * with any of the given prefixes, then it is included in the result.
653    *
654    * @param nodes the nodes to filter
655    * @param prefixes the prefixes to include in the result
656    * @return list of every element that starts with one of the prefixes
657    */
658   private static List<String> filterByPrefix(List<String> nodes, 
659       String... prefixes) {
660     List<String> lockChildren = new ArrayList<String>();
661     for (String child : nodes){
662       for (String prefix : prefixes){
663         if (child.startsWith(prefix)){
664           lockChildren.add(child);
665           break;
666         }
667       }
668     }
669     return lockChildren;
670   }
671 }