001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.calcPriority;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
023import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument;
024import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
025
026import java.util.List;
027import java.util.concurrent.CompletableFuture;
028import java.util.concurrent.TimeUnit;
029import org.apache.hadoop.hbase.HRegionLocation;
030import org.apache.hadoop.hbase.ServerName;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
033import org.apache.hadoop.hbase.ipc.HBaseRpcController;
034import org.apache.yetus.audience.InterfaceAudience;
035
036import org.apache.hbase.thirdparty.io.netty.util.Timer;
037
038import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
039import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
040
041/**
042 * Factory to create an AsyncRpcRetryCaller.
043 * @since 2.0.0
044 */
045@InterfaceAudience.Private
046class AsyncRpcRetryingCallerFactory {
047
048  private final AsyncConnectionImpl conn;
049
050  private final Timer retryTimer;
051
052  public AsyncRpcRetryingCallerFactory(AsyncConnectionImpl conn, Timer retryTimer) {
053    this.conn = conn;
054    this.retryTimer = retryTimer;
055  }
056
057  private abstract class BuilderBase {
058
059    protected long pauseNs = conn.connConf.getPauseNs();
060
061    protected long pauseNsForServerOverloaded = conn.connConf.getPauseNsForServerOverloaded();
062
063    protected int maxAttempts = retries2Attempts(conn.connConf.getMaxRetries());
064
065    protected int startLogErrorsCnt = conn.connConf.getStartLogErrorsCnt();
066  }
067
068  public class SingleRequestCallerBuilder<T> extends BuilderBase {
069
070    private TableName tableName;
071
072    private byte[] row;
073
074    private AsyncSingleRequestRpcRetryingCaller.Callable<T> callable;
075
076    private long operationTimeoutNs = -1L;
077
078    private long rpcTimeoutNs = -1L;
079
080    private RegionLocateType locateType = RegionLocateType.CURRENT;
081
082    private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID;
083
084    private int priority = PRIORITY_UNSET;
085
086    public SingleRequestCallerBuilder<T> table(TableName tableName) {
087      this.tableName = tableName;
088      return this;
089    }
090
091    public SingleRequestCallerBuilder<T> row(byte[] row) {
092      this.row = row;
093      return this;
094    }
095
096    public SingleRequestCallerBuilder<T>
097      action(AsyncSingleRequestRpcRetryingCaller.Callable<T> callable) {
098      this.callable = callable;
099      return this;
100    }
101
102    public SingleRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) {
103      this.operationTimeoutNs = unit.toNanos(operationTimeout);
104      return this;
105    }
106
107    public SingleRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) {
108      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
109      return this;
110    }
111
112    public SingleRequestCallerBuilder<T> locateType(RegionLocateType locateType) {
113      this.locateType = locateType;
114      return this;
115    }
116
117    public SingleRequestCallerBuilder<T> pause(long pause, TimeUnit unit) {
118      this.pauseNs = unit.toNanos(pause);
119      return this;
120    }
121
122    public SingleRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) {
123      this.pauseNsForServerOverloaded = unit.toNanos(pause);
124      return this;
125    }
126
127    public SingleRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
128      this.maxAttempts = maxAttempts;
129      return this;
130    }
131
132    public SingleRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) {
133      this.startLogErrorsCnt = startLogErrorsCnt;
134      return this;
135    }
136
137    public SingleRequestCallerBuilder<T> replicaId(int replicaId) {
138      this.replicaId = replicaId;
139      return this;
140    }
141
142    public SingleRequestCallerBuilder<T> priority(int priority) {
143      this.priority = priority;
144      return this;
145    }
146
147    private void preCheck() {
148      checkArgument(replicaId >= 0, "invalid replica id %s", replicaId);
149      checkNotNull(tableName, "tableName is null");
150      checkNotNull(row, "row is null");
151      checkNotNull(locateType, "locateType is null");
152      checkNotNull(callable, "action is null");
153      this.priority = calcPriority(priority, tableName);
154    }
155
156    public AsyncSingleRequestRpcRetryingCaller<T> build() {
157      preCheck();
158      return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, tableName, row, replicaId,
159        locateType, callable, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts,
160        operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
161    }
162
163    /**
164     * Shortcut for {@code build().call()}
165     */
166    public CompletableFuture<T> call() {
167      return build().call();
168    }
169  }
170
171  /**
172   * Create retry caller for single action, such as get, put, delete, etc.
173   */
174  public <T> SingleRequestCallerBuilder<T> single() {
175    return new SingleRequestCallerBuilder<>();
176  }
177
178  public class ScanSingleRegionCallerBuilder extends BuilderBase {
179
180    private Long scannerId = null;
181
182    private Scan scan;
183
184    private ScanMetrics scanMetrics;
185
186    private ScanResultCache resultCache;
187
188    private AdvancedScanResultConsumer consumer;
189
190    private ClientService.Interface stub;
191
192    private HRegionLocation loc;
193
194    private boolean isRegionServerRemote;
195
196    private long scannerLeaseTimeoutPeriodNs;
197
198    private long scanTimeoutNs;
199
200    private long rpcTimeoutNs;
201
202    private int priority = PRIORITY_UNSET;
203
204    public ScanSingleRegionCallerBuilder id(long scannerId) {
205      this.scannerId = scannerId;
206      return this;
207    }
208
209    public ScanSingleRegionCallerBuilder setScan(Scan scan) {
210      this.scan = scan;
211      this.priority = scan.getPriority();
212      return this;
213    }
214
215    public ScanSingleRegionCallerBuilder metrics(ScanMetrics scanMetrics) {
216      this.scanMetrics = scanMetrics;
217      return this;
218    }
219
220    public ScanSingleRegionCallerBuilder remote(boolean isRegionServerRemote) {
221      this.isRegionServerRemote = isRegionServerRemote;
222      return this;
223    }
224
225    public ScanSingleRegionCallerBuilder resultCache(ScanResultCache resultCache) {
226      this.resultCache = resultCache;
227      return this;
228    }
229
230    public ScanSingleRegionCallerBuilder consumer(AdvancedScanResultConsumer consumer) {
231      this.consumer = consumer;
232      return this;
233    }
234
235    public ScanSingleRegionCallerBuilder stub(ClientService.Interface stub) {
236      this.stub = stub;
237      return this;
238    }
239
240    public ScanSingleRegionCallerBuilder location(HRegionLocation loc) {
241      this.loc = loc;
242      return this;
243    }
244
245    public ScanSingleRegionCallerBuilder scannerLeaseTimeoutPeriod(long scannerLeaseTimeoutPeriod,
246      TimeUnit unit) {
247      this.scannerLeaseTimeoutPeriodNs = unit.toNanos(scannerLeaseTimeoutPeriod);
248      return this;
249    }
250
251    public ScanSingleRegionCallerBuilder scanTimeout(long scanTimeout, TimeUnit unit) {
252      this.scanTimeoutNs = unit.toNanos(scanTimeout);
253      return this;
254    }
255
256    public ScanSingleRegionCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) {
257      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
258      return this;
259    }
260
261    public ScanSingleRegionCallerBuilder pause(long pause, TimeUnit unit) {
262      this.pauseNs = unit.toNanos(pause);
263      return this;
264    }
265
266    public ScanSingleRegionCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) {
267      this.pauseNsForServerOverloaded = unit.toNanos(pause);
268      return this;
269    }
270
271    public ScanSingleRegionCallerBuilder maxAttempts(int maxAttempts) {
272      this.maxAttempts = maxAttempts;
273      return this;
274    }
275
276    public ScanSingleRegionCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) {
277      this.startLogErrorsCnt = startLogErrorsCnt;
278      return this;
279    }
280
281    private void preCheck() {
282      checkArgument(scannerId != null, "invalid scannerId %d", scannerId);
283      checkNotNull(scan, "scan is null");
284      checkNotNull(resultCache, "resultCache is null");
285      checkNotNull(consumer, "consumer is null");
286      checkNotNull(stub, "stub is null");
287      checkNotNull(loc, "location is null");
288      this.priority = calcPriority(priority, loc.getRegion().getTable());
289    }
290
291    public AsyncScanSingleRegionRpcRetryingCaller build() {
292      preCheck();
293      return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, scan, scanMetrics,
294        scannerId, resultCache, consumer, stub, loc, isRegionServerRemote, priority,
295        scannerLeaseTimeoutPeriodNs, pauseNs, pauseNsForServerOverloaded, maxAttempts,
296        scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
297    }
298
299    /**
300     * Short cut for {@code build().start(HBaseRpcController, ScanResponse)}.
301     */
302    public CompletableFuture<Boolean> start(HBaseRpcController controller,
303      ScanResponse respWhenOpen) {
304      return build().start(controller, respWhenOpen);
305    }
306  }
307
308  /**
309   * Create retry caller for scanning a region.
310   */
311  public ScanSingleRegionCallerBuilder scanSingleRegion() {
312    return new ScanSingleRegionCallerBuilder();
313  }
314
315  public class BatchCallerBuilder extends BuilderBase {
316
317    private TableName tableName;
318
319    private List<? extends Row> actions;
320
321    private long operationTimeoutNs = -1L;
322
323    private long rpcTimeoutNs = -1L;
324
325    public BatchCallerBuilder table(TableName tableName) {
326      this.tableName = tableName;
327      return this;
328    }
329
330    public BatchCallerBuilder actions(List<? extends Row> actions) {
331      this.actions = actions;
332      return this;
333    }
334
335    public BatchCallerBuilder operationTimeout(long operationTimeout, TimeUnit unit) {
336      this.operationTimeoutNs = unit.toNanos(operationTimeout);
337      return this;
338    }
339
340    public BatchCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) {
341      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
342      return this;
343    }
344
345    public BatchCallerBuilder pause(long pause, TimeUnit unit) {
346      this.pauseNs = unit.toNanos(pause);
347      return this;
348    }
349
350    public BatchCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) {
351      this.pauseNsForServerOverloaded = unit.toNanos(pause);
352      return this;
353    }
354
355    public BatchCallerBuilder maxAttempts(int maxAttempts) {
356      this.maxAttempts = maxAttempts;
357      return this;
358    }
359
360    public BatchCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) {
361      this.startLogErrorsCnt = startLogErrorsCnt;
362      return this;
363    }
364
365    public <T> AsyncBatchRpcRetryingCaller<T> build() {
366      return new AsyncBatchRpcRetryingCaller<>(retryTimer, conn, tableName, actions, pauseNs,
367        pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
368        startLogErrorsCnt);
369    }
370
371    public <T> List<CompletableFuture<T>> call() {
372      return this.<T> build().call();
373    }
374  }
375
376  public BatchCallerBuilder batch() {
377    return new BatchCallerBuilder();
378  }
379
380  public class MasterRequestCallerBuilder<T> extends BuilderBase {
381    private AsyncMasterRequestRpcRetryingCaller.Callable<T> callable;
382
383    private long operationTimeoutNs = -1L;
384
385    private long rpcTimeoutNs = -1L;
386
387    private int priority = PRIORITY_UNSET;
388
389    public MasterRequestCallerBuilder<T>
390      action(AsyncMasterRequestRpcRetryingCaller.Callable<T> callable) {
391      this.callable = callable;
392      return this;
393    }
394
395    public MasterRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) {
396      this.operationTimeoutNs = unit.toNanos(operationTimeout);
397      return this;
398    }
399
400    public MasterRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) {
401      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
402      return this;
403    }
404
405    public MasterRequestCallerBuilder<T> pause(long pause, TimeUnit unit) {
406      this.pauseNs = unit.toNanos(pause);
407      return this;
408    }
409
410    public MasterRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) {
411      this.pauseNsForServerOverloaded = unit.toNanos(pause);
412      return this;
413    }
414
415    public MasterRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
416      this.maxAttempts = maxAttempts;
417      return this;
418    }
419
420    public MasterRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) {
421      this.startLogErrorsCnt = startLogErrorsCnt;
422      return this;
423    }
424
425    public MasterRequestCallerBuilder<T> priority(TableName tableName) {
426      this.priority = Math.max(priority, ConnectionUtils.getPriority(tableName));
427      return this;
428    }
429
430    public MasterRequestCallerBuilder<T> priority(int priority) {
431      this.priority = Math.max(this.priority, priority);
432      return this;
433    }
434
435    private void preCheck() {
436      checkNotNull(callable, "action is null");
437    }
438
439    public AsyncMasterRequestRpcRetryingCaller<T> build() {
440      preCheck();
441      return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn, callable, priority,
442        pauseNs, pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
443        startLogErrorsCnt);
444    }
445
446    /**
447     * Shortcut for {@code build().call()}
448     */
449    public CompletableFuture<T> call() {
450      return build().call();
451    }
452  }
453
454  public <T> MasterRequestCallerBuilder<T> masterRequest() {
455    return new MasterRequestCallerBuilder<>();
456  }
457
458  public class AdminRequestCallerBuilder<T> extends BuilderBase {
459    // TODO: maybe we can reuse AdminRequestCallerBuild, MasterRequestCallerBuild etc.
460
461    private AsyncAdminRequestRetryingCaller.Callable<T> callable;
462
463    private long operationTimeoutNs = -1L;
464
465    private long rpcTimeoutNs = -1L;
466
467    private ServerName serverName;
468
469    private int priority;
470
471    public AdminRequestCallerBuilder<T>
472      action(AsyncAdminRequestRetryingCaller.Callable<T> callable) {
473      this.callable = callable;
474      return this;
475    }
476
477    public AdminRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) {
478      this.operationTimeoutNs = unit.toNanos(operationTimeout);
479      return this;
480    }
481
482    public AdminRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) {
483      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
484      return this;
485    }
486
487    public AdminRequestCallerBuilder<T> pause(long pause, TimeUnit unit) {
488      this.pauseNs = unit.toNanos(pause);
489      return this;
490    }
491
492    public AdminRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) {
493      this.pauseNsForServerOverloaded = unit.toNanos(pause);
494      return this;
495    }
496
497    public AdminRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
498      this.maxAttempts = maxAttempts;
499      return this;
500    }
501
502    public AdminRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) {
503      this.startLogErrorsCnt = startLogErrorsCnt;
504      return this;
505    }
506
507    public AdminRequestCallerBuilder<T> serverName(ServerName serverName) {
508      this.serverName = serverName;
509      return this;
510    }
511
512    public AdminRequestCallerBuilder<T> priority(int priority) {
513      this.priority = priority;
514      return this;
515    }
516
517    public AsyncAdminRequestRetryingCaller<T> build() {
518      return new AsyncAdminRequestRetryingCaller<T>(retryTimer, conn, priority, pauseNs,
519        pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
520        startLogErrorsCnt, checkNotNull(serverName, "serverName is null"),
521        checkNotNull(callable, "action is null"));
522    }
523
524    public CompletableFuture<T> call() {
525      return build().call();
526    }
527  }
528
529  public <T> AdminRequestCallerBuilder<T> adminRequest() {
530    return new AdminRequestCallerBuilder<>();
531  }
532
533  public class ServerRequestCallerBuilder<T> extends BuilderBase {
534
535    private AsyncServerRequestRpcRetryingCaller.Callable<T> callable;
536
537    private long operationTimeoutNs = -1L;
538
539    private long rpcTimeoutNs = -1L;
540
541    private ServerName serverName;
542
543    public ServerRequestCallerBuilder<T>
544      action(AsyncServerRequestRpcRetryingCaller.Callable<T> callable) {
545      this.callable = callable;
546      return this;
547    }
548
549    public ServerRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) {
550      this.operationTimeoutNs = unit.toNanos(operationTimeout);
551      return this;
552    }
553
554    public ServerRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) {
555      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
556      return this;
557    }
558
559    public ServerRequestCallerBuilder<T> pause(long pause, TimeUnit unit) {
560      this.pauseNs = unit.toNanos(pause);
561      return this;
562    }
563
564    public ServerRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) {
565      this.pauseNsForServerOverloaded = unit.toNanos(pause);
566      return this;
567    }
568
569    public ServerRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
570      this.maxAttempts = maxAttempts;
571      return this;
572    }
573
574    public ServerRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) {
575      this.startLogErrorsCnt = startLogErrorsCnt;
576      return this;
577    }
578
579    public ServerRequestCallerBuilder<T> serverName(ServerName serverName) {
580      this.serverName = serverName;
581      return this;
582    }
583
584    public AsyncServerRequestRpcRetryingCaller<T> build() {
585      return new AsyncServerRequestRpcRetryingCaller<T>(retryTimer, conn, pauseNs,
586        pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
587        startLogErrorsCnt, checkNotNull(serverName, "serverName is null"),
588        checkNotNull(callable, "action is null"));
589    }
590
591    public CompletableFuture<T> call() {
592      return build().call();
593    }
594  }
595
596  public <T> ServerRequestCallerBuilder<T> serverRequest() {
597    return new ServerRequestCallerBuilder<>();
598  }
599}