001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.zookeeper; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT; 021import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT; 022 023import java.io.Closeable; 024import java.io.IOException; 025import java.util.Arrays; 026import java.util.EnumSet; 027import java.util.List; 028import java.util.concurrent.CompletableFuture; 029import java.util.concurrent.DelayQueue; 030import java.util.concurrent.Delayed; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.atomic.AtomicBoolean; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.DoNotRetryIOException; 035import org.apache.hadoop.hbase.util.FutureUtils; 036import org.apache.hadoop.hbase.util.Threads; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.apache.zookeeper.KeeperException; 039import org.apache.zookeeper.KeeperException.Code; 040import org.apache.zookeeper.ZooKeeper; 041import org.apache.zookeeper.client.ZKClientConfig; 042import org.apache.zookeeper.data.Stat; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 047import org.apache.hbase.thirdparty.io.netty.util.TimerTask; 048 049/** 050 * A very simple read only zookeeper implementation without watcher support. 051 */ 052@InterfaceAudience.Private 053public final class ReadOnlyZKClient implements Closeable { 054 055 private static final Logger LOG = LoggerFactory.getLogger(ReadOnlyZKClient.class); 056 057 public static final String RECOVERY_RETRY = "zookeeper.recovery.retry"; 058 059 private static final int DEFAULT_RECOVERY_RETRY = 30; 060 061 public static final String RECOVERY_RETRY_INTERVAL_MILLIS = 062 "zookeeper.recovery.retry.intervalmill"; 063 064 private static final int DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS = 1000; 065 066 public static final String KEEPALIVE_MILLIS = "zookeeper.keep-alive.time"; 067 068 private static final int DEFAULT_KEEPALIVE_MILLIS = 60000; 069 070 private static final EnumSet<Code> FAIL_FAST_CODES = EnumSet.of(Code.NOAUTH, Code.AUTHFAILED); 071 072 private final String connectString; 073 074 private final int sessionTimeoutMs; 075 076 private final int maxRetries; 077 078 private final int retryIntervalMs; 079 080 private final int keepAliveTimeMs; 081 082 private HashedWheelTimer retryTimer; 083 084 private final ZKClientConfig zkClientConfig; 085 086 private static abstract class Task implements Delayed { 087 088 protected long time = System.nanoTime(); 089 090 public boolean needZk() { 091 return false; 092 } 093 094 public void exec(ZooKeeper zk) { 095 } 096 097 public void connectFailed(Exception e) { 098 } 099 100 public void closed(IOException e) { 101 } 102 103 @Override 104 public int compareTo(Delayed o) { 105 Task that = (Task) o; 106 int c = Long.compare(time, that.time); 107 if (c != 0) { 108 return c; 109 } 110 return Integer.compare(System.identityHashCode(this), System.identityHashCode(that)); 111 } 112 113 @Override 114 public long getDelay(TimeUnit unit) { 115 return unit.convert(time - System.nanoTime(), TimeUnit.NANOSECONDS); 116 } 117 } 118 119 private static final Task CLOSE = new Task() { 120 }; 121 122 private final DelayQueue<Task> tasks = new DelayQueue<>(); 123 124 private final AtomicBoolean closed = new AtomicBoolean(false); 125 126 ZooKeeper zookeeper; 127 128 private int pendingRequests = 0; 129 130 private String getId() { 131 return String.format("0x%08x", System.identityHashCode(this)); 132 } 133 134 public ReadOnlyZKClient(Configuration conf, HashedWheelTimer retryTimer) { 135 // We might use a different ZK for client access 136 String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf); 137 if (clientZkQuorumServers != null) { 138 this.connectString = clientZkQuorumServers; 139 } else { 140 this.connectString = ZKConfig.getZKQuorumServersString(conf); 141 } 142 this.sessionTimeoutMs = conf.getInt(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT); 143 this.maxRetries = conf.getInt(RECOVERY_RETRY, DEFAULT_RECOVERY_RETRY); 144 this.retryIntervalMs = 145 conf.getInt(RECOVERY_RETRY_INTERVAL_MILLIS, DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS); 146 this.keepAliveTimeMs = conf.getInt(KEEPALIVE_MILLIS, DEFAULT_KEEPALIVE_MILLIS); 147 this.zkClientConfig = ZKConfig.getZKClientConfig(conf); 148 this.retryTimer = retryTimer; 149 LOG.debug( 150 "Connect {} to {} with session timeout={}ms, retries={}, " 151 + "retry interval={}ms, keepAlive={}ms, zk client config={}", 152 getId(), connectString, sessionTimeoutMs, maxRetries, retryIntervalMs, keepAliveTimeMs, 153 zkClientConfig); 154 Threads.setDaemonThreadRunning(new Thread(this::run), 155 "ReadOnlyZKClient-" + connectString + "@" + getId()); 156 } 157 158 private abstract class ZKTask<T> extends Task { 159 160 protected final String path; 161 162 private final CompletableFuture<T> future; 163 164 private final String operationType; 165 166 private int retries; 167 168 protected ZKTask(String path, CompletableFuture<T> future, String operationType) { 169 this.path = path; 170 this.future = future; 171 this.operationType = operationType; 172 } 173 174 protected final void onComplete(ZooKeeper zk, int rc, T ret, boolean errorIfNoNode) { 175 tasks.add(new Task() { 176 177 @Override 178 public void exec(ZooKeeper alwaysNull) { 179 pendingRequests--; 180 Code code = Code.get(rc); 181 if (code == Code.OK) { 182 future.complete(ret); 183 } else if (code == Code.NONODE) { 184 if (errorIfNoNode) { 185 future.completeExceptionally(KeeperException.create(code, path)); 186 } else { 187 future.complete(ret); 188 } 189 } else if (FAIL_FAST_CODES.contains(code)) { 190 future.completeExceptionally(KeeperException.create(code, path)); 191 } else { 192 if (code == Code.SESSIONEXPIRED) { 193 LOG.warn("{} to {} session expired, close and reconnect", getId(), connectString); 194 try { 195 zk.close(); 196 } catch (InterruptedException e) { 197 // Restore interrupt status 198 Thread.currentThread().interrupt(); 199 } 200 } 201 if (ZKTask.this.delay(retryIntervalMs, maxRetries)) { 202 LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}", getId(), 203 connectString, operationType, path, code, ZKTask.this.retries); 204 tasks.add(ZKTask.this); 205 } else { 206 LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}, give up", getId(), 207 connectString, operationType, path, code, ZKTask.this.retries); 208 future.completeExceptionally(KeeperException.create(code, path)); 209 } 210 } 211 } 212 213 @Override 214 public void closed(IOException e) { 215 // It may happen that a request is succeeded and the onComplete has been called and pushed 216 // us into the task queue, but before we get called a close is called and here we will 217 // fail the request, although it is succeeded actually. 218 // This is not a perfect solution but anyway, it is better than hang the requests for 219 // ever, and also acceptable as if you close the zk client before actually getting the 220 // response then a failure is always possible. 221 future.completeExceptionally(e); 222 } 223 }); 224 } 225 226 @Override 227 public boolean needZk() { 228 return true; 229 } 230 231 protected abstract void doExec(ZooKeeper zk); 232 233 @Override 234 public final void exec(ZooKeeper zk) { 235 pendingRequests++; 236 doExec(zk); 237 } 238 239 public boolean delay(long intervalMs, int maxRetries) { 240 if (retries >= maxRetries) { 241 return false; 242 } 243 retries++; 244 time = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(intervalMs); 245 return true; 246 } 247 248 @Override 249 public void connectFailed(Exception e) { 250 if (delay(retryIntervalMs, maxRetries)) { 251 LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}", getId(), 252 connectString, operationType, path, retries, e); 253 tasks.add(this); 254 } else { 255 LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}, give up", getId(), 256 connectString, operationType, path, retries, e); 257 future.completeExceptionally(e); 258 } 259 } 260 261 @Override 262 public void closed(IOException e) { 263 future.completeExceptionally(e); 264 } 265 } 266 267 private static TimerTask getTimerTask(final long timeoutMs, final CompletableFuture<?> future, 268 final String api) { 269 return timeout -> { 270 if (!future.isDone()) { 271 future.completeExceptionally(new DoNotRetryIOException( 272 "Zookeeper " + api + " could not be completed in " + timeoutMs + " ms")); 273 } 274 }; 275 } 276 277 public CompletableFuture<byte[]> get(final String path, final long timeoutMs) { 278 CompletableFuture<byte[]> future = get(path); 279 TimerTask timerTask = getTimerTask(timeoutMs, future, "GET"); 280 retryTimer.newTimeout(timerTask, timeoutMs + 1, TimeUnit.MILLISECONDS); 281 return future; 282 } 283 284 public CompletableFuture<byte[]> get(String path) { 285 if (closed.get()) { 286 return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed")); 287 } 288 CompletableFuture<byte[]> future = new CompletableFuture<>(); 289 tasks.add(new ZKTask<byte[]>(path, future, "get") { 290 291 @Override 292 protected void doExec(ZooKeeper zk) { 293 zk.getData(path, false, (rc, path, ctx, data, stat) -> onComplete(zk, rc, data, true), 294 null); 295 } 296 }); 297 return future; 298 } 299 300 public CompletableFuture<Stat> exists(String path, long timeoutMs) { 301 CompletableFuture<Stat> future = exists(path); 302 TimerTask timerTask = getTimerTask(timeoutMs, future, "EXISTS"); 303 retryTimer.newTimeout(timerTask, timeoutMs + 1, TimeUnit.MILLISECONDS); 304 return future; 305 } 306 307 public CompletableFuture<Stat> exists(String path) { 308 if (closed.get()) { 309 return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed")); 310 } 311 CompletableFuture<Stat> future = new CompletableFuture<>(); 312 tasks.add(new ZKTask<Stat>(path, future, "exists") { 313 314 @Override 315 protected void doExec(ZooKeeper zk) { 316 zk.exists(path, false, (rc, path, ctx, stat) -> onComplete(zk, rc, stat, false), null); 317 } 318 }); 319 return future; 320 } 321 322 public CompletableFuture<List<String>> list(String path, long timeoutMs) { 323 CompletableFuture<List<String>> future = list(path); 324 TimerTask timerTask = getTimerTask(timeoutMs, future, "LIST"); 325 retryTimer.newTimeout(timerTask, timeoutMs + 1, TimeUnit.MILLISECONDS); 326 return future; 327 } 328 329 public CompletableFuture<List<String>> list(String path) { 330 if (closed.get()) { 331 return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed")); 332 } 333 CompletableFuture<List<String>> future = new CompletableFuture<>(); 334 tasks.add(new ZKTask<List<String>>(path, future, "list") { 335 336 @Override 337 protected void doExec(ZooKeeper zk) { 338 zk.getChildren(path, false, (rc, path, ctx, children) -> onComplete(zk, rc, children, true), 339 null); 340 } 341 }); 342 return future; 343 } 344 345 private void closeZk() { 346 if (zookeeper != null) { 347 try { 348 zookeeper.close(); 349 } catch (InterruptedException e) { 350 // Restore interrupt status 351 Thread.currentThread().interrupt(); 352 } 353 zookeeper = null; 354 } 355 } 356 357 private ZooKeeper getZk() throws IOException { 358 // may be closed when session expired 359 if (zookeeper == null || !zookeeper.getState().isAlive()) { 360 zookeeper = new ZooKeeper(connectString, sessionTimeoutMs, e -> { 361 }, zkClientConfig); 362 } 363 return zookeeper; 364 } 365 366 private void run() { 367 for (;;) { 368 Task task; 369 try { 370 task = tasks.poll(keepAliveTimeMs, TimeUnit.MILLISECONDS); 371 } catch (InterruptedException e) { 372 continue; 373 } 374 if (task == CLOSE) { 375 break; 376 } 377 if (task == null) { 378 if (pendingRequests == 0) { 379 LOG.trace("{} to {} inactive for {}ms; closing (Will reconnect when new requests)", 380 getId(), connectString, keepAliveTimeMs); 381 closeZk(); 382 } 383 continue; 384 } 385 if (!task.needZk()) { 386 task.exec(null); 387 } else { 388 ZooKeeper zk; 389 try { 390 zk = getZk(); 391 } catch (Exception e) { 392 task.connectFailed(e); 393 continue; 394 } 395 task.exec(zk); 396 } 397 } 398 closeZk(); 399 DoNotRetryIOException error = new DoNotRetryIOException("Client already closed"); 400 Arrays.stream(tasks.toArray(new Task[0])).forEach(t -> t.closed(error)); 401 tasks.clear(); 402 } 403 404 @Override 405 public void close() { 406 if (closed.compareAndSet(false, true)) { 407 LOG.debug("Close zookeeper connection {} to {}", getId(), connectString); 408 tasks.add(CLOSE); 409 } 410 } 411 412 public String getConnectString() { 413 return connectString; 414 } 415}