001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.calcPriority;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
023import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument;
024import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
025
026import java.util.List;
027import java.util.concurrent.CompletableFuture;
028import java.util.concurrent.TimeUnit;
029import org.apache.hadoop.hbase.HRegionLocation;
030import org.apache.hadoop.hbase.ServerName;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
033import org.apache.hadoop.hbase.ipc.HBaseRpcController;
034import org.apache.yetus.audience.InterfaceAudience;
035
036import org.apache.hbase.thirdparty.io.netty.util.Timer;
037
038import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
039import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
040
041/**
042 * Factory to create an AsyncRpcRetryCaller.
043 * @since 2.0.0
044 */
045@InterfaceAudience.Private
046class AsyncRpcRetryingCallerFactory {
047
048  private final AsyncConnectionImpl conn;
049
050  private final Timer retryTimer;
051
052  public AsyncRpcRetryingCallerFactory(AsyncConnectionImpl conn, Timer retryTimer) {
053    this.conn = conn;
054    this.retryTimer = retryTimer;
055  }
056
057  private abstract class BuilderBase {
058
059    protected long pauseNs = conn.connConf.getPauseNs();
060
061    protected long pauseForCQTBENs = conn.connConf.getPauseForCQTBENs();
062
063    protected int maxAttempts = retries2Attempts(conn.connConf.getMaxRetries());
064
065    protected int startLogErrorsCnt = conn.connConf.getStartLogErrorsCnt();
066  }
067
068  public class SingleRequestCallerBuilder<T> extends BuilderBase {
069
070    private TableName tableName;
071
072    private byte[] row;
073
074    private AsyncSingleRequestRpcRetryingCaller.Callable<T> callable;
075
076    private long operationTimeoutNs = -1L;
077
078    private long rpcTimeoutNs = -1L;
079
080    private RegionLocateType locateType = RegionLocateType.CURRENT;
081
082    private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID;
083
084    private int priority = PRIORITY_UNSET;
085
086    public SingleRequestCallerBuilder<T> table(TableName tableName) {
087      this.tableName = tableName;
088      return this;
089    }
090
091    public SingleRequestCallerBuilder<T> row(byte[] row) {
092      this.row = row;
093      return this;
094    }
095
096    public SingleRequestCallerBuilder<T> action(
097        AsyncSingleRequestRpcRetryingCaller.Callable<T> callable) {
098      this.callable = callable;
099      return this;
100    }
101
102    public SingleRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) {
103      this.operationTimeoutNs = unit.toNanos(operationTimeout);
104      return this;
105    }
106
107    public SingleRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) {
108      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
109      return this;
110    }
111
112    public SingleRequestCallerBuilder<T> locateType(RegionLocateType locateType) {
113      this.locateType = locateType;
114      return this;
115    }
116
117    public SingleRequestCallerBuilder<T> pause(long pause, TimeUnit unit) {
118      this.pauseNs = unit.toNanos(pause);
119      return this;
120    }
121
122    public SingleRequestCallerBuilder<T> pauseForCQTBE(long pause, TimeUnit unit) {
123      this.pauseForCQTBENs = unit.toNanos(pause);
124      return this;
125    }
126
127    public SingleRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
128      this.maxAttempts = maxAttempts;
129      return this;
130    }
131
132    public SingleRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) {
133      this.startLogErrorsCnt = startLogErrorsCnt;
134      return this;
135    }
136
137    public SingleRequestCallerBuilder<T> replicaId(int replicaId) {
138      this.replicaId = replicaId;
139      return this;
140    }
141
142    public SingleRequestCallerBuilder<T> priority(int priority) {
143      this.priority = priority;
144      return this;
145    }
146
147    private void preCheck() {
148      checkArgument(replicaId >= 0, "invalid replica id %s", replicaId);
149      checkNotNull(tableName, "tableName is null");
150      checkNotNull(row, "row is null");
151      checkNotNull(locateType, "locateType is null");
152      checkNotNull(callable, "action is null");
153      this.priority = calcPriority(priority, tableName);
154    }
155
156    public AsyncSingleRequestRpcRetryingCaller<T> build() {
157      preCheck();
158      return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, tableName, row, replicaId,
159        locateType, callable, priority, pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs,
160        rpcTimeoutNs, startLogErrorsCnt);
161    }
162
163    /**
164     * Shortcut for {@code build().call()}
165     */
166    public CompletableFuture<T> call() {
167      return build().call();
168    }
169  }
170
171  /**
172   * Create retry caller for single action, such as get, put, delete, etc.
173   */
174  public <T> SingleRequestCallerBuilder<T> single() {
175    return new SingleRequestCallerBuilder<>();
176  }
177
178  public class ScanSingleRegionCallerBuilder extends BuilderBase {
179
180    private Long scannerId = null;
181
182    private Scan scan;
183
184    private ScanMetrics scanMetrics;
185
186    private ScanResultCache resultCache;
187
188    private AdvancedScanResultConsumer consumer;
189
190    private ClientService.Interface stub;
191
192    private HRegionLocation loc;
193
194    private boolean isRegionServerRemote;
195
196    private long scannerLeaseTimeoutPeriodNs;
197
198    private long scanTimeoutNs;
199
200    private long rpcTimeoutNs;
201
202    private int priority = PRIORITY_UNSET;
203
204    public ScanSingleRegionCallerBuilder id(long scannerId) {
205      this.scannerId = scannerId;
206      return this;
207    }
208
209    public ScanSingleRegionCallerBuilder setScan(Scan scan) {
210      this.scan = scan;
211      this.priority = scan.getPriority();
212      return this;
213    }
214
215    public ScanSingleRegionCallerBuilder metrics(ScanMetrics scanMetrics) {
216      this.scanMetrics = scanMetrics;
217      return this;
218    }
219
220    public ScanSingleRegionCallerBuilder remote(boolean isRegionServerRemote) {
221      this.isRegionServerRemote = isRegionServerRemote;
222      return this;
223    }
224
225    public ScanSingleRegionCallerBuilder resultCache(ScanResultCache resultCache) {
226      this.resultCache = resultCache;
227      return this;
228    }
229
230    public ScanSingleRegionCallerBuilder consumer(AdvancedScanResultConsumer consumer) {
231      this.consumer = consumer;
232      return this;
233    }
234
235    public ScanSingleRegionCallerBuilder stub(ClientService.Interface stub) {
236      this.stub = stub;
237      return this;
238    }
239
240    public ScanSingleRegionCallerBuilder location(HRegionLocation loc) {
241      this.loc = loc;
242      return this;
243    }
244
245    public ScanSingleRegionCallerBuilder scannerLeaseTimeoutPeriod(long scannerLeaseTimeoutPeriod,
246        TimeUnit unit) {
247      this.scannerLeaseTimeoutPeriodNs = unit.toNanos(scannerLeaseTimeoutPeriod);
248      return this;
249    }
250
251    public ScanSingleRegionCallerBuilder scanTimeout(long scanTimeout, TimeUnit unit) {
252      this.scanTimeoutNs = unit.toNanos(scanTimeout);
253      return this;
254    }
255
256    public ScanSingleRegionCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) {
257      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
258      return this;
259    }
260
261    public ScanSingleRegionCallerBuilder pause(long pause, TimeUnit unit) {
262      this.pauseNs = unit.toNanos(pause);
263      return this;
264    }
265
266    public ScanSingleRegionCallerBuilder pauseForCQTBE(long pause, TimeUnit unit) {
267      this.pauseForCQTBENs = unit.toNanos(pause);
268      return this;
269    }
270
271    public ScanSingleRegionCallerBuilder maxAttempts(int maxAttempts) {
272      this.maxAttempts = maxAttempts;
273      return this;
274    }
275
276    public ScanSingleRegionCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) {
277      this.startLogErrorsCnt = startLogErrorsCnt;
278      return this;
279    }
280
281    private void preCheck() {
282      checkArgument(scannerId != null, "invalid scannerId %d", scannerId);
283      checkNotNull(scan, "scan is null");
284      checkNotNull(resultCache, "resultCache is null");
285      checkNotNull(consumer, "consumer is null");
286      checkNotNull(stub, "stub is null");
287      checkNotNull(loc, "location is null");
288      this.priority = calcPriority(priority, loc.getRegion().getTable());
289    }
290
291    public AsyncScanSingleRegionRpcRetryingCaller build() {
292      preCheck();
293      return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, scan, scanMetrics,
294        scannerId, resultCache, consumer, stub, loc, isRegionServerRemote, priority,
295        scannerLeaseTimeoutPeriodNs, pauseNs, pauseForCQTBENs, maxAttempts, scanTimeoutNs,
296        rpcTimeoutNs, startLogErrorsCnt);
297    }
298
299    /**
300     * Short cut for {@code build().start(HBaseRpcController, ScanResponse)}.
301     */
302    public CompletableFuture<Boolean> start(HBaseRpcController controller,
303        ScanResponse respWhenOpen) {
304      return build().start(controller, respWhenOpen);
305    }
306  }
307
308  /**
309   * Create retry caller for scanning a region.
310   */
311  public ScanSingleRegionCallerBuilder scanSingleRegion() {
312    return new ScanSingleRegionCallerBuilder();
313  }
314
315  public class BatchCallerBuilder extends BuilderBase {
316
317    private TableName tableName;
318
319    private List<? extends Row> actions;
320
321    private long operationTimeoutNs = -1L;
322
323    private long rpcTimeoutNs = -1L;
324
325    public BatchCallerBuilder table(TableName tableName) {
326      this.tableName = tableName;
327      return this;
328    }
329
330    public BatchCallerBuilder actions(List<? extends Row> actions) {
331      this.actions = actions;
332      return this;
333    }
334
335    public BatchCallerBuilder operationTimeout(long operationTimeout, TimeUnit unit) {
336      this.operationTimeoutNs = unit.toNanos(operationTimeout);
337      return this;
338    }
339
340    public BatchCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) {
341      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
342      return this;
343    }
344
345    public BatchCallerBuilder pause(long pause, TimeUnit unit) {
346      this.pauseNs = unit.toNanos(pause);
347      return this;
348    }
349
350    public BatchCallerBuilder pauseForCQTBE(long pause, TimeUnit unit) {
351      this.pauseForCQTBENs = unit.toNanos(pause);
352      return this;
353    }
354
355    public BatchCallerBuilder maxAttempts(int maxAttempts) {
356      this.maxAttempts = maxAttempts;
357      return this;
358    }
359
360    public BatchCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) {
361      this.startLogErrorsCnt = startLogErrorsCnt;
362      return this;
363    }
364
365    public <T> AsyncBatchRpcRetryingCaller<T> build() {
366      return new AsyncBatchRpcRetryingCaller<>(retryTimer, conn, tableName, actions, pauseNs,
367        pauseForCQTBENs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
368    }
369
370    public <T> List<CompletableFuture<T>> call() {
371      return this.<T> build().call();
372    }
373  }
374
375  public BatchCallerBuilder batch() {
376    return new BatchCallerBuilder();
377  }
378
379  public class MasterRequestCallerBuilder<T> extends BuilderBase {
380    private AsyncMasterRequestRpcRetryingCaller.Callable<T> callable;
381
382    private long operationTimeoutNs = -1L;
383
384    private long rpcTimeoutNs = -1L;
385
386    private int priority = PRIORITY_UNSET;
387
388    public MasterRequestCallerBuilder<T> action(
389        AsyncMasterRequestRpcRetryingCaller.Callable<T> callable) {
390      this.callable = callable;
391      return this;
392    }
393
394    public MasterRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) {
395      this.operationTimeoutNs = unit.toNanos(operationTimeout);
396      return this;
397    }
398
399    public MasterRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) {
400      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
401      return this;
402    }
403
404    public MasterRequestCallerBuilder<T> pause(long pause, TimeUnit unit) {
405      this.pauseNs = unit.toNanos(pause);
406      return this;
407    }
408
409    public MasterRequestCallerBuilder<T> pauseForCQTBE(long pause, TimeUnit unit) {
410      this.pauseForCQTBENs = unit.toNanos(pause);
411      return this;
412    }
413
414    public MasterRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
415      this.maxAttempts = maxAttempts;
416      return this;
417    }
418
419    public MasterRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) {
420      this.startLogErrorsCnt = startLogErrorsCnt;
421      return this;
422    }
423
424    public MasterRequestCallerBuilder<T> priority(TableName tableName) {
425      this.priority = Math.max(priority, ConnectionUtils.getPriority(tableName));
426      return this;
427    }
428
429    public MasterRequestCallerBuilder<T> priority(int priority) {
430      this.priority = Math.max(this.priority, priority);
431      return this;
432    }
433
434    private void preCheck() {
435      checkNotNull(callable, "action is null");
436    }
437
438    public AsyncMasterRequestRpcRetryingCaller<T> build() {
439      preCheck();
440      return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn, callable, priority,
441        pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
442    }
443
444    /**
445     * Shortcut for {@code build().call()}
446     */
447    public CompletableFuture<T> call() {
448      return build().call();
449    }
450  }
451
452  public <T> MasterRequestCallerBuilder<T> masterRequest() {
453    return new MasterRequestCallerBuilder<>();
454  }
455
456  public class AdminRequestCallerBuilder<T> extends BuilderBase {
457    // TODO: maybe we can reuse AdminRequestCallerBuild, MasterRequestCallerBuild etc.
458
459    private AsyncAdminRequestRetryingCaller.Callable<T> callable;
460
461    private long operationTimeoutNs = -1L;
462
463    private long rpcTimeoutNs = -1L;
464
465    private ServerName serverName;
466
467    private int priority;
468
469    public AdminRequestCallerBuilder<T> action(
470        AsyncAdminRequestRetryingCaller.Callable<T> callable) {
471      this.callable = callable;
472      return this;
473    }
474
475    public AdminRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) {
476      this.operationTimeoutNs = unit.toNanos(operationTimeout);
477      return this;
478    }
479
480    public AdminRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) {
481      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
482      return this;
483    }
484
485    public AdminRequestCallerBuilder<T> pause(long pause, TimeUnit unit) {
486      this.pauseNs = unit.toNanos(pause);
487      return this;
488    }
489
490    public AdminRequestCallerBuilder<T> pauseForCQTBE(long pause, TimeUnit unit) {
491      this.pauseForCQTBENs = unit.toNanos(pause);
492      return this;
493    }
494
495    public AdminRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
496      this.maxAttempts = maxAttempts;
497      return this;
498    }
499
500    public AdminRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) {
501      this.startLogErrorsCnt = startLogErrorsCnt;
502      return this;
503    }
504
505    public AdminRequestCallerBuilder<T> serverName(ServerName serverName) {
506      this.serverName = serverName;
507      return this;
508    }
509
510    public AdminRequestCallerBuilder<T> priority(int priority) {
511      this.priority = priority;
512      return this;
513    }
514
515    public AsyncAdminRequestRetryingCaller<T> build() {
516      return new AsyncAdminRequestRetryingCaller<T>(retryTimer, conn, priority, pauseNs,
517        pauseForCQTBENs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt,
518        checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null"));
519    }
520
521    public CompletableFuture<T> call() {
522      return build().call();
523    }
524  }
525
526  public <T> AdminRequestCallerBuilder<T> adminRequest() {
527    return new AdminRequestCallerBuilder<>();
528  }
529
530  public class ServerRequestCallerBuilder<T> extends BuilderBase {
531
532    private AsyncServerRequestRpcRetryingCaller.Callable<T> callable;
533
534    private long operationTimeoutNs = -1L;
535
536    private long rpcTimeoutNs = -1L;
537
538    private ServerName serverName;
539
540    public ServerRequestCallerBuilder<T> action(
541        AsyncServerRequestRpcRetryingCaller.Callable<T> callable) {
542      this.callable = callable;
543      return this;
544    }
545
546    public ServerRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) {
547      this.operationTimeoutNs = unit.toNanos(operationTimeout);
548      return this;
549    }
550
551    public ServerRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) {
552      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
553      return this;
554    }
555
556    public ServerRequestCallerBuilder<T> pause(long pause, TimeUnit unit) {
557      this.pauseNs = unit.toNanos(pause);
558      return this;
559    }
560
561    public ServerRequestCallerBuilder<T> pauseForCQTBE(long pause, TimeUnit unit) {
562      this.pauseForCQTBENs = unit.toNanos(pause);
563      return this;
564    }
565
566    public ServerRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
567      this.maxAttempts = maxAttempts;
568      return this;
569    }
570
571    public ServerRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) {
572      this.startLogErrorsCnt = startLogErrorsCnt;
573      return this;
574    }
575
576    public ServerRequestCallerBuilder<T> serverName(ServerName serverName) {
577      this.serverName = serverName;
578      return this;
579    }
580
581    public AsyncServerRequestRpcRetryingCaller<T> build() {
582      return new AsyncServerRequestRpcRetryingCaller<T>(retryTimer, conn, pauseNs, pauseForCQTBENs,
583        maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt,
584        checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null"));
585    }
586
587    public CompletableFuture<T> call() {
588      return build().call();
589    }
590  }
591
592  public <T> ServerRequestCallerBuilder<T> serverRequest() {
593    return new ServerRequestCallerBuilder<>();
594  }
595}