001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.zookeeper;
019
020import io.opentelemetry.api.trace.Span;
021import io.opentelemetry.api.trace.StatusCode;
022import io.opentelemetry.context.Scope;
023import java.io.IOException;
024import java.lang.management.ManagementFactory;
025import java.util.ArrayList;
026import java.util.LinkedList;
027import java.util.List;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.trace.TraceUtil;
031import org.apache.hadoop.hbase.util.Bytes;
032import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
033import org.apache.hadoop.hbase.util.RetryCounter;
034import org.apache.hadoop.hbase.util.RetryCounterFactory;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.apache.zookeeper.AsyncCallback;
037import org.apache.zookeeper.CreateMode;
038import org.apache.zookeeper.KeeperException;
039import org.apache.zookeeper.Op;
040import org.apache.zookeeper.OpResult;
041import org.apache.zookeeper.Watcher;
042import org.apache.zookeeper.ZooDefs;
043import org.apache.zookeeper.ZooKeeper;
044import org.apache.zookeeper.ZooKeeper.States;
045import org.apache.zookeeper.data.ACL;
046import org.apache.zookeeper.data.Stat;
047import org.apache.zookeeper.proto.CreateRequest;
048import org.apache.zookeeper.proto.SetDataRequest;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052/**
053 * A zookeeper that can handle 'recoverable' errors. To handle recoverable errors, developers need
054 * to realize that there are two classes of requests: idempotent and non-idempotent requests. Read
055 * requests and unconditional sets and deletes are examples of idempotent requests, they can be
056 * reissued with the same results. (Although, the delete may throw a NoNodeException on reissue its
057 * effect on the ZooKeeper state is the same.) Non-idempotent requests need special handling,
058 * application and library writers need to keep in mind that they may need to encode information in
059 * the data or name of znodes to detect retries. A simple example is a create that uses a sequence
060 * flag. If a process issues a create("/x-", ..., SEQUENCE) and gets a connection loss exception,
061 * that process will reissue another create("/x-", ..., SEQUENCE) and get back x-111. When the
062 * process does a getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be that x-109
063 * was the result of the previous create, so the process actually owns both x-109 and x-111. An easy
064 * way around this is to use "x-process id-" when doing the create. If the process is using an id of
065 * 352, before reissuing the create it will do a getChildren("/") and see "x-222-1", "x-542-30",
066 * "x-352-109", x-333-110". The process will know that the original create succeeded an the znode it
067 * created is "x-352-109".
068 * @see "http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling"
069 */
070@InterfaceAudience.Private
071public class RecoverableZooKeeper {
072  private static final Logger LOG = LoggerFactory.getLogger(RecoverableZooKeeper.class);
073  // the actual ZooKeeper client instance
074  private ZooKeeper zk;
075  private final RetryCounterFactory retryCounterFactory;
076  // An identifier of this process in the cluster
077  private final String identifier;
078  private final byte[] id;
079  private final Watcher watcher;
080  private final int sessionTimeout;
081  private final String quorumServers;
082  private final int maxMultiSize;
083
084  /**
085   * See {@link #connect(Configuration, String, Watcher, String)}
086   */
087  public static RecoverableZooKeeper connect(Configuration conf, Watcher watcher)
088    throws IOException {
089    String ensemble = ZKConfig.getZKQuorumServersString(conf);
090    return connect(conf, ensemble, watcher);
091  }
092
093  /**
094   * See {@link #connect(Configuration, String, Watcher, String)}
095   */
096  public static RecoverableZooKeeper connect(Configuration conf, String ensemble, Watcher watcher)
097    throws IOException {
098    return connect(conf, ensemble, watcher, null);
099  }
100
101  /**
102   * Creates a new connection to ZooKeeper, pulling settings and ensemble config from the specified
103   * configuration object using methods from {@link ZKConfig}. Sets the connection status monitoring
104   * watcher to the specified watcher.
105   * @param conf       configuration to pull ensemble and other settings from
106   * @param watcher    watcher to monitor connection changes
107   * @param ensemble   ZooKeeper servers quorum string
108   * @param identifier value used to identify this client instance.
109   * @return connection to zookeeper
110   * @throws IOException if unable to connect to zk or config problem
111   */
112  public static RecoverableZooKeeper connect(Configuration conf, String ensemble, Watcher watcher,
113    final String identifier) throws IOException {
114    if (ensemble == null) {
115      throw new IOException("Unable to determine ZooKeeper ensemble");
116    }
117    int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
118    if (LOG.isTraceEnabled()) {
119      LOG.trace("{} opening connection to ZooKeeper ensemble={}", identifier, ensemble);
120    }
121    int retry = conf.getInt("zookeeper.recovery.retry", 3);
122    int retryIntervalMillis = conf.getInt("zookeeper.recovery.retry.intervalmill", 1000);
123    int maxSleepTime = conf.getInt("zookeeper.recovery.retry.maxsleeptime", 60000);
124    int multiMaxSize = conf.getInt("zookeeper.multi.max.size", 1024 * 1024);
125    return new RecoverableZooKeeper(ensemble, timeout, watcher, retry, retryIntervalMillis,
126      maxSleepTime, identifier, multiMaxSize);
127  }
128
129  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE",
130      justification = "None. Its always been this way.")
131  public RecoverableZooKeeper(String quorumServers, int sessionTimeout, Watcher watcher,
132    int maxRetries, int retryIntervalMillis, int maxSleepTime, String identifier, int maxMultiSize)
133    throws IOException {
134    // TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should.
135    this.retryCounterFactory =
136      new RetryCounterFactory(maxRetries + 1, retryIntervalMillis, maxSleepTime);
137
138    if (identifier == null || identifier.length() == 0) {
139      // the identifier = processID@hostName
140      identifier = ManagementFactory.getRuntimeMXBean().getName();
141    }
142    LOG.info("Process identifier={} connecting to ZooKeeper ensemble={}", identifier,
143      quorumServers);
144    this.identifier = identifier;
145    this.id = Bytes.toBytes(identifier);
146
147    this.watcher = watcher;
148    this.sessionTimeout = sessionTimeout;
149    this.quorumServers = quorumServers;
150    this.maxMultiSize = maxMultiSize;
151
152    try {
153      checkZk();
154    } catch (Exception x) {
155      /* ignore */
156    }
157  }
158
159  /**
160   * Returns the maximum size (in bytes) that should be included in any single multi() call. NB:
161   * This is an approximation, so there may be variance in the msg actually sent over the wire.
162   * Please be sure to set this approximately, with respect to your ZK server configuration for
163   * jute.maxbuffer.
164   */
165  public int getMaxMultiSizeLimit() {
166    return maxMultiSize;
167  }
168
169  /**
170   * Try to create a ZooKeeper connection. Turns any exception encountered into a
171   * KeeperException.OperationTimeoutException so it can retried.
172   * @return The created ZooKeeper connection object
173   * @throws KeeperException if a ZooKeeper operation fails
174   */
175  protected synchronized ZooKeeper checkZk() throws KeeperException {
176    if (this.zk == null) {
177      try {
178        this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher);
179      } catch (IOException ex) {
180        LOG.warn("Unable to create ZooKeeper Connection", ex);
181        throw new KeeperException.OperationTimeoutException();
182      }
183    }
184    return zk;
185  }
186
187  public synchronized void reconnectAfterExpiration()
188    throws IOException, KeeperException, InterruptedException {
189    if (zk != null) {
190      LOG.info("Closing dead ZooKeeper connection, session" + " was: 0x"
191        + Long.toHexString(zk.getSessionId()));
192      zk.close();
193      // reset the ZooKeeper connection
194      zk = null;
195    }
196    checkZk();
197    LOG.info("Recreated a ZooKeeper, session" + " is: 0x" + Long.toHexString(zk.getSessionId()));
198  }
199
200  /**
201   * delete is an idempotent operation. Retry before throwing exception. This function will not
202   * throw NoNodeException if the path does not exist.
203   */
204  public void delete(String path, int version) throws InterruptedException, KeeperException {
205    final Span span = TraceUtil.createSpan("RecoverableZookeeper.delete");
206    try (Scope ignored = span.makeCurrent()) {
207      RetryCounter retryCounter = retryCounterFactory.create();
208      boolean isRetry = false; // False for first attempt, true for all retries.
209      while (true) {
210        try {
211          long startTime = EnvironmentEdgeManager.currentTime();
212          checkZk().delete(path, version);
213          span.setStatus(StatusCode.OK);
214          return;
215        } catch (KeeperException e) {
216          switch (e.code()) {
217            case NONODE:
218              if (isRetry) {
219                LOG.debug(
220                  "Node " + path + " already deleted. Assuming a " + "previous attempt succeeded.");
221                span.setStatus(StatusCode.OK);
222                return;
223              }
224              LOG.debug("Node {} already deleted, retry={}", path, isRetry);
225              TraceUtil.setError(span, e);
226              throw e;
227
228            case CONNECTIONLOSS:
229            case OPERATIONTIMEOUT:
230            case REQUESTTIMEOUT:
231              TraceUtil.setError(span, e);
232              retryOrThrow(retryCounter, e, "delete");
233              break;
234
235            default:
236              TraceUtil.setError(span, e);
237              throw e;
238          }
239        }
240        retryCounter.sleepUntilNextRetry();
241        isRetry = true;
242      }
243    } finally {
244      span.end();
245    }
246  }
247
248  /**
249   * exists is an idempotent operation. Retry before throwing exception
250   * @return A Stat instance
251   */
252  public Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException {
253    final Span span = TraceUtil.createSpan("RecoverableZookeeper.exists");
254    try (Scope ignored = span.makeCurrent()) {
255      RetryCounter retryCounter = retryCounterFactory.create();
256      while (true) {
257        try {
258          long startTime = EnvironmentEdgeManager.currentTime();
259          Stat nodeStat = checkZk().exists(path, watcher);
260          span.setStatus(StatusCode.OK);
261          return nodeStat;
262        } catch (KeeperException e) {
263          switch (e.code()) {
264            case CONNECTIONLOSS:
265            case OPERATIONTIMEOUT:
266            case REQUESTTIMEOUT:
267              TraceUtil.setError(span, e);
268              retryOrThrow(retryCounter, e, "exists");
269              break;
270
271            default:
272              TraceUtil.setError(span, e);
273              throw e;
274          }
275        }
276        retryCounter.sleepUntilNextRetry();
277      }
278    } finally {
279      span.end();
280    }
281  }
282
283  /**
284   * exists is an idempotent operation. Retry before throwing exception
285   * @return A Stat instance
286   */
287  public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException {
288    Span span = TraceUtil.createSpan("RecoverableZookeeper.exists");
289    try (Scope ignored = span.makeCurrent()) {
290      RetryCounter retryCounter = retryCounterFactory.create();
291      while (true) {
292        try {
293          long startTime = EnvironmentEdgeManager.currentTime();
294          Stat nodeStat = checkZk().exists(path, watch);
295          span.setStatus(StatusCode.OK);
296          return nodeStat;
297        } catch (KeeperException e) {
298          switch (e.code()) {
299            case CONNECTIONLOSS:
300              TraceUtil.setError(span, e);
301              retryOrThrow(retryCounter, e, "exists");
302              break;
303            case OPERATIONTIMEOUT:
304              TraceUtil.setError(span, e);
305              retryOrThrow(retryCounter, e, "exists");
306              break;
307
308            default:
309              TraceUtil.setError(span, e);
310              throw e;
311          }
312        }
313        retryCounter.sleepUntilNextRetry();
314      }
315    } finally {
316      span.end();
317    }
318  }
319
320  private void retryOrThrow(RetryCounter retryCounter, KeeperException e, String opName)
321    throws KeeperException {
322    if (!retryCounter.shouldRetry()) {
323      LOG.error("ZooKeeper {} failed after {} attempts", opName, retryCounter.getMaxAttempts());
324      throw e;
325    }
326    LOG.debug("Retry, connectivity issue (JVM Pause?); quorum={},exception{}=", quorumServers, e);
327  }
328
329  /**
330   * getChildren is an idempotent operation. Retry before throwing exception
331   * @return List of children znodes
332   */
333  public List<String> getChildren(String path, Watcher watcher)
334    throws KeeperException, InterruptedException {
335    final Span span = TraceUtil.createSpan("RecoverableZookeeper.getChildren");
336    try (Scope ignored = span.makeCurrent()) {
337      RetryCounter retryCounter = retryCounterFactory.create();
338      while (true) {
339        try {
340          long startTime = EnvironmentEdgeManager.currentTime();
341          List<String> children = checkZk().getChildren(path, watcher);
342          span.setStatus(StatusCode.OK);
343          return children;
344        } catch (KeeperException e) {
345          switch (e.code()) {
346            case CONNECTIONLOSS:
347            case OPERATIONTIMEOUT:
348            case REQUESTTIMEOUT:
349              TraceUtil.setError(span, e);
350              retryOrThrow(retryCounter, e, "getChildren");
351              break;
352
353            default:
354              TraceUtil.setError(span, e);
355              throw e;
356          }
357        }
358        retryCounter.sleepUntilNextRetry();
359      }
360    } finally {
361      span.end();
362    }
363  }
364
365  /**
366   * getChildren is an idempotent operation. Retry before throwing exception
367   * @return List of children znodes
368   */
369  public List<String> getChildren(String path, boolean watch)
370    throws KeeperException, InterruptedException {
371    Span span = TraceUtil.createSpan("RecoverableZookeeper.getChildren");
372    try (Scope ignored = span.makeCurrent()) {
373      RetryCounter retryCounter = retryCounterFactory.create();
374      while (true) {
375        try {
376          long startTime = EnvironmentEdgeManager.currentTime();
377          List<String> children = checkZk().getChildren(path, watch);
378          span.setStatus(StatusCode.OK);
379          return children;
380        } catch (KeeperException e) {
381          switch (e.code()) {
382            case CONNECTIONLOSS:
383              TraceUtil.setError(span, e);
384              retryOrThrow(retryCounter, e, "getChildren");
385              break;
386            case OPERATIONTIMEOUT:
387              TraceUtil.setError(span, e);
388              retryOrThrow(retryCounter, e, "getChildren");
389              break;
390
391            default:
392              TraceUtil.setError(span, e);
393              throw e;
394          }
395        }
396        retryCounter.sleepUntilNextRetry();
397      }
398    } finally {
399      span.end();
400    }
401  }
402
403  /**
404   * getData is an idempotent operation. Retry before throwing exception n
405   */
406  public byte[] getData(String path, Watcher watcher, Stat stat)
407    throws KeeperException, InterruptedException {
408    final Span span = TraceUtil.createSpan("RecoverableZookeeper.getData");
409    try (Scope ignored = span.makeCurrent()) {
410      RetryCounter retryCounter = retryCounterFactory.create();
411      while (true) {
412        try {
413          long startTime = EnvironmentEdgeManager.currentTime();
414          byte[] revData = checkZk().getData(path, watcher, stat);
415          span.setStatus(StatusCode.OK);
416          return ZKMetadata.removeMetaData(revData);
417        } catch (KeeperException e) {
418          switch (e.code()) {
419            case CONNECTIONLOSS:
420            case OPERATIONTIMEOUT:
421            case REQUESTTIMEOUT:
422              TraceUtil.setError(span, e);
423              retryOrThrow(retryCounter, e, "getData");
424              break;
425
426            default:
427              TraceUtil.setError(span, e);
428              throw e;
429          }
430        }
431        retryCounter.sleepUntilNextRetry();
432      }
433    } finally {
434      span.end();
435    }
436  }
437
438  /**
439   * getData is an idempotent operation. Retry before throwing exception n
440   */
441  public byte[] getData(String path, boolean watch, Stat stat)
442    throws KeeperException, InterruptedException {
443    Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.getData").startSpan();
444    try (Scope scope = span.makeCurrent()) {
445      RetryCounter retryCounter = retryCounterFactory.create();
446      while (true) {
447        try {
448          long startTime = EnvironmentEdgeManager.currentTime();
449          byte[] revData = checkZk().getData(path, watch, stat);
450          span.setStatus(StatusCode.OK);
451          return ZKMetadata.removeMetaData(revData);
452        } catch (KeeperException e) {
453          switch (e.code()) {
454            case CONNECTIONLOSS:
455              TraceUtil.setError(span, e);
456              retryOrThrow(retryCounter, e, "getData");
457              break;
458            case OPERATIONTIMEOUT:
459              TraceUtil.setError(span, e);
460              retryOrThrow(retryCounter, e, "getData");
461              break;
462
463            default:
464              TraceUtil.setError(span, e);
465              throw e;
466          }
467        }
468        retryCounter.sleepUntilNextRetry();
469      }
470    } finally {
471      span.end();
472    }
473  }
474
475  /**
476   * setData is NOT an idempotent operation. Retry may cause BadVersion Exception Adding an
477   * identifier field into the data to check whether badversion is caused by the result of previous
478   * correctly setData
479   * @return Stat instance
480   */
481  public Stat setData(String path, byte[] data, int version)
482    throws KeeperException, InterruptedException {
483    final Span span = TraceUtil.createSpan("RecoverableZookeeper.setData");
484    try (Scope ignored = span.makeCurrent()) {
485      RetryCounter retryCounter = retryCounterFactory.create();
486      byte[] newData = ZKMetadata.appendMetaData(id, data);
487      boolean isRetry = false;
488      long startTime;
489      while (true) {
490        try {
491          startTime = EnvironmentEdgeManager.currentTime();
492          Stat nodeStat = checkZk().setData(path, newData, version);
493          span.setStatus(StatusCode.OK);
494          return nodeStat;
495        } catch (KeeperException e) {
496          switch (e.code()) {
497            case CONNECTIONLOSS:
498            case OPERATIONTIMEOUT:
499            case REQUESTTIMEOUT:
500              TraceUtil.setError(span, e);
501              retryOrThrow(retryCounter, e, "setData");
502              break;
503            case BADVERSION:
504              if (isRetry) {
505                // try to verify whether the previous setData success or not
506                try {
507                  Stat stat = new Stat();
508                  byte[] revData = checkZk().getData(path, false, stat);
509                  if (Bytes.compareTo(revData, newData) == 0) {
510                    // the bad version is caused by previous successful setData
511                    span.setStatus(StatusCode.OK);
512                    return stat;
513                  }
514                } catch (KeeperException keeperException) {
515                  // the ZK is not reliable at this moment. just throwing exception
516                  TraceUtil.setError(span, keeperException);
517                  throw keeperException;
518                }
519              }
520              // throw other exceptions and verified bad version exceptions
521            default:
522              TraceUtil.setError(span, e);
523              throw e;
524          }
525        }
526        retryCounter.sleepUntilNextRetry();
527        isRetry = true;
528      }
529    } finally {
530      span.end();
531    }
532  }
533
534  /**
535   * getAcl is an idempotent operation. Retry before throwing exception
536   * @return list of ACLs
537   */
538  public List<ACL> getAcl(String path, Stat stat) throws KeeperException, InterruptedException {
539    final Span span = TraceUtil.createSpan("RecoverableZookeeper.getAcl");
540    try (Scope ignored = span.makeCurrent()) {
541      RetryCounter retryCounter = retryCounterFactory.create();
542      while (true) {
543        try {
544          long startTime = EnvironmentEdgeManager.currentTime();
545          List<ACL> nodeACL = checkZk().getACL(path, stat);
546          span.setStatus(StatusCode.OK);
547          return nodeACL;
548        } catch (KeeperException e) {
549          switch (e.code()) {
550            case CONNECTIONLOSS:
551            case OPERATIONTIMEOUT:
552            case REQUESTTIMEOUT:
553              TraceUtil.setError(span, e);
554              retryOrThrow(retryCounter, e, "getAcl");
555              break;
556
557            default:
558              TraceUtil.setError(span, e);
559              throw e;
560          }
561        }
562        retryCounter.sleepUntilNextRetry();
563      }
564    } finally {
565      span.end();
566    }
567  }
568
569  /**
570   * setAcl is an idempotent operation. Retry before throwing exception
571   * @return list of ACLs
572   */
573  public Stat setAcl(String path, List<ACL> acls, int version)
574    throws KeeperException, InterruptedException {
575    final Span span = TraceUtil.createSpan("RecoverableZookeeper.setAcl");
576    try (Scope ignored = span.makeCurrent()) {
577      RetryCounter retryCounter = retryCounterFactory.create();
578      while (true) {
579        try {
580          long startTime = EnvironmentEdgeManager.currentTime();
581          Stat nodeStat = checkZk().setACL(path, acls, version);
582          span.setStatus(StatusCode.OK);
583          return nodeStat;
584        } catch (KeeperException e) {
585          switch (e.code()) {
586            case CONNECTIONLOSS:
587            case OPERATIONTIMEOUT:
588              TraceUtil.setError(span, e);
589              retryOrThrow(retryCounter, e, "setAcl");
590              break;
591
592            default:
593              TraceUtil.setError(span, e);
594              throw e;
595          }
596        }
597        retryCounter.sleepUntilNextRetry();
598      }
599    } finally {
600      span.end();
601    }
602  }
603
604  /**
605   * <p>
606   * NONSEQUENTIAL create is idempotent operation. Retry before throwing exceptions. But this
607   * function will not throw the NodeExist exception back to the application.
608   * </p>
609   * <p>
610   * But SEQUENTIAL is NOT idempotent operation. It is necessary to add identifier to the path to
611   * verify, whether the previous one is successful or not.
612   * </p>
613   * n
614   */
615  public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
616    throws KeeperException, InterruptedException {
617    final Span span = TraceUtil.createSpan("RecoverableZookeeper.create");
618    try (Scope ignored = span.makeCurrent()) {
619      byte[] newData = ZKMetadata.appendMetaData(id, data);
620      switch (createMode) {
621        case EPHEMERAL:
622        case PERSISTENT:
623          span.setStatus(StatusCode.OK);
624          return createNonSequential(path, newData, acl, createMode);
625
626        case EPHEMERAL_SEQUENTIAL:
627        case PERSISTENT_SEQUENTIAL:
628          span.setStatus(StatusCode.OK);
629          return createSequential(path, newData, acl, createMode);
630
631        default:
632          final IllegalArgumentException e =
633            new IllegalArgumentException("Unrecognized CreateMode: " + createMode);
634          TraceUtil.setError(span, e);
635          throw e;
636      }
637    } finally {
638      span.end();
639    }
640  }
641
642  private String createNonSequential(String path, byte[] data, List<ACL> acl, CreateMode createMode)
643    throws KeeperException, InterruptedException {
644    RetryCounter retryCounter = retryCounterFactory.create();
645    boolean isRetry = false; // False for first attempt, true for all retries.
646    long startTime;
647    while (true) {
648      try {
649        startTime = EnvironmentEdgeManager.currentTime();
650        String nodePath = checkZk().create(path, data, acl, createMode);
651        return nodePath;
652      } catch (KeeperException e) {
653        switch (e.code()) {
654          case NODEEXISTS:
655            if (isRetry) {
656              // If the connection was lost, there is still a possibility that
657              // we have successfully created the node at our previous attempt,
658              // so we read the node and compare.
659              byte[] currentData = checkZk().getData(path, false, null);
660              if (currentData != null && Bytes.compareTo(currentData, data) == 0) {
661                // We successfully created a non-sequential node
662                return path;
663              }
664              LOG.error("Node " + path + " already exists with " + Bytes.toStringBinary(currentData)
665                + ", could not write " + Bytes.toStringBinary(data));
666              throw e;
667            }
668            LOG.trace("Node {} already exists", path);
669            throw e;
670
671          case CONNECTIONLOSS:
672          case OPERATIONTIMEOUT:
673          case REQUESTTIMEOUT:
674            retryOrThrow(retryCounter, e, "create");
675            break;
676
677          default:
678            throw e;
679        }
680      }
681      retryCounter.sleepUntilNextRetry();
682      isRetry = true;
683    }
684  }
685
686  private String createSequential(String path, byte[] data, List<ACL> acl, CreateMode createMode)
687    throws KeeperException, InterruptedException {
688    RetryCounter retryCounter = retryCounterFactory.create();
689    boolean first = true;
690    String newPath = path + this.identifier;
691    while (true) {
692      try {
693        if (!first) {
694          // Check if we succeeded on a previous attempt
695          String previousResult = findPreviousSequentialNode(newPath);
696          if (previousResult != null) {
697            return previousResult;
698          }
699        }
700        first = false;
701        long startTime = EnvironmentEdgeManager.currentTime();
702        String nodePath = checkZk().create(newPath, data, acl, createMode);
703        return nodePath;
704      } catch (KeeperException e) {
705        switch (e.code()) {
706          case CONNECTIONLOSS:
707          case OPERATIONTIMEOUT:
708          case REQUESTTIMEOUT:
709            retryOrThrow(retryCounter, e, "create");
710            break;
711
712          default:
713            throw e;
714        }
715      }
716      retryCounter.sleepUntilNextRetry();
717    }
718  }
719
720  /**
721   * Convert Iterable of {@link org.apache.zookeeper.Op} we got into the ZooKeeper.Op instances to
722   * actually pass to multi (need to do this in order to appendMetaData).
723   */
724  private Iterable<Op> prepareZKMulti(Iterable<Op> ops) throws UnsupportedOperationException {
725    if (ops == null) {
726      return null;
727    }
728
729    List<Op> preparedOps = new LinkedList<>();
730    for (Op op : ops) {
731      if (op.getType() == ZooDefs.OpCode.create) {
732        CreateRequest create = (CreateRequest) op.toRequestRecord();
733        preparedOps.add(Op.create(create.getPath(), ZKMetadata.appendMetaData(id, create.getData()),
734          create.getAcl(), create.getFlags()));
735      } else if (op.getType() == ZooDefs.OpCode.delete) {
736        // no need to appendMetaData for delete
737        preparedOps.add(op);
738      } else if (op.getType() == ZooDefs.OpCode.setData) {
739        SetDataRequest setData = (SetDataRequest) op.toRequestRecord();
740        preparedOps.add(Op.setData(setData.getPath(),
741          ZKMetadata.appendMetaData(id, setData.getData()), setData.getVersion()));
742      } else {
743        throw new UnsupportedOperationException("Unexpected ZKOp type: " + op.getClass().getName());
744      }
745    }
746    return preparedOps;
747  }
748
749  /**
750   * Run multiple operations in a transactional manner. Retry before throwing exception
751   */
752  public List<OpResult> multi(Iterable<Op> ops) throws KeeperException, InterruptedException {
753    final Span span = TraceUtil.createSpan("RecoverableZookeeper.multi");
754    try (Scope ignored = span.makeCurrent()) {
755      RetryCounter retryCounter = retryCounterFactory.create();
756      Iterable<Op> multiOps = prepareZKMulti(ops);
757      while (true) {
758        try {
759          long startTime = EnvironmentEdgeManager.currentTime();
760          List<OpResult> opResults = checkZk().multi(multiOps);
761          span.setStatus(StatusCode.OK);
762          return opResults;
763        } catch (KeeperException e) {
764          switch (e.code()) {
765            case CONNECTIONLOSS:
766            case OPERATIONTIMEOUT:
767            case REQUESTTIMEOUT:
768              TraceUtil.setError(span, e);
769              retryOrThrow(retryCounter, e, "multi");
770              break;
771
772            default:
773              TraceUtil.setError(span, e);
774              throw e;
775          }
776        }
777        retryCounter.sleepUntilNextRetry();
778      }
779    } finally {
780      span.end();
781    }
782  }
783
784  private String findPreviousSequentialNode(String path)
785    throws KeeperException, InterruptedException {
786    int lastSlashIdx = path.lastIndexOf('/');
787    assert (lastSlashIdx != -1);
788    String parent = path.substring(0, lastSlashIdx);
789    String nodePrefix = path.substring(lastSlashIdx + 1);
790    long startTime = EnvironmentEdgeManager.currentTime();
791    List<String> nodes = checkZk().getChildren(parent, false);
792    List<String> matching = filterByPrefix(nodes, nodePrefix);
793    for (String node : matching) {
794      String nodePath = parent + "/" + node;
795      startTime = EnvironmentEdgeManager.currentTime();
796      Stat stat = checkZk().exists(nodePath, false);
797      if (stat != null) {
798        return nodePath;
799      }
800    }
801    return null;
802  }
803
804  public synchronized long getSessionId() {
805    return zk == null ? -1 : zk.getSessionId();
806  }
807
808  public synchronized void close() throws InterruptedException {
809    if (zk != null) {
810      zk.close();
811    }
812  }
813
814  public synchronized States getState() {
815    return zk == null ? null : zk.getState();
816  }
817
818  public synchronized ZooKeeper getZooKeeper() {
819    return zk;
820  }
821
822  public synchronized byte[] getSessionPasswd() {
823    return zk == null ? null : zk.getSessionPasswd();
824  }
825
826  public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) throws KeeperException {
827    checkZk().sync(path, cb, ctx);
828  }
829
830  /**
831   * Filters the given node list by the given prefixes. This method is all-inclusive--if any element
832   * in the node list starts with any of the given prefixes, then it is included in the result.
833   * @param nodes    the nodes to filter
834   * @param prefixes the prefixes to include in the result
835   * @return list of every element that starts with one of the prefixes
836   */
837  private static List<String> filterByPrefix(List<String> nodes, String... prefixes) {
838    List<String> lockChildren = new ArrayList<>();
839    for (String child : nodes) {
840      for (String prefix : prefixes) {
841        if (child.startsWith(prefix)) {
842          lockChildren.add(child);
843          break;
844        }
845      }
846    }
847    return lockChildren;
848  }
849
850  public String getIdentifier() {
851    return identifier;
852  }
853}