001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.zookeeper;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT;
021import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT;
022
023import java.io.Closeable;
024import java.io.IOException;
025import java.util.Arrays;
026import java.util.EnumSet;
027import java.util.List;
028import java.util.concurrent.CompletableFuture;
029import java.util.concurrent.DelayQueue;
030import java.util.concurrent.Delayed;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.atomic.AtomicBoolean;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.DoNotRetryIOException;
035import org.apache.hadoop.hbase.util.FutureUtils;
036import org.apache.hadoop.hbase.util.Threads;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.apache.zookeeper.KeeperException;
039import org.apache.zookeeper.KeeperException.Code;
040import org.apache.zookeeper.ZooKeeper;
041import org.apache.zookeeper.data.Stat;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045/**
046 * A very simple read only zookeeper implementation without watcher support.
047 */
048@InterfaceAudience.Private
049public final class ReadOnlyZKClient implements Closeable {
050
051  private static final Logger LOG = LoggerFactory.getLogger(ReadOnlyZKClient.class);
052
053  public static final String RECOVERY_RETRY = "zookeeper.recovery.retry";
054
055  private static final int DEFAULT_RECOVERY_RETRY = 30;
056
057  public static final String RECOVERY_RETRY_INTERVAL_MILLIS =
058    "zookeeper.recovery.retry.intervalmill";
059
060  private static final int DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS = 1000;
061
062  public static final String KEEPALIVE_MILLIS = "zookeeper.keep-alive.time";
063
064  private static final int DEFAULT_KEEPALIVE_MILLIS = 60000;
065
066  private static final EnumSet<Code> FAIL_FAST_CODES = EnumSet.of(Code.NOAUTH, Code.AUTHFAILED);
067
068  private final String connectString;
069
070  private final int sessionTimeoutMs;
071
072  private final int maxRetries;
073
074  private final int retryIntervalMs;
075
076  private final int keepAliveTimeMs;
077
078  private static abstract class Task implements Delayed {
079
080    protected long time = System.nanoTime();
081
082    public boolean needZk() {
083      return false;
084    }
085
086    public void exec(ZooKeeper zk) {
087    }
088
089    public void connectFailed(Exception e) {
090    }
091
092    public void closed(IOException e) {
093    }
094
095    @Override
096    public int compareTo(Delayed o) {
097      Task that = (Task) o;
098      int c = Long.compare(time, that.time);
099      if (c != 0) {
100        return c;
101      }
102      return Integer.compare(System.identityHashCode(this), System.identityHashCode(that));
103    }
104
105    @Override
106    public long getDelay(TimeUnit unit) {
107      return unit.convert(time - System.nanoTime(), TimeUnit.NANOSECONDS);
108    }
109  }
110
111  private static final Task CLOSE = new Task() {
112  };
113
114  private final DelayQueue<Task> tasks = new DelayQueue<>();
115
116  private final AtomicBoolean closed = new AtomicBoolean(false);
117
118  ZooKeeper zookeeper;
119
120  private int pendingRequests = 0;
121
122  private String getId() {
123    return String.format("0x%08x", System.identityHashCode(this));
124  }
125
126  public ReadOnlyZKClient(Configuration conf) {
127    // We might use a different ZK for client access
128    String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf);
129    if (clientZkQuorumServers != null) {
130      this.connectString = clientZkQuorumServers;
131    } else {
132      this.connectString = ZKConfig.getZKQuorumServersString(conf);
133    }
134    this.sessionTimeoutMs = conf.getInt(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT);
135    this.maxRetries = conf.getInt(RECOVERY_RETRY, DEFAULT_RECOVERY_RETRY);
136    this.retryIntervalMs =
137      conf.getInt(RECOVERY_RETRY_INTERVAL_MILLIS, DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS);
138    this.keepAliveTimeMs = conf.getInt(KEEPALIVE_MILLIS, DEFAULT_KEEPALIVE_MILLIS);
139    LOG.debug(
140      "Connect {} to {} with session timeout={}ms, retries {}, "
141        + "retry interval {}ms, keepAlive={}ms",
142      getId(), connectString, sessionTimeoutMs, maxRetries, retryIntervalMs, keepAliveTimeMs);
143    Threads.setDaemonThreadRunning(new Thread(this::run),
144      "ReadOnlyZKClient-" + connectString + "@" + getId());
145  }
146
147  private abstract class ZKTask<T> extends Task {
148
149    protected final String path;
150
151    private final CompletableFuture<T> future;
152
153    private final String operationType;
154
155    private int retries;
156
157    protected ZKTask(String path, CompletableFuture<T> future, String operationType) {
158      this.path = path;
159      this.future = future;
160      this.operationType = operationType;
161    }
162
163    protected final void onComplete(ZooKeeper zk, int rc, T ret, boolean errorIfNoNode) {
164      tasks.add(new Task() {
165
166        @Override
167        public void exec(ZooKeeper alwaysNull) {
168          pendingRequests--;
169          Code code = Code.get(rc);
170          if (code == Code.OK) {
171            future.complete(ret);
172          } else if (code == Code.NONODE) {
173            if (errorIfNoNode) {
174              future.completeExceptionally(KeeperException.create(code, path));
175            } else {
176              future.complete(ret);
177            }
178          } else if (FAIL_FAST_CODES.contains(code)) {
179            future.completeExceptionally(KeeperException.create(code, path));
180          } else {
181            if (code == Code.SESSIONEXPIRED) {
182              LOG.warn("{} to {} session expired, close and reconnect", getId(), connectString);
183              try {
184                zk.close();
185              } catch (InterruptedException e) {
186                // Restore interrupt status
187                Thread.currentThread().interrupt();
188              }
189            }
190            if (ZKTask.this.delay(retryIntervalMs, maxRetries)) {
191              LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}", getId(),
192                connectString, operationType, path, code, ZKTask.this.retries);
193              tasks.add(ZKTask.this);
194            } else {
195              LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}, give up", getId(),
196                connectString, operationType, path, code, ZKTask.this.retries);
197              future.completeExceptionally(KeeperException.create(code, path));
198            }
199          }
200        }
201
202        @Override
203        public void closed(IOException e) {
204          // It may happen that a request is succeeded and the onComplete has been called and pushed
205          // us into the task queue, but before we get called a close is called and here we will
206          // fail the request, although it is succeeded actually.
207          // This is not a perfect solution but anyway, it is better than hang the requests for
208          // ever, and also acceptable as if you close the zk client before actually getting the
209          // response then a failure is always possible.
210          future.completeExceptionally(e);
211        }
212      });
213    }
214
215    @Override
216    public boolean needZk() {
217      return true;
218    }
219
220    protected abstract void doExec(ZooKeeper zk);
221
222    @Override
223    public final void exec(ZooKeeper zk) {
224      pendingRequests++;
225      doExec(zk);
226    }
227
228    public boolean delay(long intervalMs, int maxRetries) {
229      if (retries >= maxRetries) {
230        return false;
231      }
232      retries++;
233      time = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(intervalMs);
234      return true;
235    }
236
237    @Override
238    public void connectFailed(Exception e) {
239      if (delay(retryIntervalMs, maxRetries)) {
240        LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}", getId(),
241          connectString, operationType, path, retries, e);
242        tasks.add(this);
243      } else {
244        LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}, give up", getId(),
245          connectString, operationType, path, retries, e);
246        future.completeExceptionally(e);
247      }
248    }
249
250    @Override
251    public void closed(IOException e) {
252      future.completeExceptionally(e);
253    }
254  }
255
256  public CompletableFuture<byte[]> get(String path) {
257    if (closed.get()) {
258      return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));
259    }
260    CompletableFuture<byte[]> future = new CompletableFuture<>();
261    tasks.add(new ZKTask<byte[]>(path, future, "get") {
262
263      @Override
264      protected void doExec(ZooKeeper zk) {
265        zk.getData(path, false, (rc, path, ctx, data, stat) -> onComplete(zk, rc, data, true),
266          null);
267      }
268    });
269    return future;
270  }
271
272  public CompletableFuture<Stat> exists(String path) {
273    if (closed.get()) {
274      return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));
275    }
276    CompletableFuture<Stat> future = new CompletableFuture<>();
277    tasks.add(new ZKTask<Stat>(path, future, "exists") {
278
279      @Override
280      protected void doExec(ZooKeeper zk) {
281        zk.exists(path, false, (rc, path, ctx, stat) -> onComplete(zk, rc, stat, false), null);
282      }
283    });
284    return future;
285  }
286
287  public CompletableFuture<List<String>> list(String path) {
288    if (closed.get()) {
289      return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));
290    }
291    CompletableFuture<List<String>> future = new CompletableFuture<>();
292    tasks.add(new ZKTask<List<String>>(path, future, "list") {
293
294      @Override
295      protected void doExec(ZooKeeper zk) {
296        zk.getChildren(path, false, (rc, path, ctx, children) -> onComplete(zk, rc, children, true),
297          null);
298      }
299    });
300    return future;
301  }
302
303  private void closeZk() {
304    if (zookeeper != null) {
305      try {
306        zookeeper.close();
307      } catch (InterruptedException e) {
308        // Restore interrupt status
309        Thread.currentThread().interrupt();
310      }
311      zookeeper = null;
312    }
313  }
314
315  private ZooKeeper getZk() throws IOException {
316    // may be closed when session expired
317    if (zookeeper == null || !zookeeper.getState().isAlive()) {
318      zookeeper = new ZooKeeper(connectString, sessionTimeoutMs, e -> {
319      });
320    }
321    return zookeeper;
322  }
323
324  private void run() {
325    for (;;) {
326      Task task;
327      try {
328        task = tasks.poll(keepAliveTimeMs, TimeUnit.MILLISECONDS);
329      } catch (InterruptedException e) {
330        continue;
331      }
332      if (task == CLOSE) {
333        break;
334      }
335      if (task == null) {
336        if (pendingRequests == 0) {
337          LOG.trace("{} to {} inactive for {}ms; closing (Will reconnect when new requests)",
338            getId(), connectString, keepAliveTimeMs);
339          closeZk();
340        }
341        continue;
342      }
343      if (!task.needZk()) {
344        task.exec(null);
345      } else {
346        ZooKeeper zk;
347        try {
348          zk = getZk();
349        } catch (Exception e) {
350          task.connectFailed(e);
351          continue;
352        }
353        task.exec(zk);
354      }
355    }
356    closeZk();
357    DoNotRetryIOException error = new DoNotRetryIOException("Client already closed");
358    Arrays.stream(tasks.toArray(new Task[0])).forEach(t -> t.closed(error));
359    tasks.clear();
360  }
361
362  @Override
363  public void close() {
364    if (closed.compareAndSet(false, true)) {
365      LOG.debug("Close zookeeper connection {} to {}", getId(), connectString);
366      tasks.add(CLOSE);
367    }
368  }
369
370  public String getConnectString() {
371    return connectString;
372  }
373}