001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.zookeeper; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT; 021import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT; 022 023import java.io.Closeable; 024import java.io.IOException; 025import java.util.Arrays; 026import java.util.EnumSet; 027import java.util.List; 028import java.util.concurrent.CompletableFuture; 029import java.util.concurrent.DelayQueue; 030import java.util.concurrent.Delayed; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.atomic.AtomicBoolean; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.DoNotRetryIOException; 035import org.apache.hadoop.hbase.util.FutureUtils; 036import org.apache.hadoop.hbase.util.Threads; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.apache.zookeeper.KeeperException; 039import org.apache.zookeeper.KeeperException.Code; 040import org.apache.zookeeper.ZooKeeper; 041import org.apache.zookeeper.data.Stat; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 046 047/** 048 * A very simple read only zookeeper implementation without watcher support. 049 */ 050@InterfaceAudience.Private 051public final class ReadOnlyZKClient implements Closeable { 052 053 private static final Logger LOG = LoggerFactory.getLogger(ReadOnlyZKClient.class); 054 055 public static final String RECOVERY_RETRY = "zookeeper.recovery.retry"; 056 057 private static final int DEFAULT_RECOVERY_RETRY = 30; 058 059 public static final String RECOVERY_RETRY_INTERVAL_MILLIS = 060 "zookeeper.recovery.retry.intervalmill"; 061 062 private static final int DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS = 1000; 063 064 public static final String KEEPALIVE_MILLIS = "zookeeper.keep-alive.time"; 065 066 private static final int DEFAULT_KEEPALIVE_MILLIS = 60000; 067 068 private static final EnumSet<Code> FAIL_FAST_CODES = EnumSet.of(Code.NOAUTH, Code.AUTHFAILED); 069 070 private final String connectString; 071 072 private final int sessionTimeoutMs; 073 074 private final int maxRetries; 075 076 private final int retryIntervalMs; 077 078 private final int keepAliveTimeMs; 079 080 private static abstract class Task implements Delayed { 081 082 protected long time = System.nanoTime(); 083 084 public boolean needZk() { 085 return false; 086 } 087 088 public void exec(ZooKeeper zk) { 089 } 090 091 public void connectFailed(IOException e) { 092 } 093 094 public void closed(IOException e) { 095 } 096 097 @Override 098 public int compareTo(Delayed o) { 099 Task that = (Task) o; 100 int c = Long.compare(time, that.time); 101 if (c != 0) { 102 return c; 103 } 104 return Integer.compare(System.identityHashCode(this), System.identityHashCode(that)); 105 } 106 107 @Override 108 public long getDelay(TimeUnit unit) { 109 return unit.convert(time - System.nanoTime(), TimeUnit.NANOSECONDS); 110 } 111 } 112 113 private static final Task CLOSE = new Task() { 114 }; 115 116 private final DelayQueue<Task> tasks = new DelayQueue<>(); 117 118 private final AtomicBoolean closed = new AtomicBoolean(false); 119 120 @VisibleForTesting 121 ZooKeeper zookeeper; 122 123 private int pendingRequests = 0; 124 125 private String getId() { 126 return String.format("0x%08x", System.identityHashCode(this)); 127 } 128 129 public ReadOnlyZKClient(Configuration conf) { 130 // We might use a different ZK for client access 131 String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf); 132 if (clientZkQuorumServers != null) { 133 this.connectString = clientZkQuorumServers; 134 } else { 135 this.connectString = ZKConfig.getZKQuorumServersString(conf); 136 } 137 this.sessionTimeoutMs = conf.getInt(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT); 138 this.maxRetries = conf.getInt(RECOVERY_RETRY, DEFAULT_RECOVERY_RETRY); 139 this.retryIntervalMs = 140 conf.getInt(RECOVERY_RETRY_INTERVAL_MILLIS, DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS); 141 this.keepAliveTimeMs = conf.getInt(KEEPALIVE_MILLIS, DEFAULT_KEEPALIVE_MILLIS); 142 LOG.debug( 143 "Connect {} to {} with session timeout={}ms, retries {}, " + 144 "retry interval {}ms, keepAlive={}ms", 145 getId(), connectString, sessionTimeoutMs, maxRetries, retryIntervalMs, keepAliveTimeMs); 146 Threads.setDaemonThreadRunning(new Thread(this::run), 147 "ReadOnlyZKClient-" + connectString + "@" + getId()); 148 } 149 150 private abstract class ZKTask<T> extends Task { 151 152 protected final String path; 153 154 private final CompletableFuture<T> future; 155 156 private final String operationType; 157 158 private int retries; 159 160 protected ZKTask(String path, CompletableFuture<T> future, String operationType) { 161 this.path = path; 162 this.future = future; 163 this.operationType = operationType; 164 } 165 166 protected final void onComplete(ZooKeeper zk, int rc, T ret, boolean errorIfNoNode) { 167 tasks.add(new Task() { 168 169 @Override 170 public void exec(ZooKeeper alwaysNull) { 171 pendingRequests--; 172 Code code = Code.get(rc); 173 if (code == Code.OK) { 174 future.complete(ret); 175 } else if (code == Code.NONODE) { 176 if (errorIfNoNode) { 177 future.completeExceptionally(KeeperException.create(code, path)); 178 } else { 179 future.complete(ret); 180 } 181 } else if (FAIL_FAST_CODES.contains(code)) { 182 future.completeExceptionally(KeeperException.create(code, path)); 183 } else { 184 if (code == Code.SESSIONEXPIRED) { 185 LOG.warn("{} to {} session expired, close and reconnect", getId(), connectString); 186 try { 187 zk.close(); 188 } catch (InterruptedException e) { 189 } 190 } 191 if (ZKTask.this.delay(retryIntervalMs, maxRetries)) { 192 LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}", getId(), 193 connectString, operationType, path, code, ZKTask.this.retries); 194 tasks.add(ZKTask.this); 195 } else { 196 LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}, give up", getId(), 197 connectString, operationType, path, code, ZKTask.this.retries); 198 future.completeExceptionally(KeeperException.create(code, path)); 199 } 200 } 201 } 202 203 @Override 204 public void closed(IOException e) { 205 // It may happen that a request is succeeded and the onComplete has been called and pushed 206 // us into the task queue, but before we get called a close is called and here we will 207 // fail the request, although it is succeeded actually. 208 // This is not a perfect solution but anyway, it is better than hang the requests for 209 // ever, and also acceptable as if you close the zk client before actually getting the 210 // response then a failure is always possible. 211 future.completeExceptionally(e); 212 } 213 }); 214 } 215 216 @Override 217 public boolean needZk() { 218 return true; 219 } 220 221 protected abstract void doExec(ZooKeeper zk); 222 223 @Override 224 public final void exec(ZooKeeper zk) { 225 pendingRequests++; 226 doExec(zk); 227 } 228 229 public boolean delay(long intervalMs, int maxRetries) { 230 if (retries >= maxRetries) { 231 return false; 232 } 233 retries++; 234 time = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(intervalMs); 235 return true; 236 } 237 238 @Override 239 public void connectFailed(IOException e) { 240 if (delay(retryIntervalMs, maxRetries)) { 241 LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}", getId(), 242 connectString, operationType, path, retries, e); 243 tasks.add(this); 244 } else { 245 LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}, give up", getId(), 246 connectString, operationType, path, retries, e); 247 future.completeExceptionally(e); 248 } 249 } 250 251 @Override 252 public void closed(IOException e) { 253 future.completeExceptionally(e); 254 } 255 } 256 257 public CompletableFuture<byte[]> get(String path) { 258 if (closed.get()) { 259 return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed")); 260 } 261 CompletableFuture<byte[]> future = new CompletableFuture<>(); 262 tasks.add(new ZKTask<byte[]>(path, future, "get") { 263 264 @Override 265 protected void doExec(ZooKeeper zk) { 266 zk.getData(path, false, 267 (rc, path, ctx, data, stat) -> onComplete(zk, rc, data, true), null); 268 } 269 }); 270 return future; 271 } 272 273 public CompletableFuture<Stat> exists(String path) { 274 if (closed.get()) { 275 return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed")); 276 } 277 CompletableFuture<Stat> future = new CompletableFuture<>(); 278 tasks.add(new ZKTask<Stat>(path, future, "exists") { 279 280 @Override 281 protected void doExec(ZooKeeper zk) { 282 zk.exists(path, false, (rc, path, ctx, stat) -> onComplete(zk, rc, stat, false), null); 283 } 284 }); 285 return future; 286 } 287 288 public CompletableFuture<List<String>> list(String path) { 289 if (closed.get()) { 290 return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed")); 291 } 292 CompletableFuture<List<String>> future = new CompletableFuture<>(); 293 tasks.add(new ZKTask<List<String>>(path, future, "list") { 294 295 @Override 296 protected void doExec(ZooKeeper zk) { 297 zk.getChildren(path, false, (rc, path, ctx, children) -> onComplete(zk, rc, children, true), 298 null); 299 } 300 }); 301 return future; 302 } 303 304 private void closeZk() { 305 if (zookeeper != null) { 306 try { 307 zookeeper.close(); 308 } catch (InterruptedException e) { 309 } 310 zookeeper = null; 311 } 312 } 313 314 private ZooKeeper getZk() throws IOException { 315 // may be closed when session expired 316 if (zookeeper == null || !zookeeper.getState().isAlive()) { 317 zookeeper = new ZooKeeper(connectString, sessionTimeoutMs, e -> {}); 318 } 319 return zookeeper; 320 } 321 322 private void run() { 323 for (;;) { 324 Task task; 325 try { 326 task = tasks.poll(keepAliveTimeMs, TimeUnit.MILLISECONDS); 327 } catch (InterruptedException e) { 328 continue; 329 } 330 if (task == CLOSE) { 331 break; 332 } 333 if (task == null) { 334 if (pendingRequests == 0) { 335 LOG.trace("{} to {} inactive for {}ms; closing (Will reconnect when new requests)", 336 getId(), connectString, keepAliveTimeMs); 337 closeZk(); 338 } 339 continue; 340 } 341 if (!task.needZk()) { 342 task.exec(null); 343 } else { 344 ZooKeeper zk; 345 try { 346 zk = getZk(); 347 } catch (IOException e) { 348 task.connectFailed(e); 349 continue; 350 } 351 task.exec(zk); 352 } 353 } 354 closeZk(); 355 DoNotRetryIOException error = new DoNotRetryIOException("Client already closed"); 356 Arrays.stream(tasks.toArray(new Task[0])).forEach(t -> t.closed(error)); 357 tasks.clear(); 358 } 359 360 @Override 361 public void close() { 362 if (closed.compareAndSet(false, true)) { 363 LOG.debug("Close zookeeper connection {} to {}", getId(), connectString); 364 tasks.add(CLOSE); 365 } 366 } 367 368 @VisibleForTesting 369 public String getConnectString() { 370 return connectString; 371 } 372}