001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.zookeeper;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT;
021import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT;
022
023import java.io.Closeable;
024import java.io.IOException;
025import java.util.Arrays;
026import java.util.EnumSet;
027import java.util.List;
028import java.util.concurrent.CompletableFuture;
029import java.util.concurrent.DelayQueue;
030import java.util.concurrent.Delayed;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.atomic.AtomicBoolean;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.DoNotRetryIOException;
035import org.apache.hadoop.hbase.util.FutureUtils;
036import org.apache.hadoop.hbase.util.Threads;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.apache.zookeeper.KeeperException;
039import org.apache.zookeeper.KeeperException.Code;
040import org.apache.zookeeper.ZooKeeper;
041import org.apache.zookeeper.client.ZKClientConfig;
042import org.apache.zookeeper.data.Stat;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 * A very simple read only zookeeper implementation without watcher support.
048 */
049@InterfaceAudience.Private
050public final class ReadOnlyZKClient implements Closeable {
051
052  private static final Logger LOG = LoggerFactory.getLogger(ReadOnlyZKClient.class);
053
054  public static final String RECOVERY_RETRY = "zookeeper.recovery.retry";
055
056  private static final int DEFAULT_RECOVERY_RETRY = 30;
057
058  public static final String RECOVERY_RETRY_INTERVAL_MILLIS =
059    "zookeeper.recovery.retry.intervalmill";
060
061  private static final int DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS = 1000;
062
063  public static final String KEEPALIVE_MILLIS = "zookeeper.keep-alive.time";
064
065  private static final int DEFAULT_KEEPALIVE_MILLIS = 60000;
066
067  private static final EnumSet<Code> FAIL_FAST_CODES = EnumSet.of(Code.NOAUTH, Code.AUTHFAILED);
068
069  private final String connectString;
070
071  private final int sessionTimeoutMs;
072
073  private final int maxRetries;
074
075  private final int retryIntervalMs;
076
077  private final int keepAliveTimeMs;
078
079  private final ZKClientConfig zkClientConfig;
080
081  private static abstract class Task implements Delayed {
082
083    protected long time = System.nanoTime();
084
085    public boolean needZk() {
086      return false;
087    }
088
089    public void exec(ZooKeeper zk) {
090    }
091
092    public void connectFailed(Exception e) {
093    }
094
095    public void closed(IOException e) {
096    }
097
098    @Override
099    public int compareTo(Delayed o) {
100      Task that = (Task) o;
101      int c = Long.compare(time, that.time);
102      if (c != 0) {
103        return c;
104      }
105      return Integer.compare(System.identityHashCode(this), System.identityHashCode(that));
106    }
107
108    @Override
109    public long getDelay(TimeUnit unit) {
110      return unit.convert(time - System.nanoTime(), TimeUnit.NANOSECONDS);
111    }
112  }
113
114  private static final Task CLOSE = new Task() {
115  };
116
117  private final DelayQueue<Task> tasks = new DelayQueue<>();
118
119  private final AtomicBoolean closed = new AtomicBoolean(false);
120
121  ZooKeeper zookeeper;
122
123  private int pendingRequests = 0;
124
125  private String getId() {
126    return String.format("0x%08x", System.identityHashCode(this));
127  }
128
129  public ReadOnlyZKClient(Configuration conf) {
130    // We might use a different ZK for client access
131    String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf);
132    if (clientZkQuorumServers != null) {
133      this.connectString = clientZkQuorumServers;
134    } else {
135      this.connectString = ZKConfig.getZKQuorumServersString(conf);
136    }
137    this.sessionTimeoutMs = conf.getInt(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT);
138    this.maxRetries = conf.getInt(RECOVERY_RETRY, DEFAULT_RECOVERY_RETRY);
139    this.retryIntervalMs =
140      conf.getInt(RECOVERY_RETRY_INTERVAL_MILLIS, DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS);
141    this.keepAliveTimeMs = conf.getInt(KEEPALIVE_MILLIS, DEFAULT_KEEPALIVE_MILLIS);
142    this.zkClientConfig = ZKConfig.getZKClientConfig(conf);
143    LOG.debug(
144      "Connect {} to {} with session timeout={}ms, retries={}, "
145        + "retry interval={}ms, keepAlive={}ms, zk client config={}",
146      getId(), connectString, sessionTimeoutMs, maxRetries, retryIntervalMs, keepAliveTimeMs,
147      zkClientConfig);
148    Threads.setDaemonThreadRunning(new Thread(this::run),
149      "ReadOnlyZKClient-" + connectString + "@" + getId());
150  }
151
152  private abstract class ZKTask<T> extends Task {
153
154    protected final String path;
155
156    private final CompletableFuture<T> future;
157
158    private final String operationType;
159
160    private int retries;
161
162    protected ZKTask(String path, CompletableFuture<T> future, String operationType) {
163      this.path = path;
164      this.future = future;
165      this.operationType = operationType;
166    }
167
168    protected final void onComplete(ZooKeeper zk, int rc, T ret, boolean errorIfNoNode) {
169      tasks.add(new Task() {
170
171        @Override
172        public void exec(ZooKeeper alwaysNull) {
173          pendingRequests--;
174          Code code = Code.get(rc);
175          if (code == Code.OK) {
176            future.complete(ret);
177          } else if (code == Code.NONODE) {
178            if (errorIfNoNode) {
179              future.completeExceptionally(KeeperException.create(code, path));
180            } else {
181              future.complete(ret);
182            }
183          } else if (FAIL_FAST_CODES.contains(code)) {
184            future.completeExceptionally(KeeperException.create(code, path));
185          } else {
186            if (code == Code.SESSIONEXPIRED) {
187              LOG.warn("{} to {} session expired, close and reconnect", getId(), connectString);
188              try {
189                zk.close();
190              } catch (InterruptedException e) {
191                // Restore interrupt status
192                Thread.currentThread().interrupt();
193              }
194            }
195            if (ZKTask.this.delay(retryIntervalMs, maxRetries)) {
196              LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}", getId(),
197                connectString, operationType, path, code, ZKTask.this.retries);
198              tasks.add(ZKTask.this);
199            } else {
200              LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}, give up", getId(),
201                connectString, operationType, path, code, ZKTask.this.retries);
202              future.completeExceptionally(KeeperException.create(code, path));
203            }
204          }
205        }
206
207        @Override
208        public void closed(IOException e) {
209          // It may happen that a request is succeeded and the onComplete has been called and pushed
210          // us into the task queue, but before we get called a close is called and here we will
211          // fail the request, although it is succeeded actually.
212          // This is not a perfect solution but anyway, it is better than hang the requests for
213          // ever, and also acceptable as if you close the zk client before actually getting the
214          // response then a failure is always possible.
215          future.completeExceptionally(e);
216        }
217      });
218    }
219
220    @Override
221    public boolean needZk() {
222      return true;
223    }
224
225    protected abstract void doExec(ZooKeeper zk);
226
227    @Override
228    public final void exec(ZooKeeper zk) {
229      pendingRequests++;
230      doExec(zk);
231    }
232
233    public boolean delay(long intervalMs, int maxRetries) {
234      if (retries >= maxRetries) {
235        return false;
236      }
237      retries++;
238      time = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(intervalMs);
239      return true;
240    }
241
242    @Override
243    public void connectFailed(Exception e) {
244      if (delay(retryIntervalMs, maxRetries)) {
245        LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}", getId(),
246          connectString, operationType, path, retries, e);
247        tasks.add(this);
248      } else {
249        LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}, give up", getId(),
250          connectString, operationType, path, retries, e);
251        future.completeExceptionally(e);
252      }
253    }
254
255    @Override
256    public void closed(IOException e) {
257      future.completeExceptionally(e);
258    }
259  }
260
261  public CompletableFuture<byte[]> get(String path) {
262    if (closed.get()) {
263      return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));
264    }
265    CompletableFuture<byte[]> future = new CompletableFuture<>();
266    tasks.add(new ZKTask<byte[]>(path, future, "get") {
267
268      @Override
269      protected void doExec(ZooKeeper zk) {
270        zk.getData(path, false, (rc, path, ctx, data, stat) -> onComplete(zk, rc, data, true),
271          null);
272      }
273    });
274    return future;
275  }
276
277  public CompletableFuture<Stat> exists(String path) {
278    if (closed.get()) {
279      return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));
280    }
281    CompletableFuture<Stat> future = new CompletableFuture<>();
282    tasks.add(new ZKTask<Stat>(path, future, "exists") {
283
284      @Override
285      protected void doExec(ZooKeeper zk) {
286        zk.exists(path, false, (rc, path, ctx, stat) -> onComplete(zk, rc, stat, false), null);
287      }
288    });
289    return future;
290  }
291
292  public CompletableFuture<List<String>> list(String path) {
293    if (closed.get()) {
294      return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));
295    }
296    CompletableFuture<List<String>> future = new CompletableFuture<>();
297    tasks.add(new ZKTask<List<String>>(path, future, "list") {
298
299      @Override
300      protected void doExec(ZooKeeper zk) {
301        zk.getChildren(path, false, (rc, path, ctx, children) -> onComplete(zk, rc, children, true),
302          null);
303      }
304    });
305    return future;
306  }
307
308  private void closeZk() {
309    if (zookeeper != null) {
310      try {
311        zookeeper.close();
312      } catch (InterruptedException e) {
313        // Restore interrupt status
314        Thread.currentThread().interrupt();
315      }
316      zookeeper = null;
317    }
318  }
319
320  private ZooKeeper getZk() throws IOException {
321    // may be closed when session expired
322    if (zookeeper == null || !zookeeper.getState().isAlive()) {
323      zookeeper = new ZooKeeper(connectString, sessionTimeoutMs, e -> {
324      }, zkClientConfig);
325    }
326    return zookeeper;
327  }
328
329  private void run() {
330    for (;;) {
331      Task task;
332      try {
333        task = tasks.poll(keepAliveTimeMs, TimeUnit.MILLISECONDS);
334      } catch (InterruptedException e) {
335        continue;
336      }
337      if (task == CLOSE) {
338        break;
339      }
340      if (task == null) {
341        if (pendingRequests == 0) {
342          LOG.trace("{} to {} inactive for {}ms; closing (Will reconnect when new requests)",
343            getId(), connectString, keepAliveTimeMs);
344          closeZk();
345        }
346        continue;
347      }
348      if (!task.needZk()) {
349        task.exec(null);
350      } else {
351        ZooKeeper zk;
352        try {
353          zk = getZk();
354        } catch (Exception e) {
355          task.connectFailed(e);
356          continue;
357        }
358        task.exec(zk);
359      }
360    }
361    closeZk();
362    DoNotRetryIOException error = new DoNotRetryIOException("Client already closed");
363    Arrays.stream(tasks.toArray(new Task[0])).forEach(t -> t.closed(error));
364    tasks.clear();
365  }
366
367  @Override
368  public void close() {
369    if (closed.compareAndSet(false, true)) {
370      LOG.debug("Close zookeeper connection {} to {}", getId(), connectString);
371      tasks.add(CLOSE);
372    }
373  }
374
375  public String getConnectString() {
376    return connectString;
377  }
378}