001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static java.util.stream.Collectors.toList; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.allOf; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnly; 023 024import com.google.protobuf.RpcChannel; 025import java.util.List; 026import java.util.concurrent.CompletableFuture; 027import java.util.concurrent.TimeUnit; 028import java.util.function.Function; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.CompareOperator; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.filter.Filter; 033import org.apache.hadoop.hbase.io.TimeRange; 034import org.apache.hadoop.hbase.util.Bytes; 035import org.apache.yetus.audience.InterfaceAudience; 036 037import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 038 039/** 040 * The interface for asynchronous version of Table. Obtain an instance from a 041 * {@link AsyncConnection}. 042 * <p> 043 * The implementation is required to be thread safe. 044 * <p> 045 * Usually the implementation will not throw any exception directly. You need to get the exception 046 * from the returned {@link CompletableFuture}. 047 * @since 2.0.0 048 */ 049@InterfaceAudience.Public 050public interface AsyncTable<C extends ScanResultConsumerBase> { 051 052 /** 053 * Gets the fully qualified table name instance of this table. 054 */ 055 TableName getName(); 056 057 /** 058 * Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance. 059 * <p> 060 * The reference returned is not a copy, so any change made to it will affect this instance. 061 */ 062 Configuration getConfiguration(); 063 064 /** 065 * Gets the {@link TableDescriptor} for this table. 066 */ 067 CompletableFuture<TableDescriptor> getDescriptor(); 068 069 /** 070 * Gets the {@link AsyncTableRegionLocator} for this table. 071 */ 072 AsyncTableRegionLocator getRegionLocator(); 073 /** 074 * Get timeout of each rpc request in this Table instance. It will be overridden by a more 075 * specific rpc timeout config such as readRpcTimeout or writeRpcTimeout. 076 * @see #getReadRpcTimeout(TimeUnit) 077 * @see #getWriteRpcTimeout(TimeUnit) 078 * @param unit the unit of time the timeout to be represented in 079 * @return rpc timeout in the specified time unit 080 */ 081 long getRpcTimeout(TimeUnit unit); 082 083 /** 084 * Get timeout of each rpc read request in this Table instance. 085 * @param unit the unit of time the timeout to be represented in 086 * @return read rpc timeout in the specified time unit 087 */ 088 long getReadRpcTimeout(TimeUnit unit); 089 090 /** 091 * Get timeout of each rpc write request in this Table instance. 092 * @param unit the unit of time the timeout to be represented in 093 * @return write rpc timeout in the specified time unit 094 */ 095 long getWriteRpcTimeout(TimeUnit unit); 096 097 /** 098 * Get timeout of each operation in Table instance. 099 * @param unit the unit of time the timeout to be represented in 100 * @return operation rpc timeout in the specified time unit 101 */ 102 long getOperationTimeout(TimeUnit unit); 103 104 /** 105 * Get the timeout of a single operation in a scan. It works like operation timeout for other 106 * operations. 107 * @param unit the unit of time the timeout to be represented in 108 * @return scan rpc timeout in the specified time unit 109 */ 110 long getScanTimeout(TimeUnit unit); 111 112 /** 113 * Test for the existence of columns in the table, as specified by the Get. 114 * <p> 115 * This will return true if the Get matches one or more keys, false if not. 116 * <p> 117 * This is a server-side call so it prevents any data from being transfered to the client. 118 * @return true if the specified Get matches one or more keys, false if not. The return value will 119 * be wrapped by a {@link CompletableFuture}. 120 */ 121 default CompletableFuture<Boolean> exists(Get get) { 122 return get(toCheckExistenceOnly(get)).thenApply(r -> r.getExists()); 123 } 124 125 /** 126 * Extracts certain cells from a given row. 127 * @param get The object that specifies what data to fetch and from which row. 128 * @return The data coming from the specified row, if it exists. If the row specified doesn't 129 * exist, the {@link Result} instance returned won't contain any 130 * {@link org.apache.hadoop.hbase.KeyValue}, as indicated by {@link Result#isEmpty()}. The 131 * return value will be wrapped by a {@link CompletableFuture}. 132 */ 133 CompletableFuture<Result> get(Get get); 134 135 /** 136 * Puts some data to the table. 137 * @param put The data to put. 138 * @return A {@link CompletableFuture} that always returns null when complete normally. 139 */ 140 CompletableFuture<Void> put(Put put); 141 142 /** 143 * Deletes the specified cells/row. 144 * @param delete The object that specifies what to delete. 145 * @return A {@link CompletableFuture} that always returns null when complete normally. 146 */ 147 CompletableFuture<Void> delete(Delete delete); 148 149 /** 150 * Appends values to one or more columns within a single row. 151 * <p> 152 * This operation does not appear atomic to readers. Appends are done under a single row lock, so 153 * write operations to a row are synchronized, but readers do not take row locks so get and scan 154 * operations can see this operation partially completed. 155 * @param append object that specifies the columns and amounts to be used for the increment 156 * operations 157 * @return values of columns after the append operation (maybe null). The return value will be 158 * wrapped by a {@link CompletableFuture}. 159 */ 160 CompletableFuture<Result> append(Append append); 161 162 /** 163 * Increments one or more columns within a single row. 164 * <p> 165 * This operation does not appear atomic to readers. Increments are done under a single row lock, 166 * so write operations to a row are synchronized, but readers do not take row locks so get and 167 * scan operations can see this operation partially completed. 168 * @param increment object that specifies the columns and amounts to be used for the increment 169 * operations 170 * @return values of columns after the increment. The return value will be wrapped by a 171 * {@link CompletableFuture}. 172 */ 173 CompletableFuture<Result> increment(Increment increment); 174 175 /** 176 * See {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)} 177 * <p> 178 * The {@link Durability} is defaulted to {@link Durability#SYNC_WAL}. 179 * @param row The row that contains the cell to increment. 180 * @param family The column family of the cell to increment. 181 * @param qualifier The column qualifier of the cell to increment. 182 * @param amount The amount to increment the cell with (or decrement, if the amount is negative). 183 * @return The new value, post increment. The return value will be wrapped by a 184 * {@link CompletableFuture}. 185 */ 186 default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, 187 long amount) { 188 return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL); 189 } 190 191 /** 192 * Atomically increments a column value. If the column value already exists and is not a 193 * big-endian long, this could throw an exception. If the column value does not yet exist it is 194 * initialized to <code>amount</code> and written to the specified column. 195 * <p> 196 * Setting durability to {@link Durability#SKIP_WAL} means that in a fail scenario you will lose 197 * any increments that have not been flushed. 198 * @param row The row that contains the cell to increment. 199 * @param family The column family of the cell to increment. 200 * @param qualifier The column qualifier of the cell to increment. 201 * @param amount The amount to increment the cell with (or decrement, if the amount is negative). 202 * @param durability The persistence guarantee for this increment. 203 * @return The new value, post increment. The return value will be wrapped by a 204 * {@link CompletableFuture}. 205 */ 206 default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, 207 long amount, Durability durability) { 208 Preconditions.checkNotNull(row, "row is null"); 209 Preconditions.checkNotNull(family, "family is null"); 210 return increment( 211 new Increment(row).addColumn(family, qualifier, amount).setDurability(durability)) 212 .thenApply(r -> Bytes.toLong(r.getValue(family, qualifier))); 213 } 214 215 /** 216 * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it 217 * adds the Put/Delete/RowMutations. 218 * <p> 219 * Use the returned {@link CheckAndMutateBuilder} to construct your request and then execute it. 220 * This is a fluent style API, the code is like: 221 * 222 * <pre> 223 * <code> 224 * table.checkAndMutate(row, family).qualifier(qualifier).ifNotExists().thenPut(put) 225 * .thenAccept(succ -> { 226 * if (succ) { 227 * System.out.println("Check and put succeeded"); 228 * } else { 229 * System.out.println("Check and put failed"); 230 * } 231 * }); 232 * </code> 233 * </pre> 234 * 235 * @deprecated Since 2.4.0, will be removed in 4.0.0. For internal test use only, do not use it 236 * any more. 237 */ 238 @Deprecated 239 CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family); 240 241 /** 242 * A helper class for sending checkAndMutate request. 243 * 244 * @deprecated Since 2.4.0, will be removed in 4.0.0. For internal test use only, do not use it 245 * any more. 246 */ 247 @Deprecated 248 interface CheckAndMutateBuilder { 249 250 /** 251 * @param qualifier column qualifier to check. 252 */ 253 CheckAndMutateBuilder qualifier(byte[] qualifier); 254 255 /** 256 * @param timeRange time range to check. 257 */ 258 CheckAndMutateBuilder timeRange(TimeRange timeRange); 259 260 /** 261 * Check for lack of column. 262 */ 263 CheckAndMutateBuilder ifNotExists(); 264 265 /** 266 * Check for equality. 267 * @param value the expected value 268 */ 269 default CheckAndMutateBuilder ifEquals(byte[] value) { 270 return ifMatches(CompareOperator.EQUAL, value); 271 } 272 273 /** 274 * @param compareOp comparison operator to use 275 * @param value the expected value 276 */ 277 CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value); 278 279 /** 280 * @param put data to put if check succeeds 281 * @return {@code true} if the new put was executed, {@code false} otherwise. The return value 282 * will be wrapped by a {@link CompletableFuture}. 283 */ 284 CompletableFuture<Boolean> thenPut(Put put); 285 286 /** 287 * @param delete data to delete if check succeeds 288 * @return {@code true} if the new delete was executed, {@code false} otherwise. The return 289 * value will be wrapped by a {@link CompletableFuture}. 290 */ 291 CompletableFuture<Boolean> thenDelete(Delete delete); 292 293 /** 294 * @param mutation mutations to perform if check succeeds 295 * @return true if the new mutation was executed, false otherwise. The return value will be 296 * wrapped by a {@link CompletableFuture}. 297 */ 298 CompletableFuture<Boolean> thenMutate(RowMutations mutation); 299 } 300 301 /** 302 * Atomically checks if a row matches the specified filter. If it does, it adds the 303 * Put/Delete/RowMutations. 304 * <p> 305 * Use the returned {@link CheckAndMutateWithFilterBuilder} to construct your request and then 306 * execute it. This is a fluent style API, the code is like: 307 * 308 * <pre> 309 * <code> 310 * table.checkAndMutate(row, filter).thenPut(put) 311 * .thenAccept(succ -> { 312 * if (succ) { 313 * System.out.println("Check and put succeeded"); 314 * } else { 315 * System.out.println("Check and put failed"); 316 * } 317 * }); 318 * </code> 319 * </pre> 320 * 321 * @deprecated Since 2.4.0, will be removed in 4.0.0. For internal test use only, do not use it 322 * any more. 323 */ 324 @Deprecated 325 CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter); 326 327 /** 328 * A helper class for sending checkAndMutate request with a filter. 329 * 330 * @deprecated Since 2.4.0, will be removed in 4.0.0. For internal test use only, do not use it 331 * any more. 332 */ 333 @Deprecated 334 interface CheckAndMutateWithFilterBuilder { 335 336 /** 337 * @param timeRange time range to check. 338 */ 339 CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange); 340 341 /** 342 * @param put data to put if check succeeds 343 * @return {@code true} if the new put was executed, {@code false} otherwise. The return value 344 * will be wrapped by a {@link CompletableFuture}. 345 */ 346 CompletableFuture<Boolean> thenPut(Put put); 347 348 /** 349 * @param delete data to delete if check succeeds 350 * @return {@code true} if the new delete was executed, {@code false} otherwise. The return 351 * value will be wrapped by a {@link CompletableFuture}. 352 */ 353 CompletableFuture<Boolean> thenDelete(Delete delete); 354 355 /** 356 * @param mutation mutations to perform if check succeeds 357 * @return true if the new mutation was executed, false otherwise. The return value will be 358 * wrapped by a {@link CompletableFuture}. 359 */ 360 CompletableFuture<Boolean> thenMutate(RowMutations mutation); 361 } 362 363 /** 364 * checkAndMutate that atomically checks if a row matches the specified condition. If it does, 365 * it performs the specified action. 366 * 367 * @param checkAndMutate The CheckAndMutate object. 368 * @return A {@link CompletableFuture}s that represent the result for the CheckAndMutate. 369 */ 370 CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate); 371 372 /** 373 * Batch version of checkAndMutate. The specified CheckAndMutates are batched only in the sense 374 * that they are sent to a RS in one RPC, but each CheckAndMutate operation is still executed 375 * atomically (and thus, each may fail independently of others). 376 * 377 * @param checkAndMutates The list of CheckAndMutate. 378 * @return A list of {@link CompletableFuture}s that represent the result for each 379 * CheckAndMutate. 380 */ 381 List<CompletableFuture<CheckAndMutateResult>> checkAndMutate( 382 List<CheckAndMutate> checkAndMutates); 383 384 /** 385 * A simple version of batch checkAndMutate. It will fail if there are any failures. 386 * 387 * @param checkAndMutates The list of rows to apply. 388 * @return A {@link CompletableFuture} that wrapper the result list. 389 */ 390 default CompletableFuture<List<CheckAndMutateResult>> checkAndMutateAll( 391 List<CheckAndMutate> checkAndMutates) { 392 return allOf(checkAndMutate(checkAndMutates)); 393 } 394 395 /** 396 * Performs multiple mutations atomically on a single row. Currently {@link Put} and 397 * {@link Delete} are supported. 398 * @param mutation object that specifies the set of mutations to perform atomically 399 * @return A {@link CompletableFuture} that returns results of Increment/Append operations 400 */ 401 CompletableFuture<Result> mutateRow(RowMutations mutation); 402 403 /** 404 * The scan API uses the observer pattern. 405 * @param scan A configured {@link Scan} object. 406 * @param consumer the consumer used to receive results. 407 * @see ScanResultConsumer 408 * @see AdvancedScanResultConsumer 409 */ 410 void scan(Scan scan, C consumer); 411 412 /** 413 * Gets a scanner on the current table for the given family. 414 * @param family The column family to scan. 415 * @return A scanner. 416 */ 417 default ResultScanner getScanner(byte[] family) { 418 return getScanner(new Scan().addFamily(family)); 419 } 420 421 /** 422 * Gets a scanner on the current table for the given family and qualifier. 423 * @param family The column family to scan. 424 * @param qualifier The column qualifier to scan. 425 * @return A scanner. 426 */ 427 default ResultScanner getScanner(byte[] family, byte[] qualifier) { 428 return getScanner(new Scan().addColumn(family, qualifier)); 429 } 430 431 /** 432 * Returns a scanner on the current table as specified by the {@link Scan} object. 433 * @param scan A configured {@link Scan} object. 434 * @return A scanner. 435 */ 436 ResultScanner getScanner(Scan scan); 437 438 /** 439 * Return all the results that match the given scan object. 440 * <p> 441 * Notice that usually you should use this method with a {@link Scan} object that has limit set. 442 * For example, if you want to get the closest row after a given row, you could do this: 443 * <p> 444 * 445 * <pre> 446 * <code> 447 * table.scanAll(new Scan().withStartRow(row, false).setLimit(1)).thenAccept(results -> { 448 * if (results.isEmpty()) { 449 * System.out.println("No row after " + Bytes.toStringBinary(row)); 450 * } else { 451 * System.out.println("The closest row after " + Bytes.toStringBinary(row) + " is " 452 * + Bytes.toStringBinary(results.stream().findFirst().get().getRow())); 453 * } 454 * }); 455 * </code> 456 * </pre> 457 * <p> 458 * If your result set is very large, you should use other scan method to get a scanner or use 459 * callback to process the results. They will do chunking to prevent OOM. The scanAll method will 460 * fetch all the results and store them in a List and then return the list to you. 461 * <p> 462 * The scan metrics will be collected background if you enable it but you have no way to get it. 463 * Usually you can get scan metrics from {@code ResultScanner}, or through 464 * {@code ScanResultConsumer.onScanMetricsCreated} but this method only returns a list of results. 465 * So if you really care about scan metrics then you'd better use other scan methods which return 466 * a {@code ResultScanner} or let you pass in a {@code ScanResultConsumer}. There is no 467 * performance difference between these scan methods so do not worry. 468 * @param scan A configured {@link Scan} object. So if you use this method to fetch a really large 469 * result set, it is likely to cause OOM. 470 * @return The results of this small scan operation. The return value will be wrapped by a 471 * {@link CompletableFuture}. 472 */ 473 CompletableFuture<List<Result>> scanAll(Scan scan); 474 475 /** 476 * Test for the existence of columns in the table, as specified by the Gets. 477 * <p> 478 * This will return a list of booleans. Each value will be true if the related Get matches one or 479 * more keys, false if not. 480 * <p> 481 * This is a server-side call so it prevents any data from being transferred to the client. 482 * @param gets the Gets 483 * @return A list of {@link CompletableFuture}s that represent the existence for each get. 484 */ 485 default List<CompletableFuture<Boolean>> exists(List<Get> gets) { 486 return get(toCheckExistenceOnly(gets)).stream() 487 .<CompletableFuture<Boolean>> map(f -> f.thenApply(r -> r.getExists())).collect(toList()); 488 } 489 490 /** 491 * A simple version for batch exists. It will fail if there are any failures and you will get the 492 * whole result boolean list at once if the operation is succeeded. 493 * @param gets the Gets 494 * @return A {@link CompletableFuture} that wrapper the result boolean list. 495 */ 496 default CompletableFuture<List<Boolean>> existsAll(List<Get> gets) { 497 return allOf(exists(gets)); 498 } 499 500 /** 501 * Extracts certain cells from the given rows, in batch. 502 * <p> 503 * Notice that you may not get all the results with this function, which means some of the 504 * returned {@link CompletableFuture}s may succeed while some of the other returned 505 * {@link CompletableFuture}s may fail. 506 * @param gets The objects that specify what data to fetch and from which rows. 507 * @return A list of {@link CompletableFuture}s that represent the result for each get. 508 */ 509 List<CompletableFuture<Result>> get(List<Get> gets); 510 511 /** 512 * A simple version for batch get. It will fail if there are any failures and you will get the 513 * whole result list at once if the operation is succeeded. 514 * @param gets The objects that specify what data to fetch and from which rows. 515 * @return A {@link CompletableFuture} that wrapper the result list. 516 */ 517 default CompletableFuture<List<Result>> getAll(List<Get> gets) { 518 return allOf(get(gets)); 519 } 520 521 /** 522 * Puts some data in the table, in batch. 523 * @param puts The list of mutations to apply. 524 * @return A list of {@link CompletableFuture}s that represent the result for each put. 525 */ 526 List<CompletableFuture<Void>> put(List<Put> puts); 527 528 /** 529 * A simple version of batch put. It will fail if there are any failures. 530 * @param puts The list of mutations to apply. 531 * @return A {@link CompletableFuture} that always returns null when complete normally. 532 */ 533 default CompletableFuture<Void> putAll(List<Put> puts) { 534 return allOf(put(puts)).thenApply(r -> null); 535 } 536 537 /** 538 * Deletes the specified cells/rows in bulk. 539 * @param deletes list of things to delete. 540 * @return A list of {@link CompletableFuture}s that represent the result for each delete. 541 */ 542 List<CompletableFuture<Void>> delete(List<Delete> deletes); 543 544 /** 545 * A simple version of batch delete. It will fail if there are any failures. 546 * @param deletes list of things to delete. 547 * @return A {@link CompletableFuture} that always returns null when complete normally. 548 */ 549 default CompletableFuture<Void> deleteAll(List<Delete> deletes) { 550 return allOf(delete(deletes)).thenApply(r -> null); 551 } 552 553 /** 554 * Method that does a batch call on Deletes, Gets, Puts, Increments, Appends and RowMutations. The 555 * ordering of execution of the actions is not defined. Meaning if you do a Put and a Get in the 556 * same {@link #batch} call, you will not necessarily be guaranteed that the Get returns what the 557 * Put had put. 558 * @param actions list of Get, Put, Delete, Increment, Append, and RowMutations objects 559 * @return A list of {@link CompletableFuture}s that represent the result for each action. 560 */ 561 <T> List<CompletableFuture<T>> batch(List<? extends Row> actions); 562 563 /** 564 * A simple version of batch. It will fail if there are any failures and you will get the whole 565 * result list at once if the operation is succeeded. 566 * @param actions list of Get, Put, Delete, Increment, Append and RowMutations objects 567 * @return A list of the result for the actions. Wrapped by a {@link CompletableFuture}. 568 */ 569 default <T> CompletableFuture<List<T>> batchAll(List<? extends Row> actions) { 570 return allOf(batch(actions)); 571 } 572 573 /** 574 * Execute the given coprocessor call on the region which contains the given {@code row}. 575 * <p> 576 * The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a 577 * one line lambda expression, like: 578 * 579 * <pre> 580 * <code> 581 * channel -> xxxService.newStub(channel) 582 * </code> 583 * </pre> 584 * 585 * @param stubMaker a delegation to the actual {@code newStub} call. 586 * @param callable a delegation to the actual protobuf rpc call. See the comment of 587 * {@link ServiceCaller} for more details. 588 * @param row The row key used to identify the remote region location 589 * @param <S> the type of the asynchronous stub 590 * @param <R> the type of the return value 591 * @return the return value of the protobuf rpc call, wrapped by a {@link CompletableFuture}. 592 * @see ServiceCaller 593 */ 594 <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 595 ServiceCaller<S, R> callable, byte[] row); 596 597 /** 598 * The callback when we want to execute a coprocessor call on a range of regions. 599 * <p> 600 * As the locating itself also takes some time, the implementation may want to send rpc calls on 601 * the fly, which means we do not know how many regions we have when we get the return value of 602 * the rpc calls, so we need an {@link #onComplete()} which is used to tell you that we have 603 * passed all the return values to you(through the {@link #onRegionComplete(RegionInfo, Object)} 604 * or {@link #onRegionError(RegionInfo, Throwable)} calls), i.e, there will be no 605 * {@link #onRegionComplete(RegionInfo, Object)} or {@link #onRegionError(RegionInfo, Throwable)} 606 * calls in the future. 607 * <p> 608 * Here is a pseudo code to describe a typical implementation of a range coprocessor service 609 * method to help you better understand how the {@link CoprocessorCallback} will be called. The 610 * {@code callback} in the pseudo code is our {@link CoprocessorCallback}. And notice that the 611 * {@code whenComplete} is {@code CompletableFuture.whenComplete}. 612 * 613 * <pre> 614 * locateThenCall(byte[] row) { 615 * locate(row).whenComplete((location, locateError) -> { 616 * if (locateError != null) { 617 * callback.onError(locateError); 618 * return; 619 * } 620 * incPendingCall(); 621 * region = location.getRegion(); 622 * if (region.getEndKey() > endKey) { 623 * locateEnd = true; 624 * } else { 625 * locateThenCall(region.getEndKey()); 626 * } 627 * sendCall().whenComplete((resp, error) -> { 628 * if (error != null) { 629 * callback.onRegionError(region, error); 630 * } else { 631 * callback.onRegionComplete(region, resp); 632 * } 633 * if (locateEnd && decPendingCallAndGet() == 0) { 634 * callback.onComplete(); 635 * } 636 * }); 637 * }); 638 * } 639 * </pre> 640 */ 641 @InterfaceAudience.Public 642 interface CoprocessorCallback<R> { 643 644 /** 645 * @param region the region that the response belongs to 646 * @param resp the response of the coprocessor call 647 */ 648 void onRegionComplete(RegionInfo region, R resp); 649 650 /** 651 * @param region the region that the error belongs to 652 * @param error the response error of the coprocessor call 653 */ 654 void onRegionError(RegionInfo region, Throwable error); 655 656 /** 657 * Indicate that all responses of the regions have been notified by calling 658 * {@link #onRegionComplete(RegionInfo, Object)} or 659 * {@link #onRegionError(RegionInfo, Throwable)}. 660 */ 661 void onComplete(); 662 663 /** 664 * Indicate that we got an error which does not belong to any regions. Usually a locating error. 665 */ 666 void onError(Throwable error); 667 } 668 669 /** 670 * Helper class for sending coprocessorService request that executes a coprocessor call on regions 671 * which are covered by a range. 672 * <p> 673 * If {@code fromRow} is not specified the selection will start with the first table region. If 674 * {@code toRow} is not specified the selection will continue through the last table region. 675 * @param <S> the type of the protobuf Service you want to call. 676 * @param <R> the type of the return value. 677 */ 678 interface CoprocessorServiceBuilder<S, R> { 679 680 /** 681 * @param startKey start region selection with region containing this row, inclusive. 682 */ 683 default CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey) { 684 return fromRow(startKey, true); 685 } 686 687 /** 688 * @param startKey start region selection with region containing this row 689 * @param inclusive whether to include the startKey 690 */ 691 CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey, boolean inclusive); 692 693 /** 694 * @param endKey select regions up to and including the region containing this row, exclusive. 695 */ 696 default CoprocessorServiceBuilder<S, R> toRow(byte[] endKey) { 697 return toRow(endKey, false); 698 } 699 700 /** 701 * @param endKey select regions up to and including the region containing this row 702 * @param inclusive whether to include the endKey 703 */ 704 CoprocessorServiceBuilder<S, R> toRow(byte[] endKey, boolean inclusive); 705 706 /** 707 * Execute the coprocessorService request. You can get the response through the 708 * {@link CoprocessorCallback}. 709 */ 710 void execute(); 711 } 712 713 /** 714 * Execute a coprocessor call on the regions which are covered by a range. 715 * <p> 716 * Use the returned {@link CoprocessorServiceBuilder} construct your request and then execute it. 717 * <p> 718 * The {@code stubMaker} is just a delegation to the {@code xxxService.newStub} call. Usually it 719 * is only a one line lambda expression, like: 720 * 721 * <pre> 722 * <code> 723 * channel -> xxxService.newStub(channel) 724 * </code> 725 * </pre> 726 * 727 * @param stubMaker a delegation to the actual {@code newStub} call. 728 * @param callable a delegation to the actual protobuf rpc call. See the comment of 729 * {@link ServiceCaller} for more details. 730 * @param callback callback to get the response. See the comment of {@link CoprocessorCallback} 731 * for more details. 732 */ 733 <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(Function<RpcChannel, S> stubMaker, 734 ServiceCaller<S, R> callable, CoprocessorCallback<R> callback); 735}