001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; 022import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument; 023import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull; 024 025import java.util.Collections; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.CompletableFuture; 029import java.util.concurrent.TimeUnit; 030import org.apache.hadoop.hbase.HRegionLocation; 031import org.apache.hadoop.hbase.ServerName; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 034import org.apache.hadoop.hbase.ipc.HBaseRpcController; 035import org.apache.yetus.audience.InterfaceAudience; 036 037import org.apache.hbase.thirdparty.io.netty.util.Timer; 038 039import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 040import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 041 042/** 043 * Factory to create an AsyncRpcRetryCaller. 044 * @since 2.0.0 045 */ 046@InterfaceAudience.Private 047class AsyncRpcRetryingCallerFactory { 048 049 private final AsyncConnectionImpl conn; 050 051 private final Timer retryTimer; 052 053 public AsyncRpcRetryingCallerFactory(AsyncConnectionImpl conn, Timer retryTimer) { 054 this.conn = conn; 055 this.retryTimer = retryTimer; 056 } 057 058 private abstract class BuilderBase { 059 060 protected long pauseNs = conn.connConf.getPauseNs(); 061 062 protected long pauseNsForServerOverloaded = conn.connConf.getPauseNsForServerOverloaded(); 063 064 protected int maxAttempts = retries2Attempts(conn.connConf.getMaxRetries()); 065 066 protected int startLogErrorsCnt = conn.connConf.getStartLogErrorsCnt(); 067 } 068 069 public class SingleRequestCallerBuilder<T> extends BuilderBase { 070 071 private TableName tableName; 072 073 private byte[] row; 074 075 private AsyncSingleRequestRpcRetryingCaller.Callable<T> callable; 076 077 private long operationTimeoutNs = -1L; 078 079 private long rpcTimeoutNs = -1L; 080 081 private RegionLocateType locateType = RegionLocateType.CURRENT; 082 083 private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID; 084 085 private int priority = PRIORITY_UNSET; 086 087 private Map<String, byte[]> requestAttributes = Collections.emptyMap(); 088 089 public SingleRequestCallerBuilder<T> table(TableName tableName) { 090 this.tableName = tableName; 091 return this; 092 } 093 094 public SingleRequestCallerBuilder<T> row(byte[] row) { 095 this.row = row; 096 return this; 097 } 098 099 public SingleRequestCallerBuilder<T> 100 action(AsyncSingleRequestRpcRetryingCaller.Callable<T> callable) { 101 this.callable = callable; 102 return this; 103 } 104 105 public SingleRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) { 106 this.operationTimeoutNs = unit.toNanos(operationTimeout); 107 return this; 108 } 109 110 public SingleRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) { 111 this.rpcTimeoutNs = unit.toNanos(rpcTimeout); 112 return this; 113 } 114 115 public SingleRequestCallerBuilder<T> locateType(RegionLocateType locateType) { 116 this.locateType = locateType; 117 return this; 118 } 119 120 public SingleRequestCallerBuilder<T> pause(long pause, TimeUnit unit) { 121 this.pauseNs = unit.toNanos(pause); 122 return this; 123 } 124 125 public SingleRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) { 126 this.pauseNsForServerOverloaded = unit.toNanos(pause); 127 return this; 128 } 129 130 public SingleRequestCallerBuilder<T> maxAttempts(int maxAttempts) { 131 this.maxAttempts = maxAttempts; 132 return this; 133 } 134 135 public SingleRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) { 136 this.startLogErrorsCnt = startLogErrorsCnt; 137 return this; 138 } 139 140 public SingleRequestCallerBuilder<T> replicaId(int replicaId) { 141 this.replicaId = replicaId; 142 return this; 143 } 144 145 public SingleRequestCallerBuilder<T> priority(int priority) { 146 this.priority = priority; 147 return this; 148 } 149 150 public SingleRequestCallerBuilder<T> 151 setRequestAttributes(Map<String, byte[]> requestAttributes) { 152 this.requestAttributes = requestAttributes; 153 return this; 154 } 155 156 private void preCheck() { 157 checkArgument(replicaId >= 0, "invalid replica id %s", replicaId); 158 checkNotNull(tableName, "tableName is null"); 159 checkNotNull(row, "row is null"); 160 checkNotNull(locateType, "locateType is null"); 161 checkNotNull(callable, "action is null"); 162 } 163 164 public AsyncSingleRequestRpcRetryingCaller<T> build() { 165 preCheck(); 166 return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, tableName, row, replicaId, 167 locateType, callable, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts, 168 operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, requestAttributes); 169 } 170 171 /** 172 * Shortcut for {@code build().call()} 173 */ 174 public CompletableFuture<T> call() { 175 return build().call(); 176 } 177 } 178 179 /** 180 * Create retry caller for single action, such as get, put, delete, etc. 181 */ 182 public <T> SingleRequestCallerBuilder<T> single() { 183 return new SingleRequestCallerBuilder<>(); 184 } 185 186 public class ScanSingleRegionCallerBuilder extends BuilderBase { 187 188 private Long scannerId = null; 189 190 private Scan scan; 191 192 private ScanMetrics scanMetrics; 193 194 private ScanResultCache resultCache; 195 196 private AdvancedScanResultConsumer consumer; 197 198 private ClientService.Interface stub; 199 200 private HRegionLocation loc; 201 202 private boolean isRegionServerRemote; 203 204 private long scannerLeaseTimeoutPeriodNs; 205 206 private long scanTimeoutNs; 207 208 private long rpcTimeoutNs; 209 210 private int priority = PRIORITY_UNSET; 211 212 private Map<String, byte[]> requestAttributes = Collections.emptyMap(); 213 214 public ScanSingleRegionCallerBuilder id(long scannerId) { 215 this.scannerId = scannerId; 216 return this; 217 } 218 219 public ScanSingleRegionCallerBuilder setScan(Scan scan) { 220 this.scan = scan; 221 this.priority = scan.getPriority(); 222 return this; 223 } 224 225 public ScanSingleRegionCallerBuilder metrics(ScanMetrics scanMetrics) { 226 this.scanMetrics = scanMetrics; 227 return this; 228 } 229 230 public ScanSingleRegionCallerBuilder remote(boolean isRegionServerRemote) { 231 this.isRegionServerRemote = isRegionServerRemote; 232 return this; 233 } 234 235 public ScanSingleRegionCallerBuilder resultCache(ScanResultCache resultCache) { 236 this.resultCache = resultCache; 237 return this; 238 } 239 240 public ScanSingleRegionCallerBuilder consumer(AdvancedScanResultConsumer consumer) { 241 this.consumer = consumer; 242 return this; 243 } 244 245 public ScanSingleRegionCallerBuilder stub(ClientService.Interface stub) { 246 this.stub = stub; 247 return this; 248 } 249 250 public ScanSingleRegionCallerBuilder location(HRegionLocation loc) { 251 this.loc = loc; 252 return this; 253 } 254 255 public ScanSingleRegionCallerBuilder scannerLeaseTimeoutPeriod(long scannerLeaseTimeoutPeriod, 256 TimeUnit unit) { 257 this.scannerLeaseTimeoutPeriodNs = unit.toNanos(scannerLeaseTimeoutPeriod); 258 return this; 259 } 260 261 public ScanSingleRegionCallerBuilder scanTimeout(long scanTimeout, TimeUnit unit) { 262 this.scanTimeoutNs = unit.toNanos(scanTimeout); 263 return this; 264 } 265 266 public ScanSingleRegionCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) { 267 this.rpcTimeoutNs = unit.toNanos(rpcTimeout); 268 return this; 269 } 270 271 public ScanSingleRegionCallerBuilder pause(long pause, TimeUnit unit) { 272 this.pauseNs = unit.toNanos(pause); 273 return this; 274 } 275 276 public ScanSingleRegionCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) { 277 this.pauseNsForServerOverloaded = unit.toNanos(pause); 278 return this; 279 } 280 281 public ScanSingleRegionCallerBuilder maxAttempts(int maxAttempts) { 282 this.maxAttempts = maxAttempts; 283 return this; 284 } 285 286 public ScanSingleRegionCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) { 287 this.startLogErrorsCnt = startLogErrorsCnt; 288 return this; 289 } 290 291 public ScanSingleRegionCallerBuilder 292 setRequestAttributes(Map<String, byte[]> requestAttributes) { 293 this.requestAttributes = requestAttributes; 294 return this; 295 } 296 297 private void preCheck() { 298 checkArgument(scannerId != null, "invalid scannerId %d", scannerId); 299 checkNotNull(scan, "scan is null"); 300 checkNotNull(resultCache, "resultCache is null"); 301 checkNotNull(consumer, "consumer is null"); 302 checkNotNull(stub, "stub is null"); 303 checkNotNull(loc, "location is null"); 304 } 305 306 public AsyncScanSingleRegionRpcRetryingCaller build() { 307 preCheck(); 308 return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, scan, scanMetrics, 309 scannerId, resultCache, consumer, stub, loc, isRegionServerRemote, priority, 310 scannerLeaseTimeoutPeriodNs, pauseNs, pauseNsForServerOverloaded, maxAttempts, 311 scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, requestAttributes); 312 } 313 314 /** 315 * Short cut for {@code build().start(HBaseRpcController, ScanResponse)}. 316 */ 317 public CompletableFuture<Boolean> start(HBaseRpcController controller, 318 ScanResponse respWhenOpen) { 319 return build().start(controller, respWhenOpen); 320 } 321 } 322 323 /** 324 * Create retry caller for scanning a region. 325 */ 326 public ScanSingleRegionCallerBuilder scanSingleRegion() { 327 return new ScanSingleRegionCallerBuilder(); 328 } 329 330 public class BatchCallerBuilder extends BuilderBase { 331 332 private TableName tableName; 333 334 private List<? extends Row> actions; 335 336 private long operationTimeoutNs = -1L; 337 338 private long rpcTimeoutNs = -1L; 339 340 private Map<String, byte[]> requestAttributes = Collections.emptyMap(); 341 342 public BatchCallerBuilder table(TableName tableName) { 343 this.tableName = tableName; 344 return this; 345 } 346 347 public BatchCallerBuilder actions(List<? extends Row> actions) { 348 this.actions = actions; 349 return this; 350 } 351 352 public BatchCallerBuilder operationTimeout(long operationTimeout, TimeUnit unit) { 353 this.operationTimeoutNs = unit.toNanos(operationTimeout); 354 return this; 355 } 356 357 public BatchCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) { 358 this.rpcTimeoutNs = unit.toNanos(rpcTimeout); 359 return this; 360 } 361 362 public BatchCallerBuilder pause(long pause, TimeUnit unit) { 363 this.pauseNs = unit.toNanos(pause); 364 return this; 365 } 366 367 public BatchCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) { 368 this.pauseNsForServerOverloaded = unit.toNanos(pause); 369 return this; 370 } 371 372 public BatchCallerBuilder maxAttempts(int maxAttempts) { 373 this.maxAttempts = maxAttempts; 374 return this; 375 } 376 377 public BatchCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) { 378 this.startLogErrorsCnt = startLogErrorsCnt; 379 return this; 380 } 381 382 public BatchCallerBuilder setRequestAttributes(Map<String, byte[]> requestAttributes) { 383 this.requestAttributes = requestAttributes; 384 return this; 385 } 386 387 public <T> AsyncBatchRpcRetryingCaller<T> build() { 388 return new AsyncBatchRpcRetryingCaller<>(retryTimer, conn, tableName, actions, pauseNs, 389 pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs, 390 startLogErrorsCnt, requestAttributes); 391 } 392 393 public <T> List<CompletableFuture<T>> call() { 394 return this.<T> build().call(); 395 } 396 } 397 398 public BatchCallerBuilder batch() { 399 return new BatchCallerBuilder(); 400 } 401 402 public class MasterRequestCallerBuilder<T> extends BuilderBase { 403 private AsyncMasterRequestRpcRetryingCaller.Callable<T> callable; 404 405 private long operationTimeoutNs = -1L; 406 407 private long rpcTimeoutNs = -1L; 408 409 private int priority = PRIORITY_UNSET; 410 411 private TableName tableName; 412 413 public MasterRequestCallerBuilder<T> 414 action(AsyncMasterRequestRpcRetryingCaller.Callable<T> callable) { 415 this.callable = callable; 416 return this; 417 } 418 419 public MasterRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) { 420 this.operationTimeoutNs = unit.toNanos(operationTimeout); 421 return this; 422 } 423 424 public MasterRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) { 425 this.rpcTimeoutNs = unit.toNanos(rpcTimeout); 426 return this; 427 } 428 429 public MasterRequestCallerBuilder<T> pause(long pause, TimeUnit unit) { 430 this.pauseNs = unit.toNanos(pause); 431 return this; 432 } 433 434 public MasterRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) { 435 this.pauseNsForServerOverloaded = unit.toNanos(pause); 436 return this; 437 } 438 439 public MasterRequestCallerBuilder<T> maxAttempts(int maxAttempts) { 440 this.maxAttempts = maxAttempts; 441 return this; 442 } 443 444 public MasterRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) { 445 this.startLogErrorsCnt = startLogErrorsCnt; 446 return this; 447 } 448 449 public MasterRequestCallerBuilder<T> tableName(TableName tableName) { 450 this.tableName = tableName; 451 return this; 452 } 453 454 public MasterRequestCallerBuilder<T> priority(int priority) { 455 this.priority = priority; 456 return this; 457 } 458 459 private void preCheck() { 460 checkNotNull(callable, "action is null"); 461 } 462 463 public AsyncMasterRequestRpcRetryingCaller<T> build() { 464 preCheck(); 465 return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn, callable, tableName, 466 priority, pauseNs, pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, 467 rpcTimeoutNs, startLogErrorsCnt); 468 } 469 470 /** 471 * Shortcut for {@code build().call()} 472 */ 473 public CompletableFuture<T> call() { 474 return build().call(); 475 } 476 } 477 478 public <T> MasterRequestCallerBuilder<T> masterRequest() { 479 return new MasterRequestCallerBuilder<>(); 480 } 481 482 public class AdminRequestCallerBuilder<T> extends BuilderBase { 483 // TODO: maybe we can reuse AdminRequestCallerBuild, MasterRequestCallerBuild etc. 484 485 private AsyncAdminRequestRetryingCaller.Callable<T> callable; 486 487 private long operationTimeoutNs = -1L; 488 489 private long rpcTimeoutNs = -1L; 490 491 private ServerName serverName; 492 493 private int priority; 494 495 public AdminRequestCallerBuilder<T> 496 action(AsyncAdminRequestRetryingCaller.Callable<T> callable) { 497 this.callable = callable; 498 return this; 499 } 500 501 public AdminRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) { 502 this.operationTimeoutNs = unit.toNanos(operationTimeout); 503 return this; 504 } 505 506 public AdminRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) { 507 this.rpcTimeoutNs = unit.toNanos(rpcTimeout); 508 return this; 509 } 510 511 public AdminRequestCallerBuilder<T> pause(long pause, TimeUnit unit) { 512 this.pauseNs = unit.toNanos(pause); 513 return this; 514 } 515 516 public AdminRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) { 517 this.pauseNsForServerOverloaded = unit.toNanos(pause); 518 return this; 519 } 520 521 public AdminRequestCallerBuilder<T> maxAttempts(int maxAttempts) { 522 this.maxAttempts = maxAttempts; 523 return this; 524 } 525 526 public AdminRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) { 527 this.startLogErrorsCnt = startLogErrorsCnt; 528 return this; 529 } 530 531 public AdminRequestCallerBuilder<T> serverName(ServerName serverName) { 532 this.serverName = serverName; 533 return this; 534 } 535 536 public AdminRequestCallerBuilder<T> priority(int priority) { 537 this.priority = priority; 538 return this; 539 } 540 541 public AsyncAdminRequestRetryingCaller<T> build() { 542 return new AsyncAdminRequestRetryingCaller<T>(retryTimer, conn, priority, pauseNs, 543 pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs, 544 startLogErrorsCnt, checkNotNull(serverName, "serverName is null"), 545 checkNotNull(callable, "action is null")); 546 } 547 548 public CompletableFuture<T> call() { 549 return build().call(); 550 } 551 } 552 553 public <T> AdminRequestCallerBuilder<T> adminRequest() { 554 return new AdminRequestCallerBuilder<>(); 555 } 556 557 public class ServerRequestCallerBuilder<T> extends BuilderBase { 558 559 private AsyncServerRequestRpcRetryingCaller.Callable<T> callable; 560 561 private long operationTimeoutNs = -1L; 562 563 private long rpcTimeoutNs = -1L; 564 565 private ServerName serverName; 566 567 public ServerRequestCallerBuilder<T> 568 action(AsyncServerRequestRpcRetryingCaller.Callable<T> callable) { 569 this.callable = callable; 570 return this; 571 } 572 573 public ServerRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) { 574 this.operationTimeoutNs = unit.toNanos(operationTimeout); 575 return this; 576 } 577 578 public ServerRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) { 579 this.rpcTimeoutNs = unit.toNanos(rpcTimeout); 580 return this; 581 } 582 583 public ServerRequestCallerBuilder<T> pause(long pause, TimeUnit unit) { 584 this.pauseNs = unit.toNanos(pause); 585 return this; 586 } 587 588 public ServerRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) { 589 this.pauseNsForServerOverloaded = unit.toNanos(pause); 590 return this; 591 } 592 593 public ServerRequestCallerBuilder<T> maxAttempts(int maxAttempts) { 594 this.maxAttempts = maxAttempts; 595 return this; 596 } 597 598 public ServerRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) { 599 this.startLogErrorsCnt = startLogErrorsCnt; 600 return this; 601 } 602 603 public ServerRequestCallerBuilder<T> serverName(ServerName serverName) { 604 this.serverName = serverName; 605 return this; 606 } 607 608 public AsyncServerRequestRpcRetryingCaller<T> build() { 609 return new AsyncServerRequestRpcRetryingCaller<T>(retryTimer, conn, pauseNs, 610 pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs, 611 startLogErrorsCnt, checkNotNull(serverName, "serverName is null"), 612 checkNotNull(callable, "action is null")); 613 } 614 615 public CompletableFuture<T> call() { 616 return build().call(); 617 } 618 } 619 620 public <T> ServerRequestCallerBuilder<T> serverRequest() { 621 return new ServerRequestCallerBuilder<>(); 622 } 623}