001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.zookeeper;
020
021import java.io.IOException;
022import java.lang.management.ManagementFactory;
023import java.util.ArrayList;
024import java.util.LinkedList;
025import java.util.List;
026
027import org.apache.hadoop.hbase.trace.TraceUtil;
028import org.apache.hadoop.hbase.util.Bytes;
029import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
030import org.apache.hadoop.hbase.util.RetryCounter;
031import org.apache.hadoop.hbase.util.RetryCounterFactory;
032import org.apache.htrace.core.TraceScope;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.apache.zookeeper.AsyncCallback;
035import org.apache.zookeeper.CreateMode;
036import org.apache.zookeeper.KeeperException;
037import org.apache.zookeeper.Op;
038import org.apache.zookeeper.OpResult;
039import org.apache.zookeeper.Watcher;
040import org.apache.zookeeper.ZooDefs;
041import org.apache.zookeeper.ZooKeeper;
042import org.apache.zookeeper.ZooKeeper.States;
043import org.apache.zookeeper.data.ACL;
044import org.apache.zookeeper.data.Stat;
045import org.apache.zookeeper.proto.CreateRequest;
046import org.apache.zookeeper.proto.SetDataRequest;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050/**
051 * A zookeeper that can handle 'recoverable' errors.
052 * To handle recoverable errors, developers need to realize that there are two
053 * classes of requests: idempotent and non-idempotent requests. Read requests
054 * and unconditional sets and deletes are examples of idempotent requests, they
055 * can be reissued with the same results.
056 * (Although, the delete may throw a NoNodeException on reissue its effect on
057 * the ZooKeeper state is the same.) Non-idempotent requests need special
058 * handling, application and library writers need to keep in mind that they may
059 * need to encode information in the data or name of znodes to detect
060 * retries. A simple example is a create that uses a sequence flag.
061 * If a process issues a create("/x-", ..., SEQUENCE) and gets a connection
062 * loss exception, that process will reissue another
063 * create("/x-", ..., SEQUENCE) and get back x-111. When the process does a
064 * getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be
065 * that x-109 was the result of the previous create, so the process actually
066 * owns both x-109 and x-111. An easy way around this is to use "x-process id-"
067 * when doing the create. If the process is using an id of 352, before reissuing
068 * the create it will do a getChildren("/") and see "x-222-1", "x-542-30",
069 * "x-352-109", x-333-110". The process will know that the original create
070 * succeeded an the znode it created is "x-352-109".
071 * @see "http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling"
072 */
073@InterfaceAudience.Private
074public class RecoverableZooKeeper {
075  private static final Logger LOG = LoggerFactory.getLogger(RecoverableZooKeeper.class);
076  // the actual ZooKeeper client instance
077  private ZooKeeper zk;
078  private final RetryCounterFactory retryCounterFactory;
079  // An identifier of this process in the cluster
080  private final String identifier;
081  private final byte[] id;
082  private final Watcher watcher;
083  private final int sessionTimeout;
084  private final String quorumServers;
085
086  public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
087      Watcher watcher, int maxRetries, int retryIntervalMillis, int maxSleepTime)
088    throws IOException {
089    this(quorumServers, sessionTimeout, watcher, maxRetries, retryIntervalMillis, maxSleepTime,
090        null);
091  }
092
093  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DE_MIGHT_IGNORE",
094      justification="None. Its always been this way.")
095  public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
096      Watcher watcher, int maxRetries, int retryIntervalMillis, int maxSleepTime, String identifier)
097    throws IOException {
098    // TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should.
099    this.retryCounterFactory =
100      new RetryCounterFactory(maxRetries+1, retryIntervalMillis, maxSleepTime);
101
102    if (identifier == null || identifier.length() == 0) {
103      // the identifier = processID@hostName
104      identifier = ManagementFactory.getRuntimeMXBean().getName();
105    }
106    LOG.info("Process identifier={} connecting to ZooKeeper ensemble={}", identifier,
107      quorumServers);
108    this.identifier = identifier;
109    this.id = Bytes.toBytes(identifier);
110
111    this.watcher = watcher;
112    this.sessionTimeout = sessionTimeout;
113    this.quorumServers = quorumServers;
114
115    try {
116      checkZk();
117    } catch (Exception x) {
118      /* ignore */
119    }
120  }
121
122  /**
123   * Try to create a ZooKeeper connection. Turns any exception encountered into a
124   * KeeperException.OperationTimeoutException so it can retried.
125   * @return The created ZooKeeper connection object
126   * @throws KeeperException if a ZooKeeper operation fails
127   */
128  protected synchronized ZooKeeper checkZk() throws KeeperException {
129    if (this.zk == null) {
130      try {
131        this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher);
132      } catch (IOException ex) {
133        LOG.warn("Unable to create ZooKeeper Connection", ex);
134        throw new KeeperException.OperationTimeoutException();
135      }
136    }
137    return zk;
138  }
139
140  public synchronized void reconnectAfterExpiration()
141        throws IOException, KeeperException, InterruptedException {
142    if (zk != null) {
143      LOG.info("Closing dead ZooKeeper connection, session" +
144        " was: 0x"+Long.toHexString(zk.getSessionId()));
145      zk.close();
146      // reset the ZooKeeper connection
147      zk = null;
148    }
149    checkZk();
150    LOG.info("Recreated a ZooKeeper, session" +
151      " is: 0x"+Long.toHexString(zk.getSessionId()));
152  }
153
154  /**
155   * delete is an idempotent operation. Retry before throwing exception.
156   * This function will not throw NoNodeException if the path does not
157   * exist.
158   */
159  public void delete(String path, int version) throws InterruptedException, KeeperException {
160    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.delete")) {
161      RetryCounter retryCounter = retryCounterFactory.create();
162      boolean isRetry = false; // False for first attempt, true for all retries.
163      while (true) {
164        try {
165          long startTime = EnvironmentEdgeManager.currentTime();
166          checkZk().delete(path, version);
167          return;
168        } catch (KeeperException e) {
169          switch (e.code()) {
170            case NONODE:
171              if (isRetry) {
172                LOG.debug("Node " + path + " already deleted. Assuming a " +
173                    "previous attempt succeeded.");
174                return;
175              }
176              LOG.debug("Node {} already deleted, retry={}", path, isRetry);
177              throw e;
178
179            case CONNECTIONLOSS:
180              retryOrThrow(retryCounter, e, "delete");
181              break;
182            case OPERATIONTIMEOUT:
183              retryOrThrow(retryCounter, e, "delete");
184              break;
185
186            default:
187              throw e;
188          }
189        }
190        retryCounter.sleepUntilNextRetry();
191        isRetry = true;
192      }
193    }
194  }
195
196  /**
197   * exists is an idempotent operation. Retry before throwing exception
198   * @return A Stat instance
199   */
200  public Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException {
201    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.exists")) {
202      RetryCounter retryCounter = retryCounterFactory.create();
203      while (true) {
204        try {
205          long startTime = EnvironmentEdgeManager.currentTime();
206          Stat nodeStat = checkZk().exists(path, watcher);
207          return nodeStat;
208        } catch (KeeperException e) {
209          switch (e.code()) {
210            case CONNECTIONLOSS:
211              retryOrThrow(retryCounter, e, "exists");
212              break;
213            case OPERATIONTIMEOUT:
214              retryOrThrow(retryCounter, e, "exists");
215              break;
216
217            default:
218              throw e;
219          }
220        }
221        retryCounter.sleepUntilNextRetry();
222      }
223    }
224  }
225
226  /**
227   * exists is an idempotent operation. Retry before throwing exception
228   * @return A Stat instance
229   */
230  public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException {
231    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.exists")) {
232      RetryCounter retryCounter = retryCounterFactory.create();
233      while (true) {
234        try {
235          long startTime = EnvironmentEdgeManager.currentTime();
236          Stat nodeStat = checkZk().exists(path, watch);
237          return nodeStat;
238        } catch (KeeperException e) {
239          switch (e.code()) {
240            case CONNECTIONLOSS:
241              retryOrThrow(retryCounter, e, "exists");
242              break;
243            case OPERATIONTIMEOUT:
244              retryOrThrow(retryCounter, e, "exists");
245              break;
246
247            default:
248              throw e;
249          }
250        }
251        retryCounter.sleepUntilNextRetry();
252      }
253    }
254  }
255
256  private void retryOrThrow(RetryCounter retryCounter, KeeperException e,
257      String opName) throws KeeperException {
258    if (!retryCounter.shouldRetry()) {
259      LOG.error("ZooKeeper {} failed after {} attempts", opName, retryCounter.getMaxAttempts());
260      throw e;
261    }
262    LOG.debug("Retry, connectivity issue (JVM Pause?); quorum={},exception{}=", quorumServers, e);
263  }
264
265  /**
266   * getChildren is an idempotent operation. Retry before throwing exception
267   * @return List of children znodes
268   */
269  public List<String> getChildren(String path, Watcher watcher)
270    throws KeeperException, InterruptedException {
271    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getChildren")) {
272      RetryCounter retryCounter = retryCounterFactory.create();
273      while (true) {
274        try {
275          long startTime = EnvironmentEdgeManager.currentTime();
276          List<String> children = checkZk().getChildren(path, watcher);
277          return children;
278        } catch (KeeperException e) {
279          switch (e.code()) {
280            case CONNECTIONLOSS:
281              retryOrThrow(retryCounter, e, "getChildren");
282              break;
283            case OPERATIONTIMEOUT:
284              retryOrThrow(retryCounter, e, "getChildren");
285              break;
286
287            default:
288              throw e;
289          }
290        }
291        retryCounter.sleepUntilNextRetry();
292      }
293    }
294  }
295
296  /**
297   * getChildren is an idempotent operation. Retry before throwing exception
298   * @return List of children znodes
299   */
300  public List<String> getChildren(String path, boolean watch)
301    throws KeeperException, InterruptedException {
302    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getChildren")) {
303      RetryCounter retryCounter = retryCounterFactory.create();
304      while (true) {
305        try {
306          long startTime = EnvironmentEdgeManager.currentTime();
307          List<String> children = checkZk().getChildren(path, watch);
308          return children;
309        } catch (KeeperException e) {
310          switch (e.code()) {
311            case CONNECTIONLOSS:
312              retryOrThrow(retryCounter, e, "getChildren");
313              break;
314            case OPERATIONTIMEOUT:
315              retryOrThrow(retryCounter, e, "getChildren");
316              break;
317
318            default:
319              throw e;
320          }
321        }
322        retryCounter.sleepUntilNextRetry();
323      }
324    }
325  }
326
327  /**
328   * getData is an idempotent operation. Retry before throwing exception
329   * @return Data
330   */
331  public byte[] getData(String path, Watcher watcher, Stat stat)
332    throws KeeperException, InterruptedException {
333    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getData")) {
334      RetryCounter retryCounter = retryCounterFactory.create();
335      while (true) {
336        try {
337          long startTime = EnvironmentEdgeManager.currentTime();
338          byte[] revData = checkZk().getData(path, watcher, stat);
339          return ZKMetadata.removeMetaData(revData);
340        } catch (KeeperException e) {
341          switch (e.code()) {
342            case CONNECTIONLOSS:
343              retryOrThrow(retryCounter, e, "getData");
344              break;
345            case OPERATIONTIMEOUT:
346              retryOrThrow(retryCounter, e, "getData");
347              break;
348
349            default:
350              throw e;
351          }
352        }
353        retryCounter.sleepUntilNextRetry();
354      }
355    }
356  }
357
358  /**
359   * getData is an idempotent operation. Retry before throwing exception
360   * @return Data
361   */
362  public byte[] getData(String path, boolean watch, Stat stat)
363    throws KeeperException, InterruptedException {
364    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getData")) {
365      RetryCounter retryCounter = retryCounterFactory.create();
366      while (true) {
367        try {
368          long startTime = EnvironmentEdgeManager.currentTime();
369          byte[] revData = checkZk().getData(path, watch, stat);
370          return ZKMetadata.removeMetaData(revData);
371        } catch (KeeperException e) {
372          switch (e.code()) {
373            case CONNECTIONLOSS:
374              retryOrThrow(retryCounter, e, "getData");
375              break;
376            case OPERATIONTIMEOUT:
377              retryOrThrow(retryCounter, e, "getData");
378              break;
379
380            default:
381              throw e;
382          }
383        }
384        retryCounter.sleepUntilNextRetry();
385      }
386    }
387  }
388
389  /**
390   * setData is NOT an idempotent operation. Retry may cause BadVersion Exception
391   * Adding an identifier field into the data to check whether
392   * badversion is caused by the result of previous correctly setData
393   * @return Stat instance
394   */
395  public Stat setData(String path, byte[] data, int version)
396    throws KeeperException, InterruptedException {
397    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.setData")) {
398      RetryCounter retryCounter = retryCounterFactory.create();
399      byte[] newData = ZKMetadata.appendMetaData(id, data);
400      boolean isRetry = false;
401      long startTime;
402      while (true) {
403        try {
404          startTime = EnvironmentEdgeManager.currentTime();
405          Stat nodeStat = checkZk().setData(path, newData, version);
406          return nodeStat;
407        } catch (KeeperException e) {
408          switch (e.code()) {
409            case CONNECTIONLOSS:
410              retryOrThrow(retryCounter, e, "setData");
411              break;
412            case OPERATIONTIMEOUT:
413              retryOrThrow(retryCounter, e, "setData");
414              break;
415            case BADVERSION:
416              if (isRetry) {
417                // try to verify whether the previous setData success or not
418                try{
419                  Stat stat = new Stat();
420                  byte[] revData = checkZk().getData(path, false, stat);
421                  if(Bytes.compareTo(revData, newData) == 0) {
422                    // the bad version is caused by previous successful setData
423                    return stat;
424                  }
425                } catch(KeeperException keeperException){
426                  // the ZK is not reliable at this moment. just throwing exception
427                  throw keeperException;
428                }
429              }
430            // throw other exceptions and verified bad version exceptions
431            default:
432              throw e;
433          }
434        }
435        retryCounter.sleepUntilNextRetry();
436        isRetry = true;
437      }
438    }
439  }
440
441  /**
442   * getAcl is an idempotent operation. Retry before throwing exception
443   * @return list of ACLs
444   */
445  public List<ACL> getAcl(String path, Stat stat)
446    throws KeeperException, InterruptedException {
447    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getAcl")) {
448      RetryCounter retryCounter = retryCounterFactory.create();
449      while (true) {
450        try {
451          long startTime = EnvironmentEdgeManager.currentTime();
452          List<ACL> nodeACL = checkZk().getACL(path, stat);
453          return nodeACL;
454        } catch (KeeperException e) {
455          switch (e.code()) {
456            case CONNECTIONLOSS:
457              retryOrThrow(retryCounter, e, "getAcl");
458              break;
459            case OPERATIONTIMEOUT:
460              retryOrThrow(retryCounter, e, "getAcl");
461              break;
462
463            default:
464              throw e;
465          }
466        }
467        retryCounter.sleepUntilNextRetry();
468      }
469    }
470  }
471
472  /**
473   * setAcl is an idempotent operation. Retry before throwing exception
474   * @return list of ACLs
475   */
476  public Stat setAcl(String path, List<ACL> acls, int version)
477    throws KeeperException, InterruptedException {
478    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.setAcl")) {
479      RetryCounter retryCounter = retryCounterFactory.create();
480      while (true) {
481        try {
482          long startTime = EnvironmentEdgeManager.currentTime();
483          Stat nodeStat = checkZk().setACL(path, acls, version);
484          return nodeStat;
485        } catch (KeeperException e) {
486          switch (e.code()) {
487            case CONNECTIONLOSS:
488              retryOrThrow(retryCounter, e, "setAcl");
489              break;
490            case OPERATIONTIMEOUT:
491              retryOrThrow(retryCounter, e, "setAcl");
492              break;
493
494            default:
495              throw e;
496          }
497        }
498        retryCounter.sleepUntilNextRetry();
499      }
500    }
501  }
502
503  /**
504   * <p>
505   * NONSEQUENTIAL create is idempotent operation.
506   * Retry before throwing exceptions.
507   * But this function will not throw the NodeExist exception back to the
508   * application.
509   * </p>
510   * <p>
511   * But SEQUENTIAL is NOT idempotent operation. It is necessary to add
512   * identifier to the path to verify, whether the previous one is successful
513   * or not.
514   * </p>
515   *
516   * @return Path
517   */
518  public String create(String path, byte[] data, List<ACL> acl,
519      CreateMode createMode)
520    throws KeeperException, InterruptedException {
521    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.create")) {
522      byte[] newData = ZKMetadata.appendMetaData(id, data);
523      switch (createMode) {
524        case EPHEMERAL:
525        case PERSISTENT:
526          return createNonSequential(path, newData, acl, createMode);
527
528        case EPHEMERAL_SEQUENTIAL:
529        case PERSISTENT_SEQUENTIAL:
530          return createSequential(path, newData, acl, createMode);
531
532        default:
533          throw new IllegalArgumentException("Unrecognized CreateMode: " +
534              createMode);
535      }
536    }
537  }
538
539  private String createNonSequential(String path, byte[] data, List<ACL> acl,
540      CreateMode createMode) throws KeeperException, InterruptedException {
541    RetryCounter retryCounter = retryCounterFactory.create();
542    boolean isRetry = false; // False for first attempt, true for all retries.
543    long startTime;
544    while (true) {
545      try {
546        startTime = EnvironmentEdgeManager.currentTime();
547        String nodePath = checkZk().create(path, data, acl, createMode);
548        return nodePath;
549      } catch (KeeperException e) {
550        switch (e.code()) {
551          case NODEEXISTS:
552            if (isRetry) {
553              // If the connection was lost, there is still a possibility that
554              // we have successfully created the node at our previous attempt,
555              // so we read the node and compare.
556              byte[] currentData = checkZk().getData(path, false, null);
557              if (currentData != null &&
558                  Bytes.compareTo(currentData, data) == 0) {
559                // We successfully created a non-sequential node
560                return path;
561              }
562              LOG.error("Node " + path + " already exists with " +
563                  Bytes.toStringBinary(currentData) + ", could not write " +
564                  Bytes.toStringBinary(data));
565              throw e;
566            }
567            LOG.trace("Node {} already exists", path);
568            throw e;
569
570          case CONNECTIONLOSS:
571            retryOrThrow(retryCounter, e, "create");
572            break;
573          case OPERATIONTIMEOUT:
574            retryOrThrow(retryCounter, e, "create");
575            break;
576
577          default:
578            throw e;
579        }
580      }
581      retryCounter.sleepUntilNextRetry();
582      isRetry = true;
583    }
584  }
585
586  private String createSequential(String path, byte[] data,
587      List<ACL> acl, CreateMode createMode)
588    throws KeeperException, InterruptedException {
589    RetryCounter retryCounter = retryCounterFactory.create();
590    boolean first = true;
591    String newPath = path+this.identifier;
592    while (true) {
593      try {
594        if (!first) {
595          // Check if we succeeded on a previous attempt
596          String previousResult = findPreviousSequentialNode(newPath);
597          if (previousResult != null) {
598            return previousResult;
599          }
600        }
601        first = false;
602        long startTime = EnvironmentEdgeManager.currentTime();
603        String nodePath = checkZk().create(newPath, data, acl, createMode);
604        return nodePath;
605      } catch (KeeperException e) {
606        switch (e.code()) {
607          case CONNECTIONLOSS:
608            retryOrThrow(retryCounter, e, "create");
609            break;
610          case OPERATIONTIMEOUT:
611            retryOrThrow(retryCounter, e, "create");
612            break;
613
614          default:
615            throw e;
616        }
617      }
618      retryCounter.sleepUntilNextRetry();
619    }
620  }
621  /**
622   * Convert Iterable of {@link org.apache.zookeeper.Op} we got into the ZooKeeper.Op
623   * instances to actually pass to multi (need to do this in order to appendMetaData).
624   */
625  private Iterable<Op> prepareZKMulti(Iterable<Op> ops) throws UnsupportedOperationException {
626    if(ops == null) {
627      return null;
628    }
629
630    List<Op> preparedOps = new LinkedList<>();
631    for (Op op : ops) {
632      if (op.getType() == ZooDefs.OpCode.create) {
633        CreateRequest create = (CreateRequest)op.toRequestRecord();
634        preparedOps.add(Op.create(create.getPath(), ZKMetadata.appendMetaData(id, create.getData()),
635          create.getAcl(), create.getFlags()));
636      } else if (op.getType() == ZooDefs.OpCode.delete) {
637        // no need to appendMetaData for delete
638        preparedOps.add(op);
639      } else if (op.getType() == ZooDefs.OpCode.setData) {
640        SetDataRequest setData = (SetDataRequest)op.toRequestRecord();
641        preparedOps.add(Op.setData(setData.getPath(),
642                ZKMetadata.appendMetaData(id, setData.getData()), setData.getVersion()));
643      } else {
644        throw new UnsupportedOperationException("Unexpected ZKOp type: " + op.getClass().getName());
645      }
646    }
647    return preparedOps;
648  }
649
650  /**
651   * Run multiple operations in a transactional manner. Retry before throwing exception
652   */
653  public List<OpResult> multi(Iterable<Op> ops)
654    throws KeeperException, InterruptedException {
655    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.multi")) {
656      RetryCounter retryCounter = retryCounterFactory.create();
657      Iterable<Op> multiOps = prepareZKMulti(ops);
658      while (true) {
659        try {
660          long startTime = EnvironmentEdgeManager.currentTime();
661          List<OpResult> opResults = checkZk().multi(multiOps);
662          return opResults;
663        } catch (KeeperException e) {
664          switch (e.code()) {
665            case CONNECTIONLOSS:
666              retryOrThrow(retryCounter, e, "multi");
667              break;
668            case OPERATIONTIMEOUT:
669              retryOrThrow(retryCounter, e, "multi");
670              break;
671
672            default:
673              throw e;
674          }
675        }
676        retryCounter.sleepUntilNextRetry();
677      }
678    }
679  }
680
681  private String findPreviousSequentialNode(String path)
682    throws KeeperException, InterruptedException {
683    int lastSlashIdx = path.lastIndexOf('/');
684    assert(lastSlashIdx != -1);
685    String parent = path.substring(0, lastSlashIdx);
686    String nodePrefix = path.substring(lastSlashIdx+1);
687    long startTime = EnvironmentEdgeManager.currentTime();
688    List<String> nodes = checkZk().getChildren(parent, false);
689    List<String> matching = filterByPrefix(nodes, nodePrefix);
690    for (String node : matching) {
691      String nodePath = parent + "/" + node;
692      startTime = EnvironmentEdgeManager.currentTime();
693      Stat stat = checkZk().exists(nodePath, false);
694      if (stat != null) {
695        return nodePath;
696      }
697    }
698    return null;
699  }
700
701  public synchronized long getSessionId() {
702    return zk == null ? -1 : zk.getSessionId();
703  }
704
705  public synchronized void close() throws InterruptedException {
706    if (zk != null) {
707      zk.close();
708    }
709  }
710
711  public synchronized States getState() {
712    return zk == null ? null : zk.getState();
713  }
714
715  public synchronized ZooKeeper getZooKeeper() {
716    return zk;
717  }
718
719  public synchronized byte[] getSessionPasswd() {
720    return zk == null ? null : zk.getSessionPasswd();
721  }
722
723  public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) throws KeeperException {
724    checkZk().sync(path, cb, null);
725  }
726
727  /**
728   * Filters the given node list by the given prefixes.
729   * This method is all-inclusive--if any element in the node list starts
730   * with any of the given prefixes, then it is included in the result.
731   *
732   * @param nodes the nodes to filter
733   * @param prefixes the prefixes to include in the result
734   * @return list of every element that starts with one of the prefixes
735   */
736  private static List<String> filterByPrefix(List<String> nodes,
737      String... prefixes) {
738    List<String> lockChildren = new ArrayList<>();
739    for (String child : nodes){
740      for (String prefix : prefixes){
741        if (child.startsWith(prefix)){
742          lockChildren.add(child);
743          break;
744        }
745      }
746    }
747    return lockChildren;
748  }
749
750  public String getIdentifier() {
751    return identifier;
752  }
753}