001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.zookeeper;
020
021import java.io.IOException;
022import java.lang.management.ManagementFactory;
023import java.util.ArrayList;
024import java.util.LinkedList;
025import java.util.List;
026
027import org.apache.hadoop.hbase.trace.TraceUtil;
028import org.apache.hadoop.hbase.util.Bytes;
029import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
030import org.apache.hadoop.hbase.util.RetryCounter;
031import org.apache.hadoop.hbase.util.RetryCounterFactory;
032import org.apache.htrace.core.TraceScope;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.apache.zookeeper.AsyncCallback;
035import org.apache.zookeeper.CreateMode;
036import org.apache.zookeeper.KeeperException;
037import org.apache.zookeeper.Op;
038import org.apache.zookeeper.OpResult;
039import org.apache.zookeeper.Watcher;
040import org.apache.zookeeper.ZooDefs;
041import org.apache.zookeeper.ZooKeeper;
042import org.apache.zookeeper.ZooKeeper.States;
043import org.apache.zookeeper.data.ACL;
044import org.apache.zookeeper.data.Stat;
045import org.apache.zookeeper.proto.CreateRequest;
046import org.apache.zookeeper.proto.SetDataRequest;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050/**
051 * A zookeeper that can handle 'recoverable' errors.
052 * To handle recoverable errors, developers need to realize that there are two
053 * classes of requests: idempotent and non-idempotent requests. Read requests
054 * and unconditional sets and deletes are examples of idempotent requests, they
055 * can be reissued with the same results.
056 * (Although, the delete may throw a NoNodeException on reissue its effect on
057 * the ZooKeeper state is the same.) Non-idempotent requests need special
058 * handling, application and library writers need to keep in mind that they may
059 * need to encode information in the data or name of znodes to detect
060 * retries. A simple example is a create that uses a sequence flag.
061 * If a process issues a create("/x-", ..., SEQUENCE) and gets a connection
062 * loss exception, that process will reissue another
063 * create("/x-", ..., SEQUENCE) and get back x-111. When the process does a
064 * getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be
065 * that x-109 was the result of the previous create, so the process actually
066 * owns both x-109 and x-111. An easy way around this is to use "x-process id-"
067 * when doing the create. If the process is using an id of 352, before reissuing
068 * the create it will do a getChildren("/") and see "x-222-1", "x-542-30",
069 * "x-352-109", x-333-110". The process will know that the original create
070 * succeeded an the znode it created is "x-352-109".
071 * @see "http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling"
072 */
073@InterfaceAudience.Private
074public class RecoverableZooKeeper {
075  private static final Logger LOG = LoggerFactory.getLogger(RecoverableZooKeeper.class);
076  // the actual ZooKeeper client instance
077  private ZooKeeper zk;
078  private final RetryCounterFactory retryCounterFactory;
079  // An identifier of this process in the cluster
080  private final String identifier;
081  private final byte[] id;
082  private final Watcher watcher;
083  private final int sessionTimeout;
084  private final String quorumServers;
085  private final int maxMultiSize;
086
087  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DE_MIGHT_IGNORE",
088      justification="None. Its always been this way.")
089  public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
090      Watcher watcher, int maxRetries, int retryIntervalMillis, int maxSleepTime, String identifier,
091      int maxMultiSize)
092    throws IOException {
093    // TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should.
094    this.retryCounterFactory =
095      new RetryCounterFactory(maxRetries+1, retryIntervalMillis, maxSleepTime);
096
097    if (identifier == null || identifier.length() == 0) {
098      // the identifier = processID@hostName
099      identifier = ManagementFactory.getRuntimeMXBean().getName();
100    }
101    LOG.info("Process identifier={} connecting to ZooKeeper ensemble={}", identifier,
102      quorumServers);
103    this.identifier = identifier;
104    this.id = Bytes.toBytes(identifier);
105
106    this.watcher = watcher;
107    this.sessionTimeout = sessionTimeout;
108    this.quorumServers = quorumServers;
109    this.maxMultiSize = maxMultiSize;
110
111    try {
112      checkZk();
113    } catch (Exception x) {
114      /* ignore */
115    }
116  }
117
118  /**
119   * Returns the maximum size (in bytes) that should be included in any single multi() call.
120   *
121   * NB: This is an approximation, so there may be variance in the msg actually sent over the
122   * wire. Please be sure to set this approximately, with respect to your ZK server configuration
123   * for jute.maxbuffer.
124   */
125  public int getMaxMultiSizeLimit() {
126    return maxMultiSize;
127  }
128
129  /**
130   * Try to create a ZooKeeper connection. Turns any exception encountered into a
131   * KeeperException.OperationTimeoutException so it can retried.
132   * @return The created ZooKeeper connection object
133   * @throws KeeperException if a ZooKeeper operation fails
134   */
135  protected synchronized ZooKeeper checkZk() throws KeeperException {
136    if (this.zk == null) {
137      try {
138        this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher);
139      } catch (IOException ex) {
140        LOG.warn("Unable to create ZooKeeper Connection", ex);
141        throw new KeeperException.OperationTimeoutException();
142      }
143    }
144    return zk;
145  }
146
147  public synchronized void reconnectAfterExpiration()
148        throws IOException, KeeperException, InterruptedException {
149    if (zk != null) {
150      LOG.info("Closing dead ZooKeeper connection, session" +
151        " was: 0x"+Long.toHexString(zk.getSessionId()));
152      zk.close();
153      // reset the ZooKeeper connection
154      zk = null;
155    }
156    checkZk();
157    LOG.info("Recreated a ZooKeeper, session" +
158      " is: 0x"+Long.toHexString(zk.getSessionId()));
159  }
160
161  /**
162   * delete is an idempotent operation. Retry before throwing exception.
163   * This function will not throw NoNodeException if the path does not
164   * exist.
165   */
166  public void delete(String path, int version) throws InterruptedException, KeeperException {
167    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.delete")) {
168      RetryCounter retryCounter = retryCounterFactory.create();
169      boolean isRetry = false; // False for first attempt, true for all retries.
170      while (true) {
171        try {
172          long startTime = EnvironmentEdgeManager.currentTime();
173          checkZk().delete(path, version);
174          return;
175        } catch (KeeperException e) {
176          switch (e.code()) {
177            case NONODE:
178              if (isRetry) {
179                LOG.debug("Node " + path + " already deleted. Assuming a " +
180                    "previous attempt succeeded.");
181                return;
182              }
183              LOG.debug("Node {} already deleted, retry={}", path, isRetry);
184              throw e;
185
186            case CONNECTIONLOSS:
187              retryOrThrow(retryCounter, e, "delete");
188              break;
189            case OPERATIONTIMEOUT:
190              retryOrThrow(retryCounter, e, "delete");
191              break;
192
193            default:
194              throw e;
195          }
196        }
197        retryCounter.sleepUntilNextRetry();
198        isRetry = true;
199      }
200    }
201  }
202
203  /**
204   * exists is an idempotent operation. Retry before throwing exception
205   * @return A Stat instance
206   */
207  public Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException {
208    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.exists")) {
209      RetryCounter retryCounter = retryCounterFactory.create();
210      while (true) {
211        try {
212          long startTime = EnvironmentEdgeManager.currentTime();
213          Stat nodeStat = checkZk().exists(path, watcher);
214          return nodeStat;
215        } catch (KeeperException e) {
216          switch (e.code()) {
217            case CONNECTIONLOSS:
218              retryOrThrow(retryCounter, e, "exists");
219              break;
220            case OPERATIONTIMEOUT:
221              retryOrThrow(retryCounter, e, "exists");
222              break;
223
224            default:
225              throw e;
226          }
227        }
228        retryCounter.sleepUntilNextRetry();
229      }
230    }
231  }
232
233  /**
234   * exists is an idempotent operation. Retry before throwing exception
235   * @return A Stat instance
236   */
237  public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException {
238    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.exists")) {
239      RetryCounter retryCounter = retryCounterFactory.create();
240      while (true) {
241        try {
242          long startTime = EnvironmentEdgeManager.currentTime();
243          Stat nodeStat = checkZk().exists(path, watch);
244          return nodeStat;
245        } catch (KeeperException e) {
246          switch (e.code()) {
247            case CONNECTIONLOSS:
248              retryOrThrow(retryCounter, e, "exists");
249              break;
250            case OPERATIONTIMEOUT:
251              retryOrThrow(retryCounter, e, "exists");
252              break;
253
254            default:
255              throw e;
256          }
257        }
258        retryCounter.sleepUntilNextRetry();
259      }
260    }
261  }
262
263  private void retryOrThrow(RetryCounter retryCounter, KeeperException e,
264      String opName) throws KeeperException {
265    if (!retryCounter.shouldRetry()) {
266      LOG.error("ZooKeeper {} failed after {} attempts", opName, retryCounter.getMaxAttempts());
267      throw e;
268    }
269    LOG.debug("Retry, connectivity issue (JVM Pause?); quorum={},exception{}=", quorumServers, e);
270  }
271
272  /**
273   * getChildren is an idempotent operation. Retry before throwing exception
274   * @return List of children znodes
275   */
276  public List<String> getChildren(String path, Watcher watcher)
277    throws KeeperException, InterruptedException {
278    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getChildren")) {
279      RetryCounter retryCounter = retryCounterFactory.create();
280      while (true) {
281        try {
282          long startTime = EnvironmentEdgeManager.currentTime();
283          List<String> children = checkZk().getChildren(path, watcher);
284          return children;
285        } catch (KeeperException e) {
286          switch (e.code()) {
287            case CONNECTIONLOSS:
288              retryOrThrow(retryCounter, e, "getChildren");
289              break;
290            case OPERATIONTIMEOUT:
291              retryOrThrow(retryCounter, e, "getChildren");
292              break;
293
294            default:
295              throw e;
296          }
297        }
298        retryCounter.sleepUntilNextRetry();
299      }
300    }
301  }
302
303  /**
304   * getChildren is an idempotent operation. Retry before throwing exception
305   * @return List of children znodes
306   */
307  public List<String> getChildren(String path, boolean watch)
308    throws KeeperException, InterruptedException {
309    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getChildren")) {
310      RetryCounter retryCounter = retryCounterFactory.create();
311      while (true) {
312        try {
313          long startTime = EnvironmentEdgeManager.currentTime();
314          List<String> children = checkZk().getChildren(path, watch);
315          return children;
316        } catch (KeeperException e) {
317          switch (e.code()) {
318            case CONNECTIONLOSS:
319              retryOrThrow(retryCounter, e, "getChildren");
320              break;
321            case OPERATIONTIMEOUT:
322              retryOrThrow(retryCounter, e, "getChildren");
323              break;
324
325            default:
326              throw e;
327          }
328        }
329        retryCounter.sleepUntilNextRetry();
330      }
331    }
332  }
333
334  /**
335   * getData is an idempotent operation. Retry before throwing exception
336   * @return Data
337   */
338  public byte[] getData(String path, Watcher watcher, Stat stat)
339    throws KeeperException, InterruptedException {
340    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getData")) {
341      RetryCounter retryCounter = retryCounterFactory.create();
342      while (true) {
343        try {
344          long startTime = EnvironmentEdgeManager.currentTime();
345          byte[] revData = checkZk().getData(path, watcher, stat);
346          return ZKMetadata.removeMetaData(revData);
347        } catch (KeeperException e) {
348          switch (e.code()) {
349            case CONNECTIONLOSS:
350              retryOrThrow(retryCounter, e, "getData");
351              break;
352            case OPERATIONTIMEOUT:
353              retryOrThrow(retryCounter, e, "getData");
354              break;
355
356            default:
357              throw e;
358          }
359        }
360        retryCounter.sleepUntilNextRetry();
361      }
362    }
363  }
364
365  /**
366   * getData is an idempotent operation. Retry before throwing exception
367   * @return Data
368   */
369  public byte[] getData(String path, boolean watch, Stat stat)
370    throws KeeperException, InterruptedException {
371    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getData")) {
372      RetryCounter retryCounter = retryCounterFactory.create();
373      while (true) {
374        try {
375          long startTime = EnvironmentEdgeManager.currentTime();
376          byte[] revData = checkZk().getData(path, watch, stat);
377          return ZKMetadata.removeMetaData(revData);
378        } catch (KeeperException e) {
379          switch (e.code()) {
380            case CONNECTIONLOSS:
381              retryOrThrow(retryCounter, e, "getData");
382              break;
383            case OPERATIONTIMEOUT:
384              retryOrThrow(retryCounter, e, "getData");
385              break;
386
387            default:
388              throw e;
389          }
390        }
391        retryCounter.sleepUntilNextRetry();
392      }
393    }
394  }
395
396  /**
397   * setData is NOT an idempotent operation. Retry may cause BadVersion Exception
398   * Adding an identifier field into the data to check whether
399   * badversion is caused by the result of previous correctly setData
400   * @return Stat instance
401   */
402  public Stat setData(String path, byte[] data, int version)
403    throws KeeperException, InterruptedException {
404    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.setData")) {
405      RetryCounter retryCounter = retryCounterFactory.create();
406      byte[] newData = ZKMetadata.appendMetaData(id, data);
407      boolean isRetry = false;
408      long startTime;
409      while (true) {
410        try {
411          startTime = EnvironmentEdgeManager.currentTime();
412          Stat nodeStat = checkZk().setData(path, newData, version);
413          return nodeStat;
414        } catch (KeeperException e) {
415          switch (e.code()) {
416            case CONNECTIONLOSS:
417              retryOrThrow(retryCounter, e, "setData");
418              break;
419            case OPERATIONTIMEOUT:
420              retryOrThrow(retryCounter, e, "setData");
421              break;
422            case BADVERSION:
423              if (isRetry) {
424                // try to verify whether the previous setData success or not
425                try{
426                  Stat stat = new Stat();
427                  byte[] revData = checkZk().getData(path, false, stat);
428                  if(Bytes.compareTo(revData, newData) == 0) {
429                    // the bad version is caused by previous successful setData
430                    return stat;
431                  }
432                } catch(KeeperException keeperException){
433                  // the ZK is not reliable at this moment. just throwing exception
434                  throw keeperException;
435                }
436              }
437            // throw other exceptions and verified bad version exceptions
438            default:
439              throw e;
440          }
441        }
442        retryCounter.sleepUntilNextRetry();
443        isRetry = true;
444      }
445    }
446  }
447
448  /**
449   * getAcl is an idempotent operation. Retry before throwing exception
450   * @return list of ACLs
451   */
452  public List<ACL> getAcl(String path, Stat stat)
453    throws KeeperException, InterruptedException {
454    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getAcl")) {
455      RetryCounter retryCounter = retryCounterFactory.create();
456      while (true) {
457        try {
458          long startTime = EnvironmentEdgeManager.currentTime();
459          List<ACL> nodeACL = checkZk().getACL(path, stat);
460          return nodeACL;
461        } catch (KeeperException e) {
462          switch (e.code()) {
463            case CONNECTIONLOSS:
464              retryOrThrow(retryCounter, e, "getAcl");
465              break;
466            case OPERATIONTIMEOUT:
467              retryOrThrow(retryCounter, e, "getAcl");
468              break;
469
470            default:
471              throw e;
472          }
473        }
474        retryCounter.sleepUntilNextRetry();
475      }
476    }
477  }
478
479  /**
480   * setAcl is an idempotent operation. Retry before throwing exception
481   * @return list of ACLs
482   */
483  public Stat setAcl(String path, List<ACL> acls, int version)
484    throws KeeperException, InterruptedException {
485    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.setAcl")) {
486      RetryCounter retryCounter = retryCounterFactory.create();
487      while (true) {
488        try {
489          long startTime = EnvironmentEdgeManager.currentTime();
490          Stat nodeStat = checkZk().setACL(path, acls, version);
491          return nodeStat;
492        } catch (KeeperException e) {
493          switch (e.code()) {
494            case CONNECTIONLOSS:
495              retryOrThrow(retryCounter, e, "setAcl");
496              break;
497            case OPERATIONTIMEOUT:
498              retryOrThrow(retryCounter, e, "setAcl");
499              break;
500
501            default:
502              throw e;
503          }
504        }
505        retryCounter.sleepUntilNextRetry();
506      }
507    }
508  }
509
510  /**
511   * <p>
512   * NONSEQUENTIAL create is idempotent operation.
513   * Retry before throwing exceptions.
514   * But this function will not throw the NodeExist exception back to the
515   * application.
516   * </p>
517   * <p>
518   * But SEQUENTIAL is NOT idempotent operation. It is necessary to add
519   * identifier to the path to verify, whether the previous one is successful
520   * or not.
521   * </p>
522   *
523   * @return Path
524   */
525  public String create(String path, byte[] data, List<ACL> acl,
526      CreateMode createMode)
527    throws KeeperException, InterruptedException {
528    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.create")) {
529      byte[] newData = ZKMetadata.appendMetaData(id, data);
530      switch (createMode) {
531        case EPHEMERAL:
532        case PERSISTENT:
533          return createNonSequential(path, newData, acl, createMode);
534
535        case EPHEMERAL_SEQUENTIAL:
536        case PERSISTENT_SEQUENTIAL:
537          return createSequential(path, newData, acl, createMode);
538
539        default:
540          throw new IllegalArgumentException("Unrecognized CreateMode: " +
541              createMode);
542      }
543    }
544  }
545
546  private String createNonSequential(String path, byte[] data, List<ACL> acl,
547      CreateMode createMode) throws KeeperException, InterruptedException {
548    RetryCounter retryCounter = retryCounterFactory.create();
549    boolean isRetry = false; // False for first attempt, true for all retries.
550    long startTime;
551    while (true) {
552      try {
553        startTime = EnvironmentEdgeManager.currentTime();
554        String nodePath = checkZk().create(path, data, acl, createMode);
555        return nodePath;
556      } catch (KeeperException e) {
557        switch (e.code()) {
558          case NODEEXISTS:
559            if (isRetry) {
560              // If the connection was lost, there is still a possibility that
561              // we have successfully created the node at our previous attempt,
562              // so we read the node and compare.
563              byte[] currentData = checkZk().getData(path, false, null);
564              if (currentData != null &&
565                  Bytes.compareTo(currentData, data) == 0) {
566                // We successfully created a non-sequential node
567                return path;
568              }
569              LOG.error("Node " + path + " already exists with " +
570                  Bytes.toStringBinary(currentData) + ", could not write " +
571                  Bytes.toStringBinary(data));
572              throw e;
573            }
574            LOG.trace("Node {} already exists", path);
575            throw e;
576
577          case CONNECTIONLOSS:
578            retryOrThrow(retryCounter, e, "create");
579            break;
580          case OPERATIONTIMEOUT:
581            retryOrThrow(retryCounter, e, "create");
582            break;
583
584          default:
585            throw e;
586        }
587      }
588      retryCounter.sleepUntilNextRetry();
589      isRetry = true;
590    }
591  }
592
593  private String createSequential(String path, byte[] data,
594      List<ACL> acl, CreateMode createMode)
595    throws KeeperException, InterruptedException {
596    RetryCounter retryCounter = retryCounterFactory.create();
597    boolean first = true;
598    String newPath = path+this.identifier;
599    while (true) {
600      try {
601        if (!first) {
602          // Check if we succeeded on a previous attempt
603          String previousResult = findPreviousSequentialNode(newPath);
604          if (previousResult != null) {
605            return previousResult;
606          }
607        }
608        first = false;
609        long startTime = EnvironmentEdgeManager.currentTime();
610        String nodePath = checkZk().create(newPath, data, acl, createMode);
611        return nodePath;
612      } catch (KeeperException e) {
613        switch (e.code()) {
614          case CONNECTIONLOSS:
615            retryOrThrow(retryCounter, e, "create");
616            break;
617          case OPERATIONTIMEOUT:
618            retryOrThrow(retryCounter, e, "create");
619            break;
620
621          default:
622            throw e;
623        }
624      }
625      retryCounter.sleepUntilNextRetry();
626    }
627  }
628  /**
629   * Convert Iterable of {@link org.apache.zookeeper.Op} we got into the ZooKeeper.Op
630   * instances to actually pass to multi (need to do this in order to appendMetaData).
631   */
632  private Iterable<Op> prepareZKMulti(Iterable<Op> ops) throws UnsupportedOperationException {
633    if(ops == null) {
634      return null;
635    }
636
637    List<Op> preparedOps = new LinkedList<>();
638    for (Op op : ops) {
639      if (op.getType() == ZooDefs.OpCode.create) {
640        CreateRequest create = (CreateRequest)op.toRequestRecord();
641        preparedOps.add(Op.create(create.getPath(), ZKMetadata.appendMetaData(id, create.getData()),
642          create.getAcl(), create.getFlags()));
643      } else if (op.getType() == ZooDefs.OpCode.delete) {
644        // no need to appendMetaData for delete
645        preparedOps.add(op);
646      } else if (op.getType() == ZooDefs.OpCode.setData) {
647        SetDataRequest setData = (SetDataRequest)op.toRequestRecord();
648        preparedOps.add(Op.setData(setData.getPath(),
649                ZKMetadata.appendMetaData(id, setData.getData()), setData.getVersion()));
650      } else {
651        throw new UnsupportedOperationException("Unexpected ZKOp type: " + op.getClass().getName());
652      }
653    }
654    return preparedOps;
655  }
656
657  /**
658   * Run multiple operations in a transactional manner. Retry before throwing exception
659   */
660  public List<OpResult> multi(Iterable<Op> ops)
661    throws KeeperException, InterruptedException {
662    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.multi")) {
663      RetryCounter retryCounter = retryCounterFactory.create();
664      Iterable<Op> multiOps = prepareZKMulti(ops);
665      while (true) {
666        try {
667          long startTime = EnvironmentEdgeManager.currentTime();
668          List<OpResult> opResults = checkZk().multi(multiOps);
669          return opResults;
670        } catch (KeeperException e) {
671          switch (e.code()) {
672            case CONNECTIONLOSS:
673              retryOrThrow(retryCounter, e, "multi");
674              break;
675            case OPERATIONTIMEOUT:
676              retryOrThrow(retryCounter, e, "multi");
677              break;
678
679            default:
680              throw e;
681          }
682        }
683        retryCounter.sleepUntilNextRetry();
684      }
685    }
686  }
687
688  private String findPreviousSequentialNode(String path)
689    throws KeeperException, InterruptedException {
690    int lastSlashIdx = path.lastIndexOf('/');
691    assert(lastSlashIdx != -1);
692    String parent = path.substring(0, lastSlashIdx);
693    String nodePrefix = path.substring(lastSlashIdx+1);
694    long startTime = EnvironmentEdgeManager.currentTime();
695    List<String> nodes = checkZk().getChildren(parent, false);
696    List<String> matching = filterByPrefix(nodes, nodePrefix);
697    for (String node : matching) {
698      String nodePath = parent + "/" + node;
699      startTime = EnvironmentEdgeManager.currentTime();
700      Stat stat = checkZk().exists(nodePath, false);
701      if (stat != null) {
702        return nodePath;
703      }
704    }
705    return null;
706  }
707
708  public synchronized long getSessionId() {
709    return zk == null ? -1 : zk.getSessionId();
710  }
711
712  public synchronized void close() throws InterruptedException {
713    if (zk != null) {
714      zk.close();
715    }
716  }
717
718  public synchronized States getState() {
719    return zk == null ? null : zk.getState();
720  }
721
722  public synchronized ZooKeeper getZooKeeper() {
723    return zk;
724  }
725
726  public synchronized byte[] getSessionPasswd() {
727    return zk == null ? null : zk.getSessionPasswd();
728  }
729
730  public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) throws KeeperException {
731    checkZk().sync(path, cb, ctx);
732  }
733
734  /**
735   * Filters the given node list by the given prefixes.
736   * This method is all-inclusive--if any element in the node list starts
737   * with any of the given prefixes, then it is included in the result.
738   *
739   * @param nodes the nodes to filter
740   * @param prefixes the prefixes to include in the result
741   * @return list of every element that starts with one of the prefixes
742   */
743  private static List<String> filterByPrefix(List<String> nodes,
744      String... prefixes) {
745    List<String> lockChildren = new ArrayList<>();
746    for (String child : nodes){
747      for (String prefix : prefixes){
748        if (child.startsWith(prefix)){
749          lockChildren.add(child);
750          break;
751        }
752      }
753    }
754    return lockChildren;
755  }
756
757  public String getIdentifier() {
758    return identifier;
759  }
760}