001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; 021import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument; 022import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull; 023 024import java.util.List; 025import java.util.concurrent.CompletableFuture; 026import java.util.concurrent.TimeUnit; 027import org.apache.hadoop.hbase.HRegionLocation; 028import org.apache.hadoop.hbase.ServerName; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 031import org.apache.hadoop.hbase.ipc.HBaseRpcController; 032import org.apache.yetus.audience.InterfaceAudience; 033 034import org.apache.hbase.thirdparty.io.netty.util.Timer; 035 036import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 037import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 038 039/** 040 * Factory to create an AsyncRpcRetryCaller. 041 * @since 2.0.0 042 */ 043@InterfaceAudience.Private 044class AsyncRpcRetryingCallerFactory { 045 046 private final AsyncConnectionImpl conn; 047 048 private final Timer retryTimer; 049 050 public AsyncRpcRetryingCallerFactory(AsyncConnectionImpl conn, Timer retryTimer) { 051 this.conn = conn; 052 this.retryTimer = retryTimer; 053 } 054 055 private abstract class BuilderBase { 056 057 protected long pauseNs = conn.connConf.getPauseNs(); 058 059 protected int maxAttempts = retries2Attempts(conn.connConf.getMaxRetries()); 060 061 protected int startLogErrorsCnt = conn.connConf.getStartLogErrorsCnt(); 062 } 063 064 public class SingleRequestCallerBuilder<T> extends BuilderBase { 065 066 private TableName tableName; 067 068 private byte[] row; 069 070 private AsyncSingleRequestRpcRetryingCaller.Callable<T> callable; 071 072 private long operationTimeoutNs = -1L; 073 074 private long rpcTimeoutNs = -1L; 075 076 private RegionLocateType locateType = RegionLocateType.CURRENT; 077 078 private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID; 079 080 public SingleRequestCallerBuilder<T> table(TableName tableName) { 081 this.tableName = tableName; 082 return this; 083 } 084 085 public SingleRequestCallerBuilder<T> row(byte[] row) { 086 this.row = row; 087 return this; 088 } 089 090 public SingleRequestCallerBuilder<T> action( 091 AsyncSingleRequestRpcRetryingCaller.Callable<T> callable) { 092 this.callable = callable; 093 return this; 094 } 095 096 public SingleRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) { 097 this.operationTimeoutNs = unit.toNanos(operationTimeout); 098 return this; 099 } 100 101 public SingleRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) { 102 this.rpcTimeoutNs = unit.toNanos(rpcTimeout); 103 return this; 104 } 105 106 public SingleRequestCallerBuilder<T> locateType(RegionLocateType locateType) { 107 this.locateType = locateType; 108 return this; 109 } 110 111 public SingleRequestCallerBuilder<T> pause(long pause, TimeUnit unit) { 112 this.pauseNs = unit.toNanos(pause); 113 return this; 114 } 115 116 public SingleRequestCallerBuilder<T> maxAttempts(int maxAttempts) { 117 this.maxAttempts = maxAttempts; 118 return this; 119 } 120 121 public SingleRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) { 122 this.startLogErrorsCnt = startLogErrorsCnt; 123 return this; 124 } 125 126 public SingleRequestCallerBuilder<T> replicaId(int replicaId) { 127 this.replicaId = replicaId; 128 return this; 129 } 130 131 public AsyncSingleRequestRpcRetryingCaller<T> build() { 132 checkArgument(replicaId >= 0, "invalid replica id %s", replicaId); 133 return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, 134 checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"), replicaId, 135 checkNotNull(locateType, "locateType is null"), checkNotNull(callable, "action is null"), 136 pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); 137 } 138 139 /** 140 * Shortcut for {@code build().call()} 141 */ 142 public CompletableFuture<T> call() { 143 return build().call(); 144 } 145 } 146 147 /** 148 * Create retry caller for single action, such as get, put, delete, etc. 149 */ 150 public <T> SingleRequestCallerBuilder<T> single() { 151 return new SingleRequestCallerBuilder<>(); 152 } 153 154 public class ScanSingleRegionCallerBuilder extends BuilderBase { 155 156 private Long scannerId = null; 157 158 private Scan scan; 159 160 private ScanMetrics scanMetrics; 161 162 private ScanResultCache resultCache; 163 164 private AdvancedScanResultConsumer consumer; 165 166 private ClientService.Interface stub; 167 168 private HRegionLocation loc; 169 170 private boolean isRegionServerRemote; 171 172 private long scannerLeaseTimeoutPeriodNs; 173 174 private long scanTimeoutNs; 175 176 private long rpcTimeoutNs; 177 178 public ScanSingleRegionCallerBuilder id(long scannerId) { 179 this.scannerId = scannerId; 180 return this; 181 } 182 183 public ScanSingleRegionCallerBuilder setScan(Scan scan) { 184 this.scan = scan; 185 return this; 186 } 187 188 public ScanSingleRegionCallerBuilder metrics(ScanMetrics scanMetrics) { 189 this.scanMetrics = scanMetrics; 190 return this; 191 } 192 193 public ScanSingleRegionCallerBuilder remote(boolean isRegionServerRemote) { 194 this.isRegionServerRemote = isRegionServerRemote; 195 return this; 196 } 197 198 public ScanSingleRegionCallerBuilder resultCache(ScanResultCache resultCache) { 199 this.resultCache = resultCache; 200 return this; 201 } 202 203 public ScanSingleRegionCallerBuilder consumer(AdvancedScanResultConsumer consumer) { 204 this.consumer = consumer; 205 return this; 206 } 207 208 public ScanSingleRegionCallerBuilder stub(ClientService.Interface stub) { 209 this.stub = stub; 210 return this; 211 } 212 213 public ScanSingleRegionCallerBuilder location(HRegionLocation loc) { 214 this.loc = loc; 215 return this; 216 } 217 218 public ScanSingleRegionCallerBuilder scannerLeaseTimeoutPeriod(long scannerLeaseTimeoutPeriod, 219 TimeUnit unit) { 220 this.scannerLeaseTimeoutPeriodNs = unit.toNanos(scannerLeaseTimeoutPeriod); 221 return this; 222 } 223 224 public ScanSingleRegionCallerBuilder scanTimeout(long scanTimeout, TimeUnit unit) { 225 this.scanTimeoutNs = unit.toNanos(scanTimeout); 226 return this; 227 } 228 229 public ScanSingleRegionCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) { 230 this.rpcTimeoutNs = unit.toNanos(rpcTimeout); 231 return this; 232 } 233 234 public ScanSingleRegionCallerBuilder pause(long pause, TimeUnit unit) { 235 this.pauseNs = unit.toNanos(pause); 236 return this; 237 } 238 239 public ScanSingleRegionCallerBuilder maxAttempts(int maxAttempts) { 240 this.maxAttempts = maxAttempts; 241 return this; 242 } 243 244 public ScanSingleRegionCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) { 245 this.startLogErrorsCnt = startLogErrorsCnt; 246 return this; 247 } 248 249 public AsyncScanSingleRegionRpcRetryingCaller build() { 250 checkArgument(scannerId != null, "invalid scannerId %d", scannerId); 251 return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, 252 checkNotNull(scan, "scan is null"), scanMetrics, scannerId, 253 checkNotNull(resultCache, "resultCache is null"), 254 checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"), 255 checkNotNull(loc, "location is null"), isRegionServerRemote, scannerLeaseTimeoutPeriodNs, 256 pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); 257 } 258 259 /** 260 * Short cut for {@code build().start(HBaseRpcController, ScanResponse)}. 261 */ 262 public CompletableFuture<Boolean> start(HBaseRpcController controller, 263 ScanResponse respWhenOpen) { 264 return build().start(controller, respWhenOpen); 265 } 266 } 267 268 /** 269 * Create retry caller for scanning a region. 270 */ 271 public ScanSingleRegionCallerBuilder scanSingleRegion() { 272 return new ScanSingleRegionCallerBuilder(); 273 } 274 275 public class BatchCallerBuilder extends BuilderBase { 276 277 private TableName tableName; 278 279 private List<? extends Row> actions; 280 281 private long operationTimeoutNs = -1L; 282 283 private long rpcTimeoutNs = -1L; 284 285 public BatchCallerBuilder table(TableName tableName) { 286 this.tableName = tableName; 287 return this; 288 } 289 290 public BatchCallerBuilder actions(List<? extends Row> actions) { 291 this.actions = actions; 292 return this; 293 } 294 295 public BatchCallerBuilder operationTimeout(long operationTimeout, TimeUnit unit) { 296 this.operationTimeoutNs = unit.toNanos(operationTimeout); 297 return this; 298 } 299 300 public BatchCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) { 301 this.rpcTimeoutNs = unit.toNanos(rpcTimeout); 302 return this; 303 } 304 305 public BatchCallerBuilder pause(long pause, TimeUnit unit) { 306 this.pauseNs = unit.toNanos(pause); 307 return this; 308 } 309 310 public BatchCallerBuilder maxAttempts(int maxAttempts) { 311 this.maxAttempts = maxAttempts; 312 return this; 313 } 314 315 public BatchCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) { 316 this.startLogErrorsCnt = startLogErrorsCnt; 317 return this; 318 } 319 320 public <T> AsyncBatchRpcRetryingCaller<T> build() { 321 return new AsyncBatchRpcRetryingCaller<>(retryTimer, conn, tableName, actions, pauseNs, 322 maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); 323 } 324 325 public <T> List<CompletableFuture<T>> call() { 326 return this.<T> build().call(); 327 } 328 } 329 330 public BatchCallerBuilder batch() { 331 return new BatchCallerBuilder(); 332 } 333 334 public class MasterRequestCallerBuilder<T> extends BuilderBase { 335 private AsyncMasterRequestRpcRetryingCaller.Callable<T> callable; 336 337 private long operationTimeoutNs = -1L; 338 339 private long rpcTimeoutNs = -1L; 340 341 public MasterRequestCallerBuilder<T> action( 342 AsyncMasterRequestRpcRetryingCaller.Callable<T> callable) { 343 this.callable = callable; 344 return this; 345 } 346 347 public MasterRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) { 348 this.operationTimeoutNs = unit.toNanos(operationTimeout); 349 return this; 350 } 351 352 public MasterRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) { 353 this.rpcTimeoutNs = unit.toNanos(rpcTimeout); 354 return this; 355 } 356 357 public MasterRequestCallerBuilder<T> pause(long pause, TimeUnit unit) { 358 this.pauseNs = unit.toNanos(pause); 359 return this; 360 } 361 362 public MasterRequestCallerBuilder<T> maxAttempts(int maxAttempts) { 363 this.maxAttempts = maxAttempts; 364 return this; 365 } 366 367 public MasterRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) { 368 this.startLogErrorsCnt = startLogErrorsCnt; 369 return this; 370 } 371 372 public AsyncMasterRequestRpcRetryingCaller<T> build() { 373 return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn, 374 checkNotNull(callable, "action is null"), pauseNs, maxAttempts, operationTimeoutNs, 375 rpcTimeoutNs, startLogErrorsCnt); 376 } 377 378 /** 379 * Shortcut for {@code build().call()} 380 */ 381 public CompletableFuture<T> call() { 382 return build().call(); 383 } 384 } 385 386 public <T> MasterRequestCallerBuilder<T> masterRequest() { 387 return new MasterRequestCallerBuilder<>(); 388 } 389 390 public class AdminRequestCallerBuilder<T> extends BuilderBase { 391 // TODO: maybe we can reuse AdminRequestCallerBuild, MasterRequestCallerBuild etc. 392 393 private AsyncAdminRequestRetryingCaller.Callable<T> callable; 394 395 private long operationTimeoutNs = -1L; 396 397 private long rpcTimeoutNs = -1L; 398 399 private ServerName serverName; 400 401 public AdminRequestCallerBuilder<T> action( 402 AsyncAdminRequestRetryingCaller.Callable<T> callable) { 403 this.callable = callable; 404 return this; 405 } 406 407 public AdminRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) { 408 this.operationTimeoutNs = unit.toNanos(operationTimeout); 409 return this; 410 } 411 412 public AdminRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) { 413 this.rpcTimeoutNs = unit.toNanos(rpcTimeout); 414 return this; 415 } 416 417 public AdminRequestCallerBuilder<T> pause(long pause, TimeUnit unit) { 418 this.pauseNs = unit.toNanos(pause); 419 return this; 420 } 421 422 public AdminRequestCallerBuilder<T> maxAttempts(int maxAttempts) { 423 this.maxAttempts = maxAttempts; 424 return this; 425 } 426 427 public AdminRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) { 428 this.startLogErrorsCnt = startLogErrorsCnt; 429 return this; 430 } 431 432 public AdminRequestCallerBuilder<T> serverName(ServerName serverName) { 433 this.serverName = serverName; 434 return this; 435 } 436 437 public AsyncAdminRequestRetryingCaller<T> build() { 438 return new AsyncAdminRequestRetryingCaller<T>(retryTimer, conn, pauseNs, maxAttempts, 439 operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, 440 checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null")); 441 } 442 443 public CompletableFuture<T> call() { 444 return build().call(); 445 } 446 } 447 448 public <T> AdminRequestCallerBuilder<T> adminRequest() { 449 return new AdminRequestCallerBuilder<>(); 450 } 451 452 public class ServerRequestCallerBuilder<T> extends BuilderBase { 453 454 private AsyncServerRequestRpcRetryingCaller.Callable<T> callable; 455 456 private long operationTimeoutNs = -1L; 457 458 private long rpcTimeoutNs = -1L; 459 460 private ServerName serverName; 461 462 public ServerRequestCallerBuilder<T> action( 463 AsyncServerRequestRpcRetryingCaller.Callable<T> callable) { 464 this.callable = callable; 465 return this; 466 } 467 468 public ServerRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) { 469 this.operationTimeoutNs = unit.toNanos(operationTimeout); 470 return this; 471 } 472 473 public ServerRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) { 474 this.rpcTimeoutNs = unit.toNanos(rpcTimeout); 475 return this; 476 } 477 478 public ServerRequestCallerBuilder<T> pause(long pause, TimeUnit unit) { 479 this.pauseNs = unit.toNanos(pause); 480 return this; 481 } 482 483 public ServerRequestCallerBuilder<T> maxAttempts(int maxAttempts) { 484 this.maxAttempts = maxAttempts; 485 return this; 486 } 487 488 public ServerRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) { 489 this.startLogErrorsCnt = startLogErrorsCnt; 490 return this; 491 } 492 493 public ServerRequestCallerBuilder<T> serverName(ServerName serverName) { 494 this.serverName = serverName; 495 return this; 496 } 497 498 public AsyncServerRequestRpcRetryingCaller<T> build() { 499 return new AsyncServerRequestRpcRetryingCaller<T>(retryTimer, conn, pauseNs, maxAttempts, 500 operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, 501 checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null")); 502 } 503 504 public CompletableFuture<T> call() { 505 return build().call(); 506 } 507 } 508 509 public <T> ServerRequestCallerBuilder<T> serverRequest() { 510 return new ServerRequestCallerBuilder<>(); 511 } 512}