001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.calcPriority; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; 023import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument; 024import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull; 025 026import java.util.Collections; 027import java.util.List; 028import java.util.Map; 029import java.util.concurrent.CompletableFuture; 030import java.util.concurrent.TimeUnit; 031import org.apache.hadoop.hbase.HRegionLocation; 032import org.apache.hadoop.hbase.ServerName; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 035import org.apache.hadoop.hbase.ipc.HBaseRpcController; 036import org.apache.yetus.audience.InterfaceAudience; 037 038import org.apache.hbase.thirdparty.io.netty.util.Timer; 039 040import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 041import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 042 043/** 044 * Factory to create an AsyncRpcRetryCaller. 045 * @since 2.0.0 046 */ 047@InterfaceAudience.Private 048class AsyncRpcRetryingCallerFactory { 049 050 private final AsyncConnectionImpl conn; 051 052 private final Timer retryTimer; 053 054 public AsyncRpcRetryingCallerFactory(AsyncConnectionImpl conn, Timer retryTimer) { 055 this.conn = conn; 056 this.retryTimer = retryTimer; 057 } 058 059 private abstract class BuilderBase { 060 061 protected long pauseNs = conn.connConf.getPauseNs(); 062 063 protected long pauseNsForServerOverloaded = conn.connConf.getPauseNsForServerOverloaded(); 064 065 protected int maxAttempts = retries2Attempts(conn.connConf.getMaxRetries()); 066 067 protected int startLogErrorsCnt = conn.connConf.getStartLogErrorsCnt(); 068 } 069 070 public class SingleRequestCallerBuilder<T> extends BuilderBase { 071 072 private TableName tableName; 073 074 private byte[] row; 075 076 private AsyncSingleRequestRpcRetryingCaller.Callable<T> callable; 077 078 private long operationTimeoutNs = -1L; 079 080 private long rpcTimeoutNs = -1L; 081 082 private RegionLocateType locateType = RegionLocateType.CURRENT; 083 084 private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID; 085 086 private int priority = PRIORITY_UNSET; 087 088 private Map<String, byte[]> requestAttributes = Collections.emptyMap(); 089 090 public SingleRequestCallerBuilder<T> table(TableName tableName) { 091 this.tableName = tableName; 092 return this; 093 } 094 095 public SingleRequestCallerBuilder<T> row(byte[] row) { 096 this.row = row; 097 return this; 098 } 099 100 public SingleRequestCallerBuilder<T> 101 action(AsyncSingleRequestRpcRetryingCaller.Callable<T> callable) { 102 this.callable = callable; 103 return this; 104 } 105 106 public SingleRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) { 107 this.operationTimeoutNs = unit.toNanos(operationTimeout); 108 return this; 109 } 110 111 public SingleRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) { 112 this.rpcTimeoutNs = unit.toNanos(rpcTimeout); 113 return this; 114 } 115 116 public SingleRequestCallerBuilder<T> locateType(RegionLocateType locateType) { 117 this.locateType = locateType; 118 return this; 119 } 120 121 public SingleRequestCallerBuilder<T> pause(long pause, TimeUnit unit) { 122 this.pauseNs = unit.toNanos(pause); 123 return this; 124 } 125 126 public SingleRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) { 127 this.pauseNsForServerOverloaded = unit.toNanos(pause); 128 return this; 129 } 130 131 public SingleRequestCallerBuilder<T> maxAttempts(int maxAttempts) { 132 this.maxAttempts = maxAttempts; 133 return this; 134 } 135 136 public SingleRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) { 137 this.startLogErrorsCnt = startLogErrorsCnt; 138 return this; 139 } 140 141 public SingleRequestCallerBuilder<T> replicaId(int replicaId) { 142 this.replicaId = replicaId; 143 return this; 144 } 145 146 public SingleRequestCallerBuilder<T> priority(int priority) { 147 this.priority = priority; 148 return this; 149 } 150 151 public SingleRequestCallerBuilder<T> 152 setRequestAttributes(Map<String, byte[]> requestAttributes) { 153 this.requestAttributes = requestAttributes; 154 return this; 155 } 156 157 private void preCheck() { 158 checkArgument(replicaId >= 0, "invalid replica id %s", replicaId); 159 checkNotNull(tableName, "tableName is null"); 160 checkNotNull(row, "row is null"); 161 checkNotNull(locateType, "locateType is null"); 162 checkNotNull(callable, "action is null"); 163 this.priority = calcPriority(priority, tableName); 164 } 165 166 public AsyncSingleRequestRpcRetryingCaller<T> build() { 167 preCheck(); 168 return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, tableName, row, replicaId, 169 locateType, callable, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts, 170 operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, requestAttributes); 171 } 172 173 /** 174 * Shortcut for {@code build().call()} 175 */ 176 public CompletableFuture<T> call() { 177 return build().call(); 178 } 179 } 180 181 /** 182 * Create retry caller for single action, such as get, put, delete, etc. 183 */ 184 public <T> SingleRequestCallerBuilder<T> single() { 185 return new SingleRequestCallerBuilder<>(); 186 } 187 188 public class ScanSingleRegionCallerBuilder extends BuilderBase { 189 190 private Long scannerId = null; 191 192 private Scan scan; 193 194 private ScanMetrics scanMetrics; 195 196 private ScanResultCache resultCache; 197 198 private AdvancedScanResultConsumer consumer; 199 200 private ClientService.Interface stub; 201 202 private HRegionLocation loc; 203 204 private boolean isRegionServerRemote; 205 206 private long scannerLeaseTimeoutPeriodNs; 207 208 private long scanTimeoutNs; 209 210 private long rpcTimeoutNs; 211 212 private int priority = PRIORITY_UNSET; 213 214 private Map<String, byte[]> requestAttributes = Collections.emptyMap(); 215 216 public ScanSingleRegionCallerBuilder id(long scannerId) { 217 this.scannerId = scannerId; 218 return this; 219 } 220 221 public ScanSingleRegionCallerBuilder setScan(Scan scan) { 222 this.scan = scan; 223 this.priority = scan.getPriority(); 224 return this; 225 } 226 227 public ScanSingleRegionCallerBuilder metrics(ScanMetrics scanMetrics) { 228 this.scanMetrics = scanMetrics; 229 return this; 230 } 231 232 public ScanSingleRegionCallerBuilder remote(boolean isRegionServerRemote) { 233 this.isRegionServerRemote = isRegionServerRemote; 234 return this; 235 } 236 237 public ScanSingleRegionCallerBuilder resultCache(ScanResultCache resultCache) { 238 this.resultCache = resultCache; 239 return this; 240 } 241 242 public ScanSingleRegionCallerBuilder consumer(AdvancedScanResultConsumer consumer) { 243 this.consumer = consumer; 244 return this; 245 } 246 247 public ScanSingleRegionCallerBuilder stub(ClientService.Interface stub) { 248 this.stub = stub; 249 return this; 250 } 251 252 public ScanSingleRegionCallerBuilder location(HRegionLocation loc) { 253 this.loc = loc; 254 return this; 255 } 256 257 public ScanSingleRegionCallerBuilder scannerLeaseTimeoutPeriod(long scannerLeaseTimeoutPeriod, 258 TimeUnit unit) { 259 this.scannerLeaseTimeoutPeriodNs = unit.toNanos(scannerLeaseTimeoutPeriod); 260 return this; 261 } 262 263 public ScanSingleRegionCallerBuilder scanTimeout(long scanTimeout, TimeUnit unit) { 264 this.scanTimeoutNs = unit.toNanos(scanTimeout); 265 return this; 266 } 267 268 public ScanSingleRegionCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) { 269 this.rpcTimeoutNs = unit.toNanos(rpcTimeout); 270 return this; 271 } 272 273 public ScanSingleRegionCallerBuilder pause(long pause, TimeUnit unit) { 274 this.pauseNs = unit.toNanos(pause); 275 return this; 276 } 277 278 public ScanSingleRegionCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) { 279 this.pauseNsForServerOverloaded = unit.toNanos(pause); 280 return this; 281 } 282 283 public ScanSingleRegionCallerBuilder maxAttempts(int maxAttempts) { 284 this.maxAttempts = maxAttempts; 285 return this; 286 } 287 288 public ScanSingleRegionCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) { 289 this.startLogErrorsCnt = startLogErrorsCnt; 290 return this; 291 } 292 293 public ScanSingleRegionCallerBuilder 294 setRequestAttributes(Map<String, byte[]> requestAttributes) { 295 this.requestAttributes = requestAttributes; 296 return this; 297 } 298 299 private void preCheck() { 300 checkArgument(scannerId != null, "invalid scannerId %d", scannerId); 301 checkNotNull(scan, "scan is null"); 302 checkNotNull(resultCache, "resultCache is null"); 303 checkNotNull(consumer, "consumer is null"); 304 checkNotNull(stub, "stub is null"); 305 checkNotNull(loc, "location is null"); 306 this.priority = calcPriority(priority, loc.getRegion().getTable()); 307 } 308 309 public AsyncScanSingleRegionRpcRetryingCaller build() { 310 preCheck(); 311 return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, scan, scanMetrics, 312 scannerId, resultCache, consumer, stub, loc, isRegionServerRemote, priority, 313 scannerLeaseTimeoutPeriodNs, pauseNs, pauseNsForServerOverloaded, maxAttempts, 314 scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, requestAttributes); 315 } 316 317 /** 318 * Short cut for {@code build().start(HBaseRpcController, ScanResponse)}. 319 */ 320 public CompletableFuture<Boolean> start(HBaseRpcController controller, 321 ScanResponse respWhenOpen) { 322 return build().start(controller, respWhenOpen); 323 } 324 } 325 326 /** 327 * Create retry caller for scanning a region. 328 */ 329 public ScanSingleRegionCallerBuilder scanSingleRegion() { 330 return new ScanSingleRegionCallerBuilder(); 331 } 332 333 public class BatchCallerBuilder extends BuilderBase { 334 335 private TableName tableName; 336 337 private List<? extends Row> actions; 338 339 private long operationTimeoutNs = -1L; 340 341 private long rpcTimeoutNs = -1L; 342 343 private Map<String, byte[]> requestAttributes = Collections.emptyMap(); 344 345 public BatchCallerBuilder table(TableName tableName) { 346 this.tableName = tableName; 347 return this; 348 } 349 350 public BatchCallerBuilder actions(List<? extends Row> actions) { 351 this.actions = actions; 352 return this; 353 } 354 355 public BatchCallerBuilder operationTimeout(long operationTimeout, TimeUnit unit) { 356 this.operationTimeoutNs = unit.toNanos(operationTimeout); 357 return this; 358 } 359 360 public BatchCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) { 361 this.rpcTimeoutNs = unit.toNanos(rpcTimeout); 362 return this; 363 } 364 365 public BatchCallerBuilder pause(long pause, TimeUnit unit) { 366 this.pauseNs = unit.toNanos(pause); 367 return this; 368 } 369 370 public BatchCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) { 371 this.pauseNsForServerOverloaded = unit.toNanos(pause); 372 return this; 373 } 374 375 public BatchCallerBuilder maxAttempts(int maxAttempts) { 376 this.maxAttempts = maxAttempts; 377 return this; 378 } 379 380 public BatchCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) { 381 this.startLogErrorsCnt = startLogErrorsCnt; 382 return this; 383 } 384 385 public BatchCallerBuilder setRequestAttributes(Map<String, byte[]> requestAttributes) { 386 this.requestAttributes = requestAttributes; 387 return this; 388 } 389 390 public <T> AsyncBatchRpcRetryingCaller<T> build() { 391 return new AsyncBatchRpcRetryingCaller<>(retryTimer, conn, tableName, actions, pauseNs, 392 pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs, 393 startLogErrorsCnt, requestAttributes); 394 } 395 396 public <T> List<CompletableFuture<T>> call() { 397 return this.<T> build().call(); 398 } 399 } 400 401 public BatchCallerBuilder batch() { 402 return new BatchCallerBuilder(); 403 } 404 405 public class MasterRequestCallerBuilder<T> extends BuilderBase { 406 private AsyncMasterRequestRpcRetryingCaller.Callable<T> callable; 407 408 private long operationTimeoutNs = -1L; 409 410 private long rpcTimeoutNs = -1L; 411 412 private int priority = PRIORITY_UNSET; 413 414 public MasterRequestCallerBuilder<T> 415 action(AsyncMasterRequestRpcRetryingCaller.Callable<T> callable) { 416 this.callable = callable; 417 return this; 418 } 419 420 public MasterRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) { 421 this.operationTimeoutNs = unit.toNanos(operationTimeout); 422 return this; 423 } 424 425 public MasterRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) { 426 this.rpcTimeoutNs = unit.toNanos(rpcTimeout); 427 return this; 428 } 429 430 public MasterRequestCallerBuilder<T> pause(long pause, TimeUnit unit) { 431 this.pauseNs = unit.toNanos(pause); 432 return this; 433 } 434 435 public MasterRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) { 436 this.pauseNsForServerOverloaded = unit.toNanos(pause); 437 return this; 438 } 439 440 public MasterRequestCallerBuilder<T> maxAttempts(int maxAttempts) { 441 this.maxAttempts = maxAttempts; 442 return this; 443 } 444 445 public MasterRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) { 446 this.startLogErrorsCnt = startLogErrorsCnt; 447 return this; 448 } 449 450 public MasterRequestCallerBuilder<T> priority(TableName tableName) { 451 this.priority = Math.max(priority, ConnectionUtils.getPriority(tableName)); 452 return this; 453 } 454 455 public MasterRequestCallerBuilder<T> priority(int priority) { 456 this.priority = Math.max(this.priority, priority); 457 return this; 458 } 459 460 private void preCheck() { 461 checkNotNull(callable, "action is null"); 462 } 463 464 public AsyncMasterRequestRpcRetryingCaller<T> build() { 465 preCheck(); 466 return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn, callable, priority, 467 pauseNs, pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs, 468 startLogErrorsCnt); 469 } 470 471 /** 472 * Shortcut for {@code build().call()} 473 */ 474 public CompletableFuture<T> call() { 475 return build().call(); 476 } 477 } 478 479 public <T> MasterRequestCallerBuilder<T> masterRequest() { 480 return new MasterRequestCallerBuilder<>(); 481 } 482 483 public class AdminRequestCallerBuilder<T> extends BuilderBase { 484 // TODO: maybe we can reuse AdminRequestCallerBuild, MasterRequestCallerBuild etc. 485 486 private AsyncAdminRequestRetryingCaller.Callable<T> callable; 487 488 private long operationTimeoutNs = -1L; 489 490 private long rpcTimeoutNs = -1L; 491 492 private ServerName serverName; 493 494 private int priority; 495 496 public AdminRequestCallerBuilder<T> 497 action(AsyncAdminRequestRetryingCaller.Callable<T> callable) { 498 this.callable = callable; 499 return this; 500 } 501 502 public AdminRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) { 503 this.operationTimeoutNs = unit.toNanos(operationTimeout); 504 return this; 505 } 506 507 public AdminRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) { 508 this.rpcTimeoutNs = unit.toNanos(rpcTimeout); 509 return this; 510 } 511 512 public AdminRequestCallerBuilder<T> pause(long pause, TimeUnit unit) { 513 this.pauseNs = unit.toNanos(pause); 514 return this; 515 } 516 517 public AdminRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) { 518 this.pauseNsForServerOverloaded = unit.toNanos(pause); 519 return this; 520 } 521 522 public AdminRequestCallerBuilder<T> maxAttempts(int maxAttempts) { 523 this.maxAttempts = maxAttempts; 524 return this; 525 } 526 527 public AdminRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) { 528 this.startLogErrorsCnt = startLogErrorsCnt; 529 return this; 530 } 531 532 public AdminRequestCallerBuilder<T> serverName(ServerName serverName) { 533 this.serverName = serverName; 534 return this; 535 } 536 537 public AdminRequestCallerBuilder<T> priority(int priority) { 538 this.priority = priority; 539 return this; 540 } 541 542 public AsyncAdminRequestRetryingCaller<T> build() { 543 return new AsyncAdminRequestRetryingCaller<T>(retryTimer, conn, priority, pauseNs, 544 pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs, 545 startLogErrorsCnt, checkNotNull(serverName, "serverName is null"), 546 checkNotNull(callable, "action is null")); 547 } 548 549 public CompletableFuture<T> call() { 550 return build().call(); 551 } 552 } 553 554 public <T> AdminRequestCallerBuilder<T> adminRequest() { 555 return new AdminRequestCallerBuilder<>(); 556 } 557 558 public class ServerRequestCallerBuilder<T> extends BuilderBase { 559 560 private AsyncServerRequestRpcRetryingCaller.Callable<T> callable; 561 562 private long operationTimeoutNs = -1L; 563 564 private long rpcTimeoutNs = -1L; 565 566 private ServerName serverName; 567 568 public ServerRequestCallerBuilder<T> 569 action(AsyncServerRequestRpcRetryingCaller.Callable<T> callable) { 570 this.callable = callable; 571 return this; 572 } 573 574 public ServerRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) { 575 this.operationTimeoutNs = unit.toNanos(operationTimeout); 576 return this; 577 } 578 579 public ServerRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) { 580 this.rpcTimeoutNs = unit.toNanos(rpcTimeout); 581 return this; 582 } 583 584 public ServerRequestCallerBuilder<T> pause(long pause, TimeUnit unit) { 585 this.pauseNs = unit.toNanos(pause); 586 return this; 587 } 588 589 public ServerRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) { 590 this.pauseNsForServerOverloaded = unit.toNanos(pause); 591 return this; 592 } 593 594 public ServerRequestCallerBuilder<T> maxAttempts(int maxAttempts) { 595 this.maxAttempts = maxAttempts; 596 return this; 597 } 598 599 public ServerRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) { 600 this.startLogErrorsCnt = startLogErrorsCnt; 601 return this; 602 } 603 604 public ServerRequestCallerBuilder<T> serverName(ServerName serverName) { 605 this.serverName = serverName; 606 return this; 607 } 608 609 public AsyncServerRequestRpcRetryingCaller<T> build() { 610 return new AsyncServerRequestRpcRetryingCaller<T>(retryTimer, conn, pauseNs, 611 pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs, 612 startLogErrorsCnt, checkNotNull(serverName, "serverName is null"), 613 checkNotNull(callable, "action is null")); 614 } 615 616 public CompletableFuture<T> call() { 617 return build().call(); 618 } 619 } 620 621 public <T> ServerRequestCallerBuilder<T> serverRequest() { 622 return new ServerRequestCallerBuilder<>(); 623 } 624}