001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.zookeeper; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT; 021import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT; 022 023import java.io.Closeable; 024import java.io.IOException; 025import java.util.Arrays; 026import java.util.EnumSet; 027import java.util.concurrent.CompletableFuture; 028import java.util.concurrent.DelayQueue; 029import java.util.concurrent.Delayed; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.AtomicBoolean; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.DoNotRetryIOException; 034import org.apache.hadoop.hbase.util.FutureUtils; 035import org.apache.hadoop.hbase.util.Threads; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.apache.zookeeper.KeeperException; 038import org.apache.zookeeper.KeeperException.Code; 039import org.apache.zookeeper.ZooKeeper; 040import org.apache.zookeeper.data.Stat; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 045 046/** 047 * A very simple read only zookeeper implementation without watcher support. 048 */ 049@InterfaceAudience.Private 050public final class ReadOnlyZKClient implements Closeable { 051 052 private static final Logger LOG = LoggerFactory.getLogger(ReadOnlyZKClient.class); 053 054 public static final String RECOVERY_RETRY = "zookeeper.recovery.retry"; 055 056 private static final int DEFAULT_RECOVERY_RETRY = 30; 057 058 public static final String RECOVERY_RETRY_INTERVAL_MILLIS = 059 "zookeeper.recovery.retry.intervalmill"; 060 061 private static final int DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS = 1000; 062 063 public static final String KEEPALIVE_MILLIS = "zookeeper.keep-alive.time"; 064 065 private static final int DEFAULT_KEEPALIVE_MILLIS = 60000; 066 067 private static final EnumSet<Code> FAIL_FAST_CODES = EnumSet.of(Code.NOAUTH, Code.AUTHFAILED); 068 069 private final String connectString; 070 071 private final int sessionTimeoutMs; 072 073 private final int maxRetries; 074 075 private final int retryIntervalMs; 076 077 private final int keepAliveTimeMs; 078 079 private static abstract class Task implements Delayed { 080 081 protected long time = System.nanoTime(); 082 083 public boolean needZk() { 084 return false; 085 } 086 087 public void exec(ZooKeeper zk) { 088 } 089 090 public void connectFailed(IOException e) { 091 } 092 093 public void closed(IOException e) { 094 } 095 096 @Override 097 public int compareTo(Delayed o) { 098 Task that = (Task) o; 099 int c = Long.compare(time, that.time); 100 if (c != 0) { 101 return c; 102 } 103 return Integer.compare(System.identityHashCode(this), System.identityHashCode(that)); 104 } 105 106 @Override 107 public long getDelay(TimeUnit unit) { 108 return unit.convert(time - System.nanoTime(), TimeUnit.NANOSECONDS); 109 } 110 } 111 112 private static final Task CLOSE = new Task() { 113 }; 114 115 private final DelayQueue<Task> tasks = new DelayQueue<>(); 116 117 private final AtomicBoolean closed = new AtomicBoolean(false); 118 119 @VisibleForTesting 120 ZooKeeper zookeeper; 121 122 private int pendingRequests = 0; 123 124 private String getId() { 125 return String.format("0x%08x", System.identityHashCode(this)); 126 } 127 128 public ReadOnlyZKClient(Configuration conf) { 129 this.connectString = ZKConfig.getZKQuorumServersString(conf); 130 this.sessionTimeoutMs = conf.getInt(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT); 131 this.maxRetries = conf.getInt(RECOVERY_RETRY, DEFAULT_RECOVERY_RETRY); 132 this.retryIntervalMs = 133 conf.getInt(RECOVERY_RETRY_INTERVAL_MILLIS, DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS); 134 this.keepAliveTimeMs = conf.getInt(KEEPALIVE_MILLIS, DEFAULT_KEEPALIVE_MILLIS); 135 LOG.info( 136 "Connect {} to {} with session timeout={}ms, retries {}, " + 137 "retry interval {}ms, keepAlive={}ms", 138 getId(), connectString, sessionTimeoutMs, maxRetries, retryIntervalMs, keepAliveTimeMs); 139 Threads.setDaemonThreadRunning(new Thread(this::run), 140 "ReadOnlyZKClient-" + connectString + "@" + getId()); 141 } 142 143 private abstract class ZKTask<T> extends Task { 144 145 protected final String path; 146 147 private final CompletableFuture<T> future; 148 149 private final String operationType; 150 151 private int retries; 152 153 protected ZKTask(String path, CompletableFuture<T> future, String operationType) { 154 this.path = path; 155 this.future = future; 156 this.operationType = operationType; 157 } 158 159 protected final void onComplete(ZooKeeper zk, int rc, T ret, boolean errorIfNoNode) { 160 tasks.add(new Task() { 161 162 @Override 163 public void exec(ZooKeeper alwaysNull) { 164 pendingRequests--; 165 Code code = Code.get(rc); 166 if (code == Code.OK) { 167 future.complete(ret); 168 } else if (code == Code.NONODE) { 169 if (errorIfNoNode) { 170 future.completeExceptionally(KeeperException.create(code, path)); 171 } else { 172 future.complete(ret); 173 } 174 } else if (FAIL_FAST_CODES.contains(code)) { 175 future.completeExceptionally(KeeperException.create(code, path)); 176 } else { 177 if (code == Code.SESSIONEXPIRED) { 178 LOG.warn("{} to {} session expired, close and reconnect", getId(), connectString); 179 try { 180 zk.close(); 181 } catch (InterruptedException e) { 182 } 183 } 184 if (ZKTask.this.delay(retryIntervalMs, maxRetries)) { 185 LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}", getId(), 186 connectString, operationType, path, code, ZKTask.this.retries); 187 tasks.add(ZKTask.this); 188 } else { 189 LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}, give up", getId(), 190 connectString, operationType, path, code, ZKTask.this.retries); 191 future.completeExceptionally(KeeperException.create(code, path)); 192 } 193 } 194 } 195 196 @Override 197 public void closed(IOException e) { 198 // It may happen that a request is succeeded and the onComplete has been called and pushed 199 // us into the task queue, but before we get called a close is called and here we will 200 // fail the request, although it is succeeded actually. 201 // This is not a perfect solution but anyway, it is better than hang the requests for 202 // ever, and also acceptable as if you close the zk client before actually getting the 203 // response then a failure is always possible. 204 future.completeExceptionally(e); 205 } 206 }); 207 } 208 209 @Override 210 public boolean needZk() { 211 return true; 212 } 213 214 protected abstract void doExec(ZooKeeper zk); 215 216 @Override 217 public final void exec(ZooKeeper zk) { 218 pendingRequests++; 219 doExec(zk); 220 } 221 222 public boolean delay(long intervalMs, int maxRetries) { 223 if (retries >= maxRetries) { 224 return false; 225 } 226 retries++; 227 time = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(intervalMs); 228 return true; 229 } 230 231 @Override 232 public void connectFailed(IOException e) { 233 if (delay(retryIntervalMs, maxRetries)) { 234 LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}", getId(), 235 connectString, operationType, path, retries, e); 236 tasks.add(this); 237 } else { 238 LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}, give up", getId(), 239 connectString, operationType, path, retries, e); 240 future.completeExceptionally(e); 241 } 242 } 243 244 @Override 245 public void closed(IOException e) { 246 future.completeExceptionally(e); 247 } 248 } 249 250 public CompletableFuture<byte[]> get(String path) { 251 if (closed.get()) { 252 return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed")); 253 } 254 CompletableFuture<byte[]> future = new CompletableFuture<>(); 255 tasks.add(new ZKTask<byte[]>(path, future, "get") { 256 257 @Override 258 protected void doExec(ZooKeeper zk) { 259 zk.getData(path, false, 260 (rc, path, ctx, data, stat) -> onComplete(zk, rc, data, true), null); 261 } 262 }); 263 return future; 264 } 265 266 public CompletableFuture<Stat> exists(String path) { 267 if (closed.get()) { 268 return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed")); 269 } 270 CompletableFuture<Stat> future = new CompletableFuture<>(); 271 tasks.add(new ZKTask<Stat>(path, future, "exists") { 272 273 @Override 274 protected void doExec(ZooKeeper zk) { 275 zk.exists(path, false, (rc, path, ctx, stat) -> onComplete(zk, rc, stat, false), null); 276 } 277 }); 278 return future; 279 } 280 281 private void closeZk() { 282 if (zookeeper != null) { 283 try { 284 zookeeper.close(); 285 } catch (InterruptedException e) { 286 } 287 zookeeper = null; 288 } 289 } 290 291 private ZooKeeper getZk() throws IOException { 292 // may be closed when session expired 293 if (zookeeper == null || !zookeeper.getState().isAlive()) { 294 zookeeper = new ZooKeeper(connectString, sessionTimeoutMs, e -> {}); 295 } 296 return zookeeper; 297 } 298 299 private void run() { 300 for (;;) { 301 Task task; 302 try { 303 task = tasks.poll(keepAliveTimeMs, TimeUnit.MILLISECONDS); 304 } catch (InterruptedException e) { 305 continue; 306 } 307 if (task == CLOSE) { 308 break; 309 } 310 if (task == null) { 311 if (pendingRequests == 0) { 312 LOG.trace("{} to {} inactive for {}ms; closing (Will reconnect when new requests)", 313 getId(), connectString, keepAliveTimeMs); 314 closeZk(); 315 } 316 continue; 317 } 318 if (!task.needZk()) { 319 task.exec(null); 320 } else { 321 ZooKeeper zk; 322 try { 323 zk = getZk(); 324 } catch (IOException e) { 325 task.connectFailed(e); 326 continue; 327 } 328 task.exec(zk); 329 } 330 } 331 closeZk(); 332 DoNotRetryIOException error = new DoNotRetryIOException("Client already closed"); 333 Arrays.stream(tasks.toArray(new Task[0])).forEach(t -> t.closed(error)); 334 tasks.clear(); 335 } 336 337 @Override 338 public void close() { 339 if (closed.compareAndSet(false, true)) { 340 LOG.info("Close zookeeper connection {} to {}", getId(), connectString); 341 tasks.add(CLOSE); 342 } 343 } 344 345 @VisibleForTesting 346 public String getConnectString() { 347 return connectString; 348 } 349}