001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
021import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument;
022import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
023
024import java.util.List;
025import java.util.concurrent.CompletableFuture;
026import java.util.concurrent.TimeUnit;
027import org.apache.hadoop.hbase.HRegionLocation;
028import org.apache.hadoop.hbase.ServerName;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
031import org.apache.hadoop.hbase.ipc.HBaseRpcController;
032import org.apache.yetus.audience.InterfaceAudience;
033
034import org.apache.hbase.thirdparty.io.netty.util.Timer;
035
036import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
037import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
038
039/**
040 * Factory to create an AsyncRpcRetryCaller.
041 * @since 2.0.0
042 */
043@InterfaceAudience.Private
044class AsyncRpcRetryingCallerFactory {
045
046  private final AsyncConnectionImpl conn;
047
048  private final Timer retryTimer;
049
050  public AsyncRpcRetryingCallerFactory(AsyncConnectionImpl conn, Timer retryTimer) {
051    this.conn = conn;
052    this.retryTimer = retryTimer;
053  }
054
055  private abstract class BuilderBase {
056
057    protected long pauseNs = conn.connConf.getPauseNs();
058
059    protected int maxAttempts = retries2Attempts(conn.connConf.getMaxRetries());
060
061    protected int startLogErrorsCnt = conn.connConf.getStartLogErrorsCnt();
062  }
063
064  public class SingleRequestCallerBuilder<T> extends BuilderBase {
065
066    private TableName tableName;
067
068    private byte[] row;
069
070    private AsyncSingleRequestRpcRetryingCaller.Callable<T> callable;
071
072    private long operationTimeoutNs = -1L;
073
074    private long rpcTimeoutNs = -1L;
075
076    private RegionLocateType locateType = RegionLocateType.CURRENT;
077
078    private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID;
079
080    public SingleRequestCallerBuilder<T> table(TableName tableName) {
081      this.tableName = tableName;
082      return this;
083    }
084
085    public SingleRequestCallerBuilder<T> row(byte[] row) {
086      this.row = row;
087      return this;
088    }
089
090    public SingleRequestCallerBuilder<T> action(
091        AsyncSingleRequestRpcRetryingCaller.Callable<T> callable) {
092      this.callable = callable;
093      return this;
094    }
095
096    public SingleRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) {
097      this.operationTimeoutNs = unit.toNanos(operationTimeout);
098      return this;
099    }
100
101    public SingleRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) {
102      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
103      return this;
104    }
105
106    public SingleRequestCallerBuilder<T> locateType(RegionLocateType locateType) {
107      this.locateType = locateType;
108      return this;
109    }
110
111    public SingleRequestCallerBuilder<T> pause(long pause, TimeUnit unit) {
112      this.pauseNs = unit.toNanos(pause);
113      return this;
114    }
115
116    public SingleRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
117      this.maxAttempts = maxAttempts;
118      return this;
119    }
120
121    public SingleRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) {
122      this.startLogErrorsCnt = startLogErrorsCnt;
123      return this;
124    }
125
126    public SingleRequestCallerBuilder<T> replicaId(int replicaId) {
127      this.replicaId = replicaId;
128      return this;
129    }
130
131    public AsyncSingleRequestRpcRetryingCaller<T> build() {
132      checkArgument(replicaId >= 0, "invalid replica id %s", replicaId);
133      return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn,
134        checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"), replicaId,
135        checkNotNull(locateType, "locateType is null"), checkNotNull(callable, "action is null"),
136        pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
137    }
138
139    /**
140     * Shortcut for {@code build().call()}
141     */
142    public CompletableFuture<T> call() {
143      return build().call();
144    }
145  }
146
147  /**
148   * Create retry caller for single action, such as get, put, delete, etc.
149   */
150  public <T> SingleRequestCallerBuilder<T> single() {
151    return new SingleRequestCallerBuilder<>();
152  }
153
154  public class ScanSingleRegionCallerBuilder extends BuilderBase {
155
156    private Long scannerId = null;
157
158    private Scan scan;
159
160    private ScanMetrics scanMetrics;
161
162    private ScanResultCache resultCache;
163
164    private AdvancedScanResultConsumer consumer;
165
166    private ClientService.Interface stub;
167
168    private HRegionLocation loc;
169
170    private boolean isRegionServerRemote;
171
172    private long scannerLeaseTimeoutPeriodNs;
173
174    private long scanTimeoutNs;
175
176    private long rpcTimeoutNs;
177
178    public ScanSingleRegionCallerBuilder id(long scannerId) {
179      this.scannerId = scannerId;
180      return this;
181    }
182
183    public ScanSingleRegionCallerBuilder setScan(Scan scan) {
184      this.scan = scan;
185      return this;
186    }
187
188    public ScanSingleRegionCallerBuilder metrics(ScanMetrics scanMetrics) {
189      this.scanMetrics = scanMetrics;
190      return this;
191    }
192
193    public ScanSingleRegionCallerBuilder remote(boolean isRegionServerRemote) {
194      this.isRegionServerRemote = isRegionServerRemote;
195      return this;
196    }
197
198    public ScanSingleRegionCallerBuilder resultCache(ScanResultCache resultCache) {
199      this.resultCache = resultCache;
200      return this;
201    }
202
203    public ScanSingleRegionCallerBuilder consumer(AdvancedScanResultConsumer consumer) {
204      this.consumer = consumer;
205      return this;
206    }
207
208    public ScanSingleRegionCallerBuilder stub(ClientService.Interface stub) {
209      this.stub = stub;
210      return this;
211    }
212
213    public ScanSingleRegionCallerBuilder location(HRegionLocation loc) {
214      this.loc = loc;
215      return this;
216    }
217
218    public ScanSingleRegionCallerBuilder scannerLeaseTimeoutPeriod(long scannerLeaseTimeoutPeriod,
219        TimeUnit unit) {
220      this.scannerLeaseTimeoutPeriodNs = unit.toNanos(scannerLeaseTimeoutPeriod);
221      return this;
222    }
223
224    public ScanSingleRegionCallerBuilder scanTimeout(long scanTimeout, TimeUnit unit) {
225      this.scanTimeoutNs = unit.toNanos(scanTimeout);
226      return this;
227    }
228
229    public ScanSingleRegionCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) {
230      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
231      return this;
232    }
233
234    public ScanSingleRegionCallerBuilder pause(long pause, TimeUnit unit) {
235      this.pauseNs = unit.toNanos(pause);
236      return this;
237    }
238
239    public ScanSingleRegionCallerBuilder maxAttempts(int maxAttempts) {
240      this.maxAttempts = maxAttempts;
241      return this;
242    }
243
244    public ScanSingleRegionCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) {
245      this.startLogErrorsCnt = startLogErrorsCnt;
246      return this;
247    }
248
249    public AsyncScanSingleRegionRpcRetryingCaller build() {
250      checkArgument(scannerId != null, "invalid scannerId %d", scannerId);
251      return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn,
252        checkNotNull(scan, "scan is null"), scanMetrics, scannerId,
253        checkNotNull(resultCache, "resultCache is null"),
254        checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"),
255        checkNotNull(loc, "location is null"), isRegionServerRemote, scannerLeaseTimeoutPeriodNs,
256        pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
257    }
258
259    /**
260     * Short cut for {@code build().start(HBaseRpcController, ScanResponse)}.
261     */
262    public CompletableFuture<Boolean> start(HBaseRpcController controller,
263        ScanResponse respWhenOpen) {
264      return build().start(controller, respWhenOpen);
265    }
266  }
267
268  /**
269   * Create retry caller for scanning a region.
270   */
271  public ScanSingleRegionCallerBuilder scanSingleRegion() {
272    return new ScanSingleRegionCallerBuilder();
273  }
274
275  public class BatchCallerBuilder extends BuilderBase {
276
277    private TableName tableName;
278
279    private List<? extends Row> actions;
280
281    private long operationTimeoutNs = -1L;
282
283    private long rpcTimeoutNs = -1L;
284
285    public BatchCallerBuilder table(TableName tableName) {
286      this.tableName = tableName;
287      return this;
288    }
289
290    public BatchCallerBuilder actions(List<? extends Row> actions) {
291      this.actions = actions;
292      return this;
293    }
294
295    public BatchCallerBuilder operationTimeout(long operationTimeout, TimeUnit unit) {
296      this.operationTimeoutNs = unit.toNanos(operationTimeout);
297      return this;
298    }
299
300    public BatchCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) {
301      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
302      return this;
303    }
304
305    public BatchCallerBuilder pause(long pause, TimeUnit unit) {
306      this.pauseNs = unit.toNanos(pause);
307      return this;
308    }
309
310    public BatchCallerBuilder maxAttempts(int maxAttempts) {
311      this.maxAttempts = maxAttempts;
312      return this;
313    }
314
315    public BatchCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) {
316      this.startLogErrorsCnt = startLogErrorsCnt;
317      return this;
318    }
319
320    public <T> AsyncBatchRpcRetryingCaller<T> build() {
321      return new AsyncBatchRpcRetryingCaller<>(retryTimer, conn, tableName, actions, pauseNs,
322        maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
323    }
324
325    public <T> List<CompletableFuture<T>> call() {
326      return this.<T> build().call();
327    }
328  }
329
330  public BatchCallerBuilder batch() {
331    return new BatchCallerBuilder();
332  }
333
334  public class MasterRequestCallerBuilder<T> extends BuilderBase {
335    private AsyncMasterRequestRpcRetryingCaller.Callable<T> callable;
336
337    private long operationTimeoutNs = -1L;
338
339    private long rpcTimeoutNs = -1L;
340
341    public MasterRequestCallerBuilder<T> action(
342        AsyncMasterRequestRpcRetryingCaller.Callable<T> callable) {
343      this.callable = callable;
344      return this;
345    }
346
347    public MasterRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) {
348      this.operationTimeoutNs = unit.toNanos(operationTimeout);
349      return this;
350    }
351
352    public MasterRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) {
353      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
354      return this;
355    }
356
357    public MasterRequestCallerBuilder<T> pause(long pause, TimeUnit unit) {
358      this.pauseNs = unit.toNanos(pause);
359      return this;
360    }
361
362    public MasterRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
363      this.maxAttempts = maxAttempts;
364      return this;
365    }
366
367    public MasterRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) {
368      this.startLogErrorsCnt = startLogErrorsCnt;
369      return this;
370    }
371
372    public AsyncMasterRequestRpcRetryingCaller<T> build() {
373      return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn,
374        checkNotNull(callable, "action is null"), pauseNs, maxAttempts, operationTimeoutNs,
375        rpcTimeoutNs, startLogErrorsCnt);
376    }
377
378    /**
379     * Shortcut for {@code build().call()}
380     */
381    public CompletableFuture<T> call() {
382      return build().call();
383    }
384  }
385
386  public <T> MasterRequestCallerBuilder<T> masterRequest() {
387    return new MasterRequestCallerBuilder<>();
388  }
389
390  public class AdminRequestCallerBuilder<T> extends BuilderBase {
391    // TODO: maybe we can reuse AdminRequestCallerBuild, MasterRequestCallerBuild etc.
392
393    private AsyncAdminRequestRetryingCaller.Callable<T> callable;
394
395    private long operationTimeoutNs = -1L;
396
397    private long rpcTimeoutNs = -1L;
398
399    private ServerName serverName;
400
401    public AdminRequestCallerBuilder<T> action(
402        AsyncAdminRequestRetryingCaller.Callable<T> callable) {
403      this.callable = callable;
404      return this;
405    }
406
407    public AdminRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) {
408      this.operationTimeoutNs = unit.toNanos(operationTimeout);
409      return this;
410    }
411
412    public AdminRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) {
413      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
414      return this;
415    }
416
417    public AdminRequestCallerBuilder<T> pause(long pause, TimeUnit unit) {
418      this.pauseNs = unit.toNanos(pause);
419      return this;
420    }
421
422    public AdminRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
423      this.maxAttempts = maxAttempts;
424      return this;
425    }
426
427    public AdminRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) {
428      this.startLogErrorsCnt = startLogErrorsCnt;
429      return this;
430    }
431
432    public AdminRequestCallerBuilder<T> serverName(ServerName serverName) {
433      this.serverName = serverName;
434      return this;
435    }
436
437    public AsyncAdminRequestRetryingCaller<T> build() {
438      return new AsyncAdminRequestRetryingCaller<T>(retryTimer, conn, pauseNs, maxAttempts,
439        operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt,
440        checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null"));
441    }
442
443    public CompletableFuture<T> call() {
444      return build().call();
445    }
446  }
447
448  public <T> AdminRequestCallerBuilder<T> adminRequest() {
449    return new AdminRequestCallerBuilder<>();
450  }
451
452  public class ServerRequestCallerBuilder<T> extends BuilderBase {
453
454    private AsyncServerRequestRpcRetryingCaller.Callable<T> callable;
455
456    private long operationTimeoutNs = -1L;
457
458    private long rpcTimeoutNs = -1L;
459
460    private ServerName serverName;
461
462    public ServerRequestCallerBuilder<T> action(
463        AsyncServerRequestRpcRetryingCaller.Callable<T> callable) {
464      this.callable = callable;
465      return this;
466    }
467
468    public ServerRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) {
469      this.operationTimeoutNs = unit.toNanos(operationTimeout);
470      return this;
471    }
472
473    public ServerRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) {
474      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
475      return this;
476    }
477
478    public ServerRequestCallerBuilder<T> pause(long pause, TimeUnit unit) {
479      this.pauseNs = unit.toNanos(pause);
480      return this;
481    }
482
483    public ServerRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
484      this.maxAttempts = maxAttempts;
485      return this;
486    }
487
488    public ServerRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) {
489      this.startLogErrorsCnt = startLogErrorsCnt;
490      return this;
491    }
492
493    public ServerRequestCallerBuilder<T> serverName(ServerName serverName) {
494      this.serverName = serverName;
495      return this;
496    }
497
498    public AsyncServerRequestRpcRetryingCaller<T> build() {
499      return new AsyncServerRequestRpcRetryingCaller<T>(retryTimer, conn, pauseNs, maxAttempts,
500        operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt,
501        checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null"));
502    }
503
504    public CompletableFuture<T> call() {
505      return build().call();
506    }
507  }
508
509  public <T> ServerRequestCallerBuilder<T> serverRequest() {
510    return new ServerRequestCallerBuilder<>();
511  }
512}