001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.zookeeper;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT;
021import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT;
022
023import java.io.Closeable;
024import java.io.IOException;
025import java.util.Arrays;
026import java.util.EnumSet;
027import java.util.concurrent.CompletableFuture;
028import java.util.concurrent.DelayQueue;
029import java.util.concurrent.Delayed;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicBoolean;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.DoNotRetryIOException;
034import org.apache.hadoop.hbase.util.FutureUtils;
035import org.apache.hadoop.hbase.util.Threads;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.apache.zookeeper.KeeperException;
038import org.apache.zookeeper.KeeperException.Code;
039import org.apache.zookeeper.ZooKeeper;
040import org.apache.zookeeper.data.Stat;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
045
046/**
047 * A very simple read only zookeeper implementation without watcher support.
048 */
049@InterfaceAudience.Private
050public final class ReadOnlyZKClient implements Closeable {
051
052  private static final Logger LOG = LoggerFactory.getLogger(ReadOnlyZKClient.class);
053
054  public static final String RECOVERY_RETRY = "zookeeper.recovery.retry";
055
056  private static final int DEFAULT_RECOVERY_RETRY = 30;
057
058  public static final String RECOVERY_RETRY_INTERVAL_MILLIS =
059      "zookeeper.recovery.retry.intervalmill";
060
061  private static final int DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS = 1000;
062
063  public static final String KEEPALIVE_MILLIS = "zookeeper.keep-alive.time";
064
065  private static final int DEFAULT_KEEPALIVE_MILLIS = 60000;
066
067  private static final EnumSet<Code> FAIL_FAST_CODES = EnumSet.of(Code.NOAUTH, Code.AUTHFAILED);
068
069  private final String connectString;
070
071  private final int sessionTimeoutMs;
072
073  private final int maxRetries;
074
075  private final int retryIntervalMs;
076
077  private final int keepAliveTimeMs;
078
079  private static abstract class Task implements Delayed {
080
081    protected long time = System.nanoTime();
082
083    public boolean needZk() {
084      return false;
085    }
086
087    public void exec(ZooKeeper zk) {
088    }
089
090    public void connectFailed(IOException e) {
091    }
092
093    public void closed(IOException e) {
094    }
095
096    @Override
097    public int compareTo(Delayed o) {
098      Task that = (Task) o;
099      int c = Long.compare(time, that.time);
100      if (c != 0) {
101        return c;
102      }
103      return Integer.compare(System.identityHashCode(this), System.identityHashCode(that));
104    }
105
106    @Override
107    public long getDelay(TimeUnit unit) {
108      return unit.convert(time - System.nanoTime(), TimeUnit.NANOSECONDS);
109    }
110  }
111
112  private static final Task CLOSE = new Task() {
113  };
114
115  private final DelayQueue<Task> tasks = new DelayQueue<>();
116
117  private final AtomicBoolean closed = new AtomicBoolean(false);
118
119  @VisibleForTesting
120  ZooKeeper zookeeper;
121
122  private int pendingRequests = 0;
123
124  private String getId() {
125    return String.format("0x%08x", System.identityHashCode(this));
126  }
127
128  public ReadOnlyZKClient(Configuration conf) {
129    this.connectString = ZKConfig.getZKQuorumServersString(conf);
130    this.sessionTimeoutMs = conf.getInt(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT);
131    this.maxRetries = conf.getInt(RECOVERY_RETRY, DEFAULT_RECOVERY_RETRY);
132    this.retryIntervalMs =
133        conf.getInt(RECOVERY_RETRY_INTERVAL_MILLIS, DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS);
134    this.keepAliveTimeMs = conf.getInt(KEEPALIVE_MILLIS, DEFAULT_KEEPALIVE_MILLIS);
135    LOG.info(
136      "Connect {} to {} with session timeout={}ms, retries {}, " +
137        "retry interval {}ms, keepAlive={}ms",
138      getId(), connectString, sessionTimeoutMs, maxRetries, retryIntervalMs, keepAliveTimeMs);
139    Threads.setDaemonThreadRunning(new Thread(this::run),
140      "ReadOnlyZKClient-" + connectString + "@" + getId());
141  }
142
143  private abstract class ZKTask<T> extends Task {
144
145    protected final String path;
146
147    private final CompletableFuture<T> future;
148
149    private final String operationType;
150
151    private int retries;
152
153    protected ZKTask(String path, CompletableFuture<T> future, String operationType) {
154      this.path = path;
155      this.future = future;
156      this.operationType = operationType;
157    }
158
159    protected final void onComplete(ZooKeeper zk, int rc, T ret, boolean errorIfNoNode) {
160      tasks.add(new Task() {
161
162        @Override
163        public void exec(ZooKeeper alwaysNull) {
164          pendingRequests--;
165          Code code = Code.get(rc);
166          if (code == Code.OK) {
167            future.complete(ret);
168          } else if (code == Code.NONODE) {
169            if (errorIfNoNode) {
170              future.completeExceptionally(KeeperException.create(code, path));
171            } else {
172              future.complete(ret);
173            }
174          } else if (FAIL_FAST_CODES.contains(code)) {
175            future.completeExceptionally(KeeperException.create(code, path));
176          } else {
177            if (code == Code.SESSIONEXPIRED) {
178              LOG.warn("{} to {} session expired, close and reconnect", getId(), connectString);
179              try {
180                zk.close();
181              } catch (InterruptedException e) {
182              }
183            }
184            if (ZKTask.this.delay(retryIntervalMs, maxRetries)) {
185              LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}", getId(),
186                connectString, operationType, path, code, ZKTask.this.retries);
187              tasks.add(ZKTask.this);
188            } else {
189              LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}, give up", getId(),
190                connectString, operationType, path, code, ZKTask.this.retries);
191              future.completeExceptionally(KeeperException.create(code, path));
192            }
193          }
194        }
195
196        @Override
197        public void closed(IOException e) {
198          // It may happen that a request is succeeded and the onComplete has been called and pushed
199          // us into the task queue, but before we get called a close is called and here we will
200          // fail the request, although it is succeeded actually.
201          // This is not a perfect solution but anyway, it is better than hang the requests for
202          // ever, and also acceptable as if you close the zk client before actually getting the
203          // response then a failure is always possible.
204          future.completeExceptionally(e);
205        }
206      });
207    }
208
209    @Override
210    public boolean needZk() {
211      return true;
212    }
213
214    protected abstract void doExec(ZooKeeper zk);
215
216    @Override
217    public final void exec(ZooKeeper zk) {
218      pendingRequests++;
219      doExec(zk);
220    }
221
222    public boolean delay(long intervalMs, int maxRetries) {
223      if (retries >= maxRetries) {
224        return false;
225      }
226      retries++;
227      time = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(intervalMs);
228      return true;
229    }
230
231    @Override
232    public void connectFailed(IOException e) {
233      if (delay(retryIntervalMs, maxRetries)) {
234        LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}", getId(),
235          connectString, operationType, path, retries, e);
236        tasks.add(this);
237      } else {
238        LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}, give up", getId(),
239          connectString, operationType, path, retries, e);
240        future.completeExceptionally(e);
241      }
242    }
243
244    @Override
245    public void closed(IOException e) {
246      future.completeExceptionally(e);
247    }
248  }
249
250  public CompletableFuture<byte[]> get(String path) {
251    if (closed.get()) {
252      return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));
253    }
254    CompletableFuture<byte[]> future = new CompletableFuture<>();
255    tasks.add(new ZKTask<byte[]>(path, future, "get") {
256
257      @Override
258      protected void doExec(ZooKeeper zk) {
259        zk.getData(path, false,
260            (rc, path, ctx, data, stat) -> onComplete(zk, rc, data, true), null);
261      }
262    });
263    return future;
264  }
265
266  public CompletableFuture<Stat> exists(String path) {
267    if (closed.get()) {
268      return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));
269    }
270    CompletableFuture<Stat> future = new CompletableFuture<>();
271    tasks.add(new ZKTask<Stat>(path, future, "exists") {
272
273      @Override
274      protected void doExec(ZooKeeper zk) {
275        zk.exists(path, false, (rc, path, ctx, stat) -> onComplete(zk, rc, stat, false), null);
276      }
277    });
278    return future;
279  }
280
281  private void closeZk() {
282    if (zookeeper != null) {
283      try {
284        zookeeper.close();
285      } catch (InterruptedException e) {
286      }
287      zookeeper = null;
288    }
289  }
290
291  private ZooKeeper getZk() throws IOException {
292    // may be closed when session expired
293    if (zookeeper == null || !zookeeper.getState().isAlive()) {
294      zookeeper = new ZooKeeper(connectString, sessionTimeoutMs, e -> {});
295    }
296    return zookeeper;
297  }
298
299  private void run() {
300    for (;;) {
301      Task task;
302      try {
303        task = tasks.poll(keepAliveTimeMs, TimeUnit.MILLISECONDS);
304      } catch (InterruptedException e) {
305        continue;
306      }
307      if (task == CLOSE) {
308        break;
309      }
310      if (task == null) {
311        if (pendingRequests == 0) {
312          LOG.trace("{} to {} inactive for {}ms; closing (Will reconnect when new requests)",
313            getId(), connectString, keepAliveTimeMs);
314          closeZk();
315        }
316        continue;
317      }
318      if (!task.needZk()) {
319        task.exec(null);
320      } else {
321        ZooKeeper zk;
322        try {
323          zk = getZk();
324        } catch (IOException e) {
325          task.connectFailed(e);
326          continue;
327        }
328        task.exec(zk);
329      }
330    }
331    closeZk();
332    DoNotRetryIOException error = new DoNotRetryIOException("Client already closed");
333    Arrays.stream(tasks.toArray(new Task[0])).forEach(t -> t.closed(error));
334    tasks.clear();
335  }
336
337  @Override
338  public void close() {
339    if (closed.compareAndSet(false, true)) {
340      LOG.info("Close zookeeper connection {} to {}", getId(), connectString);
341      tasks.add(CLOSE);
342    }
343  }
344
345  @VisibleForTesting
346  public String getConnectString() {
347    return connectString;
348  }
349}