001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static java.util.stream.Collectors.toList;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.allOf;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnly;
023
024import java.util.List;
025import java.util.concurrent.CompletableFuture;
026import java.util.concurrent.TimeUnit;
027import java.util.function.Function;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.CompareOperator;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.filter.Filter;
032import org.apache.hadoop.hbase.io.TimeRange;
033import org.apache.hadoop.hbase.util.Bytes;
034import org.apache.yetus.audience.InterfaceAudience;
035
036import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
037import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
038
039/**
040 * The interface for asynchronous version of Table. Obtain an instance from a
041 * {@link AsyncConnection}.
042 * <p>
043 * The implementation is required to be thread safe.
044 * <p>
045 * Usually the implementation will not throw any exception directly. You need to get the exception
046 * from the returned {@link CompletableFuture}.
047 * @since 2.0.0
048 */
049@InterfaceAudience.Public
050public interface AsyncTable<C extends ScanResultConsumerBase> {
051
052  /**
053   * Gets the fully qualified table name instance of this table.
054   */
055  TableName getName();
056
057  /**
058   * Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance.
059   * <p>
060   * The reference returned is not a copy, so any change made to it will affect this instance.
061   */
062  Configuration getConfiguration();
063
064  /**
065   * Gets the {@link TableDescriptor} for this table.
066   */
067  CompletableFuture<TableDescriptor> getDescriptor();
068
069  /**
070   * Gets the {@link AsyncTableRegionLocator} for this table.
071   */
072  AsyncTableRegionLocator getRegionLocator();
073
074  /**
075   * Get timeout of each rpc request in this Table instance. It will be overridden by a more
076   * specific rpc timeout config such as readRpcTimeout or writeRpcTimeout.
077   * @see #getReadRpcTimeout(TimeUnit)
078   * @see #getWriteRpcTimeout(TimeUnit)
079   * @param unit the unit of time the timeout to be represented in
080   * @return rpc timeout in the specified time unit
081   */
082  long getRpcTimeout(TimeUnit unit);
083
084  /**
085   * Get timeout of each rpc read request in this Table instance.
086   * @param unit the unit of time the timeout to be represented in
087   * @return read rpc timeout in the specified time unit
088   */
089  long getReadRpcTimeout(TimeUnit unit);
090
091  /**
092   * Get timeout of each rpc write request in this Table instance.
093   * @param unit the unit of time the timeout to be represented in
094   * @return write rpc timeout in the specified time unit
095   */
096  long getWriteRpcTimeout(TimeUnit unit);
097
098  /**
099   * Get timeout of each operation in Table instance.
100   * @param unit the unit of time the timeout to be represented in
101   * @return operation rpc timeout in the specified time unit
102   */
103  long getOperationTimeout(TimeUnit unit);
104
105  /**
106   * Get the timeout of a single operation in a scan. It works like operation timeout for other
107   * operations.
108   * @param unit the unit of time the timeout to be represented in
109   * @return scan rpc timeout in the specified time unit
110   */
111  long getScanTimeout(TimeUnit unit);
112
113  /**
114   * Test for the existence of columns in the table, as specified by the Get.
115   * <p>
116   * This will return true if the Get matches one or more keys, false if not.
117   * <p>
118   * This is a server-side call so it prevents any data from being transfered to the client.
119   * @return true if the specified Get matches one or more keys, false if not. The return value will
120   *         be wrapped by a {@link CompletableFuture}.
121   */
122  default CompletableFuture<Boolean> exists(Get get) {
123    return get(toCheckExistenceOnly(get)).thenApply(r -> r.getExists());
124  }
125
126  /**
127   * Extracts certain cells from a given row.
128   * @param get The object that specifies what data to fetch and from which row.
129   * @return The data coming from the specified row, if it exists. If the row specified doesn't
130   *         exist, the {@link Result} instance returned won't contain any
131   *         {@link org.apache.hadoop.hbase.KeyValue}, as indicated by {@link Result#isEmpty()}. The
132   *         return value will be wrapped by a {@link CompletableFuture}.
133   */
134  CompletableFuture<Result> get(Get get);
135
136  /**
137   * Puts some data to the table.
138   * @param put The data to put.
139   * @return A {@link CompletableFuture} that always returns null when complete normally.
140   */
141  CompletableFuture<Void> put(Put put);
142
143  /**
144   * Deletes the specified cells/row.
145   * @param delete The object that specifies what to delete.
146   * @return A {@link CompletableFuture} that always returns null when complete normally.
147   */
148  CompletableFuture<Void> delete(Delete delete);
149
150  /**
151   * Appends values to one or more columns within a single row.
152   * <p>
153   * This operation does not appear atomic to readers. Appends are done under a single row lock, so
154   * write operations to a row are synchronized, but readers do not take row locks so get and scan
155   * operations can see this operation partially completed.
156   * @param append object that specifies the columns and amounts to be used for the increment
157   *               operations
158   * @return values of columns after the append operation (maybe null). The return value will be
159   *         wrapped by a {@link CompletableFuture}.
160   */
161  CompletableFuture<Result> append(Append append);
162
163  /**
164   * Increments one or more columns within a single row.
165   * <p>
166   * This operation does not appear atomic to readers. Increments are done under a single row lock,
167   * so write operations to a row are synchronized, but readers do not take row locks so get and
168   * scan operations can see this operation partially completed.
169   * @param increment object that specifies the columns and amounts to be used for the increment
170   *                  operations
171   * @return values of columns after the increment. The return value will be wrapped by a
172   *         {@link CompletableFuture}.
173   */
174  CompletableFuture<Result> increment(Increment increment);
175
176  /**
177   * See {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)}
178   * <p>
179   * The {@link Durability} is defaulted to {@link Durability#SYNC_WAL}.
180   * @param row       The row that contains the cell to increment.
181   * @param family    The column family of the cell to increment.
182   * @param qualifier The column qualifier of the cell to increment.
183   * @param amount    The amount to increment the cell with (or decrement, if the amount is
184   *                  negative).
185   * @return The new value, post increment. The return value will be wrapped by a
186   *         {@link CompletableFuture}.
187   */
188  default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
189    long amount) {
190    return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
191  }
192
193  /**
194   * Atomically increments a column value. If the column value already exists and is not a
195   * big-endian long, this could throw an exception. If the column value does not yet exist it is
196   * initialized to <code>amount</code> and written to the specified column.
197   * <p>
198   * Setting durability to {@link Durability#SKIP_WAL} means that in a fail scenario you will lose
199   * any increments that have not been flushed.
200   * @param row        The row that contains the cell to increment.
201   * @param family     The column family of the cell to increment.
202   * @param qualifier  The column qualifier of the cell to increment.
203   * @param amount     The amount to increment the cell with (or decrement, if the amount is
204   *                   negative).
205   * @param durability The persistence guarantee for this increment.
206   * @return The new value, post increment. The return value will be wrapped by a
207   *         {@link CompletableFuture}.
208   */
209  default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
210    long amount, Durability durability) {
211    Preconditions.checkNotNull(row, "row is null");
212    Preconditions.checkNotNull(family, "family is null");
213    return increment(
214      new Increment(row).addColumn(family, qualifier, amount).setDurability(durability))
215        .thenApply(r -> Bytes.toLong(r.getValue(family, qualifier)));
216  }
217
218  /**
219   * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it
220   * adds the Put/Delete/RowMutations.
221   * <p>
222   * Use the returned {@link CheckAndMutateBuilder} to construct your request and then execute it.
223   * This is a fluent style API, the code is like:
224   *
225   * <pre>
226   * <code>
227   * table.checkAndMutate(row, family).qualifier(qualifier).ifNotExists().thenPut(put)
228   *     .thenAccept(succ -> {
229   *       if (succ) {
230   *         System.out.println("Check and put succeeded");
231   *       } else {
232   *         System.out.println("Check and put failed");
233   *       }
234   *     });
235   * </code>
236   * </pre>
237   *
238   * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it
239   *             any more.
240   */
241  @Deprecated
242  CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family);
243
244  /**
245   * A helper class for sending checkAndMutate request.
246   * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it
247   *             any more.
248   */
249  @Deprecated
250  interface CheckAndMutateBuilder {
251
252    /**
253     * @param qualifier column qualifier to check.
254     */
255    CheckAndMutateBuilder qualifier(byte[] qualifier);
256
257    /**
258     * @param timeRange time range to check.
259     */
260    CheckAndMutateBuilder timeRange(TimeRange timeRange);
261
262    /**
263     * Check for lack of column.
264     */
265    CheckAndMutateBuilder ifNotExists();
266
267    /**
268     * Check for equality.
269     * @param value the expected value
270     */
271    default CheckAndMutateBuilder ifEquals(byte[] value) {
272      return ifMatches(CompareOperator.EQUAL, value);
273    }
274
275    /**
276     * @param compareOp comparison operator to use
277     * @param value     the expected value
278     */
279    CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value);
280
281    /**
282     * @param put data to put if check succeeds
283     * @return {@code true} if the new put was executed, {@code false} otherwise. The return value
284     *         will be wrapped by a {@link CompletableFuture}.
285     */
286    CompletableFuture<Boolean> thenPut(Put put);
287
288    /**
289     * @param delete data to delete if check succeeds
290     * @return {@code true} if the new delete was executed, {@code false} otherwise. The return
291     *         value will be wrapped by a {@link CompletableFuture}.
292     */
293    CompletableFuture<Boolean> thenDelete(Delete delete);
294
295    /**
296     * @param mutation mutations to perform if check succeeds
297     * @return true if the new mutation was executed, false otherwise. The return value will be
298     *         wrapped by a {@link CompletableFuture}.
299     */
300    CompletableFuture<Boolean> thenMutate(RowMutations mutation);
301  }
302
303  /**
304   * Atomically checks if a row matches the specified filter. If it does, it adds the
305   * Put/Delete/RowMutations.
306   * <p>
307   * Use the returned {@link CheckAndMutateWithFilterBuilder} to construct your request and then
308   * execute it. This is a fluent style API, the code is like:
309   *
310   * <pre>
311   * <code>
312   * table.checkAndMutate(row, filter).thenPut(put)
313   *     .thenAccept(succ -> {
314   *       if (succ) {
315   *         System.out.println("Check and put succeeded");
316   *       } else {
317   *         System.out.println("Check and put failed");
318   *       }
319   *     });
320   * </code>
321   * </pre>
322   *
323   * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it
324   *             any more.
325   */
326  @Deprecated
327  CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter);
328
329  /**
330   * A helper class for sending checkAndMutate request with a filter.
331   * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it
332   *             any more.
333   */
334  @Deprecated
335  interface CheckAndMutateWithFilterBuilder {
336
337    /**
338     * @param timeRange time range to check.
339     */
340    CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange);
341
342    /**
343     * @param put data to put if check succeeds
344     * @return {@code true} if the new put was executed, {@code false} otherwise. The return value
345     *         will be wrapped by a {@link CompletableFuture}.
346     */
347    CompletableFuture<Boolean> thenPut(Put put);
348
349    /**
350     * @param delete data to delete if check succeeds
351     * @return {@code true} if the new delete was executed, {@code false} otherwise. The return
352     *         value will be wrapped by a {@link CompletableFuture}.
353     */
354    CompletableFuture<Boolean> thenDelete(Delete delete);
355
356    /**
357     * @param mutation mutations to perform if check succeeds
358     * @return true if the new mutation was executed, false otherwise. The return value will be
359     *         wrapped by a {@link CompletableFuture}.
360     */
361    CompletableFuture<Boolean> thenMutate(RowMutations mutation);
362  }
363
364  /**
365   * checkAndMutate that atomically checks if a row matches the specified condition. If it does, it
366   * performs the specified action.
367   * @param checkAndMutate The CheckAndMutate object.
368   * @return A {@link CompletableFuture}s that represent the result for the CheckAndMutate.
369   */
370  CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate);
371
372  /**
373   * Batch version of checkAndMutate. The specified CheckAndMutates are batched only in the sense
374   * that they are sent to a RS in one RPC, but each CheckAndMutate operation is still executed
375   * atomically (and thus, each may fail independently of others).
376   * @param checkAndMutates The list of CheckAndMutate.
377   * @return A list of {@link CompletableFuture}s that represent the result for each CheckAndMutate.
378   */
379  List<CompletableFuture<CheckAndMutateResult>>
380    checkAndMutate(List<CheckAndMutate> checkAndMutates);
381
382  /**
383   * A simple version of batch checkAndMutate. It will fail if there are any failures.
384   * @param checkAndMutates The list of rows to apply.
385   * @return A {@link CompletableFuture} that wrapper the result list.
386   */
387  default CompletableFuture<List<CheckAndMutateResult>>
388    checkAndMutateAll(List<CheckAndMutate> checkAndMutates) {
389    return allOf(checkAndMutate(checkAndMutates));
390  }
391
392  /**
393   * Performs multiple mutations atomically on a single row. Currently {@link Put} and
394   * {@link Delete} are supported.
395   * @param mutation object that specifies the set of mutations to perform atomically
396   * @return A {@link CompletableFuture} that returns results of Increment/Append operations
397   */
398  CompletableFuture<Result> mutateRow(RowMutations mutation);
399
400  /**
401   * The scan API uses the observer pattern.
402   * @param scan     A configured {@link Scan} object.
403   * @param consumer the consumer used to receive results.
404   * @see ScanResultConsumer
405   * @see AdvancedScanResultConsumer
406   */
407  void scan(Scan scan, C consumer);
408
409  /**
410   * Gets a scanner on the current table for the given family.
411   * @param family The column family to scan.
412   * @return A scanner.
413   */
414  default ResultScanner getScanner(byte[] family) {
415    return getScanner(new Scan().addFamily(family));
416  }
417
418  /**
419   * Gets a scanner on the current table for the given family and qualifier.
420   * @param family    The column family to scan.
421   * @param qualifier The column qualifier to scan.
422   * @return A scanner.
423   */
424  default ResultScanner getScanner(byte[] family, byte[] qualifier) {
425    return getScanner(new Scan().addColumn(family, qualifier));
426  }
427
428  /**
429   * Returns a scanner on the current table as specified by the {@link Scan} object.
430   * @param scan A configured {@link Scan} object.
431   * @return A scanner.
432   */
433  ResultScanner getScanner(Scan scan);
434
435  /**
436   * Return all the results that match the given scan object.
437   * <p>
438   * Notice that usually you should use this method with a {@link Scan} object that has limit set.
439   * For example, if you want to get the closest row after a given row, you could do this:
440   * <p>
441   *
442   * <pre>
443   * <code>
444   * table.scanAll(new Scan().withStartRow(row, false).setLimit(1)).thenAccept(results -> {
445   *   if (results.isEmpty()) {
446   *      System.out.println("No row after " + Bytes.toStringBinary(row));
447   *   } else {
448   *     System.out.println("The closest row after " + Bytes.toStringBinary(row) + " is "
449   *         + Bytes.toStringBinary(results.stream().findFirst().get().getRow()));
450   *   }
451   * });
452   * </code>
453   * </pre>
454   * <p>
455   * If your result set is very large, you should use other scan method to get a scanner or use
456   * callback to process the results. They will do chunking to prevent OOM. The scanAll method will
457   * fetch all the results and store them in a List and then return the list to you.
458   * <p>
459   * The scan metrics will be collected background if you enable it but you have no way to get it.
460   * Usually you can get scan metrics from {@code ResultScanner}, or through
461   * {@code ScanResultConsumer.onScanMetricsCreated} but this method only returns a list of results.
462   * So if you really care about scan metrics then you'd better use other scan methods which return
463   * a {@code ResultScanner} or let you pass in a {@code ScanResultConsumer}. There is no
464   * performance difference between these scan methods so do not worry.
465   * @param scan A configured {@link Scan} object. So if you use this method to fetch a really large
466   *             result set, it is likely to cause OOM.
467   * @return The results of this small scan operation. The return value will be wrapped by a
468   *         {@link CompletableFuture}.
469   */
470  CompletableFuture<List<Result>> scanAll(Scan scan);
471
472  /**
473   * Test for the existence of columns in the table, as specified by the Gets.
474   * <p>
475   * This will return a list of booleans. Each value will be true if the related Get matches one or
476   * more keys, false if not.
477   * <p>
478   * This is a server-side call so it prevents any data from being transferred to the client.
479   * @param gets the Gets
480   * @return A list of {@link CompletableFuture}s that represent the existence for each get.
481   */
482  default List<CompletableFuture<Boolean>> exists(List<Get> gets) {
483    return get(toCheckExistenceOnly(gets)).stream()
484      .<CompletableFuture<Boolean>> map(f -> f.thenApply(r -> r.getExists())).collect(toList());
485  }
486
487  /**
488   * A simple version for batch exists. It will fail if there are any failures and you will get the
489   * whole result boolean list at once if the operation is succeeded.
490   * @param gets the Gets
491   * @return A {@link CompletableFuture} that wrapper the result boolean list.
492   */
493  default CompletableFuture<List<Boolean>> existsAll(List<Get> gets) {
494    return allOf(exists(gets));
495  }
496
497  /**
498   * Extracts certain cells from the given rows, in batch.
499   * <p>
500   * Notice that you may not get all the results with this function, which means some of the
501   * returned {@link CompletableFuture}s may succeed while some of the other returned
502   * {@link CompletableFuture}s may fail.
503   * @param gets The objects that specify what data to fetch and from which rows.
504   * @return A list of {@link CompletableFuture}s that represent the result for each get.
505   */
506  List<CompletableFuture<Result>> get(List<Get> gets);
507
508  /**
509   * A simple version for batch get. It will fail if there are any failures and you will get the
510   * whole result list at once if the operation is succeeded.
511   * @param gets The objects that specify what data to fetch and from which rows.
512   * @return A {@link CompletableFuture} that wrapper the result list.
513   */
514  default CompletableFuture<List<Result>> getAll(List<Get> gets) {
515    return allOf(get(gets));
516  }
517
518  /**
519   * Puts some data in the table, in batch.
520   * @param puts The list of mutations to apply.
521   * @return A list of {@link CompletableFuture}s that represent the result for each put.
522   */
523  List<CompletableFuture<Void>> put(List<Put> puts);
524
525  /**
526   * A simple version of batch put. It will fail if there are any failures.
527   * @param puts The list of mutations to apply.
528   * @return A {@link CompletableFuture} that always returns null when complete normally.
529   */
530  default CompletableFuture<Void> putAll(List<Put> puts) {
531    return allOf(put(puts)).thenApply(r -> null);
532  }
533
534  /**
535   * Deletes the specified cells/rows in bulk.
536   * @param deletes list of things to delete.
537   * @return A list of {@link CompletableFuture}s that represent the result for each delete.
538   */
539  List<CompletableFuture<Void>> delete(List<Delete> deletes);
540
541  /**
542   * A simple version of batch delete. It will fail if there are any failures.
543   * @param deletes list of things to delete.
544   * @return A {@link CompletableFuture} that always returns null when complete normally.
545   */
546  default CompletableFuture<Void> deleteAll(List<Delete> deletes) {
547    return allOf(delete(deletes)).thenApply(r -> null);
548  }
549
550  /**
551   * Method that does a batch call on Deletes, Gets, Puts, Increments, Appends and RowMutations. The
552   * ordering of execution of the actions is not defined. Meaning if you do a Put and a Get in the
553   * same {@link #batch} call, you will not necessarily be guaranteed that the Get returns what the
554   * Put had put.
555   * @param actions list of Get, Put, Delete, Increment, Append, and RowMutations objects
556   * @return A list of {@link CompletableFuture}s that represent the result for each action.
557   */
558  <T> List<CompletableFuture<T>> batch(List<? extends Row> actions);
559
560  /**
561   * A simple version of batch. It will fail if there are any failures and you will get the whole
562   * result list at once if the operation is succeeded.
563   * @param actions list of Get, Put, Delete, Increment, Append and RowMutations objects
564   * @return A list of the result for the actions. Wrapped by a {@link CompletableFuture}.
565   */
566  default <T> CompletableFuture<List<T>> batchAll(List<? extends Row> actions) {
567    return allOf(batch(actions));
568  }
569
570  /**
571   * Execute the given coprocessor call on the region which contains the given {@code row}.
572   * <p>
573   * The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a
574   * one line lambda expression, like:
575   *
576   * <pre>
577   * <code>
578   * channel -> xxxService.newStub(channel)
579   * </code>
580   * </pre>
581   *
582   * @param stubMaker a delegation to the actual {@code newStub} call.
583   * @param callable  a delegation to the actual protobuf rpc call. See the comment of
584   *                  {@link ServiceCaller} for more details.
585   * @param row       The row key used to identify the remote region location
586   * @param <S>       the type of the asynchronous stub
587   * @param <R>       the type of the return value
588   * @return the return value of the protobuf rpc call, wrapped by a {@link CompletableFuture}.
589   * @see ServiceCaller
590   */
591  <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
592    ServiceCaller<S, R> callable, byte[] row);
593
594  /**
595   * The callback when we want to execute a coprocessor call on a range of regions.
596   * <p>
597   * As the locating itself also takes some time, the implementation may want to send rpc calls on
598   * the fly, which means we do not know how many regions we have when we get the return value of
599   * the rpc calls, so we need an {@link #onComplete()} which is used to tell you that we have
600   * passed all the return values to you(through the {@link #onRegionComplete(RegionInfo, Object)}
601   * or {@link #onRegionError(RegionInfo, Throwable)} calls), i.e, there will be no
602   * {@link #onRegionComplete(RegionInfo, Object)} or {@link #onRegionError(RegionInfo, Throwable)}
603   * calls in the future.
604   * <p>
605   * Here is a pseudo code to describe a typical implementation of a range coprocessor service
606   * method to help you better understand how the {@link CoprocessorCallback} will be called. The
607   * {@code callback} in the pseudo code is our {@link CoprocessorCallback}. And notice that the
608   * {@code whenComplete} is {@code CompletableFuture.whenComplete}.
609   *
610   * <pre>
611   * locateThenCall(byte[] row) {
612   *   locate(row).whenComplete((location, locateError) -> {
613   *     if (locateError != null) {
614   *       callback.onError(locateError);
615   *       return;
616   *     }
617   *     incPendingCall();
618   *     region = location.getRegion();
619   *     if (region.getEndKey() > endKey) {
620   *       locateEnd = true;
621   *     } else {
622   *       locateThenCall(region.getEndKey());
623   *     }
624   *     sendCall().whenComplete((resp, error) -> {
625   *       if (error != null) {
626   *         callback.onRegionError(region, error);
627   *       } else {
628   *         callback.onRegionComplete(region, resp);
629   *       }
630   *       if (locateEnd && decPendingCallAndGet() == 0) {
631   *         callback.onComplete();
632   *       }
633   *     });
634   *   });
635   * }
636   * </pre>
637   */
638  @InterfaceAudience.Public
639  interface CoprocessorCallback<R> {
640
641    /**
642     * @param region the region that the response belongs to
643     * @param resp   the response of the coprocessor call
644     */
645    void onRegionComplete(RegionInfo region, R resp);
646
647    /**
648     * @param region the region that the error belongs to
649     * @param error  the response error of the coprocessor call
650     */
651    void onRegionError(RegionInfo region, Throwable error);
652
653    /**
654     * Indicate that all responses of the regions have been notified by calling
655     * {@link #onRegionComplete(RegionInfo, Object)} or
656     * {@link #onRegionError(RegionInfo, Throwable)}.
657     */
658    void onComplete();
659
660    /**
661     * Indicate that we got an error which does not belong to any regions. Usually a locating error.
662     */
663    void onError(Throwable error);
664  }
665
666  /**
667   * Helper class for sending coprocessorService request that executes a coprocessor call on regions
668   * which are covered by a range.
669   * <p>
670   * If {@code fromRow} is not specified the selection will start with the first table region. If
671   * {@code toRow} is not specified the selection will continue through the last table region.
672   * @param <S> the type of the protobuf Service you want to call.
673   * @param <R> the type of the return value.
674   */
675  interface CoprocessorServiceBuilder<S, R> {
676
677    /**
678     * @param startKey start region selection with region containing this row, inclusive.
679     */
680    default CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey) {
681      return fromRow(startKey, true);
682    }
683
684    /**
685     * @param startKey  start region selection with region containing this row
686     * @param inclusive whether to include the startKey
687     */
688    CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey, boolean inclusive);
689
690    /**
691     * @param endKey select regions up to and including the region containing this row, exclusive.
692     */
693    default CoprocessorServiceBuilder<S, R> toRow(byte[] endKey) {
694      return toRow(endKey, false);
695    }
696
697    /**
698     * @param endKey    select regions up to and including the region containing this row
699     * @param inclusive whether to include the endKey
700     */
701    CoprocessorServiceBuilder<S, R> toRow(byte[] endKey, boolean inclusive);
702
703    /**
704     * Execute the coprocessorService request. You can get the response through the
705     * {@link CoprocessorCallback}.
706     */
707    void execute();
708  }
709
710  /**
711   * Execute a coprocessor call on the regions which are covered by a range.
712   * <p>
713   * Use the returned {@link CoprocessorServiceBuilder} construct your request and then execute it.
714   * <p>
715   * The {@code stubMaker} is just a delegation to the {@code xxxService.newStub} call. Usually it
716   * is only a one line lambda expression, like:
717   *
718   * <pre>
719   * <code>
720   * channel -> xxxService.newStub(channel)
721   * </code>
722   * </pre>
723   *
724   * @param stubMaker a delegation to the actual {@code newStub} call.
725   * @param callable  a delegation to the actual protobuf rpc call. See the comment of
726   *                  {@link ServiceCaller} for more details.
727   * @param callback  callback to get the response. See the comment of {@link CoprocessorCallback}
728   *                  for more details.
729   */
730  <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(Function<RpcChannel, S> stubMaker,
731    ServiceCaller<S, R> callable, CoprocessorCallback<R> callback);
732}