001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.zookeeper; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT; 021import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT; 022 023import java.io.Closeable; 024import java.io.IOException; 025import java.util.Arrays; 026import java.util.EnumSet; 027import java.util.List; 028import java.util.concurrent.CompletableFuture; 029import java.util.concurrent.DelayQueue; 030import java.util.concurrent.Delayed; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.atomic.AtomicBoolean; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.DoNotRetryIOException; 035import org.apache.hadoop.hbase.util.FutureUtils; 036import org.apache.hadoop.hbase.util.Threads; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.apache.zookeeper.KeeperException; 039import org.apache.zookeeper.KeeperException.Code; 040import org.apache.zookeeper.ZooKeeper; 041import org.apache.zookeeper.data.Stat; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045/** 046 * A very simple read only zookeeper implementation without watcher support. 047 */ 048@InterfaceAudience.Private 049public final class ReadOnlyZKClient implements Closeable { 050 051 private static final Logger LOG = LoggerFactory.getLogger(ReadOnlyZKClient.class); 052 053 public static final String RECOVERY_RETRY = "zookeeper.recovery.retry"; 054 055 private static final int DEFAULT_RECOVERY_RETRY = 30; 056 057 public static final String RECOVERY_RETRY_INTERVAL_MILLIS = 058 "zookeeper.recovery.retry.intervalmill"; 059 060 private static final int DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS = 1000; 061 062 public static final String KEEPALIVE_MILLIS = "zookeeper.keep-alive.time"; 063 064 private static final int DEFAULT_KEEPALIVE_MILLIS = 60000; 065 066 private static final EnumSet<Code> FAIL_FAST_CODES = EnumSet.of(Code.NOAUTH, Code.AUTHFAILED); 067 068 private final String connectString; 069 070 private final int sessionTimeoutMs; 071 072 private final int maxRetries; 073 074 private final int retryIntervalMs; 075 076 private final int keepAliveTimeMs; 077 078 private static abstract class Task implements Delayed { 079 080 protected long time = System.nanoTime(); 081 082 public boolean needZk() { 083 return false; 084 } 085 086 public void exec(ZooKeeper zk) { 087 } 088 089 public void connectFailed(Exception e) { 090 } 091 092 public void closed(IOException e) { 093 } 094 095 @Override 096 public int compareTo(Delayed o) { 097 Task that = (Task) o; 098 int c = Long.compare(time, that.time); 099 if (c != 0) { 100 return c; 101 } 102 return Integer.compare(System.identityHashCode(this), System.identityHashCode(that)); 103 } 104 105 @Override 106 public long getDelay(TimeUnit unit) { 107 return unit.convert(time - System.nanoTime(), TimeUnit.NANOSECONDS); 108 } 109 } 110 111 private static final Task CLOSE = new Task() { 112 }; 113 114 private final DelayQueue<Task> tasks = new DelayQueue<>(); 115 116 private final AtomicBoolean closed = new AtomicBoolean(false); 117 118 ZooKeeper zookeeper; 119 120 private int pendingRequests = 0; 121 122 private String getId() { 123 return String.format("0x%08x", System.identityHashCode(this)); 124 } 125 126 public ReadOnlyZKClient(Configuration conf) { 127 // We might use a different ZK for client access 128 String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf); 129 if (clientZkQuorumServers != null) { 130 this.connectString = clientZkQuorumServers; 131 } else { 132 this.connectString = ZKConfig.getZKQuorumServersString(conf); 133 } 134 this.sessionTimeoutMs = conf.getInt(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT); 135 this.maxRetries = conf.getInt(RECOVERY_RETRY, DEFAULT_RECOVERY_RETRY); 136 this.retryIntervalMs = 137 conf.getInt(RECOVERY_RETRY_INTERVAL_MILLIS, DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS); 138 this.keepAliveTimeMs = conf.getInt(KEEPALIVE_MILLIS, DEFAULT_KEEPALIVE_MILLIS); 139 LOG.debug( 140 "Connect {} to {} with session timeout={}ms, retries {}, " 141 + "retry interval {}ms, keepAlive={}ms", 142 getId(), connectString, sessionTimeoutMs, maxRetries, retryIntervalMs, keepAliveTimeMs); 143 Threads.setDaemonThreadRunning(new Thread(this::run), 144 "ReadOnlyZKClient-" + connectString + "@" + getId()); 145 } 146 147 private abstract class ZKTask<T> extends Task { 148 149 protected final String path; 150 151 private final CompletableFuture<T> future; 152 153 private final String operationType; 154 155 private int retries; 156 157 protected ZKTask(String path, CompletableFuture<T> future, String operationType) { 158 this.path = path; 159 this.future = future; 160 this.operationType = operationType; 161 } 162 163 protected final void onComplete(ZooKeeper zk, int rc, T ret, boolean errorIfNoNode) { 164 tasks.add(new Task() { 165 166 @Override 167 public void exec(ZooKeeper alwaysNull) { 168 pendingRequests--; 169 Code code = Code.get(rc); 170 if (code == Code.OK) { 171 future.complete(ret); 172 } else if (code == Code.NONODE) { 173 if (errorIfNoNode) { 174 future.completeExceptionally(KeeperException.create(code, path)); 175 } else { 176 future.complete(ret); 177 } 178 } else if (FAIL_FAST_CODES.contains(code)) { 179 future.completeExceptionally(KeeperException.create(code, path)); 180 } else { 181 if (code == Code.SESSIONEXPIRED) { 182 LOG.warn("{} to {} session expired, close and reconnect", getId(), connectString); 183 try { 184 zk.close(); 185 } catch (InterruptedException e) { 186 // Restore interrupt status 187 Thread.currentThread().interrupt(); 188 } 189 } 190 if (ZKTask.this.delay(retryIntervalMs, maxRetries)) { 191 LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}", getId(), 192 connectString, operationType, path, code, ZKTask.this.retries); 193 tasks.add(ZKTask.this); 194 } else { 195 LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}, give up", getId(), 196 connectString, operationType, path, code, ZKTask.this.retries); 197 future.completeExceptionally(KeeperException.create(code, path)); 198 } 199 } 200 } 201 202 @Override 203 public void closed(IOException e) { 204 // It may happen that a request is succeeded and the onComplete has been called and pushed 205 // us into the task queue, but before we get called a close is called and here we will 206 // fail the request, although it is succeeded actually. 207 // This is not a perfect solution but anyway, it is better than hang the requests for 208 // ever, and also acceptable as if you close the zk client before actually getting the 209 // response then a failure is always possible. 210 future.completeExceptionally(e); 211 } 212 }); 213 } 214 215 @Override 216 public boolean needZk() { 217 return true; 218 } 219 220 protected abstract void doExec(ZooKeeper zk); 221 222 @Override 223 public final void exec(ZooKeeper zk) { 224 pendingRequests++; 225 doExec(zk); 226 } 227 228 public boolean delay(long intervalMs, int maxRetries) { 229 if (retries >= maxRetries) { 230 return false; 231 } 232 retries++; 233 time = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(intervalMs); 234 return true; 235 } 236 237 @Override 238 public void connectFailed(Exception e) { 239 if (delay(retryIntervalMs, maxRetries)) { 240 LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}", getId(), 241 connectString, operationType, path, retries, e); 242 tasks.add(this); 243 } else { 244 LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}, give up", getId(), 245 connectString, operationType, path, retries, e); 246 future.completeExceptionally(e); 247 } 248 } 249 250 @Override 251 public void closed(IOException e) { 252 future.completeExceptionally(e); 253 } 254 } 255 256 public CompletableFuture<byte[]> get(String path) { 257 if (closed.get()) { 258 return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed")); 259 } 260 CompletableFuture<byte[]> future = new CompletableFuture<>(); 261 tasks.add(new ZKTask<byte[]>(path, future, "get") { 262 263 @Override 264 protected void doExec(ZooKeeper zk) { 265 zk.getData(path, false, (rc, path, ctx, data, stat) -> onComplete(zk, rc, data, true), 266 null); 267 } 268 }); 269 return future; 270 } 271 272 public CompletableFuture<Stat> exists(String path) { 273 if (closed.get()) { 274 return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed")); 275 } 276 CompletableFuture<Stat> future = new CompletableFuture<>(); 277 tasks.add(new ZKTask<Stat>(path, future, "exists") { 278 279 @Override 280 protected void doExec(ZooKeeper zk) { 281 zk.exists(path, false, (rc, path, ctx, stat) -> onComplete(zk, rc, stat, false), null); 282 } 283 }); 284 return future; 285 } 286 287 public CompletableFuture<List<String>> list(String path) { 288 if (closed.get()) { 289 return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed")); 290 } 291 CompletableFuture<List<String>> future = new CompletableFuture<>(); 292 tasks.add(new ZKTask<List<String>>(path, future, "list") { 293 294 @Override 295 protected void doExec(ZooKeeper zk) { 296 zk.getChildren(path, false, (rc, path, ctx, children) -> onComplete(zk, rc, children, true), 297 null); 298 } 299 }); 300 return future; 301 } 302 303 private void closeZk() { 304 if (zookeeper != null) { 305 try { 306 zookeeper.close(); 307 } catch (InterruptedException e) { 308 // Restore interrupt status 309 Thread.currentThread().interrupt(); 310 } 311 zookeeper = null; 312 } 313 } 314 315 private ZooKeeper getZk() throws IOException { 316 // may be closed when session expired 317 if (zookeeper == null || !zookeeper.getState().isAlive()) { 318 zookeeper = new ZooKeeper(connectString, sessionTimeoutMs, e -> { 319 }); 320 } 321 return zookeeper; 322 } 323 324 private void run() { 325 for (;;) { 326 Task task; 327 try { 328 task = tasks.poll(keepAliveTimeMs, TimeUnit.MILLISECONDS); 329 } catch (InterruptedException e) { 330 continue; 331 } 332 if (task == CLOSE) { 333 break; 334 } 335 if (task == null) { 336 if (pendingRequests == 0) { 337 LOG.trace("{} to {} inactive for {}ms; closing (Will reconnect when new requests)", 338 getId(), connectString, keepAliveTimeMs); 339 closeZk(); 340 } 341 continue; 342 } 343 if (!task.needZk()) { 344 task.exec(null); 345 } else { 346 ZooKeeper zk; 347 try { 348 zk = getZk(); 349 } catch (Exception e) { 350 task.connectFailed(e); 351 continue; 352 } 353 task.exec(zk); 354 } 355 } 356 closeZk(); 357 DoNotRetryIOException error = new DoNotRetryIOException("Client already closed"); 358 Arrays.stream(tasks.toArray(new Task[0])).forEach(t -> t.closed(error)); 359 tasks.clear(); 360 } 361 362 @Override 363 public void close() { 364 if (closed.compareAndSet(false, true)) { 365 LOG.debug("Close zookeeper connection {} to {}", getId(), connectString); 366 tasks.add(CLOSE); 367 } 368 } 369 370 public String getConnectString() { 371 return connectString; 372 } 373}