001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static java.util.stream.Collectors.toList; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnly; 022import static org.apache.hadoop.hbase.util.FutureUtils.allOf; 023 024import java.time.Duration; 025import java.util.Collections; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.CompletableFuture; 029import java.util.concurrent.TimeUnit; 030import java.util.function.Function; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.CompareOperator; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.filter.Filter; 035import org.apache.hadoop.hbase.io.TimeRange; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.yetus.audience.InterfaceAudience; 038 039import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 040import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; 041 042/** 043 * The interface for asynchronous version of Table. Obtain an instance from a 044 * {@link AsyncConnection}. 045 * <p> 046 * The implementation is required to be thread safe. 047 * <p> 048 * Usually the implementation will not throw any exception directly. You need to get the exception 049 * from the returned {@link CompletableFuture}. 050 * @since 2.0.0 051 */ 052@InterfaceAudience.Public 053public interface AsyncTable<C extends ScanResultConsumerBase> { 054 055 /** 056 * Gets the fully qualified table name instance of this table. 057 */ 058 TableName getName(); 059 060 /** 061 * Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance. 062 * <p> 063 * The reference returned is not a copy, so any change made to it will affect this instance. 064 */ 065 Configuration getConfiguration(); 066 067 /** 068 * Gets the {@link TableDescriptor} for this table. 069 */ 070 CompletableFuture<TableDescriptor> getDescriptor(); 071 072 /** 073 * Gets the {@link AsyncTableRegionLocator} for this table. 074 */ 075 AsyncTableRegionLocator getRegionLocator(); 076 077 /** 078 * Get timeout of each rpc request in this Table instance. It will be overridden by a more 079 * specific rpc timeout config such as readRpcTimeout or writeRpcTimeout. 080 * @see #getReadRpcTimeout(TimeUnit) 081 * @see #getWriteRpcTimeout(TimeUnit) 082 * @param unit the unit of time the timeout to be represented in 083 * @return rpc timeout in the specified time unit 084 */ 085 long getRpcTimeout(TimeUnit unit); 086 087 /** 088 * Get timeout of each rpc read request in this Table instance. 089 * @param unit the unit of time the timeout to be represented in 090 * @return read rpc timeout in the specified time unit 091 */ 092 long getReadRpcTimeout(TimeUnit unit); 093 094 /** 095 * Get timeout of each rpc write request in this Table instance. 096 * @param unit the unit of time the timeout to be represented in 097 * @return write rpc timeout in the specified time unit 098 */ 099 long getWriteRpcTimeout(TimeUnit unit); 100 101 /** 102 * Get timeout of each operation in Table instance. 103 * @param unit the unit of time the timeout to be represented in 104 * @return operation rpc timeout in the specified time unit 105 */ 106 long getOperationTimeout(TimeUnit unit); 107 108 /** 109 * Get the timeout of a single operation in a scan. It works like operation timeout for other 110 * operations. 111 * @param unit the unit of time the timeout to be represented in 112 * @return scan rpc timeout in the specified time unit 113 */ 114 long getScanTimeout(TimeUnit unit); 115 116 /** 117 * Get the map of request attributes 118 * @return a map of request attributes supplied by the client 119 */ 120 default Map<String, byte[]> getRequestAttributes() { 121 return Collections.emptyMap(); 122 } 123 124 /** 125 * Test for the existence of columns in the table, as specified by the Get. 126 * <p> 127 * This will return true if the Get matches one or more keys, false if not. 128 * <p> 129 * This is a server-side call so it prevents any data from being transfered to the client. 130 * @return true if the specified Get matches one or more keys, false if not. The return value will 131 * be wrapped by a {@link CompletableFuture}. 132 */ 133 default CompletableFuture<Boolean> exists(Get get) { 134 return get(toCheckExistenceOnly(get)).thenApply(r -> r.getExists()); 135 } 136 137 /** 138 * Extracts certain cells from a given row. 139 * @param get The object that specifies what data to fetch and from which row. 140 * @return The data coming from the specified row, if it exists. If the row specified doesn't 141 * exist, the {@link Result} instance returned won't contain any 142 * {@link org.apache.hadoop.hbase.KeyValue}, as indicated by {@link Result#isEmpty()}. The 143 * return value will be wrapped by a {@link CompletableFuture}. 144 */ 145 CompletableFuture<Result> get(Get get); 146 147 /** 148 * Puts some data to the table. 149 * @param put The data to put. 150 * @return A {@link CompletableFuture} that always returns null when complete normally. 151 */ 152 CompletableFuture<Void> put(Put put); 153 154 /** 155 * Deletes the specified cells/row. 156 * @param delete The object that specifies what to delete. 157 * @return A {@link CompletableFuture} that always returns null when complete normally. 158 */ 159 CompletableFuture<Void> delete(Delete delete); 160 161 /** 162 * Appends values to one or more columns within a single row. 163 * <p> 164 * This operation does not appear atomic to readers. Appends are done under a single row lock, so 165 * write operations to a row are synchronized, but readers do not take row locks so get and scan 166 * operations can see this operation partially completed. 167 * @param append object that specifies the columns and amounts to be used for the increment 168 * operations 169 * @return values of columns after the append operation (maybe null). The return value will be 170 * wrapped by a {@link CompletableFuture}. 171 */ 172 CompletableFuture<Result> append(Append append); 173 174 /** 175 * Increments one or more columns within a single row. 176 * <p> 177 * This operation does not appear atomic to readers. Increments are done under a single row lock, 178 * so write operations to a row are synchronized, but readers do not take row locks so get and 179 * scan operations can see this operation partially completed. 180 * @param increment object that specifies the columns and amounts to be used for the increment 181 * operations 182 * @return values of columns after the increment. The return value will be wrapped by a 183 * {@link CompletableFuture}. 184 */ 185 CompletableFuture<Result> increment(Increment increment); 186 187 /** 188 * See {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)} 189 * <p> 190 * The {@link Durability} is defaulted to {@link Durability#SYNC_WAL}. 191 * @param row The row that contains the cell to increment. 192 * @param family The column family of the cell to increment. 193 * @param qualifier The column qualifier of the cell to increment. 194 * @param amount The amount to increment the cell with (or decrement, if the amount is 195 * negative). 196 * @return The new value, post increment. The return value will be wrapped by a 197 * {@link CompletableFuture}. 198 */ 199 default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, 200 long amount) { 201 return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL); 202 } 203 204 /** 205 * Atomically increments a column value. If the column value already exists and is not a 206 * big-endian long, this could throw an exception. If the column value does not yet exist it is 207 * initialized to <code>amount</code> and written to the specified column. 208 * <p> 209 * Setting durability to {@link Durability#SKIP_WAL} means that in a fail scenario you will lose 210 * any increments that have not been flushed. 211 * @param row The row that contains the cell to increment. 212 * @param family The column family of the cell to increment. 213 * @param qualifier The column qualifier of the cell to increment. 214 * @param amount The amount to increment the cell with (or decrement, if the amount is 215 * negative). 216 * @param durability The persistence guarantee for this increment. 217 * @return The new value, post increment. The return value will be wrapped by a 218 * {@link CompletableFuture}. 219 */ 220 default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, 221 long amount, Durability durability) { 222 Preconditions.checkNotNull(row, "row is null"); 223 Preconditions.checkNotNull(family, "family is null"); 224 return increment( 225 new Increment(row).addColumn(family, qualifier, amount).setDurability(durability)) 226 .thenApply(r -> Bytes.toLong(r.getValue(family, qualifier))); 227 } 228 229 /** 230 * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it 231 * adds the Put/Delete/RowMutations. 232 * <p> 233 * Use the returned {@link CheckAndMutateBuilder} to construct your request and then execute it. 234 * This is a fluent style API, the code is like: 235 * 236 * <pre> 237 * table.checkAndMutate(row, family).qualifier(qualifier).ifNotExists().thenPut(put) 238 * .thenAccept(succ -> { 239 * if (succ) { 240 * System.out.println("Check and put succeeded"); 241 * } else { 242 * System.out.println("Check and put failed"); 243 * } 244 * }); 245 * </pre> 246 * 247 * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it 248 * any more. 249 */ 250 @Deprecated 251 CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family); 252 253 /** 254 * A helper class for sending checkAndMutate request. 255 * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it 256 * any more. 257 */ 258 @Deprecated 259 interface CheckAndMutateBuilder { 260 261 /** 262 * Match a qualifier. 263 * @param qualifier column qualifier to check. 264 */ 265 CheckAndMutateBuilder qualifier(byte[] qualifier); 266 267 /** 268 * Match a timerange. 269 * @param timeRange time range to check. 270 */ 271 CheckAndMutateBuilder timeRange(TimeRange timeRange); 272 273 /** 274 * Check for lack of column. 275 */ 276 CheckAndMutateBuilder ifNotExists(); 277 278 /** 279 * Check for equality. 280 * @param value the expected value 281 */ 282 default CheckAndMutateBuilder ifEquals(byte[] value) { 283 return ifMatches(CompareOperator.EQUAL, value); 284 } 285 286 /** 287 * Compare a value 288 * @param compareOp comparison operator to use 289 * @param value the expected value 290 */ 291 CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value); 292 293 /** 294 * Specify a Put to commit if the check succeeds. 295 * @param put data to put if check succeeds 296 * @return {@code true} if the new put was executed, {@code false} otherwise. The return value 297 * will be wrapped by a {@link CompletableFuture}. 298 */ 299 CompletableFuture<Boolean> thenPut(Put put); 300 301 /** 302 * Specify a Delete to commit if the check succeeds. 303 * @param delete data to delete if check succeeds 304 * @return {@code true} if the new delete was executed, {@code false} otherwise. The return 305 * value will be wrapped by a {@link CompletableFuture}. 306 */ 307 CompletableFuture<Boolean> thenDelete(Delete delete); 308 309 /** 310 * Specify a RowMutations to commit if the check succeeds. 311 * @param mutation mutations to perform if check succeeds 312 * @return true if the new mutation was executed, false otherwise. The return value will be 313 * wrapped by a {@link CompletableFuture}. 314 */ 315 CompletableFuture<Boolean> thenMutate(RowMutations mutation); 316 } 317 318 /** 319 * Atomically checks if a row matches the specified filter. If it does, it adds the 320 * Put/Delete/RowMutations. 321 * <p> 322 * Use the returned {@link CheckAndMutateWithFilterBuilder} to construct your request and then 323 * execute it. This is a fluent style API, the code is like: 324 * 325 * <pre> 326 * table.checkAndMutate(row, filter).thenPut(put).thenAccept(succ -> { 327 * if (succ) { 328 * System.out.println("Check and put succeeded"); 329 * } else { 330 * System.out.println("Check and put failed"); 331 * } 332 * }); 333 * </pre> 334 * 335 * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it 336 * any more. 337 */ 338 @Deprecated 339 CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter); 340 341 /** 342 * A helper class for sending checkAndMutate request with a filter. 343 * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it 344 * any more. 345 */ 346 @Deprecated 347 interface CheckAndMutateWithFilterBuilder { 348 349 /** 350 * Match a timerange. 351 * @param timeRange time range to check. 352 */ 353 CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange); 354 355 /** 356 * Specify a Put to commit if the check succeeds. 357 * @param put data to put if check succeeds 358 * @return {@code true} if the new put was executed, {@code false} otherwise. The return value 359 * will be wrapped by a {@link CompletableFuture}. 360 */ 361 CompletableFuture<Boolean> thenPut(Put put); 362 363 /** 364 * Specify a Delete to commit if the check succeeds. 365 * @param delete data to delete if check succeeds 366 * @return {@code true} if the new delete was executed, {@code false} otherwise. The return 367 * value will be wrapped by a {@link CompletableFuture}. 368 */ 369 CompletableFuture<Boolean> thenDelete(Delete delete); 370 371 /** 372 * Specify a RowMutations to commit if the check succeeds. 373 * @param mutation mutations to perform if check succeeds 374 * @return true if the new mutation was executed, false otherwise. The return value will be 375 * wrapped by a {@link CompletableFuture}. 376 */ 377 CompletableFuture<Boolean> thenMutate(RowMutations mutation); 378 } 379 380 /** 381 * checkAndMutate that atomically checks if a row matches the specified condition. If it does, it 382 * performs the specified action. 383 * @param checkAndMutate The CheckAndMutate object. 384 * @return A {@link CompletableFuture}s that represent the result for the CheckAndMutate. 385 */ 386 CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate); 387 388 /** 389 * Batch version of checkAndMutate. The specified CheckAndMutates are batched only in the sense 390 * that they are sent to a RS in one RPC, but each CheckAndMutate operation is still executed 391 * atomically (and thus, each may fail independently of others). 392 * @param checkAndMutates The list of CheckAndMutate. 393 * @return A list of {@link CompletableFuture}s that represent the result for each CheckAndMutate. 394 */ 395 List<CompletableFuture<CheckAndMutateResult>> 396 checkAndMutate(List<CheckAndMutate> checkAndMutates); 397 398 /** 399 * A simple version of batch checkAndMutate. It will fail if there are any failures. 400 * @param checkAndMutates The list of rows to apply. 401 * @return A {@link CompletableFuture} that wrapper the result list. 402 */ 403 default CompletableFuture<List<CheckAndMutateResult>> 404 checkAndMutateAll(List<CheckAndMutate> checkAndMutates) { 405 return allOf(checkAndMutate(checkAndMutates)); 406 } 407 408 /** 409 * Performs multiple mutations atomically on a single row. Currently {@link Put} and 410 * {@link Delete} are supported. 411 * @param mutation object that specifies the set of mutations to perform atomically 412 * @return A {@link CompletableFuture} that returns results of Increment/Append operations 413 */ 414 CompletableFuture<Result> mutateRow(RowMutations mutation); 415 416 /** 417 * The scan API uses the observer pattern. 418 * @param scan A configured {@link Scan} object. 419 * @param consumer the consumer used to receive results. 420 * @see ScanResultConsumer 421 * @see AdvancedScanResultConsumer 422 */ 423 void scan(Scan scan, C consumer); 424 425 /** 426 * Gets a scanner on the current table for the given family. 427 * @param family The column family to scan. 428 * @return A scanner. 429 */ 430 default ResultScanner getScanner(byte[] family) { 431 return getScanner(new Scan().addFamily(family)); 432 } 433 434 /** 435 * Gets a scanner on the current table for the given family and qualifier. 436 * @param family The column family to scan. 437 * @param qualifier The column qualifier to scan. 438 * @return A scanner. 439 */ 440 default ResultScanner getScanner(byte[] family, byte[] qualifier) { 441 return getScanner(new Scan().addColumn(family, qualifier)); 442 } 443 444 /** 445 * Returns a scanner on the current table as specified by the {@link Scan} object. 446 * @param scan A configured {@link Scan} object. 447 * @return A scanner. 448 */ 449 ResultScanner getScanner(Scan scan); 450 451 /** 452 * Return all the results that match the given scan object. 453 * <p> 454 * Notice that usually you should use this method with a {@link Scan} object that has limit set. 455 * For example, if you want to get the closest row after a given row, you could do this: 456 * <p> 457 * 458 * <pre> 459 * table.scanAll(new Scan().withStartRow(row, false).setLimit(1)).thenAccept(results -> { 460 * if (results.isEmpty()) { 461 * System.out.println("No row after " + Bytes.toStringBinary(row)); 462 * } else { 463 * System.out.println("The closest row after " + Bytes.toStringBinary(row) + " is " 464 * + Bytes.toStringBinary(results.stream().findFirst().get().getRow())); 465 * } 466 * }); 467 * </pre> 468 * <p> 469 * If your result set is very large, you should use other scan method to get a scanner or use 470 * callback to process the results. They will do chunking to prevent OOM. The scanAll method will 471 * fetch all the results and store them in a List and then return the list to you. 472 * <p> 473 * The scan metrics will be collected background if you enable it but you have no way to get it. 474 * Usually you can get scan metrics from {@code ResultScanner}, or through 475 * {@code ScanResultConsumer.onScanMetricsCreated} but this method only returns a list of results. 476 * So if you really care about scan metrics then you'd better use other scan methods which return 477 * a {@code ResultScanner} or let you pass in a {@code ScanResultConsumer}. There is no 478 * performance difference between these scan methods so do not worry. 479 * @param scan A configured {@link Scan} object. So if you use this method to fetch a really large 480 * result set, it is likely to cause OOM. 481 * @return The results of this small scan operation. The return value will be wrapped by a 482 * {@link CompletableFuture}. 483 */ 484 CompletableFuture<List<Result>> scanAll(Scan scan); 485 486 /** 487 * Test for the existence of columns in the table, as specified by the Gets. 488 * <p> 489 * This will return a list of booleans. Each value will be true if the related Get matches one or 490 * more keys, false if not. 491 * <p> 492 * This is a server-side call so it prevents any data from being transferred to the client. 493 * @param gets the Gets 494 * @return A list of {@link CompletableFuture}s that represent the existence for each get. 495 */ 496 default List<CompletableFuture<Boolean>> exists(List<Get> gets) { 497 return get(toCheckExistenceOnly(gets)).stream() 498 .<CompletableFuture<Boolean>> map(f -> f.thenApply(r -> r.getExists())).collect(toList()); 499 } 500 501 /** 502 * A simple version for batch exists. It will fail if there are any failures and you will get the 503 * whole result boolean list at once if the operation is succeeded. 504 * @param gets the Gets 505 * @return A {@link CompletableFuture} that wrapper the result boolean list. 506 */ 507 default CompletableFuture<List<Boolean>> existsAll(List<Get> gets) { 508 return allOf(exists(gets)); 509 } 510 511 /** 512 * Extracts certain cells from the given rows, in batch. 513 * <p> 514 * Notice that you may not get all the results with this function, which means some of the 515 * returned {@link CompletableFuture}s may succeed while some of the other returned 516 * {@link CompletableFuture}s may fail. 517 * @param gets The objects that specify what data to fetch and from which rows. 518 * @return A list of {@link CompletableFuture}s that represent the result for each get. 519 */ 520 List<CompletableFuture<Result>> get(List<Get> gets); 521 522 /** 523 * A simple version for batch get. It will fail if there are any failures and you will get the 524 * whole result list at once if the operation is succeeded. 525 * @param gets The objects that specify what data to fetch and from which rows. 526 * @return A {@link CompletableFuture} that wrapper the result list. 527 */ 528 default CompletableFuture<List<Result>> getAll(List<Get> gets) { 529 return allOf(get(gets)); 530 } 531 532 /** 533 * Puts some data in the table, in batch. 534 * @param puts The list of mutations to apply. 535 * @return A list of {@link CompletableFuture}s that represent the result for each put. 536 */ 537 List<CompletableFuture<Void>> put(List<Put> puts); 538 539 /** 540 * A simple version of batch put. It will fail if there are any failures. 541 * @param puts The list of mutations to apply. 542 * @return A {@link CompletableFuture} that always returns null when complete normally. 543 */ 544 default CompletableFuture<Void> putAll(List<Put> puts) { 545 return allOf(put(puts)).thenApply(r -> null); 546 } 547 548 /** 549 * Deletes the specified cells/rows in bulk. 550 * @param deletes list of things to delete. 551 * @return A list of {@link CompletableFuture}s that represent the result for each delete. 552 */ 553 List<CompletableFuture<Void>> delete(List<Delete> deletes); 554 555 /** 556 * A simple version of batch delete. It will fail if there are any failures. 557 * @param deletes list of things to delete. 558 * @return A {@link CompletableFuture} that always returns null when complete normally. 559 */ 560 default CompletableFuture<Void> deleteAll(List<Delete> deletes) { 561 return allOf(delete(deletes)).thenApply(r -> null); 562 } 563 564 /** 565 * Method that does a batch call on Deletes, Gets, Puts, Increments, Appends and RowMutations. The 566 * ordering of execution of the actions is not defined. Meaning if you do a Put and a Get in the 567 * same {@link #batch} call, you will not necessarily be guaranteed that the Get returns what the 568 * Put had put. 569 * @param actions list of Get, Put, Delete, Increment, Append, and RowMutations objects 570 * @return A list of {@link CompletableFuture}s that represent the result for each action. 571 */ 572 <T> List<CompletableFuture<T>> batch(List<? extends Row> actions); 573 574 /** 575 * A simple version of batch. It will fail if there are any failures and you will get the whole 576 * result list at once if the operation is succeeded. 577 * @param actions list of Get, Put, Delete, Increment, Append and RowMutations objects 578 * @return A list of the result for the actions. Wrapped by a {@link CompletableFuture}. 579 */ 580 default <T> CompletableFuture<List<T>> batchAll(List<? extends Row> actions) { 581 return allOf(batch(actions)); 582 } 583 584 /** 585 * Execute the given coprocessor call on the region which contains the given {@code row}. 586 * <p> 587 * The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a 588 * one line lambda expression, like: 589 * 590 * <pre> 591 * channel -> xxxService.newStub(channel) 592 * </pre> 593 * 594 * @param stubMaker a delegation to the actual {@code newStub} call. 595 * @param callable a delegation to the actual protobuf rpc call. See the comment of 596 * {@link ServiceCaller} for more details. 597 * @param row The row key used to identify the remote region location 598 * @param <S> the type of the asynchronous stub 599 * @param <R> the type of the return value 600 * @return the return value of the protobuf rpc call, wrapped by a {@link CompletableFuture}. 601 * @see ServiceCaller 602 */ 603 <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 604 ServiceCaller<S, R> callable, byte[] row); 605 606 /** 607 * The callback when we want to execute a coprocessor call on a range of regions. 608 * <p> 609 * As the locating itself also takes some time, the implementation may want to send rpc calls on 610 * the fly, which means we do not know how many regions we have when we get the return value of 611 * the rpc calls, so we need an {@link #onComplete()} which is used to tell you that we have 612 * passed all the return values to you(through the {@link #onRegionComplete(RegionInfo, Object)} 613 * or {@link #onRegionError(RegionInfo, Throwable)} calls), i.e, there will be no 614 * {@link #onRegionComplete(RegionInfo, Object)} or {@link #onRegionError(RegionInfo, Throwable)} 615 * calls in the future. 616 * <p> 617 * Here is a pseudo code to describe a typical implementation of a range coprocessor service 618 * method to help you better understand how the {@link CoprocessorCallback} will be called. The 619 * {@code callback} in the pseudo code is our {@link CoprocessorCallback}. And notice that the 620 * {@code whenComplete} is {@code CompletableFuture.whenComplete}. 621 * 622 * <pre> 623 * locateThenCall(byte[] row) { 624 * locate(row).whenComplete((location, locateError) -> { 625 * if (locateError != null) { 626 * callback.onError(locateError); 627 * return; 628 * } 629 * incPendingCall(); 630 * region = location.getRegion(); 631 * if (region.getEndKey() > endKey) { 632 * locateEnd = true; 633 * } else { 634 * locateThenCall(region.getEndKey()); 635 * } 636 * sendCall().whenComplete((resp, error) -> { 637 * if (error != null) { 638 * callback.onRegionError(region, error); 639 * } else { 640 * callback.onRegionComplete(region, resp); 641 * } 642 * if (locateEnd && decPendingCallAndGet() == 0) { 643 * callback.onComplete(); 644 * } 645 * }); 646 * }); 647 * } 648 * </pre> 649 */ 650 @InterfaceAudience.Public 651 interface CoprocessorCallback<R> { 652 653 /** 654 * Indicate that the respose of a region is available 655 * @param region the region that the response belongs to 656 * @param resp the response of the coprocessor call 657 */ 658 void onRegionComplete(RegionInfo region, R resp); 659 660 /** 661 * Indicate that the error for a region is available 662 * @param region the region that the error belongs to 663 * @param error the response error of the coprocessor call 664 */ 665 void onRegionError(RegionInfo region, Throwable error); 666 667 /** 668 * Indicate that all responses of the regions have been notified by calling 669 * {@link #onRegionComplete(RegionInfo, Object)} or 670 * {@link #onRegionError(RegionInfo, Throwable)}. 671 */ 672 void onComplete(); 673 674 /** 675 * Indicate that we got an error which does not belong to any regions. Usually a locating error. 676 */ 677 void onError(Throwable error); 678 } 679 680 /** 681 * Some coprocessors may support the idea of "partial results." If for some reason a coprocessor 682 * cannot return all results for a given region in a single response, the client side can be 683 * designed to recognize this and continuing requesting more results until they are completely 684 * accumulated in the client. 685 * <p> 686 * It is up to a particular coprocessor implementation and its corresponding clients to agree on 687 * what it means for results to be incomplete, how this state is communicated, and how multiple 688 * incomplete results are accumulated together. 689 * <p> 690 * Use this callback when you want to execute a coprocessor call on a range of regions, and that 691 * coprocessor may return incomplete results for a given region. See also the docs for 692 * {@link CoprocessorCallback}, which all apply here to its child interface too. 693 */ 694 @InterfaceAudience.Public 695 interface PartialResultCoprocessorCallback<S, R> extends CoprocessorCallback<R> { 696 /** 697 * Subclasses should implement this to tell AsyncTable whether the given response is "final" or 698 * whether the AsyncTable should send another request to the coprocessor to fetch more results 699 * from the given region. This method of fetching more results can be used many times until 700 * there are no more results to fetch from the region. 701 * @param response The response received from the coprocessor 702 * @param region The region the response came from 703 * @return A ServiceCaller object if the response was not final and therefore another request is 704 * required to continuing fetching results. null if no more requests need to be sent to 705 * the region. 706 */ 707 ServiceCaller<S, R> getNextCallable(R response, RegionInfo region); 708 709 /** 710 * Subclasses should implement this such that, when the above method returns non-null, this 711 * method returns the duration that AsyncTable should wait before sending the next request to 712 * the given region. You can use this to create a back-off behavior to reduce load on the 713 * RegionServer. If that's not desired, you can always return {@link Duration.ZERO}. 714 * @param response The response received from the coprocessor 715 * @param region The region the response came from 716 * @return The duration to wait. 717 */ 718 Duration getWaitInterval(R response, RegionInfo region); 719 } 720 721 /** 722 * Helper class for sending coprocessorService request that executes a coprocessor call on regions 723 * which are covered by a range. 724 * <p> 725 * If {@code fromRow} is not specified the selection will start with the first table region. If 726 * {@code toRow} is not specified the selection will continue through the last table region. 727 * @param <S> the type of the protobuf Service you want to call. 728 * @param <R> the type of the return value. 729 */ 730 interface CoprocessorServiceBuilder<S, R> { 731 732 /** 733 * Specify a start row 734 * @param startKey start region selection with region containing this row, inclusive. 735 */ 736 default CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey) { 737 return fromRow(startKey, true); 738 } 739 740 /** 741 * Specify a start row 742 * @param startKey start region selection with region containing this row 743 * @param inclusive whether to include the startKey 744 */ 745 CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey, boolean inclusive); 746 747 /** 748 * Specify a stop row 749 * @param endKey select regions up to and including the region containing this row, exclusive. 750 */ 751 default CoprocessorServiceBuilder<S, R> toRow(byte[] endKey) { 752 return toRow(endKey, false); 753 } 754 755 /** 756 * Specify a stop row 757 * @param endKey select regions up to and including the region containing this row 758 * @param inclusive whether to include the endKey 759 */ 760 CoprocessorServiceBuilder<S, R> toRow(byte[] endKey, boolean inclusive); 761 762 /** 763 * Execute the coprocessorService request. You can get the response through the 764 * {@link CoprocessorCallback}. 765 */ 766 void execute(); 767 } 768 769 /** 770 * Execute a coprocessor call on the regions which are covered by a range. 771 * <p> 772 * Use the returned {@link CoprocessorServiceBuilder} construct your request and then execute it. 773 * <p> 774 * The {@code stubMaker} is just a delegation to the {@code xxxService.newStub} call. Usually it 775 * is only a one line lambda expression, like: 776 * 777 * <pre> 778 * channel -> xxxService.newStub(channel) 779 * </pre> 780 * 781 * @param stubMaker a delegation to the actual {@code newStub} call. 782 * @param callable a delegation to the actual protobuf rpc call. See the comment of 783 * {@link ServiceCaller} for more details. 784 * @param callback callback to get the response. See the comment of {@link CoprocessorCallback} 785 * for more details. 786 */ 787 <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(Function<RpcChannel, S> stubMaker, 788 ServiceCaller<S, R> callable, CoprocessorCallback<R> callback); 789 790 /** 791 * Similar to above. Use when your coprocessor client+endpoint supports partial results. If the 792 * server does not offer partial results, it is still safe to use this, assuming you implement 793 * your {@link PartialResultCoprocessorCallback#getNextCallable(Object, RegionInfo)} correctly. 794 */ 795 <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(Function<RpcChannel, S> stubMaker, 796 ServiceCaller<S, R> callable, PartialResultCoprocessorCallback<S, R> callback); 797}