001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static java.util.stream.Collectors.toList;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.allOf;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnly;
023
024import com.google.protobuf.RpcChannel;
025import java.util.List;
026import java.util.concurrent.CompletableFuture;
027import java.util.concurrent.TimeUnit;
028import java.util.function.Function;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.CompareOperator;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.io.TimeRange;
033import org.apache.hadoop.hbase.util.Bytes;
034import org.apache.yetus.audience.InterfaceAudience;
035
036import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
037
038/**
039 * The interface for asynchronous version of Table. Obtain an instance from a
040 * {@link AsyncConnection}.
041 * <p>
042 * The implementation is required to be thread safe.
043 * <p>
044 * Usually the implementation will not throw any exception directly. You need to get the exception
045 * from the returned {@link CompletableFuture}.
046 * @since 2.0.0
047 */
048@InterfaceAudience.Public
049public interface AsyncTable<C extends ScanResultConsumerBase> {
050
051  /**
052   * Gets the fully qualified table name instance of this table.
053   */
054  TableName getName();
055
056  /**
057   * Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance.
058   * <p>
059   * The reference returned is not a copy, so any change made to it will affect this instance.
060   */
061  Configuration getConfiguration();
062
063  /**
064   * Gets the {@link TableDescriptor} for this table.
065   */
066  CompletableFuture<TableDescriptor> getDescriptor();
067
068  /**
069   * Gets the {@link AsyncTableRegionLocator} for this table.
070   */
071  AsyncTableRegionLocator getRegionLocator();
072  /**
073   * Get timeout of each rpc request in this Table instance. It will be overridden by a more
074   * specific rpc timeout config such as readRpcTimeout or writeRpcTimeout.
075   * @see #getReadRpcTimeout(TimeUnit)
076   * @see #getWriteRpcTimeout(TimeUnit)
077   * @param unit the unit of time the timeout to be represented in
078   * @return rpc timeout in the specified time unit
079   */
080  long getRpcTimeout(TimeUnit unit);
081
082  /**
083   * Get timeout of each rpc read request in this Table instance.
084   * @param unit the unit of time the timeout to be represented in
085   * @return read rpc timeout in the specified time unit
086   */
087  long getReadRpcTimeout(TimeUnit unit);
088
089  /**
090   * Get timeout of each rpc write request in this Table instance.
091   * @param unit the unit of time the timeout to be represented in
092   * @return write rpc timeout in the specified time unit
093   */
094  long getWriteRpcTimeout(TimeUnit unit);
095
096  /**
097   * Get timeout of each operation in Table instance.
098   * @param unit the unit of time the timeout to be represented in
099   * @return operation rpc timeout in the specified time unit
100   */
101  long getOperationTimeout(TimeUnit unit);
102
103  /**
104   * Get the timeout of a single operation in a scan. It works like operation timeout for other
105   * operations.
106   * @param unit the unit of time the timeout to be represented in
107   * @return scan rpc timeout in the specified time unit
108   */
109  long getScanTimeout(TimeUnit unit);
110
111  /**
112   * Test for the existence of columns in the table, as specified by the Get.
113   * <p>
114   * This will return true if the Get matches one or more keys, false if not.
115   * <p>
116   * This is a server-side call so it prevents any data from being transfered to the client.
117   * @return true if the specified Get matches one or more keys, false if not. The return value will
118   *         be wrapped by a {@link CompletableFuture}.
119   */
120  default CompletableFuture<Boolean> exists(Get get) {
121    return get(toCheckExistenceOnly(get)).thenApply(r -> r.getExists());
122  }
123
124  /**
125   * Extracts certain cells from a given row.
126   * @param get The object that specifies what data to fetch and from which row.
127   * @return The data coming from the specified row, if it exists. If the row specified doesn't
128   *         exist, the {@link Result} instance returned won't contain any
129   *         {@link org.apache.hadoop.hbase.KeyValue}, as indicated by {@link Result#isEmpty()}. The
130   *         return value will be wrapped by a {@link CompletableFuture}.
131   */
132  CompletableFuture<Result> get(Get get);
133
134  /**
135   * Puts some data to the table.
136   * @param put The data to put.
137   * @return A {@link CompletableFuture} that always returns null when complete normally.
138   */
139  CompletableFuture<Void> put(Put put);
140
141  /**
142   * Deletes the specified cells/row.
143   * @param delete The object that specifies what to delete.
144   * @return A {@link CompletableFuture} that always returns null when complete normally.
145   */
146  CompletableFuture<Void> delete(Delete delete);
147
148  /**
149   * Appends values to one or more columns within a single row.
150   * <p>
151   * This operation does not appear atomic to readers. Appends are done under a single row lock, so
152   * write operations to a row are synchronized, but readers do not take row locks so get and scan
153   * operations can see this operation partially completed.
154   * @param append object that specifies the columns and amounts to be used for the increment
155   *          operations
156   * @return values of columns after the append operation (maybe null). The return value will be
157   *         wrapped by a {@link CompletableFuture}.
158   */
159  CompletableFuture<Result> append(Append append);
160
161  /**
162   * Increments one or more columns within a single row.
163   * <p>
164   * This operation does not appear atomic to readers. Increments are done under a single row lock,
165   * so write operations to a row are synchronized, but readers do not take row locks so get and
166   * scan operations can see this operation partially completed.
167   * @param increment object that specifies the columns and amounts to be used for the increment
168   *          operations
169   * @return values of columns after the increment. The return value will be wrapped by a
170   *         {@link CompletableFuture}.
171   */
172  CompletableFuture<Result> increment(Increment increment);
173
174  /**
175   * See {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)}
176   * <p>
177   * The {@link Durability} is defaulted to {@link Durability#SYNC_WAL}.
178   * @param row The row that contains the cell to increment.
179   * @param family The column family of the cell to increment.
180   * @param qualifier The column qualifier of the cell to increment.
181   * @param amount The amount to increment the cell with (or decrement, if the amount is negative).
182   * @return The new value, post increment. The return value will be wrapped by a
183   *         {@link CompletableFuture}.
184   */
185  default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
186      long amount) {
187    return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
188  }
189
190  /**
191   * Atomically increments a column value. If the column value already exists and is not a
192   * big-endian long, this could throw an exception. If the column value does not yet exist it is
193   * initialized to <code>amount</code> and written to the specified column.
194   * <p>
195   * Setting durability to {@link Durability#SKIP_WAL} means that in a fail scenario you will lose
196   * any increments that have not been flushed.
197   * @param row The row that contains the cell to increment.
198   * @param family The column family of the cell to increment.
199   * @param qualifier The column qualifier of the cell to increment.
200   * @param amount The amount to increment the cell with (or decrement, if the amount is negative).
201   * @param durability The persistence guarantee for this increment.
202   * @return The new value, post increment. The return value will be wrapped by a
203   *         {@link CompletableFuture}.
204   */
205  default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
206      long amount, Durability durability) {
207    Preconditions.checkNotNull(row, "row is null");
208    Preconditions.checkNotNull(family, "family is null");
209    return increment(
210      new Increment(row).addColumn(family, qualifier, amount).setDurability(durability))
211          .thenApply(r -> Bytes.toLong(r.getValue(family, qualifier)));
212  }
213
214  /**
215   * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it
216   * adds the Put/Delete/RowMutations.
217   * <p>
218   * Use the returned {@link CheckAndMutateBuilder} to construct your request and then execute it.
219   * This is a fluent style API, the code is like:
220   *
221   * <pre>
222   * <code>
223   * table.checkAndMutate(row, family).qualifier(qualifier).ifNotExists().thenPut(put)
224   *     .thenAccept(succ -> {
225   *       if (succ) {
226   *         System.out.println("Check and put succeeded");
227   *       } else {
228   *         System.out.println("Check and put failed");
229   *       }
230   *     });
231   * </code>
232   * </pre>
233   */
234  CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family);
235
236  /**
237   * A helper class for sending checkAndMutate request.
238   */
239  interface CheckAndMutateBuilder {
240
241    /**
242     * @param qualifier column qualifier to check.
243     */
244    CheckAndMutateBuilder qualifier(byte[] qualifier);
245
246    /**
247     * @param timeRange time range to check.
248     */
249    CheckAndMutateBuilder timeRange(TimeRange timeRange);
250
251    /**
252     * Check for lack of column.
253     */
254    CheckAndMutateBuilder ifNotExists();
255
256    /**
257     * Check for equality.
258     * @param value the expected value
259     */
260    default CheckAndMutateBuilder ifEquals(byte[] value) {
261      return ifMatches(CompareOperator.EQUAL, value);
262    }
263
264    /**
265     * @param compareOp comparison operator to use
266     * @param value the expected value
267     */
268    CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value);
269
270    /**
271     * @param put data to put if check succeeds
272     * @return {@code true} if the new put was executed, {@code false} otherwise. The return value
273     *         will be wrapped by a {@link CompletableFuture}.
274     */
275    CompletableFuture<Boolean> thenPut(Put put);
276
277    /**
278     * @param delete data to delete if check succeeds
279     * @return {@code true} if the new delete was executed, {@code false} otherwise. The return
280     *         value will be wrapped by a {@link CompletableFuture}.
281     */
282    CompletableFuture<Boolean> thenDelete(Delete delete);
283
284    /**
285     * @param mutation mutations to perform if check succeeds
286     * @return true if the new mutation was executed, false otherwise. The return value will be
287     *         wrapped by a {@link CompletableFuture}.
288     */
289    CompletableFuture<Boolean> thenMutate(RowMutations mutation);
290  }
291
292  /**
293   * Performs multiple mutations atomically on a single row. Currently {@link Put} and
294   * {@link Delete} are supported.
295   * @param mutation object that specifies the set of mutations to perform atomically
296   * @return A {@link CompletableFuture} that always returns null when complete normally.
297   */
298  CompletableFuture<Void> mutateRow(RowMutations mutation);
299
300  /**
301   * The scan API uses the observer pattern.
302   * @param scan A configured {@link Scan} object.
303   * @param consumer the consumer used to receive results.
304   * @see ScanResultConsumer
305   * @see AdvancedScanResultConsumer
306   */
307  void scan(Scan scan, C consumer);
308
309  /**
310   * Gets a scanner on the current table for the given family.
311   * @param family The column family to scan.
312   * @return A scanner.
313   */
314  default ResultScanner getScanner(byte[] family) {
315    return getScanner(new Scan().addFamily(family));
316  }
317
318  /**
319   * Gets a scanner on the current table for the given family and qualifier.
320   * @param family The column family to scan.
321   * @param qualifier The column qualifier to scan.
322   * @return A scanner.
323   */
324  default ResultScanner getScanner(byte[] family, byte[] qualifier) {
325    return getScanner(new Scan().addColumn(family, qualifier));
326  }
327
328  /**
329   * Returns a scanner on the current table as specified by the {@link Scan} object.
330   * @param scan A configured {@link Scan} object.
331   * @return A scanner.
332   */
333  ResultScanner getScanner(Scan scan);
334
335  /**
336   * Return all the results that match the given scan object.
337   * <p>
338   * Notice that usually you should use this method with a {@link Scan} object that has limit set.
339   * For example, if you want to get the closest row after a given row, you could do this:
340   * <p>
341   *
342   * <pre>
343   * <code>
344   * table.scanAll(new Scan().withStartRow(row, false).setLimit(1)).thenAccept(results -> {
345   *   if (results.isEmpty()) {
346   *      System.out.println("No row after " + Bytes.toStringBinary(row));
347   *   } else {
348   *     System.out.println("The closest row after " + Bytes.toStringBinary(row) + " is "
349   *         + Bytes.toStringBinary(results.stream().findFirst().get().getRow()));
350   *   }
351   * });
352   * </code>
353   * </pre>
354   * <p>
355   * If your result set is very large, you should use other scan method to get a scanner or use
356   * callback to process the results. They will do chunking to prevent OOM. The scanAll method will
357   * fetch all the results and store them in a List and then return the list to you.
358   * <p>
359   * The scan metrics will be collected background if you enable it but you have no way to get it.
360   * Usually you can get scan metrics from {@code ResultScanner}, or through
361   * {@code ScanResultConsumer.onScanMetricsCreated} but this method only returns a list of results.
362   * So if you really care about scan metrics then you'd better use other scan methods which return
363   * a {@code ResultScanner} or let you pass in a {@code ScanResultConsumer}. There is no
364   * performance difference between these scan methods so do not worry.
365   * @param scan A configured {@link Scan} object. So if you use this method to fetch a really large
366   *          result set, it is likely to cause OOM.
367   * @return The results of this small scan operation. The return value will be wrapped by a
368   *         {@link CompletableFuture}.
369   */
370  CompletableFuture<List<Result>> scanAll(Scan scan);
371
372  /**
373   * Test for the existence of columns in the table, as specified by the Gets.
374   * <p>
375   * This will return a list of booleans. Each value will be true if the related Get matches one or
376   * more keys, false if not.
377   * <p>
378   * This is a server-side call so it prevents any data from being transferred to the client.
379   * @param gets the Gets
380   * @return A list of {@link CompletableFuture}s that represent the existence for each get.
381   */
382  default List<CompletableFuture<Boolean>> exists(List<Get> gets) {
383    return get(toCheckExistenceOnly(gets)).stream()
384        .<CompletableFuture<Boolean>> map(f -> f.thenApply(r -> r.getExists())).collect(toList());
385  }
386
387  /**
388   * A simple version for batch exists. It will fail if there are any failures and you will get the
389   * whole result boolean list at once if the operation is succeeded.
390   * @param gets the Gets
391   * @return A {@link CompletableFuture} that wrapper the result boolean list.
392   */
393  default CompletableFuture<List<Boolean>> existsAll(List<Get> gets) {
394    return allOf(exists(gets));
395  }
396
397  /**
398   * Extracts certain cells from the given rows, in batch.
399   * <p>
400   * Notice that you may not get all the results with this function, which means some of the
401   * returned {@link CompletableFuture}s may succeed while some of the other returned
402   * {@link CompletableFuture}s may fail.
403   * @param gets The objects that specify what data to fetch and from which rows.
404   * @return A list of {@link CompletableFuture}s that represent the result for each get.
405   */
406  List<CompletableFuture<Result>> get(List<Get> gets);
407
408  /**
409   * A simple version for batch get. It will fail if there are any failures and you will get the
410   * whole result list at once if the operation is succeeded.
411   * @param gets The objects that specify what data to fetch and from which rows.
412   * @return A {@link CompletableFuture} that wrapper the result list.
413   */
414  default CompletableFuture<List<Result>> getAll(List<Get> gets) {
415    return allOf(get(gets));
416  }
417
418  /**
419   * Puts some data in the table, in batch.
420   * @param puts The list of mutations to apply.
421   * @return A list of {@link CompletableFuture}s that represent the result for each put.
422   */
423  List<CompletableFuture<Void>> put(List<Put> puts);
424
425  /**
426   * A simple version of batch put. It will fail if there are any failures.
427   * @param puts The list of mutations to apply.
428   * @return A {@link CompletableFuture} that always returns null when complete normally.
429   */
430  default CompletableFuture<Void> putAll(List<Put> puts) {
431    return allOf(put(puts)).thenApply(r -> null);
432  }
433
434  /**
435   * Deletes the specified cells/rows in bulk.
436   * @param deletes list of things to delete.
437   * @return A list of {@link CompletableFuture}s that represent the result for each delete.
438   */
439  List<CompletableFuture<Void>> delete(List<Delete> deletes);
440
441  /**
442   * A simple version of batch delete. It will fail if there are any failures.
443   * @param deletes list of things to delete.
444   * @return A {@link CompletableFuture} that always returns null when complete normally.
445   */
446  default CompletableFuture<Void> deleteAll(List<Delete> deletes) {
447    return allOf(delete(deletes)).thenApply(r -> null);
448  }
449
450  /**
451   * Method that does a batch call on Deletes, Gets, Puts, Increments, Appends and RowMutations. The
452   * ordering of execution of the actions is not defined. Meaning if you do a Put and a Get in the
453   * same {@link #batch} call, you will not necessarily be guaranteed that the Get returns what the
454   * Put had put.
455   * @param actions list of Get, Put, Delete, Increment, Append, and RowMutations objects
456   * @return A list of {@link CompletableFuture}s that represent the result for each action.
457   */
458  <T> List<CompletableFuture<T>> batch(List<? extends Row> actions);
459
460  /**
461   * A simple version of batch. It will fail if there are any failures and you will get the whole
462   * result list at once if the operation is succeeded.
463   * @param actions list of Get, Put, Delete, Increment, Append and RowMutations objects
464   * @return A list of the result for the actions. Wrapped by a {@link CompletableFuture}.
465   */
466  default <T> CompletableFuture<List<T>> batchAll(List<? extends Row> actions) {
467    return allOf(batch(actions));
468  }
469
470  /**
471   * Execute the given coprocessor call on the region which contains the given {@code row}.
472   * <p>
473   * The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a
474   * one line lambda expression, like:
475   *
476   * <pre>
477   * <code>
478   * channel -> xxxService.newStub(channel)
479   * </code>
480   * </pre>
481   *
482   * @param stubMaker a delegation to the actual {@code newStub} call.
483   * @param callable a delegation to the actual protobuf rpc call. See the comment of
484   *          {@link ServiceCaller} for more details.
485   * @param row The row key used to identify the remote region location
486   * @param <S> the type of the asynchronous stub
487   * @param <R> the type of the return value
488   * @return the return value of the protobuf rpc call, wrapped by a {@link CompletableFuture}.
489   * @see ServiceCaller
490   */
491  <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
492      ServiceCaller<S, R> callable, byte[] row);
493
494  /**
495   * The callback when we want to execute a coprocessor call on a range of regions.
496   * <p>
497   * As the locating itself also takes some time, the implementation may want to send rpc calls on
498   * the fly, which means we do not know how many regions we have when we get the return value of
499   * the rpc calls, so we need an {@link #onComplete()} which is used to tell you that we have
500   * passed all the return values to you(through the {@link #onRegionComplete(RegionInfo, Object)}
501   * or {@link #onRegionError(RegionInfo, Throwable)} calls), i.e, there will be no
502   * {@link #onRegionComplete(RegionInfo, Object)} or {@link #onRegionError(RegionInfo, Throwable)}
503   * calls in the future.
504   * <p>
505   * Here is a pseudo code to describe a typical implementation of a range coprocessor service
506   * method to help you better understand how the {@link CoprocessorCallback} will be called. The
507   * {@code callback} in the pseudo code is our {@link CoprocessorCallback}. And notice that the
508   * {@code whenComplete} is {@code CompletableFuture.whenComplete}.
509   *
510   * <pre>
511   * locateThenCall(byte[] row) {
512   *   locate(row).whenComplete((location, locateError) -> {
513   *     if (locateError != null) {
514   *       callback.onError(locateError);
515   *       return;
516   *     }
517   *     incPendingCall();
518   *     region = location.getRegion();
519   *     if (region.getEndKey() > endKey) {
520   *       locateEnd = true;
521   *     } else {
522   *       locateThenCall(region.getEndKey());
523   *     }
524   *     sendCall().whenComplete((resp, error) -> {
525   *       if (error != null) {
526   *         callback.onRegionError(region, error);
527   *       } else {
528   *         callback.onRegionComplete(region, resp);
529   *       }
530   *       if (locateEnd && decPendingCallAndGet() == 0) {
531   *         callback.onComplete();
532   *       }
533   *     });
534   *   });
535   * }
536   * </pre>
537   */
538  @InterfaceAudience.Public
539  interface CoprocessorCallback<R> {
540
541    /**
542     * @param region the region that the response belongs to
543     * @param resp the response of the coprocessor call
544     */
545    void onRegionComplete(RegionInfo region, R resp);
546
547    /**
548     * @param region the region that the error belongs to
549     * @param error the response error of the coprocessor call
550     */
551    void onRegionError(RegionInfo region, Throwable error);
552
553    /**
554     * Indicate that all responses of the regions have been notified by calling
555     * {@link #onRegionComplete(RegionInfo, Object)} or
556     * {@link #onRegionError(RegionInfo, Throwable)}.
557     */
558    void onComplete();
559
560    /**
561     * Indicate that we got an error which does not belong to any regions. Usually a locating error.
562     */
563    void onError(Throwable error);
564  }
565
566  /**
567   * Helper class for sending coprocessorService request that executes a coprocessor call on regions
568   * which are covered by a range.
569   * <p>
570   * If {@code fromRow} is not specified the selection will start with the first table region. If
571   * {@code toRow} is not specified the selection will continue through the last table region.
572   * @param <S> the type of the protobuf Service you want to call.
573   * @param <R> the type of the return value.
574   */
575  interface CoprocessorServiceBuilder<S, R> {
576
577    /**
578     * @param startKey start region selection with region containing this row, inclusive.
579     */
580    default CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey) {
581      return fromRow(startKey, true);
582    }
583
584    /**
585     * @param startKey start region selection with region containing this row
586     * @param inclusive whether to include the startKey
587     */
588    CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey, boolean inclusive);
589
590    /**
591     * @param endKey select regions up to and including the region containing this row, exclusive.
592     */
593    default CoprocessorServiceBuilder<S, R> toRow(byte[] endKey) {
594      return toRow(endKey, false);
595    }
596
597    /**
598     * @param endKey select regions up to and including the region containing this row
599     * @param inclusive whether to include the endKey
600     */
601    CoprocessorServiceBuilder<S, R> toRow(byte[] endKey, boolean inclusive);
602
603    /**
604     * Execute the coprocessorService request. You can get the response through the
605     * {@link CoprocessorCallback}.
606     */
607    void execute();
608  }
609
610  /**
611   * Execute a coprocessor call on the regions which are covered by a range.
612   * <p>
613   * Use the returned {@link CoprocessorServiceBuilder} construct your request and then execute it.
614   * <p>
615   * The {@code stubMaker} is just a delegation to the {@code xxxService.newStub} call. Usually it
616   * is only a one line lambda expression, like:
617   *
618   * <pre>
619   * <code>
620   * channel -> xxxService.newStub(channel)
621   * </code>
622   * </pre>
623   *
624   * @param stubMaker a delegation to the actual {@code newStub} call.
625   * @param callable a delegation to the actual protobuf rpc call. See the comment of
626   *          {@link ServiceCaller} for more details.
627   * @param callback callback to get the response. See the comment of {@link CoprocessorCallback}
628   *          for more details.
629   */
630  <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(Function<RpcChannel, S> stubMaker,
631      ServiceCaller<S, R> callable, CoprocessorCallback<R> callback);
632}