001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static java.util.stream.Collectors.toList; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.allOf; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnly; 023 024import com.google.protobuf.RpcChannel; 025import java.util.List; 026import java.util.concurrent.CompletableFuture; 027import java.util.concurrent.TimeUnit; 028import java.util.function.Function; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.CompareOperator; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.io.TimeRange; 033import org.apache.hadoop.hbase.util.Bytes; 034import org.apache.yetus.audience.InterfaceAudience; 035 036import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 037 038/** 039 * The interface for asynchronous version of Table. Obtain an instance from a 040 * {@link AsyncConnection}. 041 * <p> 042 * The implementation is required to be thread safe. 043 * <p> 044 * Usually the implementation will not throw any exception directly. You need to get the exception 045 * from the returned {@link CompletableFuture}. 046 * @since 2.0.0 047 */ 048@InterfaceAudience.Public 049public interface AsyncTable<C extends ScanResultConsumerBase> { 050 051 /** 052 * Gets the fully qualified table name instance of this table. 053 */ 054 TableName getName(); 055 056 /** 057 * Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance. 058 * <p> 059 * The reference returned is not a copy, so any change made to it will affect this instance. 060 */ 061 Configuration getConfiguration(); 062 063 /** 064 * Get timeout of each rpc request in this Table instance. It will be overridden by a more 065 * specific rpc timeout config such as readRpcTimeout or writeRpcTimeout. 066 * @see #getReadRpcTimeout(TimeUnit) 067 * @see #getWriteRpcTimeout(TimeUnit) 068 * @param unit the unit of time the timeout to be represented in 069 * @return rpc timeout in the specified time unit 070 */ 071 long getRpcTimeout(TimeUnit unit); 072 073 /** 074 * Get timeout of each rpc read request in this Table instance. 075 * @param unit the unit of time the timeout to be represented in 076 * @return read rpc timeout in the specified time unit 077 */ 078 long getReadRpcTimeout(TimeUnit unit); 079 080 /** 081 * Get timeout of each rpc write request in this Table instance. 082 * @param unit the unit of time the timeout to be represented in 083 * @return write rpc timeout in the specified time unit 084 */ 085 long getWriteRpcTimeout(TimeUnit unit); 086 087 /** 088 * Get timeout of each operation in Table instance. 089 * @param unit the unit of time the timeout to be represented in 090 * @return operation rpc timeout in the specified time unit 091 */ 092 long getOperationTimeout(TimeUnit unit); 093 094 /** 095 * Get the timeout of a single operation in a scan. It works like operation timeout for other 096 * operations. 097 * @param unit the unit of time the timeout to be represented in 098 * @return scan rpc timeout in the specified time unit 099 */ 100 long getScanTimeout(TimeUnit unit); 101 102 /** 103 * Test for the existence of columns in the table, as specified by the Get. 104 * <p> 105 * This will return true if the Get matches one or more keys, false if not. 106 * <p> 107 * This is a server-side call so it prevents any data from being transfered to the client. 108 * @return true if the specified Get matches one or more keys, false if not. The return value will 109 * be wrapped by a {@link CompletableFuture}. 110 */ 111 default CompletableFuture<Boolean> exists(Get get) { 112 return get(toCheckExistenceOnly(get)).thenApply(r -> r.getExists()); 113 } 114 115 /** 116 * Extracts certain cells from a given row. 117 * @param get The object that specifies what data to fetch and from which row. 118 * @return The data coming from the specified row, if it exists. If the row specified doesn't 119 * exist, the {@link Result} instance returned won't contain any 120 * {@link org.apache.hadoop.hbase.KeyValue}, as indicated by {@link Result#isEmpty()}. The 121 * return value will be wrapped by a {@link CompletableFuture}. 122 */ 123 CompletableFuture<Result> get(Get get); 124 125 /** 126 * Puts some data to the table. 127 * @param put The data to put. 128 * @return A {@link CompletableFuture} that always returns null when complete normally. 129 */ 130 CompletableFuture<Void> put(Put put); 131 132 /** 133 * Deletes the specified cells/row. 134 * @param delete The object that specifies what to delete. 135 * @return A {@link CompletableFuture} that always returns null when complete normally. 136 */ 137 CompletableFuture<Void> delete(Delete delete); 138 139 /** 140 * Appends values to one or more columns within a single row. 141 * <p> 142 * This operation does not appear atomic to readers. Appends are done under a single row lock, so 143 * write operations to a row are synchronized, but readers do not take row locks so get and scan 144 * operations can see this operation partially completed. 145 * @param append object that specifies the columns and amounts to be used for the increment 146 * operations 147 * @return values of columns after the append operation (maybe null). The return value will be 148 * wrapped by a {@link CompletableFuture}. 149 */ 150 CompletableFuture<Result> append(Append append); 151 152 /** 153 * Increments one or more columns within a single row. 154 * <p> 155 * This operation does not appear atomic to readers. Increments are done under a single row lock, 156 * so write operations to a row are synchronized, but readers do not take row locks so get and 157 * scan operations can see this operation partially completed. 158 * @param increment object that specifies the columns and amounts to be used for the increment 159 * operations 160 * @return values of columns after the increment. The return value will be wrapped by a 161 * {@link CompletableFuture}. 162 */ 163 CompletableFuture<Result> increment(Increment increment); 164 165 /** 166 * See {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)} 167 * <p> 168 * The {@link Durability} is defaulted to {@link Durability#SYNC_WAL}. 169 * @param row The row that contains the cell to increment. 170 * @param family The column family of the cell to increment. 171 * @param qualifier The column qualifier of the cell to increment. 172 * @param amount The amount to increment the cell with (or decrement, if the amount is negative). 173 * @return The new value, post increment. The return value will be wrapped by a 174 * {@link CompletableFuture}. 175 */ 176 default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, 177 long amount) { 178 return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL); 179 } 180 181 /** 182 * Atomically increments a column value. If the column value already exists and is not a 183 * big-endian long, this could throw an exception. If the column value does not yet exist it is 184 * initialized to <code>amount</code> and written to the specified column. 185 * <p> 186 * Setting durability to {@link Durability#SKIP_WAL} means that in a fail scenario you will lose 187 * any increments that have not been flushed. 188 * @param row The row that contains the cell to increment. 189 * @param family The column family of the cell to increment. 190 * @param qualifier The column qualifier of the cell to increment. 191 * @param amount The amount to increment the cell with (or decrement, if the amount is negative). 192 * @param durability The persistence guarantee for this increment. 193 * @return The new value, post increment. The return value will be wrapped by a 194 * {@link CompletableFuture}. 195 */ 196 default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, 197 long amount, Durability durability) { 198 Preconditions.checkNotNull(row, "row is null"); 199 Preconditions.checkNotNull(family, "family is null"); 200 return increment( 201 new Increment(row).addColumn(family, qualifier, amount).setDurability(durability)) 202 .thenApply(r -> Bytes.toLong(r.getValue(family, qualifier))); 203 } 204 205 /** 206 * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it 207 * adds the Put/Delete/RowMutations. 208 * <p> 209 * Use the returned {@link CheckAndMutateBuilder} to construct your request and then execute it. 210 * This is a fluent style API, the code is like: 211 * 212 * <pre> 213 * <code> 214 * table.checkAndMutate(row, family).qualifier(qualifier).ifNotExists().thenPut(put) 215 * .thenAccept(succ -> { 216 * if (succ) { 217 * System.out.println("Check and put succeeded"); 218 * } else { 219 * System.out.println("Check and put failed"); 220 * } 221 * }); 222 * </code> 223 * </pre> 224 */ 225 CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family); 226 227 /** 228 * A helper class for sending checkAndMutate request. 229 */ 230 interface CheckAndMutateBuilder { 231 232 /** 233 * @param qualifier column qualifier to check. 234 */ 235 CheckAndMutateBuilder qualifier(byte[] qualifier); 236 237 /** 238 * @param timeRange time range to check. 239 */ 240 CheckAndMutateBuilder timeRange(TimeRange timeRange); 241 242 /** 243 * Check for lack of column. 244 */ 245 CheckAndMutateBuilder ifNotExists(); 246 247 /** 248 * Check for equality. 249 * @param value the expected value 250 */ 251 default CheckAndMutateBuilder ifEquals(byte[] value) { 252 return ifMatches(CompareOperator.EQUAL, value); 253 } 254 255 /** 256 * @param compareOp comparison operator to use 257 * @param value the expected value 258 */ 259 CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value); 260 261 /** 262 * @param put data to put if check succeeds 263 * @return {@code true} if the new put was executed, {@code false} otherwise. The return value 264 * will be wrapped by a {@link CompletableFuture}. 265 */ 266 CompletableFuture<Boolean> thenPut(Put put); 267 268 /** 269 * @param delete data to delete if check succeeds 270 * @return {@code true} if the new delete was executed, {@code false} otherwise. The return 271 * value will be wrapped by a {@link CompletableFuture}. 272 */ 273 CompletableFuture<Boolean> thenDelete(Delete delete); 274 275 /** 276 * @param mutation mutations to perform if check succeeds 277 * @return true if the new mutation was executed, false otherwise. The return value will be 278 * wrapped by a {@link CompletableFuture}. 279 */ 280 CompletableFuture<Boolean> thenMutate(RowMutations mutation); 281 } 282 283 /** 284 * Performs multiple mutations atomically on a single row. Currently {@link Put} and 285 * {@link Delete} are supported. 286 * @param mutation object that specifies the set of mutations to perform atomically 287 * @return A {@link CompletableFuture} that always returns null when complete normally. 288 */ 289 CompletableFuture<Void> mutateRow(RowMutations mutation); 290 291 /** 292 * The scan API uses the observer pattern. 293 * @param scan A configured {@link Scan} object. 294 * @param consumer the consumer used to receive results. 295 * @see ScanResultConsumer 296 * @see AdvancedScanResultConsumer 297 */ 298 void scan(Scan scan, C consumer); 299 300 /** 301 * Gets a scanner on the current table for the given family. 302 * @param family The column family to scan. 303 * @return A scanner. 304 */ 305 default ResultScanner getScanner(byte[] family) { 306 return getScanner(new Scan().addFamily(family)); 307 } 308 309 /** 310 * Gets a scanner on the current table for the given family and qualifier. 311 * @param family The column family to scan. 312 * @param qualifier The column qualifier to scan. 313 * @return A scanner. 314 */ 315 default ResultScanner getScanner(byte[] family, byte[] qualifier) { 316 return getScanner(new Scan().addColumn(family, qualifier)); 317 } 318 319 /** 320 * Returns a scanner on the current table as specified by the {@link Scan} object. 321 * @param scan A configured {@link Scan} object. 322 * @return A scanner. 323 */ 324 ResultScanner getScanner(Scan scan); 325 326 /** 327 * Return all the results that match the given scan object. 328 * <p> 329 * Notice that usually you should use this method with a {@link Scan} object that has limit set. 330 * For example, if you want to get the closest row after a given row, you could do this: 331 * <p> 332 * 333 * <pre> 334 * <code> 335 * table.scanAll(new Scan().withStartRow(row, false).setLimit(1)).thenAccept(results -> { 336 * if (results.isEmpty()) { 337 * System.out.println("No row after " + Bytes.toStringBinary(row)); 338 * } else { 339 * System.out.println("The closest row after " + Bytes.toStringBinary(row) + " is " 340 * + Bytes.toStringBinary(results.stream().findFirst().get().getRow())); 341 * } 342 * }); 343 * </code> 344 * </pre> 345 * <p> 346 * If your result set is very large, you should use other scan method to get a scanner or use 347 * callback to process the results. They will do chunking to prevent OOM. The scanAll method will 348 * fetch all the results and store them in a List and then return the list to you. 349 * <p> 350 * The scan metrics will be collected background if you enable it but you have no way to get it. 351 * Usually you can get scan metrics from {@code ResultScanner}, or through 352 * {@code ScanResultConsumer.onScanMetricsCreated} but this method only returns a list of results. 353 * So if you really care about scan metrics then you'd better use other scan methods which return 354 * a {@code ResultScanner} or let you pass in a {@code ScanResultConsumer}. There is no 355 * performance difference between these scan methods so do not worry. 356 * @param scan A configured {@link Scan} object. So if you use this method to fetch a really large 357 * result set, it is likely to cause OOM. 358 * @return The results of this small scan operation. The return value will be wrapped by a 359 * {@link CompletableFuture}. 360 */ 361 CompletableFuture<List<Result>> scanAll(Scan scan); 362 363 /** 364 * Test for the existence of columns in the table, as specified by the Gets. 365 * <p> 366 * This will return a list of booleans. Each value will be true if the related Get matches one or 367 * more keys, false if not. 368 * <p> 369 * This is a server-side call so it prevents any data from being transferred to the client. 370 * @param gets the Gets 371 * @return A list of {@link CompletableFuture}s that represent the existence for each get. 372 */ 373 default List<CompletableFuture<Boolean>> exists(List<Get> gets) { 374 return get(toCheckExistenceOnly(gets)).stream() 375 .<CompletableFuture<Boolean>> map(f -> f.thenApply(r -> r.getExists())).collect(toList()); 376 } 377 378 /** 379 * A simple version for batch exists. It will fail if there are any failures and you will get the 380 * whole result boolean list at once if the operation is succeeded. 381 * @param gets the Gets 382 * @return A {@link CompletableFuture} that wrapper the result boolean list. 383 */ 384 default CompletableFuture<List<Boolean>> existsAll(List<Get> gets) { 385 return allOf(exists(gets)); 386 } 387 388 /** 389 * Extracts certain cells from the given rows, in batch. 390 * <p> 391 * Notice that you may not get all the results with this function, which means some of the 392 * returned {@link CompletableFuture}s may succeed while some of the other returned 393 * {@link CompletableFuture}s may fail. 394 * @param gets The objects that specify what data to fetch and from which rows. 395 * @return A list of {@link CompletableFuture}s that represent the result for each get. 396 */ 397 List<CompletableFuture<Result>> get(List<Get> gets); 398 399 /** 400 * A simple version for batch get. It will fail if there are any failures and you will get the 401 * whole result list at once if the operation is succeeded. 402 * @param gets The objects that specify what data to fetch and from which rows. 403 * @return A {@link CompletableFuture} that wrapper the result list. 404 */ 405 default CompletableFuture<List<Result>> getAll(List<Get> gets) { 406 return allOf(get(gets)); 407 } 408 409 /** 410 * Puts some data in the table, in batch. 411 * @param puts The list of mutations to apply. 412 * @return A list of {@link CompletableFuture}s that represent the result for each put. 413 */ 414 List<CompletableFuture<Void>> put(List<Put> puts); 415 416 /** 417 * A simple version of batch put. It will fail if there are any failures. 418 * @param puts The list of mutations to apply. 419 * @return A {@link CompletableFuture} that always returns null when complete normally. 420 */ 421 default CompletableFuture<Void> putAll(List<Put> puts) { 422 return allOf(put(puts)).thenApply(r -> null); 423 } 424 425 /** 426 * Deletes the specified cells/rows in bulk. 427 * @param deletes list of things to delete. 428 * @return A list of {@link CompletableFuture}s that represent the result for each delete. 429 */ 430 List<CompletableFuture<Void>> delete(List<Delete> deletes); 431 432 /** 433 * A simple version of batch delete. It will fail if there are any failures. 434 * @param deletes list of things to delete. 435 * @return A {@link CompletableFuture} that always returns null when complete normally. 436 */ 437 default CompletableFuture<Void> deleteAll(List<Delete> deletes) { 438 return allOf(delete(deletes)).thenApply(r -> null); 439 } 440 441 /** 442 * Method that does a batch call on Deletes, Gets, Puts, Increments, Appends and RowMutations. The 443 * ordering of execution of the actions is not defined. Meaning if you do a Put and a Get in the 444 * same {@link #batch} call, you will not necessarily be guaranteed that the Get returns what the 445 * Put had put. 446 * @param actions list of Get, Put, Delete, Increment, Append, and RowMutations objects 447 * @return A list of {@link CompletableFuture}s that represent the result for each action. 448 */ 449 <T> List<CompletableFuture<T>> batch(List<? extends Row> actions); 450 451 /** 452 * A simple version of batch. It will fail if there are any failures and you will get the whole 453 * result list at once if the operation is succeeded. 454 * @param actions list of Get, Put, Delete, Increment, Append and RowMutations objects 455 * @return A list of the result for the actions. Wrapped by a {@link CompletableFuture}. 456 */ 457 default <T> CompletableFuture<List<T>> batchAll(List<? extends Row> actions) { 458 return allOf(batch(actions)); 459 } 460 461 /** 462 * Execute the given coprocessor call on the region which contains the given {@code row}. 463 * <p> 464 * The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a 465 * one line lambda expression, like: 466 * 467 * <pre> 468 * <code> 469 * channel -> xxxService.newStub(channel) 470 * </code> 471 * </pre> 472 * 473 * @param stubMaker a delegation to the actual {@code newStub} call. 474 * @param callable a delegation to the actual protobuf rpc call. See the comment of 475 * {@link ServiceCaller} for more details. 476 * @param row The row key used to identify the remote region location 477 * @param <S> the type of the asynchronous stub 478 * @param <R> the type of the return value 479 * @return the return value of the protobuf rpc call, wrapped by a {@link CompletableFuture}. 480 * @see ServiceCaller 481 */ 482 <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 483 ServiceCaller<S, R> callable, byte[] row); 484 485 /** 486 * The callback when we want to execute a coprocessor call on a range of regions. 487 * <p> 488 * As the locating itself also takes some time, the implementation may want to send rpc calls on 489 * the fly, which means we do not know how many regions we have when we get the return value of 490 * the rpc calls, so we need an {@link #onComplete()} which is used to tell you that we have 491 * passed all the return values to you(through the {@link #onRegionComplete(RegionInfo, Object)} 492 * or {@link #onRegionError(RegionInfo, Throwable)} calls), i.e, there will be no 493 * {@link #onRegionComplete(RegionInfo, Object)} or {@link #onRegionError(RegionInfo, Throwable)} 494 * calls in the future. 495 * <p> 496 * Here is a pseudo code to describe a typical implementation of a range coprocessor service 497 * method to help you better understand how the {@link CoprocessorCallback} will be called. The 498 * {@code callback} in the pseudo code is our {@link CoprocessorCallback}. And notice that the 499 * {@code whenComplete} is {@code CompletableFuture.whenComplete}. 500 * 501 * <pre> 502 * locateThenCall(byte[] row) { 503 * locate(row).whenComplete((location, locateError) -> { 504 * if (locateError != null) { 505 * callback.onError(locateError); 506 * return; 507 * } 508 * incPendingCall(); 509 * region = location.getRegion(); 510 * if (region.getEndKey() > endKey) { 511 * locateEnd = true; 512 * } else { 513 * locateThenCall(region.getEndKey()); 514 * } 515 * sendCall().whenComplete((resp, error) -> { 516 * if (error != null) { 517 * callback.onRegionError(region, error); 518 * } else { 519 * callback.onRegionComplete(region, resp); 520 * } 521 * if (locateEnd && decPendingCallAndGet() == 0) { 522 * callback.onComplete(); 523 * } 524 * }); 525 * }); 526 * } 527 * </pre> 528 */ 529 @InterfaceAudience.Public 530 interface CoprocessorCallback<R> { 531 532 /** 533 * @param region the region that the response belongs to 534 * @param resp the response of the coprocessor call 535 */ 536 void onRegionComplete(RegionInfo region, R resp); 537 538 /** 539 * @param region the region that the error belongs to 540 * @param error the response error of the coprocessor call 541 */ 542 void onRegionError(RegionInfo region, Throwable error); 543 544 /** 545 * Indicate that all responses of the regions have been notified by calling 546 * {@link #onRegionComplete(RegionInfo, Object)} or 547 * {@link #onRegionError(RegionInfo, Throwable)}. 548 */ 549 void onComplete(); 550 551 /** 552 * Indicate that we got an error which does not belong to any regions. Usually a locating error. 553 */ 554 void onError(Throwable error); 555 } 556 557 /** 558 * Helper class for sending coprocessorService request that executes a coprocessor call on regions 559 * which are covered by a range. 560 * <p> 561 * If {@code fromRow} is not specified the selection will start with the first table region. If 562 * {@code toRow} is not specified the selection will continue through the last table region. 563 * @param <S> the type of the protobuf Service you want to call. 564 * @param <R> the type of the return value. 565 */ 566 interface CoprocessorServiceBuilder<S, R> { 567 568 /** 569 * @param startKey start region selection with region containing this row, inclusive. 570 */ 571 default CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey) { 572 return fromRow(startKey, true); 573 } 574 575 /** 576 * @param startKey start region selection with region containing this row 577 * @param inclusive whether to include the startKey 578 */ 579 CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey, boolean inclusive); 580 581 /** 582 * @param endKey select regions up to and including the region containing this row, exclusive. 583 */ 584 default CoprocessorServiceBuilder<S, R> toRow(byte[] endKey) { 585 return toRow(endKey, false); 586 } 587 588 /** 589 * @param endKey select regions up to and including the region containing this row 590 * @param inclusive whether to include the endKey 591 */ 592 CoprocessorServiceBuilder<S, R> toRow(byte[] endKey, boolean inclusive); 593 594 /** 595 * Execute the coprocessorService request. You can get the response through the 596 * {@link CoprocessorCallback}. 597 */ 598 void execute(); 599 } 600 601 /** 602 * Execute a coprocessor call on the regions which are covered by a range. 603 * <p> 604 * Use the returned {@link CoprocessorServiceBuilder} construct your request and then execute it. 605 * <p> 606 * The {@code stubMaker} is just a delegation to the {@code xxxService.newStub} call. Usually it 607 * is only a one line lambda expression, like: 608 * 609 * <pre> 610 * <code> 611 * channel -> xxxService.newStub(channel) 612 * </code> 613 * </pre> 614 * 615 * @param stubMaker a delegation to the actual {@code newStub} call. 616 * @param callable a delegation to the actual protobuf rpc call. See the comment of 617 * {@link ServiceCaller} for more details. 618 * @param callback callback to get the response. See the comment of {@link CoprocessorCallback} 619 * for more details. 620 */ 621 <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(Function<RpcChannel, S> stubMaker, 622 ServiceCaller<S, R> callable, CoprocessorCallback<R> callback); 623}