001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.calcPriority;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
023import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument;
024import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
025
026import java.util.Collections;
027import java.util.List;
028import java.util.Map;
029import java.util.concurrent.CompletableFuture;
030import java.util.concurrent.TimeUnit;
031import org.apache.hadoop.hbase.HRegionLocation;
032import org.apache.hadoop.hbase.ServerName;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
035import org.apache.hadoop.hbase.ipc.HBaseRpcController;
036import org.apache.yetus.audience.InterfaceAudience;
037
038import org.apache.hbase.thirdparty.io.netty.util.Timer;
039
040import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
041import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
042
043/**
044 * Factory to create an AsyncRpcRetryCaller.
045 * @since 2.0.0
046 */
047@InterfaceAudience.Private
048class AsyncRpcRetryingCallerFactory {
049
050  private final AsyncConnectionImpl conn;
051
052  private final Timer retryTimer;
053
054  public AsyncRpcRetryingCallerFactory(AsyncConnectionImpl conn, Timer retryTimer) {
055    this.conn = conn;
056    this.retryTimer = retryTimer;
057  }
058
059  private abstract class BuilderBase {
060
061    protected long pauseNs = conn.connConf.getPauseNs();
062
063    protected long pauseNsForServerOverloaded = conn.connConf.getPauseNsForServerOverloaded();
064
065    protected int maxAttempts = retries2Attempts(conn.connConf.getMaxRetries());
066
067    protected int startLogErrorsCnt = conn.connConf.getStartLogErrorsCnt();
068  }
069
070  public class SingleRequestCallerBuilder<T> extends BuilderBase {
071
072    private TableName tableName;
073
074    private byte[] row;
075
076    private AsyncSingleRequestRpcRetryingCaller.Callable<T> callable;
077
078    private long operationTimeoutNs = -1L;
079
080    private long rpcTimeoutNs = -1L;
081
082    private RegionLocateType locateType = RegionLocateType.CURRENT;
083
084    private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID;
085
086    private int priority = PRIORITY_UNSET;
087
088    private Map<String, byte[]> requestAttributes = Collections.emptyMap();
089
090    public SingleRequestCallerBuilder<T> table(TableName tableName) {
091      this.tableName = tableName;
092      return this;
093    }
094
095    public SingleRequestCallerBuilder<T> row(byte[] row) {
096      this.row = row;
097      return this;
098    }
099
100    public SingleRequestCallerBuilder<T>
101      action(AsyncSingleRequestRpcRetryingCaller.Callable<T> callable) {
102      this.callable = callable;
103      return this;
104    }
105
106    public SingleRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) {
107      this.operationTimeoutNs = unit.toNanos(operationTimeout);
108      return this;
109    }
110
111    public SingleRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) {
112      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
113      return this;
114    }
115
116    public SingleRequestCallerBuilder<T> locateType(RegionLocateType locateType) {
117      this.locateType = locateType;
118      return this;
119    }
120
121    public SingleRequestCallerBuilder<T> pause(long pause, TimeUnit unit) {
122      this.pauseNs = unit.toNanos(pause);
123      return this;
124    }
125
126    public SingleRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) {
127      this.pauseNsForServerOverloaded = unit.toNanos(pause);
128      return this;
129    }
130
131    public SingleRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
132      this.maxAttempts = maxAttempts;
133      return this;
134    }
135
136    public SingleRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) {
137      this.startLogErrorsCnt = startLogErrorsCnt;
138      return this;
139    }
140
141    public SingleRequestCallerBuilder<T> replicaId(int replicaId) {
142      this.replicaId = replicaId;
143      return this;
144    }
145
146    public SingleRequestCallerBuilder<T> priority(int priority) {
147      this.priority = priority;
148      return this;
149    }
150
151    public SingleRequestCallerBuilder<T>
152      setRequestAttributes(Map<String, byte[]> requestAttributes) {
153      this.requestAttributes = requestAttributes;
154      return this;
155    }
156
157    private void preCheck() {
158      checkArgument(replicaId >= 0, "invalid replica id %s", replicaId);
159      checkNotNull(tableName, "tableName is null");
160      checkNotNull(row, "row is null");
161      checkNotNull(locateType, "locateType is null");
162      checkNotNull(callable, "action is null");
163      this.priority = calcPriority(priority, tableName);
164    }
165
166    public AsyncSingleRequestRpcRetryingCaller<T> build() {
167      preCheck();
168      return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, tableName, row, replicaId,
169        locateType, callable, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts,
170        operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, requestAttributes);
171    }
172
173    /**
174     * Shortcut for {@code build().call()}
175     */
176    public CompletableFuture<T> call() {
177      return build().call();
178    }
179  }
180
181  /**
182   * Create retry caller for single action, such as get, put, delete, etc.
183   */
184  public <T> SingleRequestCallerBuilder<T> single() {
185    return new SingleRequestCallerBuilder<>();
186  }
187
188  public class ScanSingleRegionCallerBuilder extends BuilderBase {
189
190    private Long scannerId = null;
191
192    private Scan scan;
193
194    private ScanMetrics scanMetrics;
195
196    private ScanResultCache resultCache;
197
198    private AdvancedScanResultConsumer consumer;
199
200    private ClientService.Interface stub;
201
202    private HRegionLocation loc;
203
204    private boolean isRegionServerRemote;
205
206    private long scannerLeaseTimeoutPeriodNs;
207
208    private long scanTimeoutNs;
209
210    private long rpcTimeoutNs;
211
212    private int priority = PRIORITY_UNSET;
213
214    private Map<String, byte[]> requestAttributes = Collections.emptyMap();
215
216    public ScanSingleRegionCallerBuilder id(long scannerId) {
217      this.scannerId = scannerId;
218      return this;
219    }
220
221    public ScanSingleRegionCallerBuilder setScan(Scan scan) {
222      this.scan = scan;
223      this.priority = scan.getPriority();
224      return this;
225    }
226
227    public ScanSingleRegionCallerBuilder metrics(ScanMetrics scanMetrics) {
228      this.scanMetrics = scanMetrics;
229      return this;
230    }
231
232    public ScanSingleRegionCallerBuilder remote(boolean isRegionServerRemote) {
233      this.isRegionServerRemote = isRegionServerRemote;
234      return this;
235    }
236
237    public ScanSingleRegionCallerBuilder resultCache(ScanResultCache resultCache) {
238      this.resultCache = resultCache;
239      return this;
240    }
241
242    public ScanSingleRegionCallerBuilder consumer(AdvancedScanResultConsumer consumer) {
243      this.consumer = consumer;
244      return this;
245    }
246
247    public ScanSingleRegionCallerBuilder stub(ClientService.Interface stub) {
248      this.stub = stub;
249      return this;
250    }
251
252    public ScanSingleRegionCallerBuilder location(HRegionLocation loc) {
253      this.loc = loc;
254      return this;
255    }
256
257    public ScanSingleRegionCallerBuilder scannerLeaseTimeoutPeriod(long scannerLeaseTimeoutPeriod,
258      TimeUnit unit) {
259      this.scannerLeaseTimeoutPeriodNs = unit.toNanos(scannerLeaseTimeoutPeriod);
260      return this;
261    }
262
263    public ScanSingleRegionCallerBuilder scanTimeout(long scanTimeout, TimeUnit unit) {
264      this.scanTimeoutNs = unit.toNanos(scanTimeout);
265      return this;
266    }
267
268    public ScanSingleRegionCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) {
269      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
270      return this;
271    }
272
273    public ScanSingleRegionCallerBuilder pause(long pause, TimeUnit unit) {
274      this.pauseNs = unit.toNanos(pause);
275      return this;
276    }
277
278    public ScanSingleRegionCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) {
279      this.pauseNsForServerOverloaded = unit.toNanos(pause);
280      return this;
281    }
282
283    public ScanSingleRegionCallerBuilder maxAttempts(int maxAttempts) {
284      this.maxAttempts = maxAttempts;
285      return this;
286    }
287
288    public ScanSingleRegionCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) {
289      this.startLogErrorsCnt = startLogErrorsCnt;
290      return this;
291    }
292
293    public ScanSingleRegionCallerBuilder
294      setRequestAttributes(Map<String, byte[]> requestAttributes) {
295      this.requestAttributes = requestAttributes;
296      return this;
297    }
298
299    private void preCheck() {
300      checkArgument(scannerId != null, "invalid scannerId %d", scannerId);
301      checkNotNull(scan, "scan is null");
302      checkNotNull(resultCache, "resultCache is null");
303      checkNotNull(consumer, "consumer is null");
304      checkNotNull(stub, "stub is null");
305      checkNotNull(loc, "location is null");
306      this.priority = calcPriority(priority, loc.getRegion().getTable());
307    }
308
309    public AsyncScanSingleRegionRpcRetryingCaller build() {
310      preCheck();
311      return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, scan, scanMetrics,
312        scannerId, resultCache, consumer, stub, loc, isRegionServerRemote, priority,
313        scannerLeaseTimeoutPeriodNs, pauseNs, pauseNsForServerOverloaded, maxAttempts,
314        scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, requestAttributes);
315    }
316
317    /**
318     * Short cut for {@code build().start(HBaseRpcController, ScanResponse)}.
319     */
320    public CompletableFuture<Boolean> start(HBaseRpcController controller,
321      ScanResponse respWhenOpen) {
322      return build().start(controller, respWhenOpen);
323    }
324  }
325
326  /**
327   * Create retry caller for scanning a region.
328   */
329  public ScanSingleRegionCallerBuilder scanSingleRegion() {
330    return new ScanSingleRegionCallerBuilder();
331  }
332
333  public class BatchCallerBuilder extends BuilderBase {
334
335    private TableName tableName;
336
337    private List<? extends Row> actions;
338
339    private long operationTimeoutNs = -1L;
340
341    private long rpcTimeoutNs = -1L;
342
343    private Map<String, byte[]> requestAttributes = Collections.emptyMap();
344
345    public BatchCallerBuilder table(TableName tableName) {
346      this.tableName = tableName;
347      return this;
348    }
349
350    public BatchCallerBuilder actions(List<? extends Row> actions) {
351      this.actions = actions;
352      return this;
353    }
354
355    public BatchCallerBuilder operationTimeout(long operationTimeout, TimeUnit unit) {
356      this.operationTimeoutNs = unit.toNanos(operationTimeout);
357      return this;
358    }
359
360    public BatchCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) {
361      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
362      return this;
363    }
364
365    public BatchCallerBuilder pause(long pause, TimeUnit unit) {
366      this.pauseNs = unit.toNanos(pause);
367      return this;
368    }
369
370    public BatchCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) {
371      this.pauseNsForServerOverloaded = unit.toNanos(pause);
372      return this;
373    }
374
375    public BatchCallerBuilder maxAttempts(int maxAttempts) {
376      this.maxAttempts = maxAttempts;
377      return this;
378    }
379
380    public BatchCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) {
381      this.startLogErrorsCnt = startLogErrorsCnt;
382      return this;
383    }
384
385    public BatchCallerBuilder setRequestAttributes(Map<String, byte[]> requestAttributes) {
386      this.requestAttributes = requestAttributes;
387      return this;
388    }
389
390    public <T> AsyncBatchRpcRetryingCaller<T> build() {
391      return new AsyncBatchRpcRetryingCaller<>(retryTimer, conn, tableName, actions, pauseNs,
392        pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
393        startLogErrorsCnt, requestAttributes);
394    }
395
396    public <T> List<CompletableFuture<T>> call() {
397      return this.<T> build().call();
398    }
399  }
400
401  public BatchCallerBuilder batch() {
402    return new BatchCallerBuilder();
403  }
404
405  public class MasterRequestCallerBuilder<T> extends BuilderBase {
406    private AsyncMasterRequestRpcRetryingCaller.Callable<T> callable;
407
408    private long operationTimeoutNs = -1L;
409
410    private long rpcTimeoutNs = -1L;
411
412    private int priority = PRIORITY_UNSET;
413
414    public MasterRequestCallerBuilder<T>
415      action(AsyncMasterRequestRpcRetryingCaller.Callable<T> callable) {
416      this.callable = callable;
417      return this;
418    }
419
420    public MasterRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) {
421      this.operationTimeoutNs = unit.toNanos(operationTimeout);
422      return this;
423    }
424
425    public MasterRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) {
426      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
427      return this;
428    }
429
430    public MasterRequestCallerBuilder<T> pause(long pause, TimeUnit unit) {
431      this.pauseNs = unit.toNanos(pause);
432      return this;
433    }
434
435    public MasterRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) {
436      this.pauseNsForServerOverloaded = unit.toNanos(pause);
437      return this;
438    }
439
440    public MasterRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
441      this.maxAttempts = maxAttempts;
442      return this;
443    }
444
445    public MasterRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) {
446      this.startLogErrorsCnt = startLogErrorsCnt;
447      return this;
448    }
449
450    public MasterRequestCallerBuilder<T> priority(TableName tableName) {
451      this.priority = Math.max(priority, ConnectionUtils.getPriority(tableName));
452      return this;
453    }
454
455    public MasterRequestCallerBuilder<T> priority(int priority) {
456      this.priority = Math.max(this.priority, priority);
457      return this;
458    }
459
460    private void preCheck() {
461      checkNotNull(callable, "action is null");
462    }
463
464    public AsyncMasterRequestRpcRetryingCaller<T> build() {
465      preCheck();
466      return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn, callable, priority,
467        pauseNs, pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
468        startLogErrorsCnt);
469    }
470
471    /**
472     * Shortcut for {@code build().call()}
473     */
474    public CompletableFuture<T> call() {
475      return build().call();
476    }
477  }
478
479  public <T> MasterRequestCallerBuilder<T> masterRequest() {
480    return new MasterRequestCallerBuilder<>();
481  }
482
483  public class AdminRequestCallerBuilder<T> extends BuilderBase {
484    // TODO: maybe we can reuse AdminRequestCallerBuild, MasterRequestCallerBuild etc.
485
486    private AsyncAdminRequestRetryingCaller.Callable<T> callable;
487
488    private long operationTimeoutNs = -1L;
489
490    private long rpcTimeoutNs = -1L;
491
492    private ServerName serverName;
493
494    private int priority;
495
496    public AdminRequestCallerBuilder<T>
497      action(AsyncAdminRequestRetryingCaller.Callable<T> callable) {
498      this.callable = callable;
499      return this;
500    }
501
502    public AdminRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) {
503      this.operationTimeoutNs = unit.toNanos(operationTimeout);
504      return this;
505    }
506
507    public AdminRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) {
508      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
509      return this;
510    }
511
512    public AdminRequestCallerBuilder<T> pause(long pause, TimeUnit unit) {
513      this.pauseNs = unit.toNanos(pause);
514      return this;
515    }
516
517    public AdminRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) {
518      this.pauseNsForServerOverloaded = unit.toNanos(pause);
519      return this;
520    }
521
522    public AdminRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
523      this.maxAttempts = maxAttempts;
524      return this;
525    }
526
527    public AdminRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) {
528      this.startLogErrorsCnt = startLogErrorsCnt;
529      return this;
530    }
531
532    public AdminRequestCallerBuilder<T> serverName(ServerName serverName) {
533      this.serverName = serverName;
534      return this;
535    }
536
537    public AdminRequestCallerBuilder<T> priority(int priority) {
538      this.priority = priority;
539      return this;
540    }
541
542    public AsyncAdminRequestRetryingCaller<T> build() {
543      return new AsyncAdminRequestRetryingCaller<T>(retryTimer, conn, priority, pauseNs,
544        pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
545        startLogErrorsCnt, checkNotNull(serverName, "serverName is null"),
546        checkNotNull(callable, "action is null"));
547    }
548
549    public CompletableFuture<T> call() {
550      return build().call();
551    }
552  }
553
554  public <T> AdminRequestCallerBuilder<T> adminRequest() {
555    return new AdminRequestCallerBuilder<>();
556  }
557
558  public class ServerRequestCallerBuilder<T> extends BuilderBase {
559
560    private AsyncServerRequestRpcRetryingCaller.Callable<T> callable;
561
562    private long operationTimeoutNs = -1L;
563
564    private long rpcTimeoutNs = -1L;
565
566    private ServerName serverName;
567
568    public ServerRequestCallerBuilder<T>
569      action(AsyncServerRequestRpcRetryingCaller.Callable<T> callable) {
570      this.callable = callable;
571      return this;
572    }
573
574    public ServerRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) {
575      this.operationTimeoutNs = unit.toNanos(operationTimeout);
576      return this;
577    }
578
579    public ServerRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) {
580      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
581      return this;
582    }
583
584    public ServerRequestCallerBuilder<T> pause(long pause, TimeUnit unit) {
585      this.pauseNs = unit.toNanos(pause);
586      return this;
587    }
588
589    public ServerRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) {
590      this.pauseNsForServerOverloaded = unit.toNanos(pause);
591      return this;
592    }
593
594    public ServerRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
595      this.maxAttempts = maxAttempts;
596      return this;
597    }
598
599    public ServerRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) {
600      this.startLogErrorsCnt = startLogErrorsCnt;
601      return this;
602    }
603
604    public ServerRequestCallerBuilder<T> serverName(ServerName serverName) {
605      this.serverName = serverName;
606      return this;
607    }
608
609    public AsyncServerRequestRpcRetryingCaller<T> build() {
610      return new AsyncServerRequestRpcRetryingCaller<T>(retryTimer, conn, pauseNs,
611        pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
612        startLogErrorsCnt, checkNotNull(serverName, "serverName is null"),
613        checkNotNull(callable, "action is null"));
614    }
615
616    public CompletableFuture<T> call() {
617      return build().call();
618    }
619  }
620
621  public <T> ServerRequestCallerBuilder<T> serverRequest() {
622    return new ServerRequestCallerBuilder<>();
623  }
624}