001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static java.util.stream.Collectors.toList;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnly;
022import static org.apache.hadoop.hbase.util.FutureUtils.allOf;
023
024import java.util.List;
025import java.util.Map;
026import java.util.concurrent.CompletableFuture;
027import java.util.concurrent.TimeUnit;
028import java.util.function.Function;
029import org.apache.commons.lang3.NotImplementedException;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.CompareOperator;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.filter.Filter;
034import org.apache.hadoop.hbase.io.TimeRange;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.yetus.audience.InterfaceAudience;
037
038import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
039import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
040
041/**
042 * The interface for asynchronous version of Table. Obtain an instance from a
043 * {@link AsyncConnection}.
044 * <p>
045 * The implementation is required to be thread safe.
046 * <p>
047 * Usually the implementation will not throw any exception directly. You need to get the exception
048 * from the returned {@link CompletableFuture}.
049 * @since 2.0.0
050 */
051@InterfaceAudience.Public
052public interface AsyncTable<C extends ScanResultConsumerBase> {
053
054  /**
055   * Gets the fully qualified table name instance of this table.
056   */
057  TableName getName();
058
059  /**
060   * Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance.
061   * <p>
062   * The reference returned is not a copy, so any change made to it will affect this instance.
063   */
064  Configuration getConfiguration();
065
066  /**
067   * Gets the {@link TableDescriptor} for this table.
068   */
069  CompletableFuture<TableDescriptor> getDescriptor();
070
071  /**
072   * Gets the {@link AsyncTableRegionLocator} for this table.
073   */
074  AsyncTableRegionLocator getRegionLocator();
075
076  /**
077   * Get timeout of each rpc request in this Table instance. It will be overridden by a more
078   * specific rpc timeout config such as readRpcTimeout or writeRpcTimeout.
079   * @see #getReadRpcTimeout(TimeUnit)
080   * @see #getWriteRpcTimeout(TimeUnit)
081   * @param unit the unit of time the timeout to be represented in
082   * @return rpc timeout in the specified time unit
083   */
084  long getRpcTimeout(TimeUnit unit);
085
086  /**
087   * Get timeout of each rpc read request in this Table instance.
088   * @param unit the unit of time the timeout to be represented in
089   * @return read rpc timeout in the specified time unit
090   */
091  long getReadRpcTimeout(TimeUnit unit);
092
093  /**
094   * Get timeout of each rpc write request in this Table instance.
095   * @param unit the unit of time the timeout to be represented in
096   * @return write rpc timeout in the specified time unit
097   */
098  long getWriteRpcTimeout(TimeUnit unit);
099
100  /**
101   * Get timeout of each operation in Table instance.
102   * @param unit the unit of time the timeout to be represented in
103   * @return operation rpc timeout in the specified time unit
104   */
105  long getOperationTimeout(TimeUnit unit);
106
107  /**
108   * Get the timeout of a single operation in a scan. It works like operation timeout for other
109   * operations.
110   * @param unit the unit of time the timeout to be represented in
111   * @return scan rpc timeout in the specified time unit
112   */
113  long getScanTimeout(TimeUnit unit);
114
115  /**
116   * Get the map of request attributes
117   * @return a map of request attributes supplied by the client
118   */
119  default Map<String, byte[]> getRequestAttributes() {
120    throw new NotImplementedException("Add an implementation!");
121  }
122
123  /**
124   * Test for the existence of columns in the table, as specified by the Get.
125   * <p>
126   * This will return true if the Get matches one or more keys, false if not.
127   * <p>
128   * This is a server-side call so it prevents any data from being transfered to the client.
129   * @return true if the specified Get matches one or more keys, false if not. The return value will
130   *         be wrapped by a {@link CompletableFuture}.
131   */
132  default CompletableFuture<Boolean> exists(Get get) {
133    return get(toCheckExistenceOnly(get)).thenApply(r -> r.getExists());
134  }
135
136  /**
137   * Extracts certain cells from a given row.
138   * @param get The object that specifies what data to fetch and from which row.
139   * @return The data coming from the specified row, if it exists. If the row specified doesn't
140   *         exist, the {@link Result} instance returned won't contain any
141   *         {@link org.apache.hadoop.hbase.KeyValue}, as indicated by {@link Result#isEmpty()}. The
142   *         return value will be wrapped by a {@link CompletableFuture}.
143   */
144  CompletableFuture<Result> get(Get get);
145
146  /**
147   * Puts some data to the table.
148   * @param put The data to put.
149   * @return A {@link CompletableFuture} that always returns null when complete normally.
150   */
151  CompletableFuture<Void> put(Put put);
152
153  /**
154   * Deletes the specified cells/row.
155   * @param delete The object that specifies what to delete.
156   * @return A {@link CompletableFuture} that always returns null when complete normally.
157   */
158  CompletableFuture<Void> delete(Delete delete);
159
160  /**
161   * Appends values to one or more columns within a single row.
162   * <p>
163   * This operation does not appear atomic to readers. Appends are done under a single row lock, so
164   * write operations to a row are synchronized, but readers do not take row locks so get and scan
165   * operations can see this operation partially completed.
166   * @param append object that specifies the columns and amounts to be used for the increment
167   *               operations
168   * @return values of columns after the append operation (maybe null). The return value will be
169   *         wrapped by a {@link CompletableFuture}.
170   */
171  CompletableFuture<Result> append(Append append);
172
173  /**
174   * Increments one or more columns within a single row.
175   * <p>
176   * This operation does not appear atomic to readers. Increments are done under a single row lock,
177   * so write operations to a row are synchronized, but readers do not take row locks so get and
178   * scan operations can see this operation partially completed.
179   * @param increment object that specifies the columns and amounts to be used for the increment
180   *                  operations
181   * @return values of columns after the increment. The return value will be wrapped by a
182   *         {@link CompletableFuture}.
183   */
184  CompletableFuture<Result> increment(Increment increment);
185
186  /**
187   * See {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)}
188   * <p>
189   * The {@link Durability} is defaulted to {@link Durability#SYNC_WAL}.
190   * @param row       The row that contains the cell to increment.
191   * @param family    The column family of the cell to increment.
192   * @param qualifier The column qualifier of the cell to increment.
193   * @param amount    The amount to increment the cell with (or decrement, if the amount is
194   *                  negative).
195   * @return The new value, post increment. The return value will be wrapped by a
196   *         {@link CompletableFuture}.
197   */
198  default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
199    long amount) {
200    return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
201  }
202
203  /**
204   * Atomically increments a column value. If the column value already exists and is not a
205   * big-endian long, this could throw an exception. If the column value does not yet exist it is
206   * initialized to <code>amount</code> and written to the specified column.
207   * <p>
208   * Setting durability to {@link Durability#SKIP_WAL} means that in a fail scenario you will lose
209   * any increments that have not been flushed.
210   * @param row        The row that contains the cell to increment.
211   * @param family     The column family of the cell to increment.
212   * @param qualifier  The column qualifier of the cell to increment.
213   * @param amount     The amount to increment the cell with (or decrement, if the amount is
214   *                   negative).
215   * @param durability The persistence guarantee for this increment.
216   * @return The new value, post increment. The return value will be wrapped by a
217   *         {@link CompletableFuture}.
218   */
219  default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
220    long amount, Durability durability) {
221    Preconditions.checkNotNull(row, "row is null");
222    Preconditions.checkNotNull(family, "family is null");
223    return increment(
224      new Increment(row).addColumn(family, qualifier, amount).setDurability(durability))
225        .thenApply(r -> Bytes.toLong(r.getValue(family, qualifier)));
226  }
227
228  /**
229   * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it
230   * adds the Put/Delete/RowMutations.
231   * <p>
232   * Use the returned {@link CheckAndMutateBuilder} to construct your request and then execute it.
233   * This is a fluent style API, the code is like:
234   *
235   * <pre>
236   * table.checkAndMutate(row, family).qualifier(qualifier).ifNotExists().thenPut(put)
237   *   .thenAccept(succ -&gt; {
238   *     if (succ) {
239   *       System.out.println("Check and put succeeded");
240   *     } else {
241   *       System.out.println("Check and put failed");
242   *     }
243   *   });
244   * </pre>
245   *
246   * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it
247   *             any more.
248   */
249  @Deprecated
250  CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family);
251
252  /**
253   * A helper class for sending checkAndMutate request.
254   * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it
255   *             any more.
256   */
257  @Deprecated
258  interface CheckAndMutateBuilder {
259
260    /**
261     * Match a qualifier.
262     * @param qualifier column qualifier to check.
263     */
264    CheckAndMutateBuilder qualifier(byte[] qualifier);
265
266    /**
267     * Match a timerange.
268     * @param timeRange time range to check.
269     */
270    CheckAndMutateBuilder timeRange(TimeRange timeRange);
271
272    /**
273     * Check for lack of column.
274     */
275    CheckAndMutateBuilder ifNotExists();
276
277    /**
278     * Check for equality.
279     * @param value the expected value
280     */
281    default CheckAndMutateBuilder ifEquals(byte[] value) {
282      return ifMatches(CompareOperator.EQUAL, value);
283    }
284
285    /**
286     * Compare a value
287     * @param compareOp comparison operator to use
288     * @param value     the expected value
289     */
290    CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value);
291
292    /**
293     * Specify a Put to commit if the check succeeds.
294     * @param put data to put if check succeeds
295     * @return {@code true} if the new put was executed, {@code false} otherwise. The return value
296     *         will be wrapped by a {@link CompletableFuture}.
297     */
298    CompletableFuture<Boolean> thenPut(Put put);
299
300    /**
301     * Specify a Delete to commit if the check succeeds.
302     * @param delete data to delete if check succeeds
303     * @return {@code true} if the new delete was executed, {@code false} otherwise. The return
304     *         value will be wrapped by a {@link CompletableFuture}.
305     */
306    CompletableFuture<Boolean> thenDelete(Delete delete);
307
308    /**
309     * Specify a RowMutations to commit if the check succeeds.
310     * @param mutation mutations to perform if check succeeds
311     * @return true if the new mutation was executed, false otherwise. The return value will be
312     *         wrapped by a {@link CompletableFuture}.
313     */
314    CompletableFuture<Boolean> thenMutate(RowMutations mutation);
315  }
316
317  /**
318   * Atomically checks if a row matches the specified filter. If it does, it adds the
319   * Put/Delete/RowMutations.
320   * <p>
321   * Use the returned {@link CheckAndMutateWithFilterBuilder} to construct your request and then
322   * execute it. This is a fluent style API, the code is like:
323   *
324   * <pre>
325   * table.checkAndMutate(row, filter).thenPut(put).thenAccept(succ -&gt; {
326   *   if (succ) {
327   *     System.out.println("Check and put succeeded");
328   *   } else {
329   *     System.out.println("Check and put failed");
330   *   }
331   * });
332   * </pre>
333   *
334   * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it
335   *             any more.
336   */
337  @Deprecated
338  CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter);
339
340  /**
341   * A helper class for sending checkAndMutate request with a filter.
342   * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it
343   *             any more.
344   */
345  @Deprecated
346  interface CheckAndMutateWithFilterBuilder {
347
348    /**
349     * Match a timerange.
350     * @param timeRange time range to check.
351     */
352    CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange);
353
354    /**
355     * Specify a Put to commit if the check succeeds.
356     * @param put data to put if check succeeds
357     * @return {@code true} if the new put was executed, {@code false} otherwise. The return value
358     *         will be wrapped by a {@link CompletableFuture}.
359     */
360    CompletableFuture<Boolean> thenPut(Put put);
361
362    /**
363     * Specify a Delete to commit if the check succeeds.
364     * @param delete data to delete if check succeeds
365     * @return {@code true} if the new delete was executed, {@code false} otherwise. The return
366     *         value will be wrapped by a {@link CompletableFuture}.
367     */
368    CompletableFuture<Boolean> thenDelete(Delete delete);
369
370    /**
371     * Specify a RowMutations to commit if the check succeeds.
372     * @param mutation mutations to perform if check succeeds
373     * @return true if the new mutation was executed, false otherwise. The return value will be
374     *         wrapped by a {@link CompletableFuture}.
375     */
376    CompletableFuture<Boolean> thenMutate(RowMutations mutation);
377  }
378
379  /**
380   * checkAndMutate that atomically checks if a row matches the specified condition. If it does, it
381   * performs the specified action.
382   * @param checkAndMutate The CheckAndMutate object.
383   * @return A {@link CompletableFuture}s that represent the result for the CheckAndMutate.
384   */
385  CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate);
386
387  /**
388   * Batch version of checkAndMutate. The specified CheckAndMutates are batched only in the sense
389   * that they are sent to a RS in one RPC, but each CheckAndMutate operation is still executed
390   * atomically (and thus, each may fail independently of others).
391   * @param checkAndMutates The list of CheckAndMutate.
392   * @return A list of {@link CompletableFuture}s that represent the result for each CheckAndMutate.
393   */
394  List<CompletableFuture<CheckAndMutateResult>>
395    checkAndMutate(List<CheckAndMutate> checkAndMutates);
396
397  /**
398   * A simple version of batch checkAndMutate. It will fail if there are any failures.
399   * @param checkAndMutates The list of rows to apply.
400   * @return A {@link CompletableFuture} that wrapper the result list.
401   */
402  default CompletableFuture<List<CheckAndMutateResult>>
403    checkAndMutateAll(List<CheckAndMutate> checkAndMutates) {
404    return allOf(checkAndMutate(checkAndMutates));
405  }
406
407  /**
408   * Performs multiple mutations atomically on a single row. Currently {@link Put} and
409   * {@link Delete} are supported.
410   * @param mutation object that specifies the set of mutations to perform atomically
411   * @return A {@link CompletableFuture} that returns results of Increment/Append operations
412   */
413  CompletableFuture<Result> mutateRow(RowMutations mutation);
414
415  /**
416   * The scan API uses the observer pattern.
417   * @param scan     A configured {@link Scan} object.
418   * @param consumer the consumer used to receive results.
419   * @see ScanResultConsumer
420   * @see AdvancedScanResultConsumer
421   */
422  void scan(Scan scan, C consumer);
423
424  /**
425   * Gets a scanner on the current table for the given family.
426   * @param family The column family to scan.
427   * @return A scanner.
428   */
429  default ResultScanner getScanner(byte[] family) {
430    return getScanner(new Scan().addFamily(family));
431  }
432
433  /**
434   * Gets a scanner on the current table for the given family and qualifier.
435   * @param family    The column family to scan.
436   * @param qualifier The column qualifier to scan.
437   * @return A scanner.
438   */
439  default ResultScanner getScanner(byte[] family, byte[] qualifier) {
440    return getScanner(new Scan().addColumn(family, qualifier));
441  }
442
443  /**
444   * Returns a scanner on the current table as specified by the {@link Scan} object.
445   * @param scan A configured {@link Scan} object.
446   * @return A scanner.
447   */
448  ResultScanner getScanner(Scan scan);
449
450  /**
451   * Return all the results that match the given scan object.
452   * <p>
453   * Notice that usually you should use this method with a {@link Scan} object that has limit set.
454   * For example, if you want to get the closest row after a given row, you could do this:
455   * <p>
456   *
457   * <pre>
458   * table.scanAll(new Scan().withStartRow(row, false).setLimit(1)).thenAccept(results -&gt; {
459   *   if (results.isEmpty()) {
460   *     System.out.println("No row after " + Bytes.toStringBinary(row));
461   *   } else {
462   *     System.out.println("The closest row after " + Bytes.toStringBinary(row) + " is "
463   *       + Bytes.toStringBinary(results.stream().findFirst().get().getRow()));
464   *   }
465   * });
466   * </pre>
467   * <p>
468   * If your result set is very large, you should use other scan method to get a scanner or use
469   * callback to process the results. They will do chunking to prevent OOM. The scanAll method will
470   * fetch all the results and store them in a List and then return the list to you.
471   * <p>
472   * The scan metrics will be collected background if you enable it but you have no way to get it.
473   * Usually you can get scan metrics from {@code ResultScanner}, or through
474   * {@code ScanResultConsumer.onScanMetricsCreated} but this method only returns a list of results.
475   * So if you really care about scan metrics then you'd better use other scan methods which return
476   * a {@code ResultScanner} or let you pass in a {@code ScanResultConsumer}. There is no
477   * performance difference between these scan methods so do not worry.
478   * @param scan A configured {@link Scan} object. So if you use this method to fetch a really large
479   *             result set, it is likely to cause OOM.
480   * @return The results of this small scan operation. The return value will be wrapped by a
481   *         {@link CompletableFuture}.
482   */
483  CompletableFuture<List<Result>> scanAll(Scan scan);
484
485  /**
486   * Test for the existence of columns in the table, as specified by the Gets.
487   * <p>
488   * This will return a list of booleans. Each value will be true if the related Get matches one or
489   * more keys, false if not.
490   * <p>
491   * This is a server-side call so it prevents any data from being transferred to the client.
492   * @param gets the Gets
493   * @return A list of {@link CompletableFuture}s that represent the existence for each get.
494   */
495  default List<CompletableFuture<Boolean>> exists(List<Get> gets) {
496    return get(toCheckExistenceOnly(gets)).stream()
497      .<CompletableFuture<Boolean>> map(f -> f.thenApply(r -> r.getExists())).collect(toList());
498  }
499
500  /**
501   * A simple version for batch exists. It will fail if there are any failures and you will get the
502   * whole result boolean list at once if the operation is succeeded.
503   * @param gets the Gets
504   * @return A {@link CompletableFuture} that wrapper the result boolean list.
505   */
506  default CompletableFuture<List<Boolean>> existsAll(List<Get> gets) {
507    return allOf(exists(gets));
508  }
509
510  /**
511   * Extracts certain cells from the given rows, in batch.
512   * <p>
513   * Notice that you may not get all the results with this function, which means some of the
514   * returned {@link CompletableFuture}s may succeed while some of the other returned
515   * {@link CompletableFuture}s may fail.
516   * @param gets The objects that specify what data to fetch and from which rows.
517   * @return A list of {@link CompletableFuture}s that represent the result for each get.
518   */
519  List<CompletableFuture<Result>> get(List<Get> gets);
520
521  /**
522   * A simple version for batch get. It will fail if there are any failures and you will get the
523   * whole result list at once if the operation is succeeded.
524   * @param gets The objects that specify what data to fetch and from which rows.
525   * @return A {@link CompletableFuture} that wrapper the result list.
526   */
527  default CompletableFuture<List<Result>> getAll(List<Get> gets) {
528    return allOf(get(gets));
529  }
530
531  /**
532   * Puts some data in the table, in batch.
533   * @param puts The list of mutations to apply.
534   * @return A list of {@link CompletableFuture}s that represent the result for each put.
535   */
536  List<CompletableFuture<Void>> put(List<Put> puts);
537
538  /**
539   * A simple version of batch put. It will fail if there are any failures.
540   * @param puts The list of mutations to apply.
541   * @return A {@link CompletableFuture} that always returns null when complete normally.
542   */
543  default CompletableFuture<Void> putAll(List<Put> puts) {
544    return allOf(put(puts)).thenApply(r -> null);
545  }
546
547  /**
548   * Deletes the specified cells/rows in bulk.
549   * @param deletes list of things to delete.
550   * @return A list of {@link CompletableFuture}s that represent the result for each delete.
551   */
552  List<CompletableFuture<Void>> delete(List<Delete> deletes);
553
554  /**
555   * A simple version of batch delete. It will fail if there are any failures.
556   * @param deletes list of things to delete.
557   * @return A {@link CompletableFuture} that always returns null when complete normally.
558   */
559  default CompletableFuture<Void> deleteAll(List<Delete> deletes) {
560    return allOf(delete(deletes)).thenApply(r -> null);
561  }
562
563  /**
564   * Method that does a batch call on Deletes, Gets, Puts, Increments, Appends and RowMutations. The
565   * ordering of execution of the actions is not defined. Meaning if you do a Put and a Get in the
566   * same {@link #batch} call, you will not necessarily be guaranteed that the Get returns what the
567   * Put had put.
568   * @param actions list of Get, Put, Delete, Increment, Append, and RowMutations objects
569   * @return A list of {@link CompletableFuture}s that represent the result for each action.
570   */
571  <T> List<CompletableFuture<T>> batch(List<? extends Row> actions);
572
573  /**
574   * A simple version of batch. It will fail if there are any failures and you will get the whole
575   * result list at once if the operation is succeeded.
576   * @param actions list of Get, Put, Delete, Increment, Append and RowMutations objects
577   * @return A list of the result for the actions. Wrapped by a {@link CompletableFuture}.
578   */
579  default <T> CompletableFuture<List<T>> batchAll(List<? extends Row> actions) {
580    return allOf(batch(actions));
581  }
582
583  /**
584   * Execute the given coprocessor call on the region which contains the given {@code row}.
585   * <p>
586   * The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a
587   * one line lambda expression, like:
588   *
589   * <pre>
590   * channel -&gt; xxxService.newStub(channel)
591   * </pre>
592   *
593   * @param stubMaker a delegation to the actual {@code newStub} call.
594   * @param callable  a delegation to the actual protobuf rpc call. See the comment of
595   *                  {@link ServiceCaller} for more details.
596   * @param row       The row key used to identify the remote region location
597   * @param <S>       the type of the asynchronous stub
598   * @param <R>       the type of the return value
599   * @return the return value of the protobuf rpc call, wrapped by a {@link CompletableFuture}.
600   * @see ServiceCaller
601   */
602  <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
603    ServiceCaller<S, R> callable, byte[] row);
604
605  /**
606   * The callback when we want to execute a coprocessor call on a range of regions.
607   * <p>
608   * As the locating itself also takes some time, the implementation may want to send rpc calls on
609   * the fly, which means we do not know how many regions we have when we get the return value of
610   * the rpc calls, so we need an {@link #onComplete()} which is used to tell you that we have
611   * passed all the return values to you(through the {@link #onRegionComplete(RegionInfo, Object)}
612   * or {@link #onRegionError(RegionInfo, Throwable)} calls), i.e, there will be no
613   * {@link #onRegionComplete(RegionInfo, Object)} or {@link #onRegionError(RegionInfo, Throwable)}
614   * calls in the future.
615   * <p>
616   * Here is a pseudo code to describe a typical implementation of a range coprocessor service
617   * method to help you better understand how the {@link CoprocessorCallback} will be called. The
618   * {@code callback} in the pseudo code is our {@link CoprocessorCallback}. And notice that the
619   * {@code whenComplete} is {@code CompletableFuture.whenComplete}.
620   *
621   * <pre>
622   * locateThenCall(byte[] row) {
623   *   locate(row).whenComplete((location, locateError) -&gt; {
624   *     if (locateError != null) {
625   *       callback.onError(locateError);
626   *       return;
627   *     }
628   *     incPendingCall();
629   *     region = location.getRegion();
630   *     if (region.getEndKey() > endKey) {
631   *       locateEnd = true;
632   *     } else {
633   *       locateThenCall(region.getEndKey());
634   *     }
635   *     sendCall().whenComplete((resp, error) -&gt; {
636   *       if (error != null) {
637   *         callback.onRegionError(region, error);
638   *       } else {
639   *         callback.onRegionComplete(region, resp);
640   *       }
641   *       if (locateEnd && decPendingCallAndGet() == 0) {
642   *         callback.onComplete();
643   *       }
644   *     });
645   *   });
646   * }
647   * </pre>
648   */
649  @InterfaceAudience.Public
650  interface CoprocessorCallback<R> {
651
652    /**
653     * Indicate that the respose of a region is available
654     * @param region the region that the response belongs to
655     * @param resp   the response of the coprocessor call
656     */
657    void onRegionComplete(RegionInfo region, R resp);
658
659    /**
660     * Indicate that the error for a region is available
661     * @param region the region that the error belongs to
662     * @param error  the response error of the coprocessor call
663     */
664    void onRegionError(RegionInfo region, Throwable error);
665
666    /**
667     * Indicate that all responses of the regions have been notified by calling
668     * {@link #onRegionComplete(RegionInfo, Object)} or
669     * {@link #onRegionError(RegionInfo, Throwable)}.
670     */
671    void onComplete();
672
673    /**
674     * Indicate that we got an error which does not belong to any regions. Usually a locating error.
675     */
676    void onError(Throwable error);
677  }
678
679  /**
680   * Helper class for sending coprocessorService request that executes a coprocessor call on regions
681   * which are covered by a range.
682   * <p>
683   * If {@code fromRow} is not specified the selection will start with the first table region. If
684   * {@code toRow} is not specified the selection will continue through the last table region.
685   * @param <S> the type of the protobuf Service you want to call.
686   * @param <R> the type of the return value.
687   */
688  interface CoprocessorServiceBuilder<S, R> {
689
690    /**
691     * Specify a start row
692     * @param startKey start region selection with region containing this row, inclusive.
693     */
694    default CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey) {
695      return fromRow(startKey, true);
696    }
697
698    /**
699     * Specify a start row
700     * @param startKey  start region selection with region containing this row
701     * @param inclusive whether to include the startKey
702     */
703    CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey, boolean inclusive);
704
705    /**
706     * Specify a stop row
707     * @param endKey select regions up to and including the region containing this row, exclusive.
708     */
709    default CoprocessorServiceBuilder<S, R> toRow(byte[] endKey) {
710      return toRow(endKey, false);
711    }
712
713    /**
714     * Specify a stop row
715     * @param endKey    select regions up to and including the region containing this row
716     * @param inclusive whether to include the endKey
717     */
718    CoprocessorServiceBuilder<S, R> toRow(byte[] endKey, boolean inclusive);
719
720    /**
721     * Execute the coprocessorService request. You can get the response through the
722     * {@link CoprocessorCallback}.
723     */
724    void execute();
725  }
726
727  /**
728   * Execute a coprocessor call on the regions which are covered by a range.
729   * <p>
730   * Use the returned {@link CoprocessorServiceBuilder} construct your request and then execute it.
731   * <p>
732   * The {@code stubMaker} is just a delegation to the {@code xxxService.newStub} call. Usually it
733   * is only a one line lambda expression, like:
734   *
735   * <pre>
736   * channel -&gt; xxxService.newStub(channel)
737   * </pre>
738   *
739   * @param stubMaker a delegation to the actual {@code newStub} call.
740   * @param callable  a delegation to the actual protobuf rpc call. See the comment of
741   *                  {@link ServiceCaller} for more details.
742   * @param callback  callback to get the response. See the comment of {@link CoprocessorCallback}
743   *                  for more details.
744   */
745  <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(Function<RpcChannel, S> stubMaker,
746    ServiceCaller<S, R> callable, CoprocessorCallback<R> callback);
747}