001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.zookeeper;
019
020import io.opentelemetry.api.trace.Span;
021import io.opentelemetry.api.trace.StatusCode;
022import io.opentelemetry.context.Scope;
023import java.io.IOException;
024import java.lang.management.ManagementFactory;
025import java.util.ArrayList;
026import java.util.LinkedList;
027import java.util.List;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.trace.TraceUtil;
031import org.apache.hadoop.hbase.util.Bytes;
032import org.apache.hadoop.hbase.util.RetryCounter;
033import org.apache.hadoop.hbase.util.RetryCounterFactory;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.apache.zookeeper.AsyncCallback;
036import org.apache.zookeeper.CreateMode;
037import org.apache.zookeeper.KeeperException;
038import org.apache.zookeeper.Op;
039import org.apache.zookeeper.OpResult;
040import org.apache.zookeeper.Watcher;
041import org.apache.zookeeper.ZooDefs;
042import org.apache.zookeeper.ZooKeeper;
043import org.apache.zookeeper.ZooKeeper.States;
044import org.apache.zookeeper.data.ACL;
045import org.apache.zookeeper.data.Stat;
046import org.apache.zookeeper.proto.CreateRequest;
047import org.apache.zookeeper.proto.SetDataRequest;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051/**
052 * A zookeeper that can handle 'recoverable' errors. To handle recoverable errors, developers need
053 * to realize that there are two classes of requests: idempotent and non-idempotent requests. Read
054 * requests and unconditional sets and deletes are examples of idempotent requests, they can be
055 * reissued with the same results. (Although, the delete may throw a NoNodeException on reissue its
056 * effect on the ZooKeeper state is the same.) Non-idempotent requests need special handling,
057 * application and library writers need to keep in mind that they may need to encode information in
058 * the data or name of znodes to detect retries. A simple example is a create that uses a sequence
059 * flag. If a process issues a create("/x-", ..., SEQUENCE) and gets a connection loss exception,
060 * that process will reissue another create("/x-", ..., SEQUENCE) and get back x-111. When the
061 * process does a getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be that x-109
062 * was the result of the previous create, so the process actually owns both x-109 and x-111. An easy
063 * way around this is to use "x-process id-" when doing the create. If the process is using an id of
064 * 352, before reissuing the create it will do a getChildren("/") and see "x-222-1", "x-542-30",
065 * "x-352-109", x-333-110". The process will know that the original create succeeded an the znode it
066 * created is "x-352-109".
067 * @see "https://cwiki.apache.org/confluence/display/HADOOP2/ZooKeeper+ErrorHandling"
068 */
069@InterfaceAudience.Private
070public class RecoverableZooKeeper {
071  private static final Logger LOG = LoggerFactory.getLogger(RecoverableZooKeeper.class);
072  // the actual ZooKeeper client instance
073  private ZooKeeper zk;
074  private final RetryCounterFactory retryCounterFactory;
075  // An identifier of this process in the cluster
076  private final String identifier;
077  private final byte[] id;
078  private final Watcher watcher;
079  private final int sessionTimeout;
080  private final String quorumServers;
081  private final int maxMultiSize;
082
083  /**
084   * See {@link #connect(Configuration, String, Watcher, String)}
085   */
086  public static RecoverableZooKeeper connect(Configuration conf, Watcher watcher)
087    throws IOException {
088    String ensemble = ZKConfig.getZKQuorumServersString(conf);
089    return connect(conf, ensemble, watcher);
090  }
091
092  /**
093   * See {@link #connect(Configuration, String, Watcher, String)}
094   */
095  public static RecoverableZooKeeper connect(Configuration conf, String ensemble, Watcher watcher)
096    throws IOException {
097    return connect(conf, ensemble, watcher, null);
098  }
099
100  /**
101   * Creates a new connection to ZooKeeper, pulling settings and ensemble config from the specified
102   * configuration object using methods from {@link ZKConfig}. Sets the connection status monitoring
103   * watcher to the specified watcher.
104   * @param conf       configuration to pull ensemble and other settings from
105   * @param watcher    watcher to monitor connection changes
106   * @param ensemble   ZooKeeper servers quorum string
107   * @param identifier value used to identify this client instance.
108   * @return connection to zookeeper
109   * @throws IOException if unable to connect to zk or config problem
110   */
111  public static RecoverableZooKeeper connect(Configuration conf, String ensemble, Watcher watcher,
112    final String identifier) throws IOException {
113    if (ensemble == null) {
114      throw new IOException("Unable to determine ZooKeeper ensemble");
115    }
116    int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
117    if (LOG.isTraceEnabled()) {
118      LOG.trace("{} opening connection to ZooKeeper ensemble={}", identifier, ensemble);
119    }
120    int retry = conf.getInt("zookeeper.recovery.retry", 3);
121    int retryIntervalMillis = conf.getInt("zookeeper.recovery.retry.intervalmill", 1000);
122    int maxSleepTime = conf.getInt("zookeeper.recovery.retry.maxsleeptime", 60000);
123    int multiMaxSize = conf.getInt("zookeeper.multi.max.size", 1024 * 1024);
124    return new RecoverableZooKeeper(ensemble, timeout, watcher, retry, retryIntervalMillis,
125      maxSleepTime, identifier, multiMaxSize);
126  }
127
128  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE",
129      justification = "None. Its always been this way.")
130  public RecoverableZooKeeper(String quorumServers, int sessionTimeout, Watcher watcher,
131    int maxRetries, int retryIntervalMillis, int maxSleepTime, String identifier, int maxMultiSize)
132    throws IOException {
133    // TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should.
134    this.retryCounterFactory =
135      new RetryCounterFactory(maxRetries + 1, retryIntervalMillis, maxSleepTime);
136
137    if (identifier == null || identifier.length() == 0) {
138      // the identifier = processID@hostName
139      identifier = ManagementFactory.getRuntimeMXBean().getName();
140    }
141    LOG.info("Process identifier={} connecting to ZooKeeper ensemble={}", identifier,
142      quorumServers);
143    this.identifier = identifier;
144    this.id = Bytes.toBytes(identifier);
145
146    this.watcher = watcher;
147    this.sessionTimeout = sessionTimeout;
148    this.quorumServers = quorumServers;
149    this.maxMultiSize = maxMultiSize;
150
151    try {
152      checkZk();
153    } catch (Exception x) {
154      /* ignore */
155    }
156  }
157
158  /**
159   * Returns the maximum size (in bytes) that should be included in any single multi() call. NB:
160   * This is an approximation, so there may be variance in the msg actually sent over the wire.
161   * Please be sure to set this approximately, with respect to your ZK server configuration for
162   * jute.maxbuffer.
163   */
164  public int getMaxMultiSizeLimit() {
165    return maxMultiSize;
166  }
167
168  /**
169   * Try to create a ZooKeeper connection. Turns any exception encountered into a
170   * KeeperException.OperationTimeoutException so it can retried.
171   * @return The created ZooKeeper connection object
172   * @throws KeeperException if a ZooKeeper operation fails
173   */
174  protected synchronized ZooKeeper checkZk() throws KeeperException {
175    if (this.zk == null) {
176      try {
177        this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher);
178      } catch (IOException ex) {
179        LOG.warn("Unable to create ZooKeeper Connection", ex);
180        throw new KeeperException.OperationTimeoutException();
181      }
182    }
183    return zk;
184  }
185
186  public synchronized void reconnectAfterExpiration()
187    throws IOException, KeeperException, InterruptedException {
188    if (zk != null) {
189      LOG.info("Closing dead ZooKeeper connection, session" + " was: 0x"
190        + Long.toHexString(zk.getSessionId()));
191      zk.close();
192      // reset the ZooKeeper connection
193      zk = null;
194    }
195    checkZk();
196    LOG.info("Recreated a ZooKeeper, session" + " is: 0x" + Long.toHexString(zk.getSessionId()));
197  }
198
199  /**
200   * delete is an idempotent operation. Retry before throwing exception. This function will not
201   * throw NoNodeException if the path does not exist.
202   */
203  public void delete(String path, int version) throws InterruptedException, KeeperException {
204    final Span span = TraceUtil.createSpan("RecoverableZookeeper.delete");
205    try (Scope ignored = span.makeCurrent()) {
206      RetryCounter retryCounter = retryCounterFactory.create();
207      boolean isRetry = false; // False for first attempt, true for all retries.
208      while (true) {
209        try {
210          checkZk().delete(path, version);
211          span.setStatus(StatusCode.OK);
212          return;
213        } catch (KeeperException e) {
214          switch (e.code()) {
215            case NONODE:
216              if (isRetry) {
217                LOG.debug(
218                  "Node " + path + " already deleted. Assuming a " + "previous attempt succeeded.");
219                return;
220              }
221              LOG.debug("Node {} already deleted, retry={}", path, isRetry);
222              TraceUtil.setError(span, e);
223              throw e;
224
225            case CONNECTIONLOSS:
226            case OPERATIONTIMEOUT:
227            case REQUESTTIMEOUT:
228              TraceUtil.setError(span, e);
229              retryOrThrow(retryCounter, e, "delete");
230              break;
231
232            default:
233              TraceUtil.setError(span, e);
234              throw e;
235          }
236        }
237        retryCounter.sleepUntilNextRetry();
238        isRetry = true;
239      }
240    } finally {
241      span.end();
242    }
243  }
244
245  /**
246   * exists is an idempotent operation. Retry before throwing exception
247   * @return A Stat instance
248   */
249  public Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException {
250    return exists(path, watcher, null);
251  }
252
253  private Stat exists(String path, Watcher watcher, Boolean watch)
254    throws InterruptedException, KeeperException {
255    final Span span = TraceUtil.createSpan("RecoverableZookeeper.exists");
256    try (Scope ignored = span.makeCurrent()) {
257      RetryCounter retryCounter = retryCounterFactory.create();
258      while (true) {
259        try {
260          Stat nodeStat;
261          if (watch == null) {
262            nodeStat = checkZk().exists(path, watcher);
263          } else {
264            nodeStat = checkZk().exists(path, watch);
265          }
266          span.setStatus(StatusCode.OK);
267          return nodeStat;
268        } catch (KeeperException e) {
269          switch (e.code()) {
270            case CONNECTIONLOSS:
271            case OPERATIONTIMEOUT:
272            case REQUESTTIMEOUT:
273              TraceUtil.setError(span, e);
274              retryOrThrow(retryCounter, e, "exists");
275              break;
276
277            default:
278              TraceUtil.setError(span, e);
279              throw e;
280          }
281        }
282        retryCounter.sleepUntilNextRetry();
283      }
284    } finally {
285      span.end();
286    }
287  }
288
289  /**
290   * exists is an idempotent operation. Retry before throwing exception
291   * @return A Stat instance
292   */
293  public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException {
294    return exists(path, null, watch);
295  }
296
297  private void retryOrThrow(RetryCounter retryCounter, KeeperException e, String opName)
298    throws KeeperException {
299    if (!retryCounter.shouldRetry()) {
300      LOG.error("ZooKeeper {} failed after {} attempts", opName, retryCounter.getMaxAttempts());
301      throw e;
302    }
303    LOG.debug("Retry, connectivity issue (JVM Pause?); quorum={},exception{}=", quorumServers, e);
304  }
305
306  /**
307   * getChildren is an idempotent operation. Retry before throwing exception
308   * @return List of children znodes
309   */
310  public List<String> getChildren(String path, Watcher watcher)
311    throws KeeperException, InterruptedException {
312    return getChildren(path, watcher, null);
313  }
314
315  private List<String> getChildren(String path, Watcher watcher, Boolean watch)
316    throws InterruptedException, KeeperException {
317    final Span span = TraceUtil.createSpan("RecoverableZookeeper.getChildren");
318    try (Scope ignored = span.makeCurrent()) {
319      RetryCounter retryCounter = retryCounterFactory.create();
320      while (true) {
321        try {
322          List<String> children;
323          if (watch == null) {
324            children = checkZk().getChildren(path, watcher);
325          } else {
326            children = checkZk().getChildren(path, watch);
327          }
328          span.setStatus(StatusCode.OK);
329          return children;
330        } catch (KeeperException e) {
331          switch (e.code()) {
332            case CONNECTIONLOSS:
333            case OPERATIONTIMEOUT:
334            case REQUESTTIMEOUT:
335              TraceUtil.setError(span, e);
336              retryOrThrow(retryCounter, e, "getChildren");
337              break;
338
339            default:
340              TraceUtil.setError(span, e);
341              throw e;
342          }
343        }
344        retryCounter.sleepUntilNextRetry();
345      }
346    } finally {
347      span.end();
348    }
349  }
350
351  /**
352   * getChildren is an idempotent operation. Retry before throwing exception
353   * @return List of children znodes
354   */
355  public List<String> getChildren(String path, boolean watch)
356    throws KeeperException, InterruptedException {
357    return getChildren(path, null, watch);
358  }
359
360  /**
361   * getData is an idempotent operation. Retry before throwing exception
362   */
363  public byte[] getData(String path, Watcher watcher, Stat stat)
364    throws KeeperException, InterruptedException {
365    return getData(path, watcher, null, stat);
366  }
367
368  private byte[] getData(String path, Watcher watcher, Boolean watch, Stat stat)
369    throws InterruptedException, KeeperException {
370    final Span span = TraceUtil.createSpan("RecoverableZookeeper.getData");
371    try (Scope ignored = span.makeCurrent()) {
372      RetryCounter retryCounter = retryCounterFactory.create();
373      while (true) {
374        try {
375          byte[] revData;
376          if (watch == null) {
377            revData = checkZk().getData(path, watcher, stat);
378          } else {
379            revData = checkZk().getData(path, watch, stat);
380          }
381          span.setStatus(StatusCode.OK);
382          return ZKMetadata.removeMetaData(revData);
383        } catch (KeeperException e) {
384          switch (e.code()) {
385            case CONNECTIONLOSS:
386            case OPERATIONTIMEOUT:
387            case REQUESTTIMEOUT:
388              TraceUtil.setError(span, e);
389              retryOrThrow(retryCounter, e, "getData");
390              break;
391
392            default:
393              TraceUtil.setError(span, e);
394              throw e;
395          }
396        }
397        retryCounter.sleepUntilNextRetry();
398      }
399    } finally {
400      span.end();
401    }
402  }
403
404  /**
405   * getData is an idempotent operation. Retry before throwing exception
406   */
407  public byte[] getData(String path, boolean watch, Stat stat)
408    throws KeeperException, InterruptedException {
409    return getData(path, null, watch, stat);
410  }
411
412  /**
413   * setData is NOT an idempotent operation. Retry may cause BadVersion Exception Adding an
414   * identifier field into the data to check whether badversion is caused by the result of previous
415   * correctly setData
416   * @return Stat instance
417   */
418  public Stat setData(String path, byte[] data, int version)
419    throws KeeperException, InterruptedException {
420    final Span span = TraceUtil.createSpan("RecoverableZookeeper.setData");
421    try (Scope ignored = span.makeCurrent()) {
422      RetryCounter retryCounter = retryCounterFactory.create();
423      byte[] newData = ZKMetadata.appendMetaData(id, data);
424      boolean isRetry = false;
425      while (true) {
426        try {
427          span.setStatus(StatusCode.OK);
428          return checkZk().setData(path, newData, version);
429        } catch (KeeperException e) {
430          switch (e.code()) {
431            case CONNECTIONLOSS:
432            case OPERATIONTIMEOUT:
433            case REQUESTTIMEOUT:
434              TraceUtil.setError(span, e);
435              retryOrThrow(retryCounter, e, "setData");
436              break;
437            case BADVERSION:
438              if (isRetry) {
439                // try to verify whether the previous setData success or not
440                try {
441                  Stat stat = new Stat();
442                  byte[] revData = checkZk().getData(path, false, stat);
443                  if (Bytes.compareTo(revData, newData) == 0) {
444                    // the bad version is caused by previous successful setData
445                    return stat;
446                  }
447                } catch (KeeperException keeperException) {
448                  // the ZK is not reliable at this moment. just throwing exception
449                  TraceUtil.setError(span, e);
450                  throw keeperException;
451                }
452              }
453              // throw other exceptions and verified bad version exceptions
454            default:
455              TraceUtil.setError(span, e);
456              throw e;
457          }
458        }
459        retryCounter.sleepUntilNextRetry();
460        isRetry = true;
461      }
462    } finally {
463      span.end();
464    }
465  }
466
467  /**
468   * getAcl is an idempotent operation. Retry before throwing exception
469   * @return list of ACLs
470   */
471  public List<ACL> getAcl(String path, Stat stat) throws KeeperException, InterruptedException {
472    final Span span = TraceUtil.createSpan("RecoverableZookeeper.getAcl");
473    try (Scope ignored = span.makeCurrent()) {
474      RetryCounter retryCounter = retryCounterFactory.create();
475      while (true) {
476        try {
477          span.setStatus(StatusCode.OK);
478          return checkZk().getACL(path, stat);
479        } catch (KeeperException e) {
480          switch (e.code()) {
481            case CONNECTIONLOSS:
482            case OPERATIONTIMEOUT:
483            case REQUESTTIMEOUT:
484              TraceUtil.setError(span, e);
485              retryOrThrow(retryCounter, e, "getAcl");
486              break;
487
488            default:
489              TraceUtil.setError(span, e);
490              throw e;
491          }
492        }
493        retryCounter.sleepUntilNextRetry();
494      }
495    } finally {
496      span.end();
497    }
498  }
499
500  /**
501   * setAcl is an idempotent operation. Retry before throwing exception
502   * @return list of ACLs
503   */
504  public Stat setAcl(String path, List<ACL> acls, int version)
505    throws KeeperException, InterruptedException {
506    final Span span = TraceUtil.createSpan("RecoverableZookeeper.setAcl");
507    try (Scope ignored = span.makeCurrent()) {
508      RetryCounter retryCounter = retryCounterFactory.create();
509      while (true) {
510        try {
511          span.setStatus(StatusCode.OK);
512          return checkZk().setACL(path, acls, version);
513        } catch (KeeperException e) {
514          switch (e.code()) {
515            case CONNECTIONLOSS:
516            case OPERATIONTIMEOUT:
517              TraceUtil.setError(span, e);
518              retryOrThrow(retryCounter, e, "setAcl");
519              break;
520
521            default:
522              TraceUtil.setError(span, e);
523              throw e;
524          }
525        }
526        retryCounter.sleepUntilNextRetry();
527      }
528    } finally {
529      span.end();
530    }
531  }
532
533  /**
534   * <p>
535   * NONSEQUENTIAL create is idempotent operation. Retry before throwing exceptions. But this
536   * function will not throw the NodeExist exception back to the application.
537   * </p>
538   * <p>
539   * But SEQUENTIAL is NOT idempotent operation. It is necessary to add identifier to the path to
540   * verify, whether the previous one is successful or not.
541   * </p>
542   */
543  public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
544    throws KeeperException, InterruptedException {
545    final Span span = TraceUtil.createSpan("RecoverableZookeeper.create");
546    try (Scope ignored = span.makeCurrent()) {
547      byte[] newData = ZKMetadata.appendMetaData(id, data);
548      switch (createMode) {
549        case EPHEMERAL:
550        case PERSISTENT:
551          span.setStatus(StatusCode.OK);
552          return createNonSequential(path, newData, acl, createMode);
553
554        case EPHEMERAL_SEQUENTIAL:
555        case PERSISTENT_SEQUENTIAL:
556          span.setStatus(StatusCode.OK);
557          return createSequential(path, newData, acl, createMode);
558
559        default:
560          final IllegalArgumentException e =
561            new IllegalArgumentException("Unrecognized CreateMode: " + createMode);
562          TraceUtil.setError(span, e);
563          throw e;
564      }
565    } finally {
566      span.end();
567    }
568  }
569
570  private String createNonSequential(String path, byte[] data, List<ACL> acl, CreateMode createMode)
571    throws KeeperException, InterruptedException {
572    RetryCounter retryCounter = retryCounterFactory.create();
573    boolean isRetry = false; // False for first attempt, true for all retries.
574    while (true) {
575      try {
576        return checkZk().create(path, data, acl, createMode);
577      } catch (KeeperException e) {
578        switch (e.code()) {
579          case NODEEXISTS:
580            if (isRetry) {
581              // If the connection was lost, there is still a possibility that
582              // we have successfully created the node at our previous attempt,
583              // so we read the node and compare.
584              byte[] currentData = checkZk().getData(path, false, null);
585              if (currentData != null && Bytes.compareTo(currentData, data) == 0) {
586                // We successfully created a non-sequential node
587                return path;
588              }
589              LOG.error("Node " + path + " already exists with " + Bytes.toStringBinary(currentData)
590                + ", could not write " + Bytes.toStringBinary(data));
591              throw e;
592            }
593            LOG.trace("Node {} already exists", path);
594            throw e;
595
596          case CONNECTIONLOSS:
597          case OPERATIONTIMEOUT:
598          case REQUESTTIMEOUT:
599            retryOrThrow(retryCounter, e, "create");
600            break;
601
602          default:
603            throw e;
604        }
605      }
606      retryCounter.sleepUntilNextRetry();
607      isRetry = true;
608    }
609  }
610
611  private String createSequential(String path, byte[] data, List<ACL> acl, CreateMode createMode)
612    throws KeeperException, InterruptedException {
613    RetryCounter retryCounter = retryCounterFactory.create();
614    boolean first = true;
615    String newPath = path + this.identifier;
616    while (true) {
617      try {
618        if (!first) {
619          // Check if we succeeded on a previous attempt
620          String previousResult = findPreviousSequentialNode(newPath);
621          if (previousResult != null) {
622            return previousResult;
623          }
624        }
625        first = false;
626        return checkZk().create(newPath, data, acl, createMode);
627      } catch (KeeperException e) {
628        switch (e.code()) {
629          case CONNECTIONLOSS:
630          case OPERATIONTIMEOUT:
631          case REQUESTTIMEOUT:
632            retryOrThrow(retryCounter, e, "create");
633            break;
634
635          default:
636            throw e;
637        }
638      }
639      retryCounter.sleepUntilNextRetry();
640    }
641  }
642
643  /**
644   * Convert Iterable of {@link org.apache.zookeeper.Op} we got into the ZooKeeper.Op instances to
645   * actually pass to multi (need to do this in order to appendMetaData).
646   */
647  private Iterable<Op> prepareZKMulti(Iterable<Op> ops) throws UnsupportedOperationException {
648    if (ops == null) {
649      return null;
650    }
651
652    List<Op> preparedOps = new LinkedList<>();
653    for (Op op : ops) {
654      if (op.getType() == ZooDefs.OpCode.create) {
655        CreateRequest create = (CreateRequest) op.toRequestRecord();
656        preparedOps.add(Op.create(create.getPath(), ZKMetadata.appendMetaData(id, create.getData()),
657          create.getAcl(), create.getFlags()));
658      } else if (op.getType() == ZooDefs.OpCode.delete) {
659        // no need to appendMetaData for delete
660        preparedOps.add(op);
661      } else if (op.getType() == ZooDefs.OpCode.setData) {
662        SetDataRequest setData = (SetDataRequest) op.toRequestRecord();
663        preparedOps.add(Op.setData(setData.getPath(),
664          ZKMetadata.appendMetaData(id, setData.getData()), setData.getVersion()));
665      } else {
666        throw new UnsupportedOperationException("Unexpected ZKOp type: " + op.getClass().getName());
667      }
668    }
669    return preparedOps;
670  }
671
672  /**
673   * Run multiple operations in a transactional manner. Retry before throwing exception
674   */
675  public List<OpResult> multi(Iterable<Op> ops) throws KeeperException, InterruptedException {
676    final Span span = TraceUtil.createSpan("RecoverableZookeeper.multi");
677    try (Scope ignored = span.makeCurrent()) {
678      RetryCounter retryCounter = retryCounterFactory.create();
679      Iterable<Op> multiOps = prepareZKMulti(ops);
680      while (true) {
681        try {
682          span.setStatus(StatusCode.OK);
683          return checkZk().multi(multiOps);
684        } catch (KeeperException e) {
685          switch (e.code()) {
686            case CONNECTIONLOSS:
687            case OPERATIONTIMEOUT:
688            case REQUESTTIMEOUT:
689              TraceUtil.setError(span, e);
690              retryOrThrow(retryCounter, e, "multi");
691              break;
692
693            default:
694              TraceUtil.setError(span, e);
695              throw e;
696          }
697        }
698        retryCounter.sleepUntilNextRetry();
699      }
700    } finally {
701      span.end();
702    }
703  }
704
705  private String findPreviousSequentialNode(String path)
706    throws KeeperException, InterruptedException {
707    int lastSlashIdx = path.lastIndexOf('/');
708    assert (lastSlashIdx != -1);
709    String parent = path.substring(0, lastSlashIdx);
710    String nodePrefix = path.substring(lastSlashIdx + 1);
711    List<String> nodes = checkZk().getChildren(parent, false);
712    List<String> matching = filterByPrefix(nodes, nodePrefix);
713    for (String node : matching) {
714      String nodePath = parent + "/" + node;
715      Stat stat = checkZk().exists(nodePath, false);
716      if (stat != null) {
717        return nodePath;
718      }
719    }
720    return null;
721  }
722
723  public synchronized long getSessionId() {
724    return zk == null ? -1 : zk.getSessionId();
725  }
726
727  public synchronized void close() throws InterruptedException {
728    if (zk != null) {
729      zk.close();
730    }
731  }
732
733  public synchronized States getState() {
734    return zk == null ? null : zk.getState();
735  }
736
737  public synchronized ZooKeeper getZooKeeper() {
738    return zk;
739  }
740
741  public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) throws KeeperException {
742    checkZk().sync(path, cb, ctx);
743  }
744
745  /**
746   * Filters the given node list by the given prefixes. This method is all-inclusive--if any element
747   * in the node list starts with any of the given prefixes, then it is included in the result.
748   * @param nodes    the nodes to filter
749   * @param prefixes the prefixes to include in the result
750   * @return list of every element that starts with one of the prefixes
751   */
752  private static List<String> filterByPrefix(List<String> nodes, String... prefixes) {
753    List<String> lockChildren = new ArrayList<>();
754    for (String child : nodes) {
755      for (String prefix : prefixes) {
756        if (child.startsWith(prefix)) {
757          lockChildren.add(child);
758          break;
759        }
760      }
761    }
762    return lockChildren;
763  }
764
765  public String getIdentifier() {
766    return identifier;
767  }
768}