001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.zookeeper;
020
021import java.io.IOException;
022import java.lang.management.ManagementFactory;
023import java.util.ArrayList;
024import java.util.LinkedList;
025import java.util.List;
026
027import org.apache.hadoop.hbase.trace.TraceUtil;
028import org.apache.hadoop.hbase.util.Bytes;
029import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
030import org.apache.hadoop.hbase.util.RetryCounter;
031import org.apache.hadoop.hbase.util.RetryCounterFactory;
032import org.apache.htrace.core.TraceScope;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.apache.zookeeper.AsyncCallback;
035import org.apache.zookeeper.CreateMode;
036import org.apache.zookeeper.KeeperException;
037import org.apache.zookeeper.Op;
038import org.apache.zookeeper.OpResult;
039import org.apache.zookeeper.Watcher;
040import org.apache.zookeeper.ZooDefs;
041import org.apache.zookeeper.ZooKeeper;
042import org.apache.zookeeper.ZooKeeper.States;
043import org.apache.zookeeper.data.ACL;
044import org.apache.zookeeper.data.Stat;
045import org.apache.zookeeper.proto.CreateRequest;
046import org.apache.zookeeper.proto.SetDataRequest;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050/**
051 * A zookeeper that can handle 'recoverable' errors.
052 * To handle recoverable errors, developers need to realize that there are two
053 * classes of requests: idempotent and non-idempotent requests. Read requests
054 * and unconditional sets and deletes are examples of idempotent requests, they
055 * can be reissued with the same results.
056 * (Although, the delete may throw a NoNodeException on reissue its effect on
057 * the ZooKeeper state is the same.) Non-idempotent requests need special
058 * handling, application and library writers need to keep in mind that they may
059 * need to encode information in the data or name of znodes to detect
060 * retries. A simple example is a create that uses a sequence flag.
061 * If a process issues a create("/x-", ..., SEQUENCE) and gets a connection
062 * loss exception, that process will reissue another
063 * create("/x-", ..., SEQUENCE) and get back x-111. When the process does a
064 * getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be
065 * that x-109 was the result of the previous create, so the process actually
066 * owns both x-109 and x-111. An easy way around this is to use "x-process id-"
067 * when doing the create. If the process is using an id of 352, before reissuing
068 * the create it will do a getChildren("/") and see "x-222-1", "x-542-30",
069 * "x-352-109", x-333-110". The process will know that the original create
070 * succeeded an the znode it created is "x-352-109".
071 * @see "http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling"
072 */
073@InterfaceAudience.Private
074public class RecoverableZooKeeper {
075  private static final Logger LOG = LoggerFactory.getLogger(RecoverableZooKeeper.class);
076  // the actual ZooKeeper client instance
077  private ZooKeeper zk;
078  private final RetryCounterFactory retryCounterFactory;
079  // An identifier of this process in the cluster
080  private final String identifier;
081  private final byte[] id;
082  private final Watcher watcher;
083  private final int sessionTimeout;
084  private final String quorumServers;
085
086  public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
087      Watcher watcher, int maxRetries, int retryIntervalMillis, int maxSleepTime)
088  throws IOException {
089    this(quorumServers, sessionTimeout, watcher, maxRetries, retryIntervalMillis, maxSleepTime,
090        null);
091  }
092
093  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DE_MIGHT_IGNORE",
094      justification="None. Its always been this way.")
095  public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
096      Watcher watcher, int maxRetries, int retryIntervalMillis, int maxSleepTime, String identifier)
097  throws IOException {
098    // TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should.
099    this.retryCounterFactory =
100      new RetryCounterFactory(maxRetries+1, retryIntervalMillis, maxSleepTime);
101
102    if (identifier == null || identifier.length() == 0) {
103      // the identifier = processID@hostName
104      identifier = ManagementFactory.getRuntimeMXBean().getName();
105    }
106    LOG.info("Process identifier=" + identifier +
107      " connecting to ZooKeeper ensemble=" + quorumServers);
108    this.identifier = identifier;
109    this.id = Bytes.toBytes(identifier);
110
111    this.watcher = watcher;
112    this.sessionTimeout = sessionTimeout;
113    this.quorumServers = quorumServers;
114
115    try {
116      checkZk();
117    } catch (Exception x) {
118      /* ignore */
119    }
120  }
121
122  /**
123   * Try to create a ZooKeeper connection. Turns any exception encountered into a
124   * KeeperException.OperationTimeoutException so it can retried.
125   * @return The created ZooKeeper connection object
126   * @throws KeeperException if a ZooKeeper operation fails
127   */
128  protected synchronized ZooKeeper checkZk() throws KeeperException {
129    if (this.zk == null) {
130      try {
131        this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher);
132      } catch (IOException ex) {
133        LOG.warn("Unable to create ZooKeeper Connection", ex);
134        throw new KeeperException.OperationTimeoutException();
135      }
136    }
137    return zk;
138  }
139
140  public synchronized void reconnectAfterExpiration()
141        throws IOException, KeeperException, InterruptedException {
142    if (zk != null) {
143      LOG.info("Closing dead ZooKeeper connection, session" +
144        " was: 0x"+Long.toHexString(zk.getSessionId()));
145      zk.close();
146      // reset the ZooKeeper connection
147      zk = null;
148    }
149    checkZk();
150    LOG.info("Recreated a ZooKeeper, session" +
151      " is: 0x"+Long.toHexString(zk.getSessionId()));
152  }
153
154  /**
155   * delete is an idempotent operation. Retry before throwing exception.
156   * This function will not throw NoNodeException if the path does not
157   * exist.
158   */
159  public void delete(String path, int version) throws InterruptedException, KeeperException {
160    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.delete")) {
161      RetryCounter retryCounter = retryCounterFactory.create();
162      boolean isRetry = false; // False for first attempt, true for all retries.
163      while (true) {
164        try {
165          long startTime = EnvironmentEdgeManager.currentTime();
166          checkZk().delete(path, version);
167          return;
168        } catch (KeeperException e) {
169          switch (e.code()) {
170            case NONODE:
171              if (isRetry) {
172                LOG.debug("Node " + path + " already deleted. Assuming a " +
173                    "previous attempt succeeded.");
174                return;
175              }
176              LOG.debug("Node " + path + " already deleted, retry=" + isRetry);
177              throw e;
178
179            case CONNECTIONLOSS:
180              retryOrThrow(retryCounter, e, "delete");
181              break;
182            case OPERATIONTIMEOUT:
183              retryOrThrow(retryCounter, e, "delete");
184              break;
185
186            default:
187              throw e;
188          }
189        }
190        retryCounter.sleepUntilNextRetry();
191        isRetry = true;
192      }
193    }
194  }
195
196  /**
197   * exists is an idempotent operation. Retry before throwing exception
198   * @return A Stat instance
199   */
200  public Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException {
201    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.exists")) {
202      RetryCounter retryCounter = retryCounterFactory.create();
203      while (true) {
204        try {
205          long startTime = EnvironmentEdgeManager.currentTime();
206          Stat nodeStat = checkZk().exists(path, watcher);
207          return nodeStat;
208        } catch (KeeperException e) {
209          switch (e.code()) {
210            case CONNECTIONLOSS:
211              retryOrThrow(retryCounter, e, "exists");
212              break;
213            case OPERATIONTIMEOUT:
214              retryOrThrow(retryCounter, e, "exists");
215              break;
216
217            default:
218              throw e;
219          }
220        }
221        retryCounter.sleepUntilNextRetry();
222      }
223    }
224  }
225
226  /**
227   * exists is an idempotent operation. Retry before throwing exception
228   * @return A Stat instance
229   */
230  public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException {
231    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.exists")) {
232      RetryCounter retryCounter = retryCounterFactory.create();
233      while (true) {
234        try {
235          long startTime = EnvironmentEdgeManager.currentTime();
236          Stat nodeStat = checkZk().exists(path, watch);
237          return nodeStat;
238        } catch (KeeperException e) {
239          switch (e.code()) {
240            case CONNECTIONLOSS:
241              retryOrThrow(retryCounter, e, "exists");
242              break;
243            case OPERATIONTIMEOUT:
244              retryOrThrow(retryCounter, e, "exists");
245              break;
246
247            default:
248              throw e;
249          }
250        }
251        retryCounter.sleepUntilNextRetry();
252      }
253    }
254  }
255
256  private void retryOrThrow(RetryCounter retryCounter, KeeperException e,
257      String opName) throws KeeperException {
258    if (!retryCounter.shouldRetry()) {
259      LOG.error("ZooKeeper " + opName + " failed after "
260        + retryCounter.getMaxAttempts() + " attempts");
261      throw e;
262    }
263    LOG.debug("Retry, connectivity issue (JVM Pause?); quorum=" + quorumServers + "," +
264        "exception=" + e);
265  }
266
267  /**
268   * getChildren is an idempotent operation. Retry before throwing exception
269   * @return List of children znodes
270   */
271  public List<String> getChildren(String path, Watcher watcher)
272    throws KeeperException, InterruptedException {
273    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getChildren")) {
274      RetryCounter retryCounter = retryCounterFactory.create();
275      while (true) {
276        try {
277          long startTime = EnvironmentEdgeManager.currentTime();
278          List<String> children = checkZk().getChildren(path, watcher);
279          return children;
280        } catch (KeeperException e) {
281          switch (e.code()) {
282            case CONNECTIONLOSS:
283              retryOrThrow(retryCounter, e, "getChildren");
284              break;
285            case OPERATIONTIMEOUT:
286              retryOrThrow(retryCounter, e, "getChildren");
287              break;
288
289            default:
290              throw e;
291          }
292        }
293        retryCounter.sleepUntilNextRetry();
294      }
295    }
296  }
297
298  /**
299   * getChildren is an idempotent operation. Retry before throwing exception
300   * @return List of children znodes
301   */
302  public List<String> getChildren(String path, boolean watch)
303  throws KeeperException, InterruptedException {
304    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getChildren")) {
305      RetryCounter retryCounter = retryCounterFactory.create();
306      while (true) {
307        try {
308          long startTime = EnvironmentEdgeManager.currentTime();
309          List<String> children = checkZk().getChildren(path, watch);
310          return children;
311        } catch (KeeperException e) {
312          switch (e.code()) {
313            case CONNECTIONLOSS:
314              retryOrThrow(retryCounter, e, "getChildren");
315              break;
316            case OPERATIONTIMEOUT:
317              retryOrThrow(retryCounter, e, "getChildren");
318              break;
319
320            default:
321              throw e;
322          }
323        }
324        retryCounter.sleepUntilNextRetry();
325      }
326    }
327  }
328
329  /**
330   * getData is an idempotent operation. Retry before throwing exception
331   * @return Data
332   */
333  public byte[] getData(String path, Watcher watcher, Stat stat)
334  throws KeeperException, InterruptedException {
335    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getData")) {
336      RetryCounter retryCounter = retryCounterFactory.create();
337      while (true) {
338        try {
339          long startTime = EnvironmentEdgeManager.currentTime();
340          byte[] revData = checkZk().getData(path, watcher, stat);
341          return ZKMetadata.removeMetaData(revData);
342        } catch (KeeperException e) {
343          switch (e.code()) {
344            case CONNECTIONLOSS:
345              retryOrThrow(retryCounter, e, "getData");
346              break;
347            case OPERATIONTIMEOUT:
348              retryOrThrow(retryCounter, e, "getData");
349              break;
350
351            default:
352              throw e;
353          }
354        }
355        retryCounter.sleepUntilNextRetry();
356      }
357    }
358  }
359
360  /**
361   * getData is an idempotent operation. Retry before throwing exception
362   * @return Data
363   */
364  public byte[] getData(String path, boolean watch, Stat stat)
365  throws KeeperException, InterruptedException {
366    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getData")) {
367      RetryCounter retryCounter = retryCounterFactory.create();
368      while (true) {
369        try {
370          long startTime = EnvironmentEdgeManager.currentTime();
371          byte[] revData = checkZk().getData(path, watch, stat);
372          return ZKMetadata.removeMetaData(revData);
373        } catch (KeeperException e) {
374          switch (e.code()) {
375            case CONNECTIONLOSS:
376              retryOrThrow(retryCounter, e, "getData");
377              break;
378            case OPERATIONTIMEOUT:
379              retryOrThrow(retryCounter, e, "getData");
380              break;
381
382            default:
383              throw e;
384          }
385        }
386        retryCounter.sleepUntilNextRetry();
387      }
388    }
389  }
390
391  /**
392   * setData is NOT an idempotent operation. Retry may cause BadVersion Exception
393   * Adding an identifier field into the data to check whether
394   * badversion is caused by the result of previous correctly setData
395   * @return Stat instance
396   */
397  public Stat setData(String path, byte[] data, int version)
398  throws KeeperException, InterruptedException {
399    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.setData")) {
400      RetryCounter retryCounter = retryCounterFactory.create();
401      byte[] newData = ZKMetadata.appendMetaData(id, data);
402      boolean isRetry = false;
403      long startTime;
404      while (true) {
405        try {
406          startTime = EnvironmentEdgeManager.currentTime();
407          Stat nodeStat = checkZk().setData(path, newData, version);
408          return nodeStat;
409        } catch (KeeperException e) {
410          switch (e.code()) {
411            case CONNECTIONLOSS:
412              retryOrThrow(retryCounter, e, "setData");
413              break;
414            case OPERATIONTIMEOUT:
415              retryOrThrow(retryCounter, e, "setData");
416              break;
417            case BADVERSION:
418              if (isRetry) {
419                // try to verify whether the previous setData success or not
420                try{
421                  Stat stat = new Stat();
422                  byte[] revData = checkZk().getData(path, false, stat);
423                  if(Bytes.compareTo(revData, newData) == 0) {
424                    // the bad version is caused by previous successful setData
425                    return stat;
426                  }
427                } catch(KeeperException keeperException){
428                  // the ZK is not reliable at this moment. just throwing exception
429                  throw keeperException;
430                }
431              }
432            // throw other exceptions and verified bad version exceptions
433            default:
434              throw e;
435          }
436        }
437        retryCounter.sleepUntilNextRetry();
438        isRetry = true;
439      }
440    }
441  }
442
443  /**
444   * getAcl is an idempotent operation. Retry before throwing exception
445   * @return list of ACLs
446   */
447  public List<ACL> getAcl(String path, Stat stat)
448  throws KeeperException, InterruptedException {
449    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getAcl")) {
450      RetryCounter retryCounter = retryCounterFactory.create();
451      while (true) {
452        try {
453          long startTime = EnvironmentEdgeManager.currentTime();
454          List<ACL> nodeACL = checkZk().getACL(path, stat);
455          return nodeACL;
456        } catch (KeeperException e) {
457          switch (e.code()) {
458            case CONNECTIONLOSS:
459              retryOrThrow(retryCounter, e, "getAcl");
460              break;
461            case OPERATIONTIMEOUT:
462              retryOrThrow(retryCounter, e, "getAcl");
463              break;
464
465            default:
466              throw e;
467          }
468        }
469        retryCounter.sleepUntilNextRetry();
470      }
471    }
472  }
473
474  /**
475   * setAcl is an idempotent operation. Retry before throwing exception
476   * @return list of ACLs
477   */
478  public Stat setAcl(String path, List<ACL> acls, int version)
479  throws KeeperException, InterruptedException {
480    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.setAcl")) {
481      RetryCounter retryCounter = retryCounterFactory.create();
482      while (true) {
483        try {
484          long startTime = EnvironmentEdgeManager.currentTime();
485          Stat nodeStat = checkZk().setACL(path, acls, version);
486          return nodeStat;
487        } catch (KeeperException e) {
488          switch (e.code()) {
489            case CONNECTIONLOSS:
490              retryOrThrow(retryCounter, e, "setAcl");
491              break;
492            case OPERATIONTIMEOUT:
493              retryOrThrow(retryCounter, e, "setAcl");
494              break;
495
496            default:
497              throw e;
498          }
499        }
500        retryCounter.sleepUntilNextRetry();
501      }
502    }
503  }
504
505  /**
506   * <p>
507   * NONSEQUENTIAL create is idempotent operation.
508   * Retry before throwing exceptions.
509   * But this function will not throw the NodeExist exception back to the
510   * application.
511   * </p>
512   * <p>
513   * But SEQUENTIAL is NOT idempotent operation. It is necessary to add
514   * identifier to the path to verify, whether the previous one is successful
515   * or not.
516   * </p>
517   *
518   * @return Path
519   */
520  public String create(String path, byte[] data, List<ACL> acl,
521      CreateMode createMode)
522  throws KeeperException, InterruptedException {
523    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.create")) {
524      byte[] newData = ZKMetadata.appendMetaData(id, data);
525      switch (createMode) {
526        case EPHEMERAL:
527        case PERSISTENT:
528          return createNonSequential(path, newData, acl, createMode);
529
530        case EPHEMERAL_SEQUENTIAL:
531        case PERSISTENT_SEQUENTIAL:
532          return createSequential(path, newData, acl, createMode);
533
534        default:
535          throw new IllegalArgumentException("Unrecognized CreateMode: " +
536              createMode);
537      }
538    }
539  }
540
541  private String createNonSequential(String path, byte[] data, List<ACL> acl,
542      CreateMode createMode) throws KeeperException, InterruptedException {
543    RetryCounter retryCounter = retryCounterFactory.create();
544    boolean isRetry = false; // False for first attempt, true for all retries.
545    long startTime;
546    while (true) {
547      try {
548        startTime = EnvironmentEdgeManager.currentTime();
549        String nodePath = checkZk().create(path, data, acl, createMode);
550        return nodePath;
551      } catch (KeeperException e) {
552        switch (e.code()) {
553          case NODEEXISTS:
554            if (isRetry) {
555              // If the connection was lost, there is still a possibility that
556              // we have successfully created the node at our previous attempt,
557              // so we read the node and compare.
558              byte[] currentData = checkZk().getData(path, false, null);
559              if (currentData != null &&
560                  Bytes.compareTo(currentData, data) == 0) {
561                // We successfully created a non-sequential node
562                return path;
563              }
564              LOG.error("Node " + path + " already exists with " +
565                  Bytes.toStringBinary(currentData) + ", could not write " +
566                  Bytes.toStringBinary(data));
567              throw e;
568            }
569            LOG.trace("Node {} already exists", path);
570            throw e;
571
572          case CONNECTIONLOSS:
573            retryOrThrow(retryCounter, e, "create");
574            break;
575          case OPERATIONTIMEOUT:
576            retryOrThrow(retryCounter, e, "create");
577            break;
578
579          default:
580            throw e;
581        }
582      }
583      retryCounter.sleepUntilNextRetry();
584      isRetry = true;
585    }
586  }
587
588  private String createSequential(String path, byte[] data,
589      List<ACL> acl, CreateMode createMode)
590  throws KeeperException, InterruptedException {
591    RetryCounter retryCounter = retryCounterFactory.create();
592    boolean first = true;
593    String newPath = path+this.identifier;
594    while (true) {
595      try {
596        if (!first) {
597          // Check if we succeeded on a previous attempt
598          String previousResult = findPreviousSequentialNode(newPath);
599          if (previousResult != null) {
600            return previousResult;
601          }
602        }
603        first = false;
604        long startTime = EnvironmentEdgeManager.currentTime();
605        String nodePath = checkZk().create(newPath, data, acl, createMode);
606        return nodePath;
607      } catch (KeeperException e) {
608        switch (e.code()) {
609          case CONNECTIONLOSS:
610            retryOrThrow(retryCounter, e, "create");
611            break;
612          case OPERATIONTIMEOUT:
613            retryOrThrow(retryCounter, e, "create");
614            break;
615
616          default:
617            throw e;
618        }
619      }
620      retryCounter.sleepUntilNextRetry();
621    }
622  }
623  /**
624   * Convert Iterable of {@link org.apache.zookeeper.Op} we got into the ZooKeeper.Op
625   * instances to actually pass to multi (need to do this in order to appendMetaData).
626   */
627  private Iterable<Op> prepareZKMulti(Iterable<Op> ops) throws UnsupportedOperationException {
628    if(ops == null) {
629      return null;
630    }
631
632    List<Op> preparedOps = new LinkedList<>();
633    for (Op op : ops) {
634      if (op.getType() == ZooDefs.OpCode.create) {
635        CreateRequest create = (CreateRequest)op.toRequestRecord();
636        preparedOps.add(Op.create(create.getPath(), ZKMetadata.appendMetaData(id, create.getData()),
637          create.getAcl(), create.getFlags()));
638      } else if (op.getType() == ZooDefs.OpCode.delete) {
639        // no need to appendMetaData for delete
640        preparedOps.add(op);
641      } else if (op.getType() == ZooDefs.OpCode.setData) {
642        SetDataRequest setData = (SetDataRequest)op.toRequestRecord();
643        preparedOps.add(Op.setData(setData.getPath(),
644                ZKMetadata.appendMetaData(id, setData.getData()), setData.getVersion()));
645      } else {
646        throw new UnsupportedOperationException("Unexpected ZKOp type: " + op.getClass().getName());
647      }
648    }
649    return preparedOps;
650  }
651
652  /**
653   * Run multiple operations in a transactional manner. Retry before throwing exception
654   */
655  public List<OpResult> multi(Iterable<Op> ops)
656  throws KeeperException, InterruptedException {
657    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.multi")) {
658      RetryCounter retryCounter = retryCounterFactory.create();
659      Iterable<Op> multiOps = prepareZKMulti(ops);
660      while (true) {
661        try {
662          long startTime = EnvironmentEdgeManager.currentTime();
663          List<OpResult> opResults = checkZk().multi(multiOps);
664          return opResults;
665        } catch (KeeperException e) {
666          switch (e.code()) {
667            case CONNECTIONLOSS:
668              retryOrThrow(retryCounter, e, "multi");
669              break;
670            case OPERATIONTIMEOUT:
671              retryOrThrow(retryCounter, e, "multi");
672              break;
673
674            default:
675              throw e;
676          }
677        }
678        retryCounter.sleepUntilNextRetry();
679      }
680    }
681  }
682
683  private String findPreviousSequentialNode(String path)
684    throws KeeperException, InterruptedException {
685    int lastSlashIdx = path.lastIndexOf('/');
686    assert(lastSlashIdx != -1);
687    String parent = path.substring(0, lastSlashIdx);
688    String nodePrefix = path.substring(lastSlashIdx+1);
689    long startTime = EnvironmentEdgeManager.currentTime();
690    List<String> nodes = checkZk().getChildren(parent, false);
691    List<String> matching = filterByPrefix(nodes, nodePrefix);
692    for (String node : matching) {
693      String nodePath = parent + "/" + node;
694      startTime = EnvironmentEdgeManager.currentTime();
695      Stat stat = checkZk().exists(nodePath, false);
696      if (stat != null) {
697        return nodePath;
698      }
699    }
700    return null;
701  }
702
703  public synchronized long getSessionId() {
704    return zk == null ? -1 : zk.getSessionId();
705  }
706
707  public synchronized void close() throws InterruptedException {
708    if (zk != null) {
709      zk.close();
710    }
711  }
712
713  public synchronized States getState() {
714    return zk == null ? null : zk.getState();
715  }
716
717  public synchronized ZooKeeper getZooKeeper() {
718    return zk;
719  }
720
721  public synchronized byte[] getSessionPasswd() {
722    return zk == null ? null : zk.getSessionPasswd();
723  }
724
725  public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) throws KeeperException {
726    checkZk().sync(path, cb, null);
727  }
728
729  /**
730   * Filters the given node list by the given prefixes.
731   * This method is all-inclusive--if any element in the node list starts
732   * with any of the given prefixes, then it is included in the result.
733   *
734   * @param nodes the nodes to filter
735   * @param prefixes the prefixes to include in the result
736   * @return list of every element that starts with one of the prefixes
737   */
738  private static List<String> filterByPrefix(List<String> nodes,
739      String... prefixes) {
740    List<String> lockChildren = new ArrayList<>();
741    for (String child : nodes){
742      for (String prefix : prefixes){
743        if (child.startsWith(prefix)){
744          lockChildren.add(child);
745          break;
746        }
747      }
748    }
749    return lockChildren;
750  }
751
752  public String getIdentifier() {
753    return identifier;
754  }
755}