001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
022import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument;
023import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
024
025import java.util.Collections;
026import java.util.List;
027import java.util.Map;
028import java.util.concurrent.CompletableFuture;
029import java.util.concurrent.TimeUnit;
030import org.apache.hadoop.hbase.HRegionLocation;
031import org.apache.hadoop.hbase.ServerName;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
034import org.apache.hadoop.hbase.ipc.HBaseRpcController;
035import org.apache.yetus.audience.InterfaceAudience;
036
037import org.apache.hbase.thirdparty.io.netty.util.Timer;
038
039import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
040import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
041
042/**
043 * Factory to create an AsyncRpcRetryCaller.
044 * @since 2.0.0
045 */
046@InterfaceAudience.Private
047class AsyncRpcRetryingCallerFactory {
048
049  private final AsyncConnectionImpl conn;
050
051  private final Timer retryTimer;
052
053  public AsyncRpcRetryingCallerFactory(AsyncConnectionImpl conn, Timer retryTimer) {
054    this.conn = conn;
055    this.retryTimer = retryTimer;
056  }
057
058  private abstract class BuilderBase {
059
060    protected long pauseNs = conn.connConf.getPauseNs();
061
062    protected long pauseNsForServerOverloaded = conn.connConf.getPauseNsForServerOverloaded();
063
064    protected int maxAttempts = retries2Attempts(conn.connConf.getMaxRetries());
065
066    protected int startLogErrorsCnt = conn.connConf.getStartLogErrorsCnt();
067  }
068
069  public class SingleRequestCallerBuilder<T> extends BuilderBase {
070
071    private TableName tableName;
072
073    private byte[] row;
074
075    private AsyncSingleRequestRpcRetryingCaller.Callable<T> callable;
076
077    private long operationTimeoutNs = -1L;
078
079    private long rpcTimeoutNs = -1L;
080
081    private RegionLocateType locateType = RegionLocateType.CURRENT;
082
083    private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID;
084
085    private int priority = PRIORITY_UNSET;
086
087    private Map<String, byte[]> requestAttributes = Collections.emptyMap();
088
089    public SingleRequestCallerBuilder<T> table(TableName tableName) {
090      this.tableName = tableName;
091      return this;
092    }
093
094    public SingleRequestCallerBuilder<T> row(byte[] row) {
095      this.row = row;
096      return this;
097    }
098
099    public SingleRequestCallerBuilder<T>
100      action(AsyncSingleRequestRpcRetryingCaller.Callable<T> callable) {
101      this.callable = callable;
102      return this;
103    }
104
105    public SingleRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) {
106      this.operationTimeoutNs = unit.toNanos(operationTimeout);
107      return this;
108    }
109
110    public SingleRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) {
111      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
112      return this;
113    }
114
115    public SingleRequestCallerBuilder<T> locateType(RegionLocateType locateType) {
116      this.locateType = locateType;
117      return this;
118    }
119
120    public SingleRequestCallerBuilder<T> pause(long pause, TimeUnit unit) {
121      this.pauseNs = unit.toNanos(pause);
122      return this;
123    }
124
125    public SingleRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) {
126      this.pauseNsForServerOverloaded = unit.toNanos(pause);
127      return this;
128    }
129
130    public SingleRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
131      this.maxAttempts = maxAttempts;
132      return this;
133    }
134
135    public SingleRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) {
136      this.startLogErrorsCnt = startLogErrorsCnt;
137      return this;
138    }
139
140    public SingleRequestCallerBuilder<T> replicaId(int replicaId) {
141      this.replicaId = replicaId;
142      return this;
143    }
144
145    public SingleRequestCallerBuilder<T> priority(int priority) {
146      this.priority = priority;
147      return this;
148    }
149
150    public SingleRequestCallerBuilder<T>
151      setRequestAttributes(Map<String, byte[]> requestAttributes) {
152      this.requestAttributes = requestAttributes;
153      return this;
154    }
155
156    private void preCheck() {
157      checkArgument(replicaId >= 0, "invalid replica id %s", replicaId);
158      checkNotNull(tableName, "tableName is null");
159      checkNotNull(row, "row is null");
160      checkNotNull(locateType, "locateType is null");
161      checkNotNull(callable, "action is null");
162    }
163
164    public AsyncSingleRequestRpcRetryingCaller<T> build() {
165      preCheck();
166      return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, tableName, row, replicaId,
167        locateType, callable, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts,
168        operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, requestAttributes);
169    }
170
171    /**
172     * Shortcut for {@code build().call()}
173     */
174    public CompletableFuture<T> call() {
175      return build().call();
176    }
177  }
178
179  /**
180   * Create retry caller for single action, such as get, put, delete, etc.
181   */
182  public <T> SingleRequestCallerBuilder<T> single() {
183    return new SingleRequestCallerBuilder<>();
184  }
185
186  public class ScanSingleRegionCallerBuilder extends BuilderBase {
187
188    private Long scannerId = null;
189
190    private Scan scan;
191
192    private ScanMetrics scanMetrics;
193
194    private ScanResultCache resultCache;
195
196    private AdvancedScanResultConsumer consumer;
197
198    private ClientService.Interface stub;
199
200    private HRegionLocation loc;
201
202    private boolean isRegionServerRemote;
203
204    private long scannerLeaseTimeoutPeriodNs;
205
206    private long scanTimeoutNs;
207
208    private long rpcTimeoutNs;
209
210    private long lastNextCallNanos = System.nanoTime();
211
212    private int priority = PRIORITY_UNSET;
213
214    private Map<String, byte[]> requestAttributes = Collections.emptyMap();
215
216    public ScanSingleRegionCallerBuilder id(long scannerId) {
217      this.scannerId = scannerId;
218      return this;
219    }
220
221    public ScanSingleRegionCallerBuilder setScan(Scan scan) {
222      this.scan = scan;
223      this.priority = scan.getPriority();
224      return this;
225    }
226
227    public ScanSingleRegionCallerBuilder metrics(ScanMetrics scanMetrics) {
228      this.scanMetrics = scanMetrics;
229      return this;
230    }
231
232    public ScanSingleRegionCallerBuilder remote(boolean isRegionServerRemote) {
233      this.isRegionServerRemote = isRegionServerRemote;
234      return this;
235    }
236
237    public ScanSingleRegionCallerBuilder resultCache(ScanResultCache resultCache) {
238      this.resultCache = resultCache;
239      return this;
240    }
241
242    public ScanSingleRegionCallerBuilder consumer(AdvancedScanResultConsumer consumer) {
243      this.consumer = consumer;
244      return this;
245    }
246
247    public ScanSingleRegionCallerBuilder stub(ClientService.Interface stub) {
248      this.stub = stub;
249      return this;
250    }
251
252    public ScanSingleRegionCallerBuilder location(HRegionLocation loc) {
253      this.loc = loc;
254      return this;
255    }
256
257    public ScanSingleRegionCallerBuilder scannerLeaseTimeoutPeriod(long scannerLeaseTimeoutPeriod,
258      TimeUnit unit) {
259      this.scannerLeaseTimeoutPeriodNs = unit.toNanos(scannerLeaseTimeoutPeriod);
260      return this;
261    }
262
263    public ScanSingleRegionCallerBuilder scanTimeout(long scanTimeout, TimeUnit unit) {
264      this.scanTimeoutNs = unit.toNanos(scanTimeout);
265      return this;
266    }
267
268    public ScanSingleRegionCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) {
269      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
270      return this;
271    }
272
273    public ScanSingleRegionCallerBuilder pause(long pause, TimeUnit unit) {
274      this.pauseNs = unit.toNanos(pause);
275      return this;
276    }
277
278    public ScanSingleRegionCallerBuilder lastNextCallNanos(long nanos) {
279      this.lastNextCallNanos = nanos;
280      return this;
281    }
282
283    public ScanSingleRegionCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) {
284      this.pauseNsForServerOverloaded = unit.toNanos(pause);
285      return this;
286    }
287
288    public ScanSingleRegionCallerBuilder maxAttempts(int maxAttempts) {
289      this.maxAttempts = maxAttempts;
290      return this;
291    }
292
293    public ScanSingleRegionCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) {
294      this.startLogErrorsCnt = startLogErrorsCnt;
295      return this;
296    }
297
298    public ScanSingleRegionCallerBuilder
299      setRequestAttributes(Map<String, byte[]> requestAttributes) {
300      this.requestAttributes = requestAttributes;
301      return this;
302    }
303
304    private void preCheck() {
305      checkArgument(scannerId != null, "invalid scannerId %d", scannerId);
306      checkNotNull(scan, "scan is null");
307      checkNotNull(resultCache, "resultCache is null");
308      checkNotNull(consumer, "consumer is null");
309      checkNotNull(stub, "stub is null");
310      checkNotNull(loc, "location is null");
311    }
312
313    public AsyncScanSingleRegionRpcRetryingCaller build() {
314      preCheck();
315      return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, scan, scanMetrics,
316        scannerId, resultCache, consumer, stub, loc, isRegionServerRemote, priority,
317        scannerLeaseTimeoutPeriodNs, pauseNs, pauseNsForServerOverloaded, maxAttempts,
318        scanTimeoutNs, rpcTimeoutNs, lastNextCallNanos, startLogErrorsCnt, requestAttributes);
319    }
320
321    /**
322     * Short cut for {@code build().start(HBaseRpcController, ScanResponse)}.
323     */
324    public CompletableFuture<Boolean> start(HBaseRpcController controller,
325      ScanResponse respWhenOpen) {
326      return build().start(controller, respWhenOpen);
327    }
328  }
329
330  /**
331   * Create retry caller for scanning a region.
332   */
333  public ScanSingleRegionCallerBuilder scanSingleRegion() {
334    return new ScanSingleRegionCallerBuilder();
335  }
336
337  public class BatchCallerBuilder extends BuilderBase {
338
339    private TableName tableName;
340
341    private List<? extends Row> actions;
342
343    private long operationTimeoutNs = -1L;
344
345    private long rpcTimeoutNs = -1L;
346
347    private Map<String, byte[]> requestAttributes = Collections.emptyMap();
348
349    public BatchCallerBuilder table(TableName tableName) {
350      this.tableName = tableName;
351      return this;
352    }
353
354    public BatchCallerBuilder actions(List<? extends Row> actions) {
355      this.actions = actions;
356      return this;
357    }
358
359    public BatchCallerBuilder operationTimeout(long operationTimeout, TimeUnit unit) {
360      this.operationTimeoutNs = unit.toNanos(operationTimeout);
361      return this;
362    }
363
364    public BatchCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) {
365      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
366      return this;
367    }
368
369    public BatchCallerBuilder pause(long pause, TimeUnit unit) {
370      this.pauseNs = unit.toNanos(pause);
371      return this;
372    }
373
374    public BatchCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) {
375      this.pauseNsForServerOverloaded = unit.toNanos(pause);
376      return this;
377    }
378
379    public BatchCallerBuilder maxAttempts(int maxAttempts) {
380      this.maxAttempts = maxAttempts;
381      return this;
382    }
383
384    public BatchCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) {
385      this.startLogErrorsCnt = startLogErrorsCnt;
386      return this;
387    }
388
389    public BatchCallerBuilder setRequestAttributes(Map<String, byte[]> requestAttributes) {
390      this.requestAttributes = requestAttributes;
391      return this;
392    }
393
394    public <T> AsyncBatchRpcRetryingCaller<T> build() {
395      return new AsyncBatchRpcRetryingCaller<>(retryTimer, conn, tableName, actions, pauseNs,
396        pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
397        startLogErrorsCnt, requestAttributes);
398    }
399
400    public <T> List<CompletableFuture<T>> call() {
401      return this.<T> build().call();
402    }
403  }
404
405  public BatchCallerBuilder batch() {
406    return new BatchCallerBuilder();
407  }
408
409  public class MasterRequestCallerBuilder<T> extends BuilderBase {
410    private AsyncMasterRequestRpcRetryingCaller.Callable<T> callable;
411
412    private long operationTimeoutNs = -1L;
413
414    private long rpcTimeoutNs = -1L;
415
416    private int priority = PRIORITY_UNSET;
417
418    private TableName tableName;
419
420    public MasterRequestCallerBuilder<T>
421      action(AsyncMasterRequestRpcRetryingCaller.Callable<T> callable) {
422      this.callable = callable;
423      return this;
424    }
425
426    public MasterRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) {
427      this.operationTimeoutNs = unit.toNanos(operationTimeout);
428      return this;
429    }
430
431    public MasterRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) {
432      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
433      return this;
434    }
435
436    public MasterRequestCallerBuilder<T> pause(long pause, TimeUnit unit) {
437      this.pauseNs = unit.toNanos(pause);
438      return this;
439    }
440
441    public MasterRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) {
442      this.pauseNsForServerOverloaded = unit.toNanos(pause);
443      return this;
444    }
445
446    public MasterRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
447      this.maxAttempts = maxAttempts;
448      return this;
449    }
450
451    public MasterRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) {
452      this.startLogErrorsCnt = startLogErrorsCnt;
453      return this;
454    }
455
456    public MasterRequestCallerBuilder<T> tableName(TableName tableName) {
457      this.tableName = tableName;
458      return this;
459    }
460
461    public MasterRequestCallerBuilder<T> priority(int priority) {
462      this.priority = priority;
463      return this;
464    }
465
466    private void preCheck() {
467      checkNotNull(callable, "action is null");
468    }
469
470    public AsyncMasterRequestRpcRetryingCaller<T> build() {
471      preCheck();
472      return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn, callable, tableName,
473        priority, pauseNs, pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs,
474        rpcTimeoutNs, startLogErrorsCnt);
475    }
476
477    /**
478     * Shortcut for {@code build().call()}
479     */
480    public CompletableFuture<T> call() {
481      return build().call();
482    }
483  }
484
485  public <T> MasterRequestCallerBuilder<T> masterRequest() {
486    return new MasterRequestCallerBuilder<>();
487  }
488
489  public class AdminRequestCallerBuilder<T> extends BuilderBase {
490    // TODO: maybe we can reuse AdminRequestCallerBuild, MasterRequestCallerBuild etc.
491
492    private AsyncAdminRequestRetryingCaller.Callable<T> callable;
493
494    private long operationTimeoutNs = -1L;
495
496    private long rpcTimeoutNs = -1L;
497
498    private ServerName serverName;
499
500    private int priority;
501
502    public AdminRequestCallerBuilder<T>
503      action(AsyncAdminRequestRetryingCaller.Callable<T> callable) {
504      this.callable = callable;
505      return this;
506    }
507
508    public AdminRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) {
509      this.operationTimeoutNs = unit.toNanos(operationTimeout);
510      return this;
511    }
512
513    public AdminRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) {
514      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
515      return this;
516    }
517
518    public AdminRequestCallerBuilder<T> pause(long pause, TimeUnit unit) {
519      this.pauseNs = unit.toNanos(pause);
520      return this;
521    }
522
523    public AdminRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) {
524      this.pauseNsForServerOverloaded = unit.toNanos(pause);
525      return this;
526    }
527
528    public AdminRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
529      this.maxAttempts = maxAttempts;
530      return this;
531    }
532
533    public AdminRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) {
534      this.startLogErrorsCnt = startLogErrorsCnt;
535      return this;
536    }
537
538    public AdminRequestCallerBuilder<T> serverName(ServerName serverName) {
539      this.serverName = serverName;
540      return this;
541    }
542
543    public AdminRequestCallerBuilder<T> priority(int priority) {
544      this.priority = priority;
545      return this;
546    }
547
548    public AsyncAdminRequestRetryingCaller<T> build() {
549      return new AsyncAdminRequestRetryingCaller<T>(retryTimer, conn, priority, pauseNs,
550        pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
551        startLogErrorsCnt, checkNotNull(serverName, "serverName is null"),
552        checkNotNull(callable, "action is null"));
553    }
554
555    public CompletableFuture<T> call() {
556      return build().call();
557    }
558  }
559
560  public <T> AdminRequestCallerBuilder<T> adminRequest() {
561    return new AdminRequestCallerBuilder<>();
562  }
563
564  public class ServerRequestCallerBuilder<T> extends BuilderBase {
565
566    private AsyncServerRequestRpcRetryingCaller.Callable<T> callable;
567
568    private long operationTimeoutNs = -1L;
569
570    private long rpcTimeoutNs = -1L;
571
572    private ServerName serverName;
573
574    public ServerRequestCallerBuilder<T>
575      action(AsyncServerRequestRpcRetryingCaller.Callable<T> callable) {
576      this.callable = callable;
577      return this;
578    }
579
580    public ServerRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) {
581      this.operationTimeoutNs = unit.toNanos(operationTimeout);
582      return this;
583    }
584
585    public ServerRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) {
586      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
587      return this;
588    }
589
590    public ServerRequestCallerBuilder<T> pause(long pause, TimeUnit unit) {
591      this.pauseNs = unit.toNanos(pause);
592      return this;
593    }
594
595    public ServerRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) {
596      this.pauseNsForServerOverloaded = unit.toNanos(pause);
597      return this;
598    }
599
600    public ServerRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
601      this.maxAttempts = maxAttempts;
602      return this;
603    }
604
605    public ServerRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) {
606      this.startLogErrorsCnt = startLogErrorsCnt;
607      return this;
608    }
609
610    public ServerRequestCallerBuilder<T> serverName(ServerName serverName) {
611      this.serverName = serverName;
612      return this;
613    }
614
615    public AsyncServerRequestRpcRetryingCaller<T> build() {
616      return new AsyncServerRequestRpcRetryingCaller<T>(retryTimer, conn, pauseNs,
617        pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
618        startLogErrorsCnt, checkNotNull(serverName, "serverName is null"),
619        checkNotNull(callable, "action is null"));
620    }
621
622    public CompletableFuture<T> call() {
623      return build().call();
624    }
625  }
626
627  public <T> ServerRequestCallerBuilder<T> serverRequest() {
628    return new ServerRequestCallerBuilder<>();
629  }
630}