001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; 022import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument; 023import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull; 024 025import java.util.Collections; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.CompletableFuture; 029import java.util.concurrent.TimeUnit; 030import org.apache.hadoop.hbase.HRegionLocation; 031import org.apache.hadoop.hbase.ServerName; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 034import org.apache.hadoop.hbase.ipc.HBaseRpcController; 035import org.apache.yetus.audience.InterfaceAudience; 036 037import org.apache.hbase.thirdparty.io.netty.util.Timer; 038 039import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 040import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 041 042/** 043 * Factory to create an AsyncRpcRetryCaller. 044 * @since 2.0.0 045 */ 046@InterfaceAudience.Private 047class AsyncRpcRetryingCallerFactory { 048 049 private final AsyncConnectionImpl conn; 050 051 private final Timer retryTimer; 052 053 public AsyncRpcRetryingCallerFactory(AsyncConnectionImpl conn, Timer retryTimer) { 054 this.conn = conn; 055 this.retryTimer = retryTimer; 056 } 057 058 private abstract class BuilderBase { 059 060 protected long pauseNs = conn.connConf.getPauseNs(); 061 062 protected long pauseNsForServerOverloaded = conn.connConf.getPauseNsForServerOverloaded(); 063 064 protected int maxAttempts = retries2Attempts(conn.connConf.getMaxRetries()); 065 066 protected int startLogErrorsCnt = conn.connConf.getStartLogErrorsCnt(); 067 } 068 069 public class SingleRequestCallerBuilder<T> extends BuilderBase { 070 071 private TableName tableName; 072 073 private byte[] row; 074 075 private AsyncSingleRequestRpcRetryingCaller.Callable<T> callable; 076 077 private long operationTimeoutNs = -1L; 078 079 private long rpcTimeoutNs = -1L; 080 081 private RegionLocateType locateType = RegionLocateType.CURRENT; 082 083 private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID; 084 085 private int priority = PRIORITY_UNSET; 086 087 private Map<String, byte[]> requestAttributes = Collections.emptyMap(); 088 089 public SingleRequestCallerBuilder<T> table(TableName tableName) { 090 this.tableName = tableName; 091 return this; 092 } 093 094 public SingleRequestCallerBuilder<T> row(byte[] row) { 095 this.row = row; 096 return this; 097 } 098 099 public SingleRequestCallerBuilder<T> 100 action(AsyncSingleRequestRpcRetryingCaller.Callable<T> callable) { 101 this.callable = callable; 102 return this; 103 } 104 105 public SingleRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) { 106 this.operationTimeoutNs = unit.toNanos(operationTimeout); 107 return this; 108 } 109 110 public SingleRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) { 111 this.rpcTimeoutNs = unit.toNanos(rpcTimeout); 112 return this; 113 } 114 115 public SingleRequestCallerBuilder<T> locateType(RegionLocateType locateType) { 116 this.locateType = locateType; 117 return this; 118 } 119 120 public SingleRequestCallerBuilder<T> pause(long pause, TimeUnit unit) { 121 this.pauseNs = unit.toNanos(pause); 122 return this; 123 } 124 125 public SingleRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) { 126 this.pauseNsForServerOverloaded = unit.toNanos(pause); 127 return this; 128 } 129 130 public SingleRequestCallerBuilder<T> maxAttempts(int maxAttempts) { 131 this.maxAttempts = maxAttempts; 132 return this; 133 } 134 135 public SingleRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) { 136 this.startLogErrorsCnt = startLogErrorsCnt; 137 return this; 138 } 139 140 public SingleRequestCallerBuilder<T> replicaId(int replicaId) { 141 this.replicaId = replicaId; 142 return this; 143 } 144 145 public SingleRequestCallerBuilder<T> priority(int priority) { 146 this.priority = priority; 147 return this; 148 } 149 150 public SingleRequestCallerBuilder<T> 151 setRequestAttributes(Map<String, byte[]> requestAttributes) { 152 this.requestAttributes = requestAttributes; 153 return this; 154 } 155 156 private void preCheck() { 157 checkArgument(replicaId >= 0, "invalid replica id %s", replicaId); 158 checkNotNull(tableName, "tableName is null"); 159 checkNotNull(row, "row is null"); 160 checkNotNull(locateType, "locateType is null"); 161 checkNotNull(callable, "action is null"); 162 } 163 164 public AsyncSingleRequestRpcRetryingCaller<T> build() { 165 preCheck(); 166 return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, tableName, row, replicaId, 167 locateType, callable, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts, 168 operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, requestAttributes); 169 } 170 171 /** 172 * Shortcut for {@code build().call()} 173 */ 174 public CompletableFuture<T> call() { 175 return build().call(); 176 } 177 } 178 179 /** 180 * Create retry caller for single action, such as get, put, delete, etc. 181 */ 182 public <T> SingleRequestCallerBuilder<T> single() { 183 return new SingleRequestCallerBuilder<>(); 184 } 185 186 public class ScanSingleRegionCallerBuilder extends BuilderBase { 187 188 private Long scannerId = null; 189 190 private Scan scan; 191 192 private ScanMetrics scanMetrics; 193 194 private ScanResultCache resultCache; 195 196 private AdvancedScanResultConsumer consumer; 197 198 private ClientService.Interface stub; 199 200 private HRegionLocation loc; 201 202 private boolean isRegionServerRemote; 203 204 private long scannerLeaseTimeoutPeriodNs; 205 206 private long scanTimeoutNs; 207 208 private long rpcTimeoutNs; 209 210 private long lastNextCallNanos = System.nanoTime(); 211 212 private int priority = PRIORITY_UNSET; 213 214 private Map<String, byte[]> requestAttributes = Collections.emptyMap(); 215 216 public ScanSingleRegionCallerBuilder id(long scannerId) { 217 this.scannerId = scannerId; 218 return this; 219 } 220 221 public ScanSingleRegionCallerBuilder setScan(Scan scan) { 222 this.scan = scan; 223 this.priority = scan.getPriority(); 224 return this; 225 } 226 227 public ScanSingleRegionCallerBuilder metrics(ScanMetrics scanMetrics) { 228 this.scanMetrics = scanMetrics; 229 return this; 230 } 231 232 public ScanSingleRegionCallerBuilder remote(boolean isRegionServerRemote) { 233 this.isRegionServerRemote = isRegionServerRemote; 234 return this; 235 } 236 237 public ScanSingleRegionCallerBuilder resultCache(ScanResultCache resultCache) { 238 this.resultCache = resultCache; 239 return this; 240 } 241 242 public ScanSingleRegionCallerBuilder consumer(AdvancedScanResultConsumer consumer) { 243 this.consumer = consumer; 244 return this; 245 } 246 247 public ScanSingleRegionCallerBuilder stub(ClientService.Interface stub) { 248 this.stub = stub; 249 return this; 250 } 251 252 public ScanSingleRegionCallerBuilder location(HRegionLocation loc) { 253 this.loc = loc; 254 return this; 255 } 256 257 public ScanSingleRegionCallerBuilder scannerLeaseTimeoutPeriod(long scannerLeaseTimeoutPeriod, 258 TimeUnit unit) { 259 this.scannerLeaseTimeoutPeriodNs = unit.toNanos(scannerLeaseTimeoutPeriod); 260 return this; 261 } 262 263 public ScanSingleRegionCallerBuilder scanTimeout(long scanTimeout, TimeUnit unit) { 264 this.scanTimeoutNs = unit.toNanos(scanTimeout); 265 return this; 266 } 267 268 public ScanSingleRegionCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) { 269 this.rpcTimeoutNs = unit.toNanos(rpcTimeout); 270 return this; 271 } 272 273 public ScanSingleRegionCallerBuilder pause(long pause, TimeUnit unit) { 274 this.pauseNs = unit.toNanos(pause); 275 return this; 276 } 277 278 public ScanSingleRegionCallerBuilder lastNextCallNanos(long nanos) { 279 this.lastNextCallNanos = nanos; 280 return this; 281 } 282 283 public ScanSingleRegionCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) { 284 this.pauseNsForServerOverloaded = unit.toNanos(pause); 285 return this; 286 } 287 288 public ScanSingleRegionCallerBuilder maxAttempts(int maxAttempts) { 289 this.maxAttempts = maxAttempts; 290 return this; 291 } 292 293 public ScanSingleRegionCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) { 294 this.startLogErrorsCnt = startLogErrorsCnt; 295 return this; 296 } 297 298 public ScanSingleRegionCallerBuilder 299 setRequestAttributes(Map<String, byte[]> requestAttributes) { 300 this.requestAttributes = requestAttributes; 301 return this; 302 } 303 304 private void preCheck() { 305 checkArgument(scannerId != null, "invalid scannerId %d", scannerId); 306 checkNotNull(scan, "scan is null"); 307 checkNotNull(resultCache, "resultCache is null"); 308 checkNotNull(consumer, "consumer is null"); 309 checkNotNull(stub, "stub is null"); 310 checkNotNull(loc, "location is null"); 311 } 312 313 public AsyncScanSingleRegionRpcRetryingCaller build() { 314 preCheck(); 315 return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, scan, scanMetrics, 316 scannerId, resultCache, consumer, stub, loc, isRegionServerRemote, priority, 317 scannerLeaseTimeoutPeriodNs, pauseNs, pauseNsForServerOverloaded, maxAttempts, 318 scanTimeoutNs, rpcTimeoutNs, lastNextCallNanos, startLogErrorsCnt, requestAttributes); 319 } 320 321 /** 322 * Short cut for {@code build().start(HBaseRpcController, ScanResponse)}. 323 */ 324 public CompletableFuture<Boolean> start(HBaseRpcController controller, 325 ScanResponse respWhenOpen) { 326 return build().start(controller, respWhenOpen); 327 } 328 } 329 330 /** 331 * Create retry caller for scanning a region. 332 */ 333 public ScanSingleRegionCallerBuilder scanSingleRegion() { 334 return new ScanSingleRegionCallerBuilder(); 335 } 336 337 public class BatchCallerBuilder extends BuilderBase { 338 339 private TableName tableName; 340 341 private List<? extends Row> actions; 342 343 private long operationTimeoutNs = -1L; 344 345 private long rpcTimeoutNs = -1L; 346 347 private Map<String, byte[]> requestAttributes = Collections.emptyMap(); 348 349 public BatchCallerBuilder table(TableName tableName) { 350 this.tableName = tableName; 351 return this; 352 } 353 354 public BatchCallerBuilder actions(List<? extends Row> actions) { 355 this.actions = actions; 356 return this; 357 } 358 359 public BatchCallerBuilder operationTimeout(long operationTimeout, TimeUnit unit) { 360 this.operationTimeoutNs = unit.toNanos(operationTimeout); 361 return this; 362 } 363 364 public BatchCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) { 365 this.rpcTimeoutNs = unit.toNanos(rpcTimeout); 366 return this; 367 } 368 369 public BatchCallerBuilder pause(long pause, TimeUnit unit) { 370 this.pauseNs = unit.toNanos(pause); 371 return this; 372 } 373 374 public BatchCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) { 375 this.pauseNsForServerOverloaded = unit.toNanos(pause); 376 return this; 377 } 378 379 public BatchCallerBuilder maxAttempts(int maxAttempts) { 380 this.maxAttempts = maxAttempts; 381 return this; 382 } 383 384 public BatchCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) { 385 this.startLogErrorsCnt = startLogErrorsCnt; 386 return this; 387 } 388 389 public BatchCallerBuilder setRequestAttributes(Map<String, byte[]> requestAttributes) { 390 this.requestAttributes = requestAttributes; 391 return this; 392 } 393 394 public <T> AsyncBatchRpcRetryingCaller<T> build() { 395 return new AsyncBatchRpcRetryingCaller<>(retryTimer, conn, tableName, actions, pauseNs, 396 pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs, 397 startLogErrorsCnt, requestAttributes); 398 } 399 400 public <T> List<CompletableFuture<T>> call() { 401 return this.<T> build().call(); 402 } 403 } 404 405 public BatchCallerBuilder batch() { 406 return new BatchCallerBuilder(); 407 } 408 409 public class MasterRequestCallerBuilder<T> extends BuilderBase { 410 private AsyncMasterRequestRpcRetryingCaller.Callable<T> callable; 411 412 private long operationTimeoutNs = -1L; 413 414 private long rpcTimeoutNs = -1L; 415 416 private int priority = PRIORITY_UNSET; 417 418 private TableName tableName; 419 420 public MasterRequestCallerBuilder<T> 421 action(AsyncMasterRequestRpcRetryingCaller.Callable<T> callable) { 422 this.callable = callable; 423 return this; 424 } 425 426 public MasterRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) { 427 this.operationTimeoutNs = unit.toNanos(operationTimeout); 428 return this; 429 } 430 431 public MasterRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) { 432 this.rpcTimeoutNs = unit.toNanos(rpcTimeout); 433 return this; 434 } 435 436 public MasterRequestCallerBuilder<T> pause(long pause, TimeUnit unit) { 437 this.pauseNs = unit.toNanos(pause); 438 return this; 439 } 440 441 public MasterRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) { 442 this.pauseNsForServerOverloaded = unit.toNanos(pause); 443 return this; 444 } 445 446 public MasterRequestCallerBuilder<T> maxAttempts(int maxAttempts) { 447 this.maxAttempts = maxAttempts; 448 return this; 449 } 450 451 public MasterRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) { 452 this.startLogErrorsCnt = startLogErrorsCnt; 453 return this; 454 } 455 456 public MasterRequestCallerBuilder<T> tableName(TableName tableName) { 457 this.tableName = tableName; 458 return this; 459 } 460 461 public MasterRequestCallerBuilder<T> priority(int priority) { 462 this.priority = priority; 463 return this; 464 } 465 466 private void preCheck() { 467 checkNotNull(callable, "action is null"); 468 } 469 470 public AsyncMasterRequestRpcRetryingCaller<T> build() { 471 preCheck(); 472 return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn, callable, tableName, 473 priority, pauseNs, pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, 474 rpcTimeoutNs, startLogErrorsCnt); 475 } 476 477 /** 478 * Shortcut for {@code build().call()} 479 */ 480 public CompletableFuture<T> call() { 481 return build().call(); 482 } 483 } 484 485 public <T> MasterRequestCallerBuilder<T> masterRequest() { 486 return new MasterRequestCallerBuilder<>(); 487 } 488 489 public class AdminRequestCallerBuilder<T> extends BuilderBase { 490 // TODO: maybe we can reuse AdminRequestCallerBuild, MasterRequestCallerBuild etc. 491 492 private AsyncAdminRequestRetryingCaller.Callable<T> callable; 493 494 private long operationTimeoutNs = -1L; 495 496 private long rpcTimeoutNs = -1L; 497 498 private ServerName serverName; 499 500 private int priority; 501 502 public AdminRequestCallerBuilder<T> 503 action(AsyncAdminRequestRetryingCaller.Callable<T> callable) { 504 this.callable = callable; 505 return this; 506 } 507 508 public AdminRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) { 509 this.operationTimeoutNs = unit.toNanos(operationTimeout); 510 return this; 511 } 512 513 public AdminRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) { 514 this.rpcTimeoutNs = unit.toNanos(rpcTimeout); 515 return this; 516 } 517 518 public AdminRequestCallerBuilder<T> pause(long pause, TimeUnit unit) { 519 this.pauseNs = unit.toNanos(pause); 520 return this; 521 } 522 523 public AdminRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) { 524 this.pauseNsForServerOverloaded = unit.toNanos(pause); 525 return this; 526 } 527 528 public AdminRequestCallerBuilder<T> maxAttempts(int maxAttempts) { 529 this.maxAttempts = maxAttempts; 530 return this; 531 } 532 533 public AdminRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) { 534 this.startLogErrorsCnt = startLogErrorsCnt; 535 return this; 536 } 537 538 public AdminRequestCallerBuilder<T> serverName(ServerName serverName) { 539 this.serverName = serverName; 540 return this; 541 } 542 543 public AdminRequestCallerBuilder<T> priority(int priority) { 544 this.priority = priority; 545 return this; 546 } 547 548 public AsyncAdminRequestRetryingCaller<T> build() { 549 return new AsyncAdminRequestRetryingCaller<T>(retryTimer, conn, priority, pauseNs, 550 pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs, 551 startLogErrorsCnt, checkNotNull(serverName, "serverName is null"), 552 checkNotNull(callable, "action is null")); 553 } 554 555 public CompletableFuture<T> call() { 556 return build().call(); 557 } 558 } 559 560 public <T> AdminRequestCallerBuilder<T> adminRequest() { 561 return new AdminRequestCallerBuilder<>(); 562 } 563 564 public class ServerRequestCallerBuilder<T> extends BuilderBase { 565 566 private AsyncServerRequestRpcRetryingCaller.Callable<T> callable; 567 568 private long operationTimeoutNs = -1L; 569 570 private long rpcTimeoutNs = -1L; 571 572 private ServerName serverName; 573 574 public ServerRequestCallerBuilder<T> 575 action(AsyncServerRequestRpcRetryingCaller.Callable<T> callable) { 576 this.callable = callable; 577 return this; 578 } 579 580 public ServerRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) { 581 this.operationTimeoutNs = unit.toNanos(operationTimeout); 582 return this; 583 } 584 585 public ServerRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) { 586 this.rpcTimeoutNs = unit.toNanos(rpcTimeout); 587 return this; 588 } 589 590 public ServerRequestCallerBuilder<T> pause(long pause, TimeUnit unit) { 591 this.pauseNs = unit.toNanos(pause); 592 return this; 593 } 594 595 public ServerRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) { 596 this.pauseNsForServerOverloaded = unit.toNanos(pause); 597 return this; 598 } 599 600 public ServerRequestCallerBuilder<T> maxAttempts(int maxAttempts) { 601 this.maxAttempts = maxAttempts; 602 return this; 603 } 604 605 public ServerRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) { 606 this.startLogErrorsCnt = startLogErrorsCnt; 607 return this; 608 } 609 610 public ServerRequestCallerBuilder<T> serverName(ServerName serverName) { 611 this.serverName = serverName; 612 return this; 613 } 614 615 public AsyncServerRequestRpcRetryingCaller<T> build() { 616 return new AsyncServerRequestRpcRetryingCaller<T>(retryTimer, conn, pauseNs, 617 pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs, 618 startLogErrorsCnt, checkNotNull(serverName, "serverName is null"), 619 checkNotNull(callable, "action is null")); 620 } 621 622 public CompletableFuture<T> call() { 623 return build().call(); 624 } 625 } 626 627 public <T> ServerRequestCallerBuilder<T> serverRequest() { 628 return new ServerRequestCallerBuilder<>(); 629 } 630}