001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.calcPriority; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; 023import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument; 024import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull; 025 026import java.util.List; 027import java.util.concurrent.CompletableFuture; 028import java.util.concurrent.TimeUnit; 029import org.apache.hadoop.hbase.HRegionLocation; 030import org.apache.hadoop.hbase.ServerName; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 033import org.apache.hadoop.hbase.ipc.HBaseRpcController; 034import org.apache.yetus.audience.InterfaceAudience; 035 036import org.apache.hbase.thirdparty.io.netty.util.Timer; 037 038import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 039import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 040 041/** 042 * Factory to create an AsyncRpcRetryCaller. 043 * @since 2.0.0 044 */ 045@InterfaceAudience.Private 046class AsyncRpcRetryingCallerFactory { 047 048 private final AsyncConnectionImpl conn; 049 050 private final Timer retryTimer; 051 052 public AsyncRpcRetryingCallerFactory(AsyncConnectionImpl conn, Timer retryTimer) { 053 this.conn = conn; 054 this.retryTimer = retryTimer; 055 } 056 057 private abstract class BuilderBase { 058 059 protected long pauseNs = conn.connConf.getPauseNs(); 060 061 protected long pauseForCQTBENs = conn.connConf.getPauseForCQTBENs(); 062 063 protected int maxAttempts = retries2Attempts(conn.connConf.getMaxRetries()); 064 065 protected int startLogErrorsCnt = conn.connConf.getStartLogErrorsCnt(); 066 } 067 068 public class SingleRequestCallerBuilder<T> extends BuilderBase { 069 070 private TableName tableName; 071 072 private byte[] row; 073 074 private AsyncSingleRequestRpcRetryingCaller.Callable<T> callable; 075 076 private long operationTimeoutNs = -1L; 077 078 private long rpcTimeoutNs = -1L; 079 080 private RegionLocateType locateType = RegionLocateType.CURRENT; 081 082 private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID; 083 084 private int priority = PRIORITY_UNSET; 085 086 public SingleRequestCallerBuilder<T> table(TableName tableName) { 087 this.tableName = tableName; 088 return this; 089 } 090 091 public SingleRequestCallerBuilder<T> row(byte[] row) { 092 this.row = row; 093 return this; 094 } 095 096 public SingleRequestCallerBuilder<T> action( 097 AsyncSingleRequestRpcRetryingCaller.Callable<T> callable) { 098 this.callable = callable; 099 return this; 100 } 101 102 public SingleRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) { 103 this.operationTimeoutNs = unit.toNanos(operationTimeout); 104 return this; 105 } 106 107 public SingleRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) { 108 this.rpcTimeoutNs = unit.toNanos(rpcTimeout); 109 return this; 110 } 111 112 public SingleRequestCallerBuilder<T> locateType(RegionLocateType locateType) { 113 this.locateType = locateType; 114 return this; 115 } 116 117 public SingleRequestCallerBuilder<T> pause(long pause, TimeUnit unit) { 118 this.pauseNs = unit.toNanos(pause); 119 return this; 120 } 121 122 public SingleRequestCallerBuilder<T> pauseForCQTBE(long pause, TimeUnit unit) { 123 this.pauseForCQTBENs = unit.toNanos(pause); 124 return this; 125 } 126 127 public SingleRequestCallerBuilder<T> maxAttempts(int maxAttempts) { 128 this.maxAttempts = maxAttempts; 129 return this; 130 } 131 132 public SingleRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) { 133 this.startLogErrorsCnt = startLogErrorsCnt; 134 return this; 135 } 136 137 public SingleRequestCallerBuilder<T> replicaId(int replicaId) { 138 this.replicaId = replicaId; 139 return this; 140 } 141 142 public SingleRequestCallerBuilder<T> priority(int priority) { 143 this.priority = priority; 144 return this; 145 } 146 147 private void preCheck() { 148 checkArgument(replicaId >= 0, "invalid replica id %s", replicaId); 149 checkNotNull(tableName, "tableName is null"); 150 checkNotNull(row, "row is null"); 151 checkNotNull(locateType, "locateType is null"); 152 checkNotNull(callable, "action is null"); 153 this.priority = calcPriority(priority, tableName); 154 } 155 156 public AsyncSingleRequestRpcRetryingCaller<T> build() { 157 preCheck(); 158 return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, tableName, row, replicaId, 159 locateType, callable, priority, pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs, 160 rpcTimeoutNs, startLogErrorsCnt); 161 } 162 163 /** 164 * Shortcut for {@code build().call()} 165 */ 166 public CompletableFuture<T> call() { 167 return build().call(); 168 } 169 } 170 171 /** 172 * Create retry caller for single action, such as get, put, delete, etc. 173 */ 174 public <T> SingleRequestCallerBuilder<T> single() { 175 return new SingleRequestCallerBuilder<>(); 176 } 177 178 public class ScanSingleRegionCallerBuilder extends BuilderBase { 179 180 private Long scannerId = null; 181 182 private Scan scan; 183 184 private ScanMetrics scanMetrics; 185 186 private ScanResultCache resultCache; 187 188 private AdvancedScanResultConsumer consumer; 189 190 private ClientService.Interface stub; 191 192 private HRegionLocation loc; 193 194 private boolean isRegionServerRemote; 195 196 private long scannerLeaseTimeoutPeriodNs; 197 198 private long scanTimeoutNs; 199 200 private long rpcTimeoutNs; 201 202 private int priority = PRIORITY_UNSET; 203 204 public ScanSingleRegionCallerBuilder id(long scannerId) { 205 this.scannerId = scannerId; 206 return this; 207 } 208 209 public ScanSingleRegionCallerBuilder setScan(Scan scan) { 210 this.scan = scan; 211 this.priority = scan.getPriority(); 212 return this; 213 } 214 215 public ScanSingleRegionCallerBuilder metrics(ScanMetrics scanMetrics) { 216 this.scanMetrics = scanMetrics; 217 return this; 218 } 219 220 public ScanSingleRegionCallerBuilder remote(boolean isRegionServerRemote) { 221 this.isRegionServerRemote = isRegionServerRemote; 222 return this; 223 } 224 225 public ScanSingleRegionCallerBuilder resultCache(ScanResultCache resultCache) { 226 this.resultCache = resultCache; 227 return this; 228 } 229 230 public ScanSingleRegionCallerBuilder consumer(AdvancedScanResultConsumer consumer) { 231 this.consumer = consumer; 232 return this; 233 } 234 235 public ScanSingleRegionCallerBuilder stub(ClientService.Interface stub) { 236 this.stub = stub; 237 return this; 238 } 239 240 public ScanSingleRegionCallerBuilder location(HRegionLocation loc) { 241 this.loc = loc; 242 return this; 243 } 244 245 public ScanSingleRegionCallerBuilder scannerLeaseTimeoutPeriod(long scannerLeaseTimeoutPeriod, 246 TimeUnit unit) { 247 this.scannerLeaseTimeoutPeriodNs = unit.toNanos(scannerLeaseTimeoutPeriod); 248 return this; 249 } 250 251 public ScanSingleRegionCallerBuilder scanTimeout(long scanTimeout, TimeUnit unit) { 252 this.scanTimeoutNs = unit.toNanos(scanTimeout); 253 return this; 254 } 255 256 public ScanSingleRegionCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) { 257 this.rpcTimeoutNs = unit.toNanos(rpcTimeout); 258 return this; 259 } 260 261 public ScanSingleRegionCallerBuilder pause(long pause, TimeUnit unit) { 262 this.pauseNs = unit.toNanos(pause); 263 return this; 264 } 265 266 public ScanSingleRegionCallerBuilder pauseForCQTBE(long pause, TimeUnit unit) { 267 this.pauseForCQTBENs = unit.toNanos(pause); 268 return this; 269 } 270 271 public ScanSingleRegionCallerBuilder maxAttempts(int maxAttempts) { 272 this.maxAttempts = maxAttempts; 273 return this; 274 } 275 276 public ScanSingleRegionCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) { 277 this.startLogErrorsCnt = startLogErrorsCnt; 278 return this; 279 } 280 281 private void preCheck() { 282 checkArgument(scannerId != null, "invalid scannerId %d", scannerId); 283 checkNotNull(scan, "scan is null"); 284 checkNotNull(resultCache, "resultCache is null"); 285 checkNotNull(consumer, "consumer is null"); 286 checkNotNull(stub, "stub is null"); 287 checkNotNull(loc, "location is null"); 288 this.priority = calcPriority(priority, loc.getRegion().getTable()); 289 } 290 291 public AsyncScanSingleRegionRpcRetryingCaller build() { 292 preCheck(); 293 return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, scan, scanMetrics, 294 scannerId, resultCache, consumer, stub, loc, isRegionServerRemote, priority, 295 scannerLeaseTimeoutPeriodNs, pauseNs, pauseForCQTBENs, maxAttempts, scanTimeoutNs, 296 rpcTimeoutNs, startLogErrorsCnt); 297 } 298 299 /** 300 * Short cut for {@code build().start(HBaseRpcController, ScanResponse)}. 301 */ 302 public CompletableFuture<Boolean> start(HBaseRpcController controller, 303 ScanResponse respWhenOpen) { 304 return build().start(controller, respWhenOpen); 305 } 306 } 307 308 /** 309 * Create retry caller for scanning a region. 310 */ 311 public ScanSingleRegionCallerBuilder scanSingleRegion() { 312 return new ScanSingleRegionCallerBuilder(); 313 } 314 315 public class BatchCallerBuilder extends BuilderBase { 316 317 private TableName tableName; 318 319 private List<? extends Row> actions; 320 321 private long operationTimeoutNs = -1L; 322 323 private long rpcTimeoutNs = -1L; 324 325 public BatchCallerBuilder table(TableName tableName) { 326 this.tableName = tableName; 327 return this; 328 } 329 330 public BatchCallerBuilder actions(List<? extends Row> actions) { 331 this.actions = actions; 332 return this; 333 } 334 335 public BatchCallerBuilder operationTimeout(long operationTimeout, TimeUnit unit) { 336 this.operationTimeoutNs = unit.toNanos(operationTimeout); 337 return this; 338 } 339 340 public BatchCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) { 341 this.rpcTimeoutNs = unit.toNanos(rpcTimeout); 342 return this; 343 } 344 345 public BatchCallerBuilder pause(long pause, TimeUnit unit) { 346 this.pauseNs = unit.toNanos(pause); 347 return this; 348 } 349 350 public BatchCallerBuilder pauseForCQTBE(long pause, TimeUnit unit) { 351 this.pauseForCQTBENs = unit.toNanos(pause); 352 return this; 353 } 354 355 public BatchCallerBuilder maxAttempts(int maxAttempts) { 356 this.maxAttempts = maxAttempts; 357 return this; 358 } 359 360 public BatchCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) { 361 this.startLogErrorsCnt = startLogErrorsCnt; 362 return this; 363 } 364 365 public <T> AsyncBatchRpcRetryingCaller<T> build() { 366 return new AsyncBatchRpcRetryingCaller<>(retryTimer, conn, tableName, actions, pauseNs, 367 pauseForCQTBENs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); 368 } 369 370 public <T> List<CompletableFuture<T>> call() { 371 return this.<T> build().call(); 372 } 373 } 374 375 public BatchCallerBuilder batch() { 376 return new BatchCallerBuilder(); 377 } 378 379 public class MasterRequestCallerBuilder<T> extends BuilderBase { 380 private AsyncMasterRequestRpcRetryingCaller.Callable<T> callable; 381 382 private long operationTimeoutNs = -1L; 383 384 private long rpcTimeoutNs = -1L; 385 386 private int priority = PRIORITY_UNSET; 387 388 public MasterRequestCallerBuilder<T> action( 389 AsyncMasterRequestRpcRetryingCaller.Callable<T> callable) { 390 this.callable = callable; 391 return this; 392 } 393 394 public MasterRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) { 395 this.operationTimeoutNs = unit.toNanos(operationTimeout); 396 return this; 397 } 398 399 public MasterRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) { 400 this.rpcTimeoutNs = unit.toNanos(rpcTimeout); 401 return this; 402 } 403 404 public MasterRequestCallerBuilder<T> pause(long pause, TimeUnit unit) { 405 this.pauseNs = unit.toNanos(pause); 406 return this; 407 } 408 409 public MasterRequestCallerBuilder<T> pauseForCQTBE(long pause, TimeUnit unit) { 410 this.pauseForCQTBENs = unit.toNanos(pause); 411 return this; 412 } 413 414 public MasterRequestCallerBuilder<T> maxAttempts(int maxAttempts) { 415 this.maxAttempts = maxAttempts; 416 return this; 417 } 418 419 public MasterRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) { 420 this.startLogErrorsCnt = startLogErrorsCnt; 421 return this; 422 } 423 424 public MasterRequestCallerBuilder<T> priority(TableName tableName) { 425 this.priority = Math.max(priority, ConnectionUtils.getPriority(tableName)); 426 return this; 427 } 428 429 public MasterRequestCallerBuilder<T> priority(int priority) { 430 this.priority = Math.max(this.priority, priority); 431 return this; 432 } 433 434 private void preCheck() { 435 checkNotNull(callable, "action is null"); 436 } 437 438 public AsyncMasterRequestRpcRetryingCaller<T> build() { 439 preCheck(); 440 return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn, callable, priority, 441 pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); 442 } 443 444 /** 445 * Shortcut for {@code build().call()} 446 */ 447 public CompletableFuture<T> call() { 448 return build().call(); 449 } 450 } 451 452 public <T> MasterRequestCallerBuilder<T> masterRequest() { 453 return new MasterRequestCallerBuilder<>(); 454 } 455 456 public class AdminRequestCallerBuilder<T> extends BuilderBase { 457 // TODO: maybe we can reuse AdminRequestCallerBuild, MasterRequestCallerBuild etc. 458 459 private AsyncAdminRequestRetryingCaller.Callable<T> callable; 460 461 private long operationTimeoutNs = -1L; 462 463 private long rpcTimeoutNs = -1L; 464 465 private ServerName serverName; 466 467 private int priority; 468 469 public AdminRequestCallerBuilder<T> action( 470 AsyncAdminRequestRetryingCaller.Callable<T> callable) { 471 this.callable = callable; 472 return this; 473 } 474 475 public AdminRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) { 476 this.operationTimeoutNs = unit.toNanos(operationTimeout); 477 return this; 478 } 479 480 public AdminRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) { 481 this.rpcTimeoutNs = unit.toNanos(rpcTimeout); 482 return this; 483 } 484 485 public AdminRequestCallerBuilder<T> pause(long pause, TimeUnit unit) { 486 this.pauseNs = unit.toNanos(pause); 487 return this; 488 } 489 490 public AdminRequestCallerBuilder<T> pauseForCQTBE(long pause, TimeUnit unit) { 491 this.pauseForCQTBENs = unit.toNanos(pause); 492 return this; 493 } 494 495 public AdminRequestCallerBuilder<T> maxAttempts(int maxAttempts) { 496 this.maxAttempts = maxAttempts; 497 return this; 498 } 499 500 public AdminRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) { 501 this.startLogErrorsCnt = startLogErrorsCnt; 502 return this; 503 } 504 505 public AdminRequestCallerBuilder<T> serverName(ServerName serverName) { 506 this.serverName = serverName; 507 return this; 508 } 509 510 public AdminRequestCallerBuilder<T> priority(int priority) { 511 this.priority = priority; 512 return this; 513 } 514 515 public AsyncAdminRequestRetryingCaller<T> build() { 516 return new AsyncAdminRequestRetryingCaller<T>(retryTimer, conn, priority, pauseNs, 517 pauseForCQTBENs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, 518 checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null")); 519 } 520 521 public CompletableFuture<T> call() { 522 return build().call(); 523 } 524 } 525 526 public <T> AdminRequestCallerBuilder<T> adminRequest() { 527 return new AdminRequestCallerBuilder<>(); 528 } 529 530 public class ServerRequestCallerBuilder<T> extends BuilderBase { 531 532 private AsyncServerRequestRpcRetryingCaller.Callable<T> callable; 533 534 private long operationTimeoutNs = -1L; 535 536 private long rpcTimeoutNs = -1L; 537 538 private ServerName serverName; 539 540 public ServerRequestCallerBuilder<T> action( 541 AsyncServerRequestRpcRetryingCaller.Callable<T> callable) { 542 this.callable = callable; 543 return this; 544 } 545 546 public ServerRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) { 547 this.operationTimeoutNs = unit.toNanos(operationTimeout); 548 return this; 549 } 550 551 public ServerRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) { 552 this.rpcTimeoutNs = unit.toNanos(rpcTimeout); 553 return this; 554 } 555 556 public ServerRequestCallerBuilder<T> pause(long pause, TimeUnit unit) { 557 this.pauseNs = unit.toNanos(pause); 558 return this; 559 } 560 561 public ServerRequestCallerBuilder<T> pauseForCQTBE(long pause, TimeUnit unit) { 562 this.pauseForCQTBENs = unit.toNanos(pause); 563 return this; 564 } 565 566 public ServerRequestCallerBuilder<T> maxAttempts(int maxAttempts) { 567 this.maxAttempts = maxAttempts; 568 return this; 569 } 570 571 public ServerRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) { 572 this.startLogErrorsCnt = startLogErrorsCnt; 573 return this; 574 } 575 576 public ServerRequestCallerBuilder<T> serverName(ServerName serverName) { 577 this.serverName = serverName; 578 return this; 579 } 580 581 public AsyncServerRequestRpcRetryingCaller<T> build() { 582 return new AsyncServerRequestRpcRetryingCaller<T>(retryTimer, conn, pauseNs, pauseForCQTBENs, 583 maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, 584 checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null")); 585 } 586 587 public CompletableFuture<T> call() { 588 return build().call(); 589 } 590 } 591 592 public <T> ServerRequestCallerBuilder<T> serverRequest() { 593 return new ServerRequestCallerBuilder<>(); 594 } 595}