001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.zookeeper;
019
020import io.opentelemetry.api.trace.Span;
021import io.opentelemetry.api.trace.StatusCode;
022import io.opentelemetry.context.Scope;
023import java.io.IOException;
024import java.lang.management.ManagementFactory;
025import java.util.ArrayList;
026import java.util.LinkedList;
027import java.util.List;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.trace.TraceUtil;
031import org.apache.hadoop.hbase.util.Bytes;
032import org.apache.hadoop.hbase.util.RetryCounter;
033import org.apache.hadoop.hbase.util.RetryCounterFactory;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.apache.zookeeper.AsyncCallback;
036import org.apache.zookeeper.CreateMode;
037import org.apache.zookeeper.KeeperException;
038import org.apache.zookeeper.Op;
039import org.apache.zookeeper.OpResult;
040import org.apache.zookeeper.Watcher;
041import org.apache.zookeeper.ZooDefs;
042import org.apache.zookeeper.ZooKeeper;
043import org.apache.zookeeper.ZooKeeper.States;
044import org.apache.zookeeper.client.ZKClientConfig;
045import org.apache.zookeeper.data.ACL;
046import org.apache.zookeeper.data.Stat;
047import org.apache.zookeeper.proto.CreateRequest;
048import org.apache.zookeeper.proto.SetDataRequest;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052/**
053 * A zookeeper that can handle 'recoverable' errors.
054 * <p>
055 * To handle recoverable errors, developers need to realize that there are two classes of requests:
056 * idempotent and non-idempotent requests. Read requests and unconditional sets and deletes are
057 * examples of idempotent requests, they can be reissued with the same results.
058 * <p>
059 * (Although, the delete may throw a NoNodeException on reissue its effect on the ZooKeeper state is
060 * the same.) Non-idempotent requests need special handling, application and library writers need to
061 * keep in mind that they may need to encode information in the data or name of znodes to detect
062 * retries. A simple example is a create that uses a sequence flag. If a process issues a
063 * create("/x-", ..., SEQUENCE) and gets a connection loss exception, that process will reissue
064 * another create("/x-", ..., SEQUENCE) and get back x-111. When the process does a
065 * getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be that x-109 was the result
066 * of the previous create, so the process actually owns both x-109 and x-111. An easy way around
067 * this is to use "x-process id-" when doing the create. If the process is using an id of 352,
068 * before reissuing the create it will do a getChildren("/") and see "x-222-1", "x-542-30",
069 * "x-352-109", x-333-110". The process will know that the original create succeeded an the znode it
070 * created is "x-352-109".
071 * @see "https://cwiki.apache.org/confluence/display/HADOOP2/ZooKeeper+ErrorHandling"
072 */
073@InterfaceAudience.Private
074public class RecoverableZooKeeper {
075  private static final Logger LOG = LoggerFactory.getLogger(RecoverableZooKeeper.class);
076  // the actual ZooKeeper client instance
077  private ZooKeeper zk;
078  private final RetryCounterFactory retryCounterFactory;
079  // An identifier of this process in the cluster
080  private final String identifier;
081  private final byte[] id;
082  private final Watcher watcher;
083  private final int sessionTimeout;
084  private final String quorumServers;
085  private final int maxMultiSize;
086  private final ZKClientConfig zkClientConfig;
087
088  /**
089   * See {@link #connect(Configuration, String, Watcher, String, ZKClientConfig)}.
090   */
091  public static RecoverableZooKeeper connect(Configuration conf, Watcher watcher)
092    throws IOException {
093    String ensemble = ZKConfig.getZKQuorumServersString(conf);
094    return connect(conf, ensemble, watcher, null, null);
095  }
096
097  /**
098   * Creates a new connection to ZooKeeper, pulling settings and ensemble config from the specified
099   * configuration object using methods from {@link ZKConfig}. Sets the connection status monitoring
100   * watcher to the specified watcher.
101   * @param conf           configuration to pull ensemble and other settings from
102   * @param watcher        watcher to monitor connection changes
103   * @param ensemble       ZooKeeper servers quorum string
104   * @param identifier     value used to identify this client instance.
105   * @param zkClientConfig client specific configurations for this instance
106   * @return connection to zookeeper
107   * @throws IOException if unable to connect to zk or config problem
108   */
109  public static RecoverableZooKeeper connect(Configuration conf, String ensemble, Watcher watcher,
110    final String identifier, ZKClientConfig zkClientConfig) throws IOException {
111    if (ensemble == null) {
112      throw new IOException("Unable to determine ZooKeeper ensemble");
113    }
114    int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
115    if (LOG.isTraceEnabled()) {
116      LOG.trace("{} opening connection to ZooKeeper ensemble={}", identifier, ensemble);
117    }
118    int retry = conf.getInt("zookeeper.recovery.retry", 3);
119    int retryIntervalMillis = conf.getInt("zookeeper.recovery.retry.intervalmill", 1000);
120    int maxSleepTime = conf.getInt("zookeeper.recovery.retry.maxsleeptime", 60000);
121    int multiMaxSize = conf.getInt("zookeeper.multi.max.size", 1024 * 1024);
122    return new RecoverableZooKeeper(ensemble, timeout, watcher, retry, retryIntervalMillis,
123      maxSleepTime, identifier, multiMaxSize, zkClientConfig);
124  }
125
126  RecoverableZooKeeper(String quorumServers, int sessionTimeout, Watcher watcher, int maxRetries,
127    int retryIntervalMillis, int maxSleepTime, String identifier, int maxMultiSize,
128    ZKClientConfig zkClientConfig) throws IOException {
129    // TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should.
130    this.retryCounterFactory =
131      new RetryCounterFactory(maxRetries + 1, retryIntervalMillis, maxSleepTime);
132
133    if (identifier == null || identifier.length() == 0) {
134      // the identifier = processID@hostName
135      identifier = ManagementFactory.getRuntimeMXBean().getName();
136    }
137    LOG.info("Process identifier={} connecting to ZooKeeper ensemble={}", identifier,
138      quorumServers);
139    this.identifier = identifier;
140    this.id = Bytes.toBytes(identifier);
141
142    this.watcher = watcher;
143    this.sessionTimeout = sessionTimeout;
144    this.quorumServers = quorumServers;
145    this.maxMultiSize = maxMultiSize;
146    this.zkClientConfig = zkClientConfig;
147  }
148
149  /**
150   * Returns the maximum size (in bytes) that should be included in any single multi() call. NB:
151   * This is an approximation, so there may be variance in the msg actually sent over the wire.
152   * Please be sure to set this approximately, with respect to your ZK server configuration for
153   * jute.maxbuffer.
154   */
155  public int getMaxMultiSizeLimit() {
156    return maxMultiSize;
157  }
158
159  /**
160   * Try to create a ZooKeeper connection. Turns any exception encountered into a
161   * KeeperException.OperationTimeoutException so it can retried.
162   * @return The created ZooKeeper connection object
163   * @throws KeeperException if a ZooKeeper operation fails
164   */
165  private synchronized ZooKeeper checkZk() throws KeeperException {
166    if (this.zk == null) {
167      try {
168        this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher, zkClientConfig);
169      } catch (IOException ex) {
170        LOG.warn("Unable to create ZooKeeper Connection", ex);
171        throw new KeeperException.OperationTimeoutException();
172      }
173    }
174    return zk;
175  }
176
177  public synchronized void reconnectAfterExpiration()
178    throws IOException, KeeperException, InterruptedException {
179    if (zk != null) {
180      LOG.info("Closing dead ZooKeeper connection, session" + " was: 0x"
181        + Long.toHexString(zk.getSessionId()));
182      zk.close();
183      // reset the ZooKeeper connection
184      zk = null;
185    }
186    checkZk();
187    LOG.info("Recreated a ZooKeeper, session" + " is: 0x" + Long.toHexString(zk.getSessionId()));
188  }
189
190  /**
191   * delete is an idempotent operation. Retry before throwing exception. This function will not
192   * throw NoNodeException if the path does not exist.
193   */
194  public void delete(String path, int version) throws InterruptedException, KeeperException {
195    final Span span = TraceUtil.createSpan("RecoverableZookeeper.delete");
196    try (Scope ignored = span.makeCurrent()) {
197      RetryCounter retryCounter = retryCounterFactory.create();
198      boolean isRetry = false; // False for first attempt, true for all retries.
199      while (true) {
200        try {
201          checkZk().delete(path, version);
202          span.setStatus(StatusCode.OK);
203          return;
204        } catch (KeeperException e) {
205          switch (e.code()) {
206            case NONODE:
207              if (isRetry) {
208                LOG.debug(
209                  "Node " + path + " already deleted. Assuming a " + "previous attempt succeeded.");
210                return;
211              }
212              LOG.debug("Node {} already deleted, retry={}", path, isRetry);
213              TraceUtil.setError(span, e);
214              throw e;
215
216            case CONNECTIONLOSS:
217            case OPERATIONTIMEOUT:
218            case REQUESTTIMEOUT:
219              TraceUtil.setError(span, e);
220              retryOrThrow(retryCounter, e, "delete");
221              break;
222
223            default:
224              TraceUtil.setError(span, e);
225              throw e;
226          }
227        }
228        retryCounter.sleepUntilNextRetry();
229        isRetry = true;
230      }
231    } finally {
232      span.end();
233    }
234  }
235
236  /**
237   * exists is an idempotent operation. Retry before throwing exception
238   * @return A Stat instance
239   */
240  public Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException {
241    return exists(path, watcher, null);
242  }
243
244  private Stat exists(String path, Watcher watcher, Boolean watch)
245    throws InterruptedException, KeeperException {
246    final Span span = TraceUtil.createSpan("RecoverableZookeeper.exists");
247    try (Scope ignored = span.makeCurrent()) {
248      RetryCounter retryCounter = retryCounterFactory.create();
249      while (true) {
250        try {
251          Stat nodeStat;
252          if (watch == null) {
253            nodeStat = checkZk().exists(path, watcher);
254          } else {
255            nodeStat = checkZk().exists(path, watch);
256          }
257          span.setStatus(StatusCode.OK);
258          return nodeStat;
259        } catch (KeeperException e) {
260          switch (e.code()) {
261            case CONNECTIONLOSS:
262            case OPERATIONTIMEOUT:
263            case REQUESTTIMEOUT:
264              TraceUtil.setError(span, e);
265              retryOrThrow(retryCounter, e, "exists");
266              break;
267
268            default:
269              TraceUtil.setError(span, e);
270              throw e;
271          }
272        }
273        retryCounter.sleepUntilNextRetry();
274      }
275    } finally {
276      span.end();
277    }
278  }
279
280  /**
281   * exists is an idempotent operation. Retry before throwing exception
282   * @return A Stat instance
283   */
284  public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException {
285    return exists(path, null, watch);
286  }
287
288  private void retryOrThrow(RetryCounter retryCounter, KeeperException e, String opName)
289    throws KeeperException {
290    if (!retryCounter.shouldRetry()) {
291      LOG.error("ZooKeeper {} failed after {} attempts", opName, retryCounter.getMaxAttempts());
292      throw e;
293    }
294    LOG.debug("Retry, connectivity issue (JVM Pause?); quorum={},exception{}=", quorumServers, e);
295  }
296
297  /**
298   * getChildren is an idempotent operation. Retry before throwing exception
299   * @return List of children znodes
300   */
301  public List<String> getChildren(String path, Watcher watcher)
302    throws KeeperException, InterruptedException {
303    return getChildren(path, watcher, null);
304  }
305
306  private List<String> getChildren(String path, Watcher watcher, Boolean watch)
307    throws InterruptedException, KeeperException {
308    final Span span = TraceUtil.createSpan("RecoverableZookeeper.getChildren");
309    try (Scope ignored = span.makeCurrent()) {
310      RetryCounter retryCounter = retryCounterFactory.create();
311      while (true) {
312        try {
313          List<String> children;
314          if (watch == null) {
315            children = checkZk().getChildren(path, watcher);
316          } else {
317            children = checkZk().getChildren(path, watch);
318          }
319          span.setStatus(StatusCode.OK);
320          return children;
321        } catch (KeeperException e) {
322          switch (e.code()) {
323            case CONNECTIONLOSS:
324            case OPERATIONTIMEOUT:
325            case REQUESTTIMEOUT:
326              TraceUtil.setError(span, e);
327              retryOrThrow(retryCounter, e, "getChildren");
328              break;
329
330            default:
331              TraceUtil.setError(span, e);
332              throw e;
333          }
334        }
335        retryCounter.sleepUntilNextRetry();
336      }
337    } finally {
338      span.end();
339    }
340  }
341
342  /**
343   * getChildren is an idempotent operation. Retry before throwing exception
344   * @return List of children znodes
345   */
346  public List<String> getChildren(String path, boolean watch)
347    throws KeeperException, InterruptedException {
348    return getChildren(path, null, watch);
349  }
350
351  /**
352   * getData is an idempotent operation. Retry before throwing exception
353   */
354  public byte[] getData(String path, Watcher watcher, Stat stat)
355    throws KeeperException, InterruptedException {
356    return getData(path, watcher, null, stat);
357  }
358
359  private byte[] getData(String path, Watcher watcher, Boolean watch, Stat stat)
360    throws InterruptedException, KeeperException {
361    final Span span = TraceUtil.createSpan("RecoverableZookeeper.getData");
362    try (Scope ignored = span.makeCurrent()) {
363      RetryCounter retryCounter = retryCounterFactory.create();
364      while (true) {
365        try {
366          byte[] revData;
367          if (watch == null) {
368            revData = checkZk().getData(path, watcher, stat);
369          } else {
370            revData = checkZk().getData(path, watch, stat);
371          }
372          span.setStatus(StatusCode.OK);
373          return ZKMetadata.removeMetaData(revData);
374        } catch (KeeperException e) {
375          switch (e.code()) {
376            case CONNECTIONLOSS:
377            case OPERATIONTIMEOUT:
378            case REQUESTTIMEOUT:
379              TraceUtil.setError(span, e);
380              retryOrThrow(retryCounter, e, "getData");
381              break;
382
383            default:
384              TraceUtil.setError(span, e);
385              throw e;
386          }
387        }
388        retryCounter.sleepUntilNextRetry();
389      }
390    } finally {
391      span.end();
392    }
393  }
394
395  /**
396   * getData is an idempotent operation. Retry before throwing exception
397   */
398  public byte[] getData(String path, boolean watch, Stat stat)
399    throws KeeperException, InterruptedException {
400    return getData(path, null, watch, stat);
401  }
402
403  /**
404   * setData is NOT an idempotent operation. Retry may cause BadVersion Exception Adding an
405   * identifier field into the data to check whether badversion is caused by the result of previous
406   * correctly setData
407   * @return Stat instance
408   */
409  public Stat setData(String path, byte[] data, int version)
410    throws KeeperException, InterruptedException {
411    final Span span = TraceUtil.createSpan("RecoverableZookeeper.setData");
412    try (Scope ignored = span.makeCurrent()) {
413      RetryCounter retryCounter = retryCounterFactory.create();
414      byte[] newData = ZKMetadata.appendMetaData(id, data);
415      boolean isRetry = false;
416      while (true) {
417        try {
418          span.setStatus(StatusCode.OK);
419          return checkZk().setData(path, newData, version);
420        } catch (KeeperException e) {
421          switch (e.code()) {
422            case CONNECTIONLOSS:
423            case OPERATIONTIMEOUT:
424            case REQUESTTIMEOUT:
425              TraceUtil.setError(span, e);
426              retryOrThrow(retryCounter, e, "setData");
427              break;
428            case BADVERSION:
429              if (isRetry) {
430                // try to verify whether the previous setData success or not
431                try {
432                  Stat stat = new Stat();
433                  byte[] revData = checkZk().getData(path, false, stat);
434                  if (Bytes.compareTo(revData, newData) == 0) {
435                    // the bad version is caused by previous successful setData
436                    return stat;
437                  }
438                } catch (KeeperException keeperException) {
439                  // the ZK is not reliable at this moment. just throwing exception
440                  TraceUtil.setError(span, e);
441                  throw keeperException;
442                }
443              }
444              // throw other exceptions and verified bad version exceptions
445            default:
446              TraceUtil.setError(span, e);
447              throw e;
448          }
449        }
450        retryCounter.sleepUntilNextRetry();
451        isRetry = true;
452      }
453    } finally {
454      span.end();
455    }
456  }
457
458  /**
459   * getAcl is an idempotent operation. Retry before throwing exception
460   * @return list of ACLs
461   */
462  public List<ACL> getAcl(String path, Stat stat) throws KeeperException, InterruptedException {
463    final Span span = TraceUtil.createSpan("RecoverableZookeeper.getAcl");
464    try (Scope ignored = span.makeCurrent()) {
465      RetryCounter retryCounter = retryCounterFactory.create();
466      while (true) {
467        try {
468          span.setStatus(StatusCode.OK);
469          return checkZk().getACL(path, stat);
470        } catch (KeeperException e) {
471          switch (e.code()) {
472            case CONNECTIONLOSS:
473            case OPERATIONTIMEOUT:
474            case REQUESTTIMEOUT:
475              TraceUtil.setError(span, e);
476              retryOrThrow(retryCounter, e, "getAcl");
477              break;
478
479            default:
480              TraceUtil.setError(span, e);
481              throw e;
482          }
483        }
484        retryCounter.sleepUntilNextRetry();
485      }
486    } finally {
487      span.end();
488    }
489  }
490
491  /**
492   * setAcl is an idempotent operation. Retry before throwing exception
493   * @return list of ACLs
494   */
495  public Stat setAcl(String path, List<ACL> acls, int version)
496    throws KeeperException, InterruptedException {
497    final Span span = TraceUtil.createSpan("RecoverableZookeeper.setAcl");
498    try (Scope ignored = span.makeCurrent()) {
499      RetryCounter retryCounter = retryCounterFactory.create();
500      while (true) {
501        try {
502          span.setStatus(StatusCode.OK);
503          return checkZk().setACL(path, acls, version);
504        } catch (KeeperException e) {
505          switch (e.code()) {
506            case CONNECTIONLOSS:
507            case OPERATIONTIMEOUT:
508              TraceUtil.setError(span, e);
509              retryOrThrow(retryCounter, e, "setAcl");
510              break;
511
512            default:
513              TraceUtil.setError(span, e);
514              throw e;
515          }
516        }
517        retryCounter.sleepUntilNextRetry();
518      }
519    } finally {
520      span.end();
521    }
522  }
523
524  /**
525   * <p>
526   * NONSEQUENTIAL create is idempotent operation. Retry before throwing exceptions. But this
527   * function will not throw the NodeExist exception back to the application.
528   * </p>
529   * <p>
530   * But SEQUENTIAL is NOT idempotent operation. It is necessary to add identifier to the path to
531   * verify, whether the previous one is successful or not.
532   * </p>
533   */
534  public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
535    throws KeeperException, InterruptedException {
536    final Span span = TraceUtil.createSpan("RecoverableZookeeper.create");
537    try (Scope ignored = span.makeCurrent()) {
538      byte[] newData = ZKMetadata.appendMetaData(id, data);
539      switch (createMode) {
540        case EPHEMERAL:
541        case PERSISTENT:
542          span.setStatus(StatusCode.OK);
543          return createNonSequential(path, newData, acl, createMode);
544
545        case EPHEMERAL_SEQUENTIAL:
546        case PERSISTENT_SEQUENTIAL:
547          span.setStatus(StatusCode.OK);
548          return createSequential(path, newData, acl, createMode);
549
550        default:
551          final IllegalArgumentException e =
552            new IllegalArgumentException("Unrecognized CreateMode: " + createMode);
553          TraceUtil.setError(span, e);
554          throw e;
555      }
556    } finally {
557      span.end();
558    }
559  }
560
561  private String createNonSequential(String path, byte[] data, List<ACL> acl, CreateMode createMode)
562    throws KeeperException, InterruptedException {
563    RetryCounter retryCounter = retryCounterFactory.create();
564    boolean isRetry = false; // False for first attempt, true for all retries.
565    while (true) {
566      try {
567        return checkZk().create(path, data, acl, createMode);
568      } catch (KeeperException e) {
569        switch (e.code()) {
570          case NODEEXISTS:
571            if (isRetry) {
572              // If the connection was lost, there is still a possibility that
573              // we have successfully created the node at our previous attempt,
574              // so we read the node and compare.
575              byte[] currentData = checkZk().getData(path, false, null);
576              if (currentData != null && Bytes.compareTo(currentData, data) == 0) {
577                // We successfully created a non-sequential node
578                return path;
579              }
580              LOG.error("Node " + path + " already exists with " + Bytes.toStringBinary(currentData)
581                + ", could not write " + Bytes.toStringBinary(data));
582              throw e;
583            }
584            LOG.trace("Node {} already exists", path);
585            throw e;
586
587          case CONNECTIONLOSS:
588          case OPERATIONTIMEOUT:
589          case REQUESTTIMEOUT:
590            retryOrThrow(retryCounter, e, "create");
591            break;
592
593          default:
594            throw e;
595        }
596      }
597      retryCounter.sleepUntilNextRetry();
598      isRetry = true;
599    }
600  }
601
602  private String createSequential(String path, byte[] data, List<ACL> acl, CreateMode createMode)
603    throws KeeperException, InterruptedException {
604    RetryCounter retryCounter = retryCounterFactory.create();
605    boolean first = true;
606    String newPath = path + this.identifier;
607    while (true) {
608      try {
609        if (!first) {
610          // Check if we succeeded on a previous attempt
611          String previousResult = findPreviousSequentialNode(newPath);
612          if (previousResult != null) {
613            return previousResult;
614          }
615        }
616        first = false;
617        return checkZk().create(newPath, data, acl, createMode);
618      } catch (KeeperException e) {
619        switch (e.code()) {
620          case CONNECTIONLOSS:
621          case OPERATIONTIMEOUT:
622          case REQUESTTIMEOUT:
623            retryOrThrow(retryCounter, e, "create");
624            break;
625
626          default:
627            throw e;
628        }
629      }
630      retryCounter.sleepUntilNextRetry();
631    }
632  }
633
634  /**
635   * Convert Iterable of {@link org.apache.zookeeper.Op} we got into the ZooKeeper.Op instances to
636   * actually pass to multi (need to do this in order to appendMetaData).
637   */
638  private Iterable<Op> prepareZKMulti(Iterable<Op> ops) throws UnsupportedOperationException {
639    if (ops == null) {
640      return null;
641    }
642
643    List<Op> preparedOps = new LinkedList<>();
644    for (Op op : ops) {
645      if (op.getType() == ZooDefs.OpCode.create) {
646        CreateRequest create = (CreateRequest) op.toRequestRecord();
647        preparedOps.add(Op.create(create.getPath(), ZKMetadata.appendMetaData(id, create.getData()),
648          create.getAcl(), create.getFlags()));
649      } else if (op.getType() == ZooDefs.OpCode.delete) {
650        // no need to appendMetaData for delete
651        preparedOps.add(op);
652      } else if (op.getType() == ZooDefs.OpCode.setData) {
653        SetDataRequest setData = (SetDataRequest) op.toRequestRecord();
654        preparedOps.add(Op.setData(setData.getPath(),
655          ZKMetadata.appendMetaData(id, setData.getData()), setData.getVersion()));
656      } else {
657        throw new UnsupportedOperationException("Unexpected ZKOp type: " + op.getClass().getName());
658      }
659    }
660    return preparedOps;
661  }
662
663  /**
664   * Run multiple operations in a transactional manner. Retry before throwing exception
665   */
666  public List<OpResult> multi(Iterable<Op> ops) throws KeeperException, InterruptedException {
667    final Span span = TraceUtil.createSpan("RecoverableZookeeper.multi");
668    try (Scope ignored = span.makeCurrent()) {
669      RetryCounter retryCounter = retryCounterFactory.create();
670      Iterable<Op> multiOps = prepareZKMulti(ops);
671      while (true) {
672        try {
673          span.setStatus(StatusCode.OK);
674          return checkZk().multi(multiOps);
675        } catch (KeeperException e) {
676          switch (e.code()) {
677            case CONNECTIONLOSS:
678            case OPERATIONTIMEOUT:
679            case REQUESTTIMEOUT:
680              TraceUtil.setError(span, e);
681              retryOrThrow(retryCounter, e, "multi");
682              break;
683
684            default:
685              TraceUtil.setError(span, e);
686              throw e;
687          }
688        }
689        retryCounter.sleepUntilNextRetry();
690      }
691    } finally {
692      span.end();
693    }
694  }
695
696  private String findPreviousSequentialNode(String path)
697    throws KeeperException, InterruptedException {
698    int lastSlashIdx = path.lastIndexOf('/');
699    assert (lastSlashIdx != -1);
700    String parent = path.substring(0, lastSlashIdx);
701    String nodePrefix = path.substring(lastSlashIdx + 1);
702    List<String> nodes = checkZk().getChildren(parent, false);
703    List<String> matching = filterByPrefix(nodes, nodePrefix);
704    for (String node : matching) {
705      String nodePath = parent + "/" + node;
706      Stat stat = checkZk().exists(nodePath, false);
707      if (stat != null) {
708        return nodePath;
709      }
710    }
711    return null;
712  }
713
714  public synchronized long getSessionId() {
715    return zk == null ? -1 : zk.getSessionId();
716  }
717
718  public synchronized void close() throws InterruptedException {
719    if (zk != null) {
720      zk.close();
721    }
722  }
723
724  public synchronized States getState() {
725    return zk == null ? null : zk.getState();
726  }
727
728  public synchronized ZooKeeper getZooKeeper() {
729    return zk;
730  }
731
732  public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) throws KeeperException {
733    checkZk().sync(path, cb, ctx);
734  }
735
736  /**
737   * Filters the given node list by the given prefixes. This method is all-inclusive--if any element
738   * in the node list starts with any of the given prefixes, then it is included in the result.
739   * @param nodes    the nodes to filter
740   * @param prefixes the prefixes to include in the result
741   * @return list of every element that starts with one of the prefixes
742   */
743  private static List<String> filterByPrefix(List<String> nodes, String... prefixes) {
744    List<String> lockChildren = new ArrayList<>();
745    for (String child : nodes) {
746      for (String prefix : prefixes) {
747        if (child.startsWith(prefix)) {
748          lockChildren.add(child);
749          break;
750        }
751      }
752    }
753    return lockChildren;
754  }
755
756  public String getIdentifier() {
757    return identifier;
758  }
759}