001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static java.util.stream.Collectors.toList; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.allOf; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnly; 023 024import java.util.List; 025import java.util.concurrent.CompletableFuture; 026import java.util.concurrent.TimeUnit; 027import java.util.function.Function; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.CompareOperator; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.filter.Filter; 032import org.apache.hadoop.hbase.io.TimeRange; 033import org.apache.hadoop.hbase.util.Bytes; 034import org.apache.yetus.audience.InterfaceAudience; 035 036import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 037import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; 038 039/** 040 * The interface for asynchronous version of Table. Obtain an instance from a 041 * {@link AsyncConnection}. 042 * <p> 043 * The implementation is required to be thread safe. 044 * <p> 045 * Usually the implementation will not throw any exception directly. You need to get the exception 046 * from the returned {@link CompletableFuture}. 047 * @since 2.0.0 048 */ 049@InterfaceAudience.Public 050public interface AsyncTable<C extends ScanResultConsumerBase> { 051 052 /** 053 * Gets the fully qualified table name instance of this table. 054 */ 055 TableName getName(); 056 057 /** 058 * Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance. 059 * <p> 060 * The reference returned is not a copy, so any change made to it will affect this instance. 061 */ 062 Configuration getConfiguration(); 063 064 /** 065 * Gets the {@link TableDescriptor} for this table. 066 */ 067 CompletableFuture<TableDescriptor> getDescriptor(); 068 069 /** 070 * Gets the {@link AsyncTableRegionLocator} for this table. 071 */ 072 AsyncTableRegionLocator getRegionLocator(); 073 074 /** 075 * Get timeout of each rpc request in this Table instance. It will be overridden by a more 076 * specific rpc timeout config such as readRpcTimeout or writeRpcTimeout. 077 * @see #getReadRpcTimeout(TimeUnit) 078 * @see #getWriteRpcTimeout(TimeUnit) 079 * @param unit the unit of time the timeout to be represented in 080 * @return rpc timeout in the specified time unit 081 */ 082 long getRpcTimeout(TimeUnit unit); 083 084 /** 085 * Get timeout of each rpc read request in this Table instance. 086 * @param unit the unit of time the timeout to be represented in 087 * @return read rpc timeout in the specified time unit 088 */ 089 long getReadRpcTimeout(TimeUnit unit); 090 091 /** 092 * Get timeout of each rpc write request in this Table instance. 093 * @param unit the unit of time the timeout to be represented in 094 * @return write rpc timeout in the specified time unit 095 */ 096 long getWriteRpcTimeout(TimeUnit unit); 097 098 /** 099 * Get timeout of each operation in Table instance. 100 * @param unit the unit of time the timeout to be represented in 101 * @return operation rpc timeout in the specified time unit 102 */ 103 long getOperationTimeout(TimeUnit unit); 104 105 /** 106 * Get the timeout of a single operation in a scan. It works like operation timeout for other 107 * operations. 108 * @param unit the unit of time the timeout to be represented in 109 * @return scan rpc timeout in the specified time unit 110 */ 111 long getScanTimeout(TimeUnit unit); 112 113 /** 114 * Test for the existence of columns in the table, as specified by the Get. 115 * <p> 116 * This will return true if the Get matches one or more keys, false if not. 117 * <p> 118 * This is a server-side call so it prevents any data from being transfered to the client. 119 * @return true if the specified Get matches one or more keys, false if not. The return value will 120 * be wrapped by a {@link CompletableFuture}. 121 */ 122 default CompletableFuture<Boolean> exists(Get get) { 123 return get(toCheckExistenceOnly(get)).thenApply(r -> r.getExists()); 124 } 125 126 /** 127 * Extracts certain cells from a given row. 128 * @param get The object that specifies what data to fetch and from which row. 129 * @return The data coming from the specified row, if it exists. If the row specified doesn't 130 * exist, the {@link Result} instance returned won't contain any 131 * {@link org.apache.hadoop.hbase.KeyValue}, as indicated by {@link Result#isEmpty()}. The 132 * return value will be wrapped by a {@link CompletableFuture}. 133 */ 134 CompletableFuture<Result> get(Get get); 135 136 /** 137 * Puts some data to the table. 138 * @param put The data to put. 139 * @return A {@link CompletableFuture} that always returns null when complete normally. 140 */ 141 CompletableFuture<Void> put(Put put); 142 143 /** 144 * Deletes the specified cells/row. 145 * @param delete The object that specifies what to delete. 146 * @return A {@link CompletableFuture} that always returns null when complete normally. 147 */ 148 CompletableFuture<Void> delete(Delete delete); 149 150 /** 151 * Appends values to one or more columns within a single row. 152 * <p> 153 * This operation does not appear atomic to readers. Appends are done under a single row lock, so 154 * write operations to a row are synchronized, but readers do not take row locks so get and scan 155 * operations can see this operation partially completed. 156 * @param append object that specifies the columns and amounts to be used for the increment 157 * operations 158 * @return values of columns after the append operation (maybe null). The return value will be 159 * wrapped by a {@link CompletableFuture}. 160 */ 161 CompletableFuture<Result> append(Append append); 162 163 /** 164 * Increments one or more columns within a single row. 165 * <p> 166 * This operation does not appear atomic to readers. Increments are done under a single row lock, 167 * so write operations to a row are synchronized, but readers do not take row locks so get and 168 * scan operations can see this operation partially completed. 169 * @param increment object that specifies the columns and amounts to be used for the increment 170 * operations 171 * @return values of columns after the increment. The return value will be wrapped by a 172 * {@link CompletableFuture}. 173 */ 174 CompletableFuture<Result> increment(Increment increment); 175 176 /** 177 * See {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)} 178 * <p> 179 * The {@link Durability} is defaulted to {@link Durability#SYNC_WAL}. 180 * @param row The row that contains the cell to increment. 181 * @param family The column family of the cell to increment. 182 * @param qualifier The column qualifier of the cell to increment. 183 * @param amount The amount to increment the cell with (or decrement, if the amount is 184 * negative). 185 * @return The new value, post increment. The return value will be wrapped by a 186 * {@link CompletableFuture}. 187 */ 188 default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, 189 long amount) { 190 return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL); 191 } 192 193 /** 194 * Atomically increments a column value. If the column value already exists and is not a 195 * big-endian long, this could throw an exception. If the column value does not yet exist it is 196 * initialized to <code>amount</code> and written to the specified column. 197 * <p> 198 * Setting durability to {@link Durability#SKIP_WAL} means that in a fail scenario you will lose 199 * any increments that have not been flushed. 200 * @param row The row that contains the cell to increment. 201 * @param family The column family of the cell to increment. 202 * @param qualifier The column qualifier of the cell to increment. 203 * @param amount The amount to increment the cell with (or decrement, if the amount is 204 * negative). 205 * @param durability The persistence guarantee for this increment. 206 * @return The new value, post increment. The return value will be wrapped by a 207 * {@link CompletableFuture}. 208 */ 209 default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, 210 long amount, Durability durability) { 211 Preconditions.checkNotNull(row, "row is null"); 212 Preconditions.checkNotNull(family, "family is null"); 213 return increment( 214 new Increment(row).addColumn(family, qualifier, amount).setDurability(durability)) 215 .thenApply(r -> Bytes.toLong(r.getValue(family, qualifier))); 216 } 217 218 /** 219 * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it 220 * adds the Put/Delete/RowMutations. 221 * <p> 222 * Use the returned {@link CheckAndMutateBuilder} to construct your request and then execute it. 223 * This is a fluent style API, the code is like: 224 * 225 * <pre> 226 * <code> 227 * table.checkAndMutate(row, family).qualifier(qualifier).ifNotExists().thenPut(put) 228 * .thenAccept(succ -> { 229 * if (succ) { 230 * System.out.println("Check and put succeeded"); 231 * } else { 232 * System.out.println("Check and put failed"); 233 * } 234 * }); 235 * </code> 236 * </pre> 237 * 238 * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it 239 * any more. 240 */ 241 @Deprecated 242 CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family); 243 244 /** 245 * A helper class for sending checkAndMutate request. 246 * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it 247 * any more. 248 */ 249 @Deprecated 250 interface CheckAndMutateBuilder { 251 252 /** 253 * @param qualifier column qualifier to check. 254 */ 255 CheckAndMutateBuilder qualifier(byte[] qualifier); 256 257 /** 258 * @param timeRange time range to check. 259 */ 260 CheckAndMutateBuilder timeRange(TimeRange timeRange); 261 262 /** 263 * Check for lack of column. 264 */ 265 CheckAndMutateBuilder ifNotExists(); 266 267 /** 268 * Check for equality. 269 * @param value the expected value 270 */ 271 default CheckAndMutateBuilder ifEquals(byte[] value) { 272 return ifMatches(CompareOperator.EQUAL, value); 273 } 274 275 /** 276 * @param compareOp comparison operator to use 277 * @param value the expected value 278 */ 279 CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value); 280 281 /** 282 * @param put data to put if check succeeds 283 * @return {@code true} if the new put was executed, {@code false} otherwise. The return value 284 * will be wrapped by a {@link CompletableFuture}. 285 */ 286 CompletableFuture<Boolean> thenPut(Put put); 287 288 /** 289 * @param delete data to delete if check succeeds 290 * @return {@code true} if the new delete was executed, {@code false} otherwise. The return 291 * value will be wrapped by a {@link CompletableFuture}. 292 */ 293 CompletableFuture<Boolean> thenDelete(Delete delete); 294 295 /** 296 * @param mutation mutations to perform if check succeeds 297 * @return true if the new mutation was executed, false otherwise. The return value will be 298 * wrapped by a {@link CompletableFuture}. 299 */ 300 CompletableFuture<Boolean> thenMutate(RowMutations mutation); 301 } 302 303 /** 304 * Atomically checks if a row matches the specified filter. If it does, it adds the 305 * Put/Delete/RowMutations. 306 * <p> 307 * Use the returned {@link CheckAndMutateWithFilterBuilder} to construct your request and then 308 * execute it. This is a fluent style API, the code is like: 309 * 310 * <pre> 311 * <code> 312 * table.checkAndMutate(row, filter).thenPut(put) 313 * .thenAccept(succ -> { 314 * if (succ) { 315 * System.out.println("Check and put succeeded"); 316 * } else { 317 * System.out.println("Check and put failed"); 318 * } 319 * }); 320 * </code> 321 * </pre> 322 * 323 * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it 324 * any more. 325 */ 326 @Deprecated 327 CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter); 328 329 /** 330 * A helper class for sending checkAndMutate request with a filter. 331 * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it 332 * any more. 333 */ 334 @Deprecated 335 interface CheckAndMutateWithFilterBuilder { 336 337 /** 338 * @param timeRange time range to check. 339 */ 340 CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange); 341 342 /** 343 * @param put data to put if check succeeds 344 * @return {@code true} if the new put was executed, {@code false} otherwise. The return value 345 * will be wrapped by a {@link CompletableFuture}. 346 */ 347 CompletableFuture<Boolean> thenPut(Put put); 348 349 /** 350 * @param delete data to delete if check succeeds 351 * @return {@code true} if the new delete was executed, {@code false} otherwise. The return 352 * value will be wrapped by a {@link CompletableFuture}. 353 */ 354 CompletableFuture<Boolean> thenDelete(Delete delete); 355 356 /** 357 * @param mutation mutations to perform if check succeeds 358 * @return true if the new mutation was executed, false otherwise. The return value will be 359 * wrapped by a {@link CompletableFuture}. 360 */ 361 CompletableFuture<Boolean> thenMutate(RowMutations mutation); 362 } 363 364 /** 365 * checkAndMutate that atomically checks if a row matches the specified condition. If it does, it 366 * performs the specified action. 367 * @param checkAndMutate The CheckAndMutate object. 368 * @return A {@link CompletableFuture}s that represent the result for the CheckAndMutate. 369 */ 370 CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate); 371 372 /** 373 * Batch version of checkAndMutate. The specified CheckAndMutates are batched only in the sense 374 * that they are sent to a RS in one RPC, but each CheckAndMutate operation is still executed 375 * atomically (and thus, each may fail independently of others). 376 * @param checkAndMutates The list of CheckAndMutate. 377 * @return A list of {@link CompletableFuture}s that represent the result for each CheckAndMutate. 378 */ 379 List<CompletableFuture<CheckAndMutateResult>> 380 checkAndMutate(List<CheckAndMutate> checkAndMutates); 381 382 /** 383 * A simple version of batch checkAndMutate. It will fail if there are any failures. 384 * @param checkAndMutates The list of rows to apply. 385 * @return A {@link CompletableFuture} that wrapper the result list. 386 */ 387 default CompletableFuture<List<CheckAndMutateResult>> 388 checkAndMutateAll(List<CheckAndMutate> checkAndMutates) { 389 return allOf(checkAndMutate(checkAndMutates)); 390 } 391 392 /** 393 * Performs multiple mutations atomically on a single row. Currently {@link Put} and 394 * {@link Delete} are supported. 395 * @param mutation object that specifies the set of mutations to perform atomically 396 * @return A {@link CompletableFuture} that returns results of Increment/Append operations 397 */ 398 CompletableFuture<Result> mutateRow(RowMutations mutation); 399 400 /** 401 * The scan API uses the observer pattern. 402 * @param scan A configured {@link Scan} object. 403 * @param consumer the consumer used to receive results. 404 * @see ScanResultConsumer 405 * @see AdvancedScanResultConsumer 406 */ 407 void scan(Scan scan, C consumer); 408 409 /** 410 * Gets a scanner on the current table for the given family. 411 * @param family The column family to scan. 412 * @return A scanner. 413 */ 414 default ResultScanner getScanner(byte[] family) { 415 return getScanner(new Scan().addFamily(family)); 416 } 417 418 /** 419 * Gets a scanner on the current table for the given family and qualifier. 420 * @param family The column family to scan. 421 * @param qualifier The column qualifier to scan. 422 * @return A scanner. 423 */ 424 default ResultScanner getScanner(byte[] family, byte[] qualifier) { 425 return getScanner(new Scan().addColumn(family, qualifier)); 426 } 427 428 /** 429 * Returns a scanner on the current table as specified by the {@link Scan} object. 430 * @param scan A configured {@link Scan} object. 431 * @return A scanner. 432 */ 433 ResultScanner getScanner(Scan scan); 434 435 /** 436 * Return all the results that match the given scan object. 437 * <p> 438 * Notice that usually you should use this method with a {@link Scan} object that has limit set. 439 * For example, if you want to get the closest row after a given row, you could do this: 440 * <p> 441 * 442 * <pre> 443 * <code> 444 * table.scanAll(new Scan().withStartRow(row, false).setLimit(1)).thenAccept(results -> { 445 * if (results.isEmpty()) { 446 * System.out.println("No row after " + Bytes.toStringBinary(row)); 447 * } else { 448 * System.out.println("The closest row after " + Bytes.toStringBinary(row) + " is " 449 * + Bytes.toStringBinary(results.stream().findFirst().get().getRow())); 450 * } 451 * }); 452 * </code> 453 * </pre> 454 * <p> 455 * If your result set is very large, you should use other scan method to get a scanner or use 456 * callback to process the results. They will do chunking to prevent OOM. The scanAll method will 457 * fetch all the results and store them in a List and then return the list to you. 458 * <p> 459 * The scan metrics will be collected background if you enable it but you have no way to get it. 460 * Usually you can get scan metrics from {@code ResultScanner}, or through 461 * {@code ScanResultConsumer.onScanMetricsCreated} but this method only returns a list of results. 462 * So if you really care about scan metrics then you'd better use other scan methods which return 463 * a {@code ResultScanner} or let you pass in a {@code ScanResultConsumer}. There is no 464 * performance difference between these scan methods so do not worry. 465 * @param scan A configured {@link Scan} object. So if you use this method to fetch a really large 466 * result set, it is likely to cause OOM. 467 * @return The results of this small scan operation. The return value will be wrapped by a 468 * {@link CompletableFuture}. 469 */ 470 CompletableFuture<List<Result>> scanAll(Scan scan); 471 472 /** 473 * Test for the existence of columns in the table, as specified by the Gets. 474 * <p> 475 * This will return a list of booleans. Each value will be true if the related Get matches one or 476 * more keys, false if not. 477 * <p> 478 * This is a server-side call so it prevents any data from being transferred to the client. 479 * @param gets the Gets 480 * @return A list of {@link CompletableFuture}s that represent the existence for each get. 481 */ 482 default List<CompletableFuture<Boolean>> exists(List<Get> gets) { 483 return get(toCheckExistenceOnly(gets)).stream() 484 .<CompletableFuture<Boolean>> map(f -> f.thenApply(r -> r.getExists())).collect(toList()); 485 } 486 487 /** 488 * A simple version for batch exists. It will fail if there are any failures and you will get the 489 * whole result boolean list at once if the operation is succeeded. 490 * @param gets the Gets 491 * @return A {@link CompletableFuture} that wrapper the result boolean list. 492 */ 493 default CompletableFuture<List<Boolean>> existsAll(List<Get> gets) { 494 return allOf(exists(gets)); 495 } 496 497 /** 498 * Extracts certain cells from the given rows, in batch. 499 * <p> 500 * Notice that you may not get all the results with this function, which means some of the 501 * returned {@link CompletableFuture}s may succeed while some of the other returned 502 * {@link CompletableFuture}s may fail. 503 * @param gets The objects that specify what data to fetch and from which rows. 504 * @return A list of {@link CompletableFuture}s that represent the result for each get. 505 */ 506 List<CompletableFuture<Result>> get(List<Get> gets); 507 508 /** 509 * A simple version for batch get. It will fail if there are any failures and you will get the 510 * whole result list at once if the operation is succeeded. 511 * @param gets The objects that specify what data to fetch and from which rows. 512 * @return A {@link CompletableFuture} that wrapper the result list. 513 */ 514 default CompletableFuture<List<Result>> getAll(List<Get> gets) { 515 return allOf(get(gets)); 516 } 517 518 /** 519 * Puts some data in the table, in batch. 520 * @param puts The list of mutations to apply. 521 * @return A list of {@link CompletableFuture}s that represent the result for each put. 522 */ 523 List<CompletableFuture<Void>> put(List<Put> puts); 524 525 /** 526 * A simple version of batch put. It will fail if there are any failures. 527 * @param puts The list of mutations to apply. 528 * @return A {@link CompletableFuture} that always returns null when complete normally. 529 */ 530 default CompletableFuture<Void> putAll(List<Put> puts) { 531 return allOf(put(puts)).thenApply(r -> null); 532 } 533 534 /** 535 * Deletes the specified cells/rows in bulk. 536 * @param deletes list of things to delete. 537 * @return A list of {@link CompletableFuture}s that represent the result for each delete. 538 */ 539 List<CompletableFuture<Void>> delete(List<Delete> deletes); 540 541 /** 542 * A simple version of batch delete. It will fail if there are any failures. 543 * @param deletes list of things to delete. 544 * @return A {@link CompletableFuture} that always returns null when complete normally. 545 */ 546 default CompletableFuture<Void> deleteAll(List<Delete> deletes) { 547 return allOf(delete(deletes)).thenApply(r -> null); 548 } 549 550 /** 551 * Method that does a batch call on Deletes, Gets, Puts, Increments, Appends and RowMutations. The 552 * ordering of execution of the actions is not defined. Meaning if you do a Put and a Get in the 553 * same {@link #batch} call, you will not necessarily be guaranteed that the Get returns what the 554 * Put had put. 555 * @param actions list of Get, Put, Delete, Increment, Append, and RowMutations objects 556 * @return A list of {@link CompletableFuture}s that represent the result for each action. 557 */ 558 <T> List<CompletableFuture<T>> batch(List<? extends Row> actions); 559 560 /** 561 * A simple version of batch. It will fail if there are any failures and you will get the whole 562 * result list at once if the operation is succeeded. 563 * @param actions list of Get, Put, Delete, Increment, Append and RowMutations objects 564 * @return A list of the result for the actions. Wrapped by a {@link CompletableFuture}. 565 */ 566 default <T> CompletableFuture<List<T>> batchAll(List<? extends Row> actions) { 567 return allOf(batch(actions)); 568 } 569 570 /** 571 * Execute the given coprocessor call on the region which contains the given {@code row}. 572 * <p> 573 * The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a 574 * one line lambda expression, like: 575 * 576 * <pre> 577 * <code> 578 * channel -> xxxService.newStub(channel) 579 * </code> 580 * </pre> 581 * 582 * @param stubMaker a delegation to the actual {@code newStub} call. 583 * @param callable a delegation to the actual protobuf rpc call. See the comment of 584 * {@link ServiceCaller} for more details. 585 * @param row The row key used to identify the remote region location 586 * @param <S> the type of the asynchronous stub 587 * @param <R> the type of the return value 588 * @return the return value of the protobuf rpc call, wrapped by a {@link CompletableFuture}. 589 * @see ServiceCaller 590 */ 591 <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 592 ServiceCaller<S, R> callable, byte[] row); 593 594 /** 595 * The callback when we want to execute a coprocessor call on a range of regions. 596 * <p> 597 * As the locating itself also takes some time, the implementation may want to send rpc calls on 598 * the fly, which means we do not know how many regions we have when we get the return value of 599 * the rpc calls, so we need an {@link #onComplete()} which is used to tell you that we have 600 * passed all the return values to you(through the {@link #onRegionComplete(RegionInfo, Object)} 601 * or {@link #onRegionError(RegionInfo, Throwable)} calls), i.e, there will be no 602 * {@link #onRegionComplete(RegionInfo, Object)} or {@link #onRegionError(RegionInfo, Throwable)} 603 * calls in the future. 604 * <p> 605 * Here is a pseudo code to describe a typical implementation of a range coprocessor service 606 * method to help you better understand how the {@link CoprocessorCallback} will be called. The 607 * {@code callback} in the pseudo code is our {@link CoprocessorCallback}. And notice that the 608 * {@code whenComplete} is {@code CompletableFuture.whenComplete}. 609 * 610 * <pre> 611 * locateThenCall(byte[] row) { 612 * locate(row).whenComplete((location, locateError) -> { 613 * if (locateError != null) { 614 * callback.onError(locateError); 615 * return; 616 * } 617 * incPendingCall(); 618 * region = location.getRegion(); 619 * if (region.getEndKey() > endKey) { 620 * locateEnd = true; 621 * } else { 622 * locateThenCall(region.getEndKey()); 623 * } 624 * sendCall().whenComplete((resp, error) -> { 625 * if (error != null) { 626 * callback.onRegionError(region, error); 627 * } else { 628 * callback.onRegionComplete(region, resp); 629 * } 630 * if (locateEnd && decPendingCallAndGet() == 0) { 631 * callback.onComplete(); 632 * } 633 * }); 634 * }); 635 * } 636 * </pre> 637 */ 638 @InterfaceAudience.Public 639 interface CoprocessorCallback<R> { 640 641 /** 642 * @param region the region that the response belongs to 643 * @param resp the response of the coprocessor call 644 */ 645 void onRegionComplete(RegionInfo region, R resp); 646 647 /** 648 * @param region the region that the error belongs to 649 * @param error the response error of the coprocessor call 650 */ 651 void onRegionError(RegionInfo region, Throwable error); 652 653 /** 654 * Indicate that all responses of the regions have been notified by calling 655 * {@link #onRegionComplete(RegionInfo, Object)} or 656 * {@link #onRegionError(RegionInfo, Throwable)}. 657 */ 658 void onComplete(); 659 660 /** 661 * Indicate that we got an error which does not belong to any regions. Usually a locating error. 662 */ 663 void onError(Throwable error); 664 } 665 666 /** 667 * Helper class for sending coprocessorService request that executes a coprocessor call on regions 668 * which are covered by a range. 669 * <p> 670 * If {@code fromRow} is not specified the selection will start with the first table region. If 671 * {@code toRow} is not specified the selection will continue through the last table region. 672 * @param <S> the type of the protobuf Service you want to call. 673 * @param <R> the type of the return value. 674 */ 675 interface CoprocessorServiceBuilder<S, R> { 676 677 /** 678 * @param startKey start region selection with region containing this row, inclusive. 679 */ 680 default CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey) { 681 return fromRow(startKey, true); 682 } 683 684 /** 685 * @param startKey start region selection with region containing this row 686 * @param inclusive whether to include the startKey 687 */ 688 CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey, boolean inclusive); 689 690 /** 691 * @param endKey select regions up to and including the region containing this row, exclusive. 692 */ 693 default CoprocessorServiceBuilder<S, R> toRow(byte[] endKey) { 694 return toRow(endKey, false); 695 } 696 697 /** 698 * @param endKey select regions up to and including the region containing this row 699 * @param inclusive whether to include the endKey 700 */ 701 CoprocessorServiceBuilder<S, R> toRow(byte[] endKey, boolean inclusive); 702 703 /** 704 * Execute the coprocessorService request. You can get the response through the 705 * {@link CoprocessorCallback}. 706 */ 707 void execute(); 708 } 709 710 /** 711 * Execute a coprocessor call on the regions which are covered by a range. 712 * <p> 713 * Use the returned {@link CoprocessorServiceBuilder} construct your request and then execute it. 714 * <p> 715 * The {@code stubMaker} is just a delegation to the {@code xxxService.newStub} call. Usually it 716 * is only a one line lambda expression, like: 717 * 718 * <pre> 719 * <code> 720 * channel -> xxxService.newStub(channel) 721 * </code> 722 * </pre> 723 * 724 * @param stubMaker a delegation to the actual {@code newStub} call. 725 * @param callable a delegation to the actual protobuf rpc call. See the comment of 726 * {@link ServiceCaller} for more details. 727 * @param callback callback to get the response. See the comment of {@link CoprocessorCallback} 728 * for more details. 729 */ 730 <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(Function<RpcChannel, S> stubMaker, 731 ServiceCaller<S, R> callable, CoprocessorCallback<R> callback); 732}