001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.zookeeper;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT;
021import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT;
022
023import java.io.Closeable;
024import java.io.IOException;
025import java.util.Arrays;
026import java.util.EnumSet;
027import java.util.List;
028import java.util.concurrent.CompletableFuture;
029import java.util.concurrent.DelayQueue;
030import java.util.concurrent.Delayed;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.atomic.AtomicBoolean;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.DoNotRetryIOException;
035import org.apache.hadoop.hbase.util.FutureUtils;
036import org.apache.hadoop.hbase.util.Threads;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.apache.zookeeper.KeeperException;
039import org.apache.zookeeper.KeeperException.Code;
040import org.apache.zookeeper.ZooKeeper;
041import org.apache.zookeeper.data.Stat;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
046
047/**
048 * A very simple read only zookeeper implementation without watcher support.
049 */
050@InterfaceAudience.Private
051public final class ReadOnlyZKClient implements Closeable {
052
053  private static final Logger LOG = LoggerFactory.getLogger(ReadOnlyZKClient.class);
054
055  public static final String RECOVERY_RETRY = "zookeeper.recovery.retry";
056
057  private static final int DEFAULT_RECOVERY_RETRY = 30;
058
059  public static final String RECOVERY_RETRY_INTERVAL_MILLIS =
060      "zookeeper.recovery.retry.intervalmill";
061
062  private static final int DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS = 1000;
063
064  public static final String KEEPALIVE_MILLIS = "zookeeper.keep-alive.time";
065
066  private static final int DEFAULT_KEEPALIVE_MILLIS = 60000;
067
068  private static final EnumSet<Code> FAIL_FAST_CODES = EnumSet.of(Code.NOAUTH, Code.AUTHFAILED);
069
070  private final String connectString;
071
072  private final int sessionTimeoutMs;
073
074  private final int maxRetries;
075
076  private final int retryIntervalMs;
077
078  private final int keepAliveTimeMs;
079
080  private static abstract class Task implements Delayed {
081
082    protected long time = System.nanoTime();
083
084    public boolean needZk() {
085      return false;
086    }
087
088    public void exec(ZooKeeper zk) {
089    }
090
091    public void connectFailed(IOException e) {
092    }
093
094    public void closed(IOException e) {
095    }
096
097    @Override
098    public int compareTo(Delayed o) {
099      Task that = (Task) o;
100      int c = Long.compare(time, that.time);
101      if (c != 0) {
102        return c;
103      }
104      return Integer.compare(System.identityHashCode(this), System.identityHashCode(that));
105    }
106
107    @Override
108    public long getDelay(TimeUnit unit) {
109      return unit.convert(time - System.nanoTime(), TimeUnit.NANOSECONDS);
110    }
111  }
112
113  private static final Task CLOSE = new Task() {
114  };
115
116  private final DelayQueue<Task> tasks = new DelayQueue<>();
117
118  private final AtomicBoolean closed = new AtomicBoolean(false);
119
120  @VisibleForTesting
121  ZooKeeper zookeeper;
122
123  private int pendingRequests = 0;
124
125  private String getId() {
126    return String.format("0x%08x", System.identityHashCode(this));
127  }
128
129  public ReadOnlyZKClient(Configuration conf) {
130    // We might use a different ZK for client access
131    String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf);
132    if (clientZkQuorumServers != null) {
133      this.connectString = clientZkQuorumServers;
134    } else {
135      this.connectString = ZKConfig.getZKQuorumServersString(conf);
136    }
137    this.sessionTimeoutMs = conf.getInt(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT);
138    this.maxRetries = conf.getInt(RECOVERY_RETRY, DEFAULT_RECOVERY_RETRY);
139    this.retryIntervalMs =
140        conf.getInt(RECOVERY_RETRY_INTERVAL_MILLIS, DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS);
141    this.keepAliveTimeMs = conf.getInt(KEEPALIVE_MILLIS, DEFAULT_KEEPALIVE_MILLIS);
142    LOG.debug(
143      "Connect {} to {} with session timeout={}ms, retries {}, " +
144        "retry interval {}ms, keepAlive={}ms",
145      getId(), connectString, sessionTimeoutMs, maxRetries, retryIntervalMs, keepAliveTimeMs);
146    Threads.setDaemonThreadRunning(new Thread(this::run),
147      "ReadOnlyZKClient-" + connectString + "@" + getId());
148  }
149
150  private abstract class ZKTask<T> extends Task {
151
152    protected final String path;
153
154    private final CompletableFuture<T> future;
155
156    private final String operationType;
157
158    private int retries;
159
160    protected ZKTask(String path, CompletableFuture<T> future, String operationType) {
161      this.path = path;
162      this.future = future;
163      this.operationType = operationType;
164    }
165
166    protected final void onComplete(ZooKeeper zk, int rc, T ret, boolean errorIfNoNode) {
167      tasks.add(new Task() {
168
169        @Override
170        public void exec(ZooKeeper alwaysNull) {
171          pendingRequests--;
172          Code code = Code.get(rc);
173          if (code == Code.OK) {
174            future.complete(ret);
175          } else if (code == Code.NONODE) {
176            if (errorIfNoNode) {
177              future.completeExceptionally(KeeperException.create(code, path));
178            } else {
179              future.complete(ret);
180            }
181          } else if (FAIL_FAST_CODES.contains(code)) {
182            future.completeExceptionally(KeeperException.create(code, path));
183          } else {
184            if (code == Code.SESSIONEXPIRED) {
185              LOG.warn("{} to {} session expired, close and reconnect", getId(), connectString);
186              try {
187                zk.close();
188              } catch (InterruptedException e) {
189              }
190            }
191            if (ZKTask.this.delay(retryIntervalMs, maxRetries)) {
192              LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}", getId(),
193                connectString, operationType, path, code, ZKTask.this.retries);
194              tasks.add(ZKTask.this);
195            } else {
196              LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}, give up", getId(),
197                connectString, operationType, path, code, ZKTask.this.retries);
198              future.completeExceptionally(KeeperException.create(code, path));
199            }
200          }
201        }
202
203        @Override
204        public void closed(IOException e) {
205          // It may happen that a request is succeeded and the onComplete has been called and pushed
206          // us into the task queue, but before we get called a close is called and here we will
207          // fail the request, although it is succeeded actually.
208          // This is not a perfect solution but anyway, it is better than hang the requests for
209          // ever, and also acceptable as if you close the zk client before actually getting the
210          // response then a failure is always possible.
211          future.completeExceptionally(e);
212        }
213      });
214    }
215
216    @Override
217    public boolean needZk() {
218      return true;
219    }
220
221    protected abstract void doExec(ZooKeeper zk);
222
223    @Override
224    public final void exec(ZooKeeper zk) {
225      pendingRequests++;
226      doExec(zk);
227    }
228
229    public boolean delay(long intervalMs, int maxRetries) {
230      if (retries >= maxRetries) {
231        return false;
232      }
233      retries++;
234      time = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(intervalMs);
235      return true;
236    }
237
238    @Override
239    public void connectFailed(IOException e) {
240      if (delay(retryIntervalMs, maxRetries)) {
241        LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}", getId(),
242          connectString, operationType, path, retries, e);
243        tasks.add(this);
244      } else {
245        LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}, give up", getId(),
246          connectString, operationType, path, retries, e);
247        future.completeExceptionally(e);
248      }
249    }
250
251    @Override
252    public void closed(IOException e) {
253      future.completeExceptionally(e);
254    }
255  }
256
257  public CompletableFuture<byte[]> get(String path) {
258    if (closed.get()) {
259      return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));
260    }
261    CompletableFuture<byte[]> future = new CompletableFuture<>();
262    tasks.add(new ZKTask<byte[]>(path, future, "get") {
263
264      @Override
265      protected void doExec(ZooKeeper zk) {
266        zk.getData(path, false,
267            (rc, path, ctx, data, stat) -> onComplete(zk, rc, data, true), null);
268      }
269    });
270    return future;
271  }
272
273  public CompletableFuture<Stat> exists(String path) {
274    if (closed.get()) {
275      return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));
276    }
277    CompletableFuture<Stat> future = new CompletableFuture<>();
278    tasks.add(new ZKTask<Stat>(path, future, "exists") {
279
280      @Override
281      protected void doExec(ZooKeeper zk) {
282        zk.exists(path, false, (rc, path, ctx, stat) -> onComplete(zk, rc, stat, false), null);
283      }
284    });
285    return future;
286  }
287
288  public CompletableFuture<List<String>> list(String path) {
289    if (closed.get()) {
290      return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));
291    }
292    CompletableFuture<List<String>> future = new CompletableFuture<>();
293    tasks.add(new ZKTask<List<String>>(path, future, "list") {
294
295      @Override
296      protected void doExec(ZooKeeper zk) {
297        zk.getChildren(path, false, (rc, path, ctx, children) -> onComplete(zk, rc, children, true),
298          null);
299      }
300    });
301    return future;
302  }
303
304  private void closeZk() {
305    if (zookeeper != null) {
306      try {
307        zookeeper.close();
308      } catch (InterruptedException e) {
309      }
310      zookeeper = null;
311    }
312  }
313
314  private ZooKeeper getZk() throws IOException {
315    // may be closed when session expired
316    if (zookeeper == null || !zookeeper.getState().isAlive()) {
317      zookeeper = new ZooKeeper(connectString, sessionTimeoutMs, e -> {});
318    }
319    return zookeeper;
320  }
321
322  private void run() {
323    for (;;) {
324      Task task;
325      try {
326        task = tasks.poll(keepAliveTimeMs, TimeUnit.MILLISECONDS);
327      } catch (InterruptedException e) {
328        continue;
329      }
330      if (task == CLOSE) {
331        break;
332      }
333      if (task == null) {
334        if (pendingRequests == 0) {
335          LOG.trace("{} to {} inactive for {}ms; closing (Will reconnect when new requests)",
336            getId(), connectString, keepAliveTimeMs);
337          closeZk();
338        }
339        continue;
340      }
341      if (!task.needZk()) {
342        task.exec(null);
343      } else {
344        ZooKeeper zk;
345        try {
346          zk = getZk();
347        } catch (IOException e) {
348          task.connectFailed(e);
349          continue;
350        }
351        task.exec(zk);
352      }
353    }
354    closeZk();
355    DoNotRetryIOException error = new DoNotRetryIOException("Client already closed");
356    Arrays.stream(tasks.toArray(new Task[0])).forEach(t -> t.closed(error));
357    tasks.clear();
358  }
359
360  @Override
361  public void close() {
362    if (closed.compareAndSet(false, true)) {
363      LOG.debug("Close zookeeper connection {} to {}", getId(), connectString);
364      tasks.add(CLOSE);
365    }
366  }
367
368  @VisibleForTesting
369  public String getConnectString() {
370    return connectString;
371  }
372}