001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
022import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument;
023import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
024
025import java.util.Collections;
026import java.util.List;
027import java.util.Map;
028import java.util.concurrent.CompletableFuture;
029import java.util.concurrent.TimeUnit;
030import org.apache.hadoop.hbase.HRegionLocation;
031import org.apache.hadoop.hbase.ServerName;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
034import org.apache.hadoop.hbase.ipc.HBaseRpcController;
035import org.apache.yetus.audience.InterfaceAudience;
036
037import org.apache.hbase.thirdparty.io.netty.util.Timer;
038
039import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
040import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
041
042/**
043 * Factory to create an AsyncRpcRetryCaller.
044 * @since 2.0.0
045 */
046@InterfaceAudience.Private
047class AsyncRpcRetryingCallerFactory {
048
049  private final AsyncConnectionImpl conn;
050
051  private final Timer retryTimer;
052
053  public AsyncRpcRetryingCallerFactory(AsyncConnectionImpl conn, Timer retryTimer) {
054    this.conn = conn;
055    this.retryTimer = retryTimer;
056  }
057
058  private abstract class BuilderBase {
059
060    protected long pauseNs = conn.connConf.getPauseNs();
061
062    protected long pauseNsForServerOverloaded = conn.connConf.getPauseNsForServerOverloaded();
063
064    protected int maxAttempts = retries2Attempts(conn.connConf.getMaxRetries());
065
066    protected int startLogErrorsCnt = conn.connConf.getStartLogErrorsCnt();
067  }
068
069  public class SingleRequestCallerBuilder<T> extends BuilderBase {
070
071    private TableName tableName;
072
073    private byte[] row;
074
075    private AsyncSingleRequestRpcRetryingCaller.Callable<T> callable;
076
077    private long operationTimeoutNs = -1L;
078
079    private long rpcTimeoutNs = -1L;
080
081    private RegionLocateType locateType = RegionLocateType.CURRENT;
082
083    private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID;
084
085    private int priority = PRIORITY_UNSET;
086
087    private Map<String, byte[]> requestAttributes = Collections.emptyMap();
088
089    public SingleRequestCallerBuilder<T> table(TableName tableName) {
090      this.tableName = tableName;
091      return this;
092    }
093
094    public SingleRequestCallerBuilder<T> row(byte[] row) {
095      this.row = row;
096      return this;
097    }
098
099    public SingleRequestCallerBuilder<T>
100      action(AsyncSingleRequestRpcRetryingCaller.Callable<T> callable) {
101      this.callable = callable;
102      return this;
103    }
104
105    public SingleRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) {
106      this.operationTimeoutNs = unit.toNanos(operationTimeout);
107      return this;
108    }
109
110    public SingleRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) {
111      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
112      return this;
113    }
114
115    public SingleRequestCallerBuilder<T> locateType(RegionLocateType locateType) {
116      this.locateType = locateType;
117      return this;
118    }
119
120    public SingleRequestCallerBuilder<T> pause(long pause, TimeUnit unit) {
121      this.pauseNs = unit.toNanos(pause);
122      return this;
123    }
124
125    public SingleRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) {
126      this.pauseNsForServerOverloaded = unit.toNanos(pause);
127      return this;
128    }
129
130    public SingleRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
131      this.maxAttempts = maxAttempts;
132      return this;
133    }
134
135    public SingleRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) {
136      this.startLogErrorsCnt = startLogErrorsCnt;
137      return this;
138    }
139
140    public SingleRequestCallerBuilder<T> replicaId(int replicaId) {
141      this.replicaId = replicaId;
142      return this;
143    }
144
145    public SingleRequestCallerBuilder<T> priority(int priority) {
146      this.priority = priority;
147      return this;
148    }
149
150    public SingleRequestCallerBuilder<T>
151      setRequestAttributes(Map<String, byte[]> requestAttributes) {
152      this.requestAttributes = requestAttributes;
153      return this;
154    }
155
156    private void preCheck() {
157      checkArgument(replicaId >= 0, "invalid replica id %s", replicaId);
158      checkNotNull(tableName, "tableName is null");
159      checkNotNull(row, "row is null");
160      checkNotNull(locateType, "locateType is null");
161      checkNotNull(callable, "action is null");
162    }
163
164    public AsyncSingleRequestRpcRetryingCaller<T> build() {
165      preCheck();
166      return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, tableName, row, replicaId,
167        locateType, callable, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts,
168        operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, requestAttributes);
169    }
170
171    /**
172     * Shortcut for {@code build().call()}
173     */
174    public CompletableFuture<T> call() {
175      return build().call();
176    }
177  }
178
179  /**
180   * Create retry caller for single action, such as get, put, delete, etc.
181   */
182  public <T> SingleRequestCallerBuilder<T> single() {
183    return new SingleRequestCallerBuilder<>();
184  }
185
186  public class ScanSingleRegionCallerBuilder extends BuilderBase {
187
188    private Long scannerId = null;
189
190    private Scan scan;
191
192    private ScanMetrics scanMetrics;
193
194    private ScanResultCache resultCache;
195
196    private AdvancedScanResultConsumer consumer;
197
198    private ClientService.Interface stub;
199
200    private HRegionLocation loc;
201
202    private boolean isRegionServerRemote;
203
204    private long scannerLeaseTimeoutPeriodNs;
205
206    private long scanTimeoutNs;
207
208    private long rpcTimeoutNs;
209
210    private int priority = PRIORITY_UNSET;
211
212    private Map<String, byte[]> requestAttributes = Collections.emptyMap();
213
214    public ScanSingleRegionCallerBuilder id(long scannerId) {
215      this.scannerId = scannerId;
216      return this;
217    }
218
219    public ScanSingleRegionCallerBuilder setScan(Scan scan) {
220      this.scan = scan;
221      this.priority = scan.getPriority();
222      return this;
223    }
224
225    public ScanSingleRegionCallerBuilder metrics(ScanMetrics scanMetrics) {
226      this.scanMetrics = scanMetrics;
227      return this;
228    }
229
230    public ScanSingleRegionCallerBuilder remote(boolean isRegionServerRemote) {
231      this.isRegionServerRemote = isRegionServerRemote;
232      return this;
233    }
234
235    public ScanSingleRegionCallerBuilder resultCache(ScanResultCache resultCache) {
236      this.resultCache = resultCache;
237      return this;
238    }
239
240    public ScanSingleRegionCallerBuilder consumer(AdvancedScanResultConsumer consumer) {
241      this.consumer = consumer;
242      return this;
243    }
244
245    public ScanSingleRegionCallerBuilder stub(ClientService.Interface stub) {
246      this.stub = stub;
247      return this;
248    }
249
250    public ScanSingleRegionCallerBuilder location(HRegionLocation loc) {
251      this.loc = loc;
252      return this;
253    }
254
255    public ScanSingleRegionCallerBuilder scannerLeaseTimeoutPeriod(long scannerLeaseTimeoutPeriod,
256      TimeUnit unit) {
257      this.scannerLeaseTimeoutPeriodNs = unit.toNanos(scannerLeaseTimeoutPeriod);
258      return this;
259    }
260
261    public ScanSingleRegionCallerBuilder scanTimeout(long scanTimeout, TimeUnit unit) {
262      this.scanTimeoutNs = unit.toNanos(scanTimeout);
263      return this;
264    }
265
266    public ScanSingleRegionCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) {
267      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
268      return this;
269    }
270
271    public ScanSingleRegionCallerBuilder pause(long pause, TimeUnit unit) {
272      this.pauseNs = unit.toNanos(pause);
273      return this;
274    }
275
276    public ScanSingleRegionCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) {
277      this.pauseNsForServerOverloaded = unit.toNanos(pause);
278      return this;
279    }
280
281    public ScanSingleRegionCallerBuilder maxAttempts(int maxAttempts) {
282      this.maxAttempts = maxAttempts;
283      return this;
284    }
285
286    public ScanSingleRegionCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) {
287      this.startLogErrorsCnt = startLogErrorsCnt;
288      return this;
289    }
290
291    public ScanSingleRegionCallerBuilder
292      setRequestAttributes(Map<String, byte[]> requestAttributes) {
293      this.requestAttributes = requestAttributes;
294      return this;
295    }
296
297    private void preCheck() {
298      checkArgument(scannerId != null, "invalid scannerId %d", scannerId);
299      checkNotNull(scan, "scan is null");
300      checkNotNull(resultCache, "resultCache is null");
301      checkNotNull(consumer, "consumer is null");
302      checkNotNull(stub, "stub is null");
303      checkNotNull(loc, "location is null");
304    }
305
306    public AsyncScanSingleRegionRpcRetryingCaller build() {
307      preCheck();
308      return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, scan, scanMetrics,
309        scannerId, resultCache, consumer, stub, loc, isRegionServerRemote, priority,
310        scannerLeaseTimeoutPeriodNs, pauseNs, pauseNsForServerOverloaded, maxAttempts,
311        scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, requestAttributes);
312    }
313
314    /**
315     * Short cut for {@code build().start(HBaseRpcController, ScanResponse)}.
316     */
317    public CompletableFuture<Boolean> start(HBaseRpcController controller,
318      ScanResponse respWhenOpen) {
319      return build().start(controller, respWhenOpen);
320    }
321  }
322
323  /**
324   * Create retry caller for scanning a region.
325   */
326  public ScanSingleRegionCallerBuilder scanSingleRegion() {
327    return new ScanSingleRegionCallerBuilder();
328  }
329
330  public class BatchCallerBuilder extends BuilderBase {
331
332    private TableName tableName;
333
334    private List<? extends Row> actions;
335
336    private long operationTimeoutNs = -1L;
337
338    private long rpcTimeoutNs = -1L;
339
340    private Map<String, byte[]> requestAttributes = Collections.emptyMap();
341
342    public BatchCallerBuilder table(TableName tableName) {
343      this.tableName = tableName;
344      return this;
345    }
346
347    public BatchCallerBuilder actions(List<? extends Row> actions) {
348      this.actions = actions;
349      return this;
350    }
351
352    public BatchCallerBuilder operationTimeout(long operationTimeout, TimeUnit unit) {
353      this.operationTimeoutNs = unit.toNanos(operationTimeout);
354      return this;
355    }
356
357    public BatchCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) {
358      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
359      return this;
360    }
361
362    public BatchCallerBuilder pause(long pause, TimeUnit unit) {
363      this.pauseNs = unit.toNanos(pause);
364      return this;
365    }
366
367    public BatchCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) {
368      this.pauseNsForServerOverloaded = unit.toNanos(pause);
369      return this;
370    }
371
372    public BatchCallerBuilder maxAttempts(int maxAttempts) {
373      this.maxAttempts = maxAttempts;
374      return this;
375    }
376
377    public BatchCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) {
378      this.startLogErrorsCnt = startLogErrorsCnt;
379      return this;
380    }
381
382    public BatchCallerBuilder setRequestAttributes(Map<String, byte[]> requestAttributes) {
383      this.requestAttributes = requestAttributes;
384      return this;
385    }
386
387    public <T> AsyncBatchRpcRetryingCaller<T> build() {
388      return new AsyncBatchRpcRetryingCaller<>(retryTimer, conn, tableName, actions, pauseNs,
389        pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
390        startLogErrorsCnt, requestAttributes);
391    }
392
393    public <T> List<CompletableFuture<T>> call() {
394      return this.<T> build().call();
395    }
396  }
397
398  public BatchCallerBuilder batch() {
399    return new BatchCallerBuilder();
400  }
401
402  public class MasterRequestCallerBuilder<T> extends BuilderBase {
403    private AsyncMasterRequestRpcRetryingCaller.Callable<T> callable;
404
405    private long operationTimeoutNs = -1L;
406
407    private long rpcTimeoutNs = -1L;
408
409    private int priority = PRIORITY_UNSET;
410
411    private TableName tableName;
412
413    public MasterRequestCallerBuilder<T>
414      action(AsyncMasterRequestRpcRetryingCaller.Callable<T> callable) {
415      this.callable = callable;
416      return this;
417    }
418
419    public MasterRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) {
420      this.operationTimeoutNs = unit.toNanos(operationTimeout);
421      return this;
422    }
423
424    public MasterRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) {
425      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
426      return this;
427    }
428
429    public MasterRequestCallerBuilder<T> pause(long pause, TimeUnit unit) {
430      this.pauseNs = unit.toNanos(pause);
431      return this;
432    }
433
434    public MasterRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) {
435      this.pauseNsForServerOverloaded = unit.toNanos(pause);
436      return this;
437    }
438
439    public MasterRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
440      this.maxAttempts = maxAttempts;
441      return this;
442    }
443
444    public MasterRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) {
445      this.startLogErrorsCnt = startLogErrorsCnt;
446      return this;
447    }
448
449    public MasterRequestCallerBuilder<T> tableName(TableName tableName) {
450      this.tableName = tableName;
451      return this;
452    }
453
454    public MasterRequestCallerBuilder<T> priority(int priority) {
455      this.priority = priority;
456      return this;
457    }
458
459    private void preCheck() {
460      checkNotNull(callable, "action is null");
461    }
462
463    public AsyncMasterRequestRpcRetryingCaller<T> build() {
464      preCheck();
465      return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn, callable, tableName,
466        priority, pauseNs, pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs,
467        rpcTimeoutNs, startLogErrorsCnt);
468    }
469
470    /**
471     * Shortcut for {@code build().call()}
472     */
473    public CompletableFuture<T> call() {
474      return build().call();
475    }
476  }
477
478  public <T> MasterRequestCallerBuilder<T> masterRequest() {
479    return new MasterRequestCallerBuilder<>();
480  }
481
482  public class AdminRequestCallerBuilder<T> extends BuilderBase {
483    // TODO: maybe we can reuse AdminRequestCallerBuild, MasterRequestCallerBuild etc.
484
485    private AsyncAdminRequestRetryingCaller.Callable<T> callable;
486
487    private long operationTimeoutNs = -1L;
488
489    private long rpcTimeoutNs = -1L;
490
491    private ServerName serverName;
492
493    private int priority;
494
495    public AdminRequestCallerBuilder<T>
496      action(AsyncAdminRequestRetryingCaller.Callable<T> callable) {
497      this.callable = callable;
498      return this;
499    }
500
501    public AdminRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) {
502      this.operationTimeoutNs = unit.toNanos(operationTimeout);
503      return this;
504    }
505
506    public AdminRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) {
507      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
508      return this;
509    }
510
511    public AdminRequestCallerBuilder<T> pause(long pause, TimeUnit unit) {
512      this.pauseNs = unit.toNanos(pause);
513      return this;
514    }
515
516    public AdminRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) {
517      this.pauseNsForServerOverloaded = unit.toNanos(pause);
518      return this;
519    }
520
521    public AdminRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
522      this.maxAttempts = maxAttempts;
523      return this;
524    }
525
526    public AdminRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) {
527      this.startLogErrorsCnt = startLogErrorsCnt;
528      return this;
529    }
530
531    public AdminRequestCallerBuilder<T> serverName(ServerName serverName) {
532      this.serverName = serverName;
533      return this;
534    }
535
536    public AdminRequestCallerBuilder<T> priority(int priority) {
537      this.priority = priority;
538      return this;
539    }
540
541    public AsyncAdminRequestRetryingCaller<T> build() {
542      return new AsyncAdminRequestRetryingCaller<T>(retryTimer, conn, priority, pauseNs,
543        pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
544        startLogErrorsCnt, checkNotNull(serverName, "serverName is null"),
545        checkNotNull(callable, "action is null"));
546    }
547
548    public CompletableFuture<T> call() {
549      return build().call();
550    }
551  }
552
553  public <T> AdminRequestCallerBuilder<T> adminRequest() {
554    return new AdminRequestCallerBuilder<>();
555  }
556
557  public class ServerRequestCallerBuilder<T> extends BuilderBase {
558
559    private AsyncServerRequestRpcRetryingCaller.Callable<T> callable;
560
561    private long operationTimeoutNs = -1L;
562
563    private long rpcTimeoutNs = -1L;
564
565    private ServerName serverName;
566
567    public ServerRequestCallerBuilder<T>
568      action(AsyncServerRequestRpcRetryingCaller.Callable<T> callable) {
569      this.callable = callable;
570      return this;
571    }
572
573    public ServerRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) {
574      this.operationTimeoutNs = unit.toNanos(operationTimeout);
575      return this;
576    }
577
578    public ServerRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) {
579      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
580      return this;
581    }
582
583    public ServerRequestCallerBuilder<T> pause(long pause, TimeUnit unit) {
584      this.pauseNs = unit.toNanos(pause);
585      return this;
586    }
587
588    public ServerRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) {
589      this.pauseNsForServerOverloaded = unit.toNanos(pause);
590      return this;
591    }
592
593    public ServerRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
594      this.maxAttempts = maxAttempts;
595      return this;
596    }
597
598    public ServerRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) {
599      this.startLogErrorsCnt = startLogErrorsCnt;
600      return this;
601    }
602
603    public ServerRequestCallerBuilder<T> serverName(ServerName serverName) {
604      this.serverName = serverName;
605      return this;
606    }
607
608    public AsyncServerRequestRpcRetryingCaller<T> build() {
609      return new AsyncServerRequestRpcRetryingCaller<T>(retryTimer, conn, pauseNs,
610        pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
611        startLogErrorsCnt, checkNotNull(serverName, "serverName is null"),
612        checkNotNull(callable, "action is null"));
613    }
614
615    public CompletableFuture<T> call() {
616      return build().call();
617    }
618  }
619
620  public <T> ServerRequestCallerBuilder<T> serverRequest() {
621    return new ServerRequestCallerBuilder<>();
622  }
623}