001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.zookeeper;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT;
021import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT;
022
023import java.io.Closeable;
024import java.io.IOException;
025import java.util.Arrays;
026import java.util.EnumSet;
027import java.util.List;
028import java.util.concurrent.CompletableFuture;
029import java.util.concurrent.DelayQueue;
030import java.util.concurrent.Delayed;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.atomic.AtomicBoolean;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.DoNotRetryIOException;
035import org.apache.hadoop.hbase.util.FutureUtils;
036import org.apache.hadoop.hbase.util.Threads;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.apache.zookeeper.KeeperException;
039import org.apache.zookeeper.KeeperException.Code;
040import org.apache.zookeeper.ZooKeeper;
041import org.apache.zookeeper.data.Stat;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045/**
046 * A very simple read only zookeeper implementation without watcher support.
047 */
048@InterfaceAudience.Private
049public final class ReadOnlyZKClient implements Closeable {
050
051  private static final Logger LOG = LoggerFactory.getLogger(ReadOnlyZKClient.class);
052
053  public static final String RECOVERY_RETRY = "zookeeper.recovery.retry";
054
055  private static final int DEFAULT_RECOVERY_RETRY = 30;
056
057  public static final String RECOVERY_RETRY_INTERVAL_MILLIS =
058      "zookeeper.recovery.retry.intervalmill";
059
060  private static final int DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS = 1000;
061
062  public static final String KEEPALIVE_MILLIS = "zookeeper.keep-alive.time";
063
064  private static final int DEFAULT_KEEPALIVE_MILLIS = 60000;
065
066  private static final EnumSet<Code> FAIL_FAST_CODES = EnumSet.of(Code.NOAUTH, Code.AUTHFAILED);
067
068  private final String connectString;
069
070  private final int sessionTimeoutMs;
071
072  private final int maxRetries;
073
074  private final int retryIntervalMs;
075
076  private final int keepAliveTimeMs;
077
078  private static abstract class Task implements Delayed {
079
080    protected long time = System.nanoTime();
081
082    public boolean needZk() {
083      return false;
084    }
085
086    public void exec(ZooKeeper zk) {
087    }
088
089    public void connectFailed(IOException e) {
090    }
091
092    public void closed(IOException e) {
093    }
094
095    @Override
096    public int compareTo(Delayed o) {
097      Task that = (Task) o;
098      int c = Long.compare(time, that.time);
099      if (c != 0) {
100        return c;
101      }
102      return Integer.compare(System.identityHashCode(this), System.identityHashCode(that));
103    }
104
105    @Override
106    public long getDelay(TimeUnit unit) {
107      return unit.convert(time - System.nanoTime(), TimeUnit.NANOSECONDS);
108    }
109  }
110
111  private static final Task CLOSE = new Task() {
112  };
113
114  private final DelayQueue<Task> tasks = new DelayQueue<>();
115
116  private final AtomicBoolean closed = new AtomicBoolean(false);
117
118  ZooKeeper zookeeper;
119
120  private int pendingRequests = 0;
121
122  private String getId() {
123    return String.format("0x%08x", System.identityHashCode(this));
124  }
125
126  public ReadOnlyZKClient(Configuration conf) {
127    // We might use a different ZK for client access
128    String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf);
129    if (clientZkQuorumServers != null) {
130      this.connectString = clientZkQuorumServers;
131    } else {
132      this.connectString = ZKConfig.getZKQuorumServersString(conf);
133    }
134    this.sessionTimeoutMs = conf.getInt(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT);
135    this.maxRetries = conf.getInt(RECOVERY_RETRY, DEFAULT_RECOVERY_RETRY);
136    this.retryIntervalMs =
137        conf.getInt(RECOVERY_RETRY_INTERVAL_MILLIS, DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS);
138    this.keepAliveTimeMs = conf.getInt(KEEPALIVE_MILLIS, DEFAULT_KEEPALIVE_MILLIS);
139    LOG.debug(
140      "Connect {} to {} with session timeout={}ms, retries {}, " +
141        "retry interval {}ms, keepAlive={}ms",
142      getId(), connectString, sessionTimeoutMs, maxRetries, retryIntervalMs, keepAliveTimeMs);
143    Threads.setDaemonThreadRunning(new Thread(this::run),
144      "ReadOnlyZKClient-" + connectString + "@" + getId());
145  }
146
147  private abstract class ZKTask<T> extends Task {
148
149    protected final String path;
150
151    private final CompletableFuture<T> future;
152
153    private final String operationType;
154
155    private int retries;
156
157    protected ZKTask(String path, CompletableFuture<T> future, String operationType) {
158      this.path = path;
159      this.future = future;
160      this.operationType = operationType;
161    }
162
163    protected final void onComplete(ZooKeeper zk, int rc, T ret, boolean errorIfNoNode) {
164      tasks.add(new Task() {
165
166        @Override
167        public void exec(ZooKeeper alwaysNull) {
168          pendingRequests--;
169          Code code = Code.get(rc);
170          if (code == Code.OK) {
171            future.complete(ret);
172          } else if (code == Code.NONODE) {
173            if (errorIfNoNode) {
174              future.completeExceptionally(KeeperException.create(code, path));
175            } else {
176              future.complete(ret);
177            }
178          } else if (FAIL_FAST_CODES.contains(code)) {
179            future.completeExceptionally(KeeperException.create(code, path));
180          } else {
181            if (code == Code.SESSIONEXPIRED) {
182              LOG.warn("{} to {} session expired, close and reconnect", getId(), connectString);
183              try {
184                zk.close();
185              } catch (InterruptedException e) {
186              }
187            }
188            if (ZKTask.this.delay(retryIntervalMs, maxRetries)) {
189              LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}", getId(),
190                connectString, operationType, path, code, ZKTask.this.retries);
191              tasks.add(ZKTask.this);
192            } else {
193              LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}, give up", getId(),
194                connectString, operationType, path, code, ZKTask.this.retries);
195              future.completeExceptionally(KeeperException.create(code, path));
196            }
197          }
198        }
199
200        @Override
201        public void closed(IOException e) {
202          // It may happen that a request is succeeded and the onComplete has been called and pushed
203          // us into the task queue, but before we get called a close is called and here we will
204          // fail the request, although it is succeeded actually.
205          // This is not a perfect solution but anyway, it is better than hang the requests for
206          // ever, and also acceptable as if you close the zk client before actually getting the
207          // response then a failure is always possible.
208          future.completeExceptionally(e);
209        }
210      });
211    }
212
213    @Override
214    public boolean needZk() {
215      return true;
216    }
217
218    protected abstract void doExec(ZooKeeper zk);
219
220    @Override
221    public final void exec(ZooKeeper zk) {
222      pendingRequests++;
223      doExec(zk);
224    }
225
226    public boolean delay(long intervalMs, int maxRetries) {
227      if (retries >= maxRetries) {
228        return false;
229      }
230      retries++;
231      time = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(intervalMs);
232      return true;
233    }
234
235    @Override
236    public void connectFailed(IOException e) {
237      if (delay(retryIntervalMs, maxRetries)) {
238        LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}", getId(),
239          connectString, operationType, path, retries, e);
240        tasks.add(this);
241      } else {
242        LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}, give up", getId(),
243          connectString, operationType, path, retries, e);
244        future.completeExceptionally(e);
245      }
246    }
247
248    @Override
249    public void closed(IOException e) {
250      future.completeExceptionally(e);
251    }
252  }
253
254  public CompletableFuture<byte[]> get(String path) {
255    if (closed.get()) {
256      return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));
257    }
258    CompletableFuture<byte[]> future = new CompletableFuture<>();
259    tasks.add(new ZKTask<byte[]>(path, future, "get") {
260
261      @Override
262      protected void doExec(ZooKeeper zk) {
263        zk.getData(path, false,
264            (rc, path, ctx, data, stat) -> onComplete(zk, rc, data, true), null);
265      }
266    });
267    return future;
268  }
269
270  public CompletableFuture<Stat> exists(String path) {
271    if (closed.get()) {
272      return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));
273    }
274    CompletableFuture<Stat> future = new CompletableFuture<>();
275    tasks.add(new ZKTask<Stat>(path, future, "exists") {
276
277      @Override
278      protected void doExec(ZooKeeper zk) {
279        zk.exists(path, false, (rc, path, ctx, stat) -> onComplete(zk, rc, stat, false), null);
280      }
281    });
282    return future;
283  }
284
285  public CompletableFuture<List<String>> list(String path) {
286    if (closed.get()) {
287      return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));
288    }
289    CompletableFuture<List<String>> future = new CompletableFuture<>();
290    tasks.add(new ZKTask<List<String>>(path, future, "list") {
291
292      @Override
293      protected void doExec(ZooKeeper zk) {
294        zk.getChildren(path, false, (rc, path, ctx, children) -> onComplete(zk, rc, children, true),
295          null);
296      }
297    });
298    return future;
299  }
300
301  private void closeZk() {
302    if (zookeeper != null) {
303      try {
304        zookeeper.close();
305      } catch (InterruptedException e) {
306      }
307      zookeeper = null;
308    }
309  }
310
311  private ZooKeeper getZk() throws IOException {
312    // may be closed when session expired
313    if (zookeeper == null || !zookeeper.getState().isAlive()) {
314      zookeeper = new ZooKeeper(connectString, sessionTimeoutMs, e -> {});
315    }
316    return zookeeper;
317  }
318
319  private void run() {
320    for (;;) {
321      Task task;
322      try {
323        task = tasks.poll(keepAliveTimeMs, TimeUnit.MILLISECONDS);
324      } catch (InterruptedException e) {
325        continue;
326      }
327      if (task == CLOSE) {
328        break;
329      }
330      if (task == null) {
331        if (pendingRequests == 0) {
332          LOG.trace("{} to {} inactive for {}ms; closing (Will reconnect when new requests)",
333            getId(), connectString, keepAliveTimeMs);
334          closeZk();
335        }
336        continue;
337      }
338      if (!task.needZk()) {
339        task.exec(null);
340      } else {
341        ZooKeeper zk;
342        try {
343          zk = getZk();
344        } catch (IOException e) {
345          task.connectFailed(e);
346          continue;
347        }
348        task.exec(zk);
349      }
350    }
351    closeZk();
352    DoNotRetryIOException error = new DoNotRetryIOException("Client already closed");
353    Arrays.stream(tasks.toArray(new Task[0])).forEach(t -> t.closed(error));
354    tasks.clear();
355  }
356
357  @Override
358  public void close() {
359    if (closed.compareAndSet(false, true)) {
360      LOG.debug("Close zookeeper connection {} to {}", getId(), connectString);
361      tasks.add(CLOSE);
362    }
363  }
364
365  public String getConnectString() {
366    return connectString;
367  }
368}