001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.calcPriority; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; 023import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument; 024import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull; 025 026import java.util.List; 027import java.util.concurrent.CompletableFuture; 028import java.util.concurrent.TimeUnit; 029import org.apache.hadoop.hbase.HRegionLocation; 030import org.apache.hadoop.hbase.ServerName; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 033import org.apache.hadoop.hbase.ipc.HBaseRpcController; 034import org.apache.yetus.audience.InterfaceAudience; 035 036import org.apache.hbase.thirdparty.io.netty.util.Timer; 037 038import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 039import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 040 041/** 042 * Factory to create an AsyncRpcRetryCaller. 043 * @since 2.0.0 044 */ 045@InterfaceAudience.Private 046class AsyncRpcRetryingCallerFactory { 047 048 private final AsyncConnectionImpl conn; 049 050 private final Timer retryTimer; 051 052 public AsyncRpcRetryingCallerFactory(AsyncConnectionImpl conn, Timer retryTimer) { 053 this.conn = conn; 054 this.retryTimer = retryTimer; 055 } 056 057 private abstract class BuilderBase { 058 059 protected long pauseNs = conn.connConf.getPauseNs(); 060 061 protected long pauseNsForServerOverloaded = conn.connConf.getPauseNsForServerOverloaded(); 062 063 protected int maxAttempts = retries2Attempts(conn.connConf.getMaxRetries()); 064 065 protected int startLogErrorsCnt = conn.connConf.getStartLogErrorsCnt(); 066 } 067 068 public class SingleRequestCallerBuilder<T> extends BuilderBase { 069 070 private TableName tableName; 071 072 private byte[] row; 073 074 private AsyncSingleRequestRpcRetryingCaller.Callable<T> callable; 075 076 private long operationTimeoutNs = -1L; 077 078 private long rpcTimeoutNs = -1L; 079 080 private RegionLocateType locateType = RegionLocateType.CURRENT; 081 082 private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID; 083 084 private int priority = PRIORITY_UNSET; 085 086 public SingleRequestCallerBuilder<T> table(TableName tableName) { 087 this.tableName = tableName; 088 return this; 089 } 090 091 public SingleRequestCallerBuilder<T> row(byte[] row) { 092 this.row = row; 093 return this; 094 } 095 096 public SingleRequestCallerBuilder<T> 097 action(AsyncSingleRequestRpcRetryingCaller.Callable<T> callable) { 098 this.callable = callable; 099 return this; 100 } 101 102 public SingleRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) { 103 this.operationTimeoutNs = unit.toNanos(operationTimeout); 104 return this; 105 } 106 107 public SingleRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) { 108 this.rpcTimeoutNs = unit.toNanos(rpcTimeout); 109 return this; 110 } 111 112 public SingleRequestCallerBuilder<T> locateType(RegionLocateType locateType) { 113 this.locateType = locateType; 114 return this; 115 } 116 117 public SingleRequestCallerBuilder<T> pause(long pause, TimeUnit unit) { 118 this.pauseNs = unit.toNanos(pause); 119 return this; 120 } 121 122 public SingleRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) { 123 this.pauseNsForServerOverloaded = unit.toNanos(pause); 124 return this; 125 } 126 127 public SingleRequestCallerBuilder<T> maxAttempts(int maxAttempts) { 128 this.maxAttempts = maxAttempts; 129 return this; 130 } 131 132 public SingleRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) { 133 this.startLogErrorsCnt = startLogErrorsCnt; 134 return this; 135 } 136 137 public SingleRequestCallerBuilder<T> replicaId(int replicaId) { 138 this.replicaId = replicaId; 139 return this; 140 } 141 142 public SingleRequestCallerBuilder<T> priority(int priority) { 143 this.priority = priority; 144 return this; 145 } 146 147 private void preCheck() { 148 checkArgument(replicaId >= 0, "invalid replica id %s", replicaId); 149 checkNotNull(tableName, "tableName is null"); 150 checkNotNull(row, "row is null"); 151 checkNotNull(locateType, "locateType is null"); 152 checkNotNull(callable, "action is null"); 153 this.priority = calcPriority(priority, tableName); 154 } 155 156 public AsyncSingleRequestRpcRetryingCaller<T> build() { 157 preCheck(); 158 return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, tableName, row, replicaId, 159 locateType, callable, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts, 160 operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); 161 } 162 163 /** 164 * Shortcut for {@code build().call()} 165 */ 166 public CompletableFuture<T> call() { 167 return build().call(); 168 } 169 } 170 171 /** 172 * Create retry caller for single action, such as get, put, delete, etc. 173 */ 174 public <T> SingleRequestCallerBuilder<T> single() { 175 return new SingleRequestCallerBuilder<>(); 176 } 177 178 public class ScanSingleRegionCallerBuilder extends BuilderBase { 179 180 private Long scannerId = null; 181 182 private Scan scan; 183 184 private ScanMetrics scanMetrics; 185 186 private ScanResultCache resultCache; 187 188 private AdvancedScanResultConsumer consumer; 189 190 private ClientService.Interface stub; 191 192 private HRegionLocation loc; 193 194 private boolean isRegionServerRemote; 195 196 private long scannerLeaseTimeoutPeriodNs; 197 198 private long scanTimeoutNs; 199 200 private long rpcTimeoutNs; 201 202 private int priority = PRIORITY_UNSET; 203 204 public ScanSingleRegionCallerBuilder id(long scannerId) { 205 this.scannerId = scannerId; 206 return this; 207 } 208 209 public ScanSingleRegionCallerBuilder setScan(Scan scan) { 210 this.scan = scan; 211 this.priority = scan.getPriority(); 212 return this; 213 } 214 215 public ScanSingleRegionCallerBuilder metrics(ScanMetrics scanMetrics) { 216 this.scanMetrics = scanMetrics; 217 return this; 218 } 219 220 public ScanSingleRegionCallerBuilder remote(boolean isRegionServerRemote) { 221 this.isRegionServerRemote = isRegionServerRemote; 222 return this; 223 } 224 225 public ScanSingleRegionCallerBuilder resultCache(ScanResultCache resultCache) { 226 this.resultCache = resultCache; 227 return this; 228 } 229 230 public ScanSingleRegionCallerBuilder consumer(AdvancedScanResultConsumer consumer) { 231 this.consumer = consumer; 232 return this; 233 } 234 235 public ScanSingleRegionCallerBuilder stub(ClientService.Interface stub) { 236 this.stub = stub; 237 return this; 238 } 239 240 public ScanSingleRegionCallerBuilder location(HRegionLocation loc) { 241 this.loc = loc; 242 return this; 243 } 244 245 public ScanSingleRegionCallerBuilder scannerLeaseTimeoutPeriod(long scannerLeaseTimeoutPeriod, 246 TimeUnit unit) { 247 this.scannerLeaseTimeoutPeriodNs = unit.toNanos(scannerLeaseTimeoutPeriod); 248 return this; 249 } 250 251 public ScanSingleRegionCallerBuilder scanTimeout(long scanTimeout, TimeUnit unit) { 252 this.scanTimeoutNs = unit.toNanos(scanTimeout); 253 return this; 254 } 255 256 public ScanSingleRegionCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) { 257 this.rpcTimeoutNs = unit.toNanos(rpcTimeout); 258 return this; 259 } 260 261 public ScanSingleRegionCallerBuilder pause(long pause, TimeUnit unit) { 262 this.pauseNs = unit.toNanos(pause); 263 return this; 264 } 265 266 public ScanSingleRegionCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) { 267 this.pauseNsForServerOverloaded = unit.toNanos(pause); 268 return this; 269 } 270 271 public ScanSingleRegionCallerBuilder maxAttempts(int maxAttempts) { 272 this.maxAttempts = maxAttempts; 273 return this; 274 } 275 276 public ScanSingleRegionCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) { 277 this.startLogErrorsCnt = startLogErrorsCnt; 278 return this; 279 } 280 281 private void preCheck() { 282 checkArgument(scannerId != null, "invalid scannerId %d", scannerId); 283 checkNotNull(scan, "scan is null"); 284 checkNotNull(resultCache, "resultCache is null"); 285 checkNotNull(consumer, "consumer is null"); 286 checkNotNull(stub, "stub is null"); 287 checkNotNull(loc, "location is null"); 288 this.priority = calcPriority(priority, loc.getRegion().getTable()); 289 } 290 291 public AsyncScanSingleRegionRpcRetryingCaller build() { 292 preCheck(); 293 return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, scan, scanMetrics, 294 scannerId, resultCache, consumer, stub, loc, isRegionServerRemote, priority, 295 scannerLeaseTimeoutPeriodNs, pauseNs, pauseNsForServerOverloaded, maxAttempts, 296 scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); 297 } 298 299 /** 300 * Short cut for {@code build().start(HBaseRpcController, ScanResponse)}. 301 */ 302 public CompletableFuture<Boolean> start(HBaseRpcController controller, 303 ScanResponse respWhenOpen) { 304 return build().start(controller, respWhenOpen); 305 } 306 } 307 308 /** 309 * Create retry caller for scanning a region. 310 */ 311 public ScanSingleRegionCallerBuilder scanSingleRegion() { 312 return new ScanSingleRegionCallerBuilder(); 313 } 314 315 public class BatchCallerBuilder extends BuilderBase { 316 317 private TableName tableName; 318 319 private List<? extends Row> actions; 320 321 private long operationTimeoutNs = -1L; 322 323 private long rpcTimeoutNs = -1L; 324 325 public BatchCallerBuilder table(TableName tableName) { 326 this.tableName = tableName; 327 return this; 328 } 329 330 public BatchCallerBuilder actions(List<? extends Row> actions) { 331 this.actions = actions; 332 return this; 333 } 334 335 public BatchCallerBuilder operationTimeout(long operationTimeout, TimeUnit unit) { 336 this.operationTimeoutNs = unit.toNanos(operationTimeout); 337 return this; 338 } 339 340 public BatchCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) { 341 this.rpcTimeoutNs = unit.toNanos(rpcTimeout); 342 return this; 343 } 344 345 public BatchCallerBuilder pause(long pause, TimeUnit unit) { 346 this.pauseNs = unit.toNanos(pause); 347 return this; 348 } 349 350 public BatchCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) { 351 this.pauseNsForServerOverloaded = unit.toNanos(pause); 352 return this; 353 } 354 355 public BatchCallerBuilder maxAttempts(int maxAttempts) { 356 this.maxAttempts = maxAttempts; 357 return this; 358 } 359 360 public BatchCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) { 361 this.startLogErrorsCnt = startLogErrorsCnt; 362 return this; 363 } 364 365 public <T> AsyncBatchRpcRetryingCaller<T> build() { 366 return new AsyncBatchRpcRetryingCaller<>(retryTimer, conn, tableName, actions, pauseNs, 367 pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs, 368 startLogErrorsCnt); 369 } 370 371 public <T> List<CompletableFuture<T>> call() { 372 return this.<T> build().call(); 373 } 374 } 375 376 public BatchCallerBuilder batch() { 377 return new BatchCallerBuilder(); 378 } 379 380 public class MasterRequestCallerBuilder<T> extends BuilderBase { 381 private AsyncMasterRequestRpcRetryingCaller.Callable<T> callable; 382 383 private long operationTimeoutNs = -1L; 384 385 private long rpcTimeoutNs = -1L; 386 387 private int priority = PRIORITY_UNSET; 388 389 public MasterRequestCallerBuilder<T> 390 action(AsyncMasterRequestRpcRetryingCaller.Callable<T> callable) { 391 this.callable = callable; 392 return this; 393 } 394 395 public MasterRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) { 396 this.operationTimeoutNs = unit.toNanos(operationTimeout); 397 return this; 398 } 399 400 public MasterRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) { 401 this.rpcTimeoutNs = unit.toNanos(rpcTimeout); 402 return this; 403 } 404 405 public MasterRequestCallerBuilder<T> pause(long pause, TimeUnit unit) { 406 this.pauseNs = unit.toNanos(pause); 407 return this; 408 } 409 410 public MasterRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) { 411 this.pauseNsForServerOverloaded = unit.toNanos(pause); 412 return this; 413 } 414 415 public MasterRequestCallerBuilder<T> maxAttempts(int maxAttempts) { 416 this.maxAttempts = maxAttempts; 417 return this; 418 } 419 420 public MasterRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) { 421 this.startLogErrorsCnt = startLogErrorsCnt; 422 return this; 423 } 424 425 public MasterRequestCallerBuilder<T> priority(TableName tableName) { 426 this.priority = Math.max(priority, ConnectionUtils.getPriority(tableName)); 427 return this; 428 } 429 430 public MasterRequestCallerBuilder<T> priority(int priority) { 431 this.priority = Math.max(this.priority, priority); 432 return this; 433 } 434 435 private void preCheck() { 436 checkNotNull(callable, "action is null"); 437 } 438 439 public AsyncMasterRequestRpcRetryingCaller<T> build() { 440 preCheck(); 441 return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn, callable, priority, 442 pauseNs, pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs, 443 startLogErrorsCnt); 444 } 445 446 /** 447 * Shortcut for {@code build().call()} 448 */ 449 public CompletableFuture<T> call() { 450 return build().call(); 451 } 452 } 453 454 public <T> MasterRequestCallerBuilder<T> masterRequest() { 455 return new MasterRequestCallerBuilder<>(); 456 } 457 458 public class AdminRequestCallerBuilder<T> extends BuilderBase { 459 // TODO: maybe we can reuse AdminRequestCallerBuild, MasterRequestCallerBuild etc. 460 461 private AsyncAdminRequestRetryingCaller.Callable<T> callable; 462 463 private long operationTimeoutNs = -1L; 464 465 private long rpcTimeoutNs = -1L; 466 467 private ServerName serverName; 468 469 private int priority; 470 471 public AdminRequestCallerBuilder<T> 472 action(AsyncAdminRequestRetryingCaller.Callable<T> callable) { 473 this.callable = callable; 474 return this; 475 } 476 477 public AdminRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) { 478 this.operationTimeoutNs = unit.toNanos(operationTimeout); 479 return this; 480 } 481 482 public AdminRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) { 483 this.rpcTimeoutNs = unit.toNanos(rpcTimeout); 484 return this; 485 } 486 487 public AdminRequestCallerBuilder<T> pause(long pause, TimeUnit unit) { 488 this.pauseNs = unit.toNanos(pause); 489 return this; 490 } 491 492 public AdminRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) { 493 this.pauseNsForServerOverloaded = unit.toNanos(pause); 494 return this; 495 } 496 497 public AdminRequestCallerBuilder<T> maxAttempts(int maxAttempts) { 498 this.maxAttempts = maxAttempts; 499 return this; 500 } 501 502 public AdminRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) { 503 this.startLogErrorsCnt = startLogErrorsCnt; 504 return this; 505 } 506 507 public AdminRequestCallerBuilder<T> serverName(ServerName serverName) { 508 this.serverName = serverName; 509 return this; 510 } 511 512 public AdminRequestCallerBuilder<T> priority(int priority) { 513 this.priority = priority; 514 return this; 515 } 516 517 public AsyncAdminRequestRetryingCaller<T> build() { 518 return new AsyncAdminRequestRetryingCaller<T>(retryTimer, conn, priority, pauseNs, 519 pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs, 520 startLogErrorsCnt, checkNotNull(serverName, "serverName is null"), 521 checkNotNull(callable, "action is null")); 522 } 523 524 public CompletableFuture<T> call() { 525 return build().call(); 526 } 527 } 528 529 public <T> AdminRequestCallerBuilder<T> adminRequest() { 530 return new AdminRequestCallerBuilder<>(); 531 } 532 533 public class ServerRequestCallerBuilder<T> extends BuilderBase { 534 535 private AsyncServerRequestRpcRetryingCaller.Callable<T> callable; 536 537 private long operationTimeoutNs = -1L; 538 539 private long rpcTimeoutNs = -1L; 540 541 private ServerName serverName; 542 543 public ServerRequestCallerBuilder<T> 544 action(AsyncServerRequestRpcRetryingCaller.Callable<T> callable) { 545 this.callable = callable; 546 return this; 547 } 548 549 public ServerRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) { 550 this.operationTimeoutNs = unit.toNanos(operationTimeout); 551 return this; 552 } 553 554 public ServerRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) { 555 this.rpcTimeoutNs = unit.toNanos(rpcTimeout); 556 return this; 557 } 558 559 public ServerRequestCallerBuilder<T> pause(long pause, TimeUnit unit) { 560 this.pauseNs = unit.toNanos(pause); 561 return this; 562 } 563 564 public ServerRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) { 565 this.pauseNsForServerOverloaded = unit.toNanos(pause); 566 return this; 567 } 568 569 public ServerRequestCallerBuilder<T> maxAttempts(int maxAttempts) { 570 this.maxAttempts = maxAttempts; 571 return this; 572 } 573 574 public ServerRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) { 575 this.startLogErrorsCnt = startLogErrorsCnt; 576 return this; 577 } 578 579 public ServerRequestCallerBuilder<T> serverName(ServerName serverName) { 580 this.serverName = serverName; 581 return this; 582 } 583 584 public AsyncServerRequestRpcRetryingCaller<T> build() { 585 return new AsyncServerRequestRpcRetryingCaller<T>(retryTimer, conn, pauseNs, 586 pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs, 587 startLogErrorsCnt, checkNotNull(serverName, "serverName is null"), 588 checkNotNull(callable, "action is null")); 589 } 590 591 public CompletableFuture<T> call() { 592 return build().call(); 593 } 594 } 595 596 public <T> ServerRequestCallerBuilder<T> serverRequest() { 597 return new ServerRequestCallerBuilder<>(); 598 } 599}