001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static java.util.stream.Collectors.toList; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnly; 022import static org.apache.hadoop.hbase.util.FutureUtils.allOf; 023 024import com.google.protobuf.RpcChannel; 025import java.util.List; 026import java.util.concurrent.CompletableFuture; 027import java.util.concurrent.TimeUnit; 028import java.util.function.Function; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.CompareOperator; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.filter.Filter; 033import org.apache.hadoop.hbase.io.TimeRange; 034import org.apache.hadoop.hbase.util.Bytes; 035import org.apache.yetus.audience.InterfaceAudience; 036 037import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 038 039/** 040 * The interface for asynchronous version of Table. Obtain an instance from a 041 * {@link AsyncConnection}. 042 * <p> 043 * The implementation is required to be thread safe. 044 * <p> 045 * Usually the implementation will not throw any exception directly. You need to get the exception 046 * from the returned {@link CompletableFuture}. 047 * @since 2.0.0 048 */ 049@InterfaceAudience.Public 050public interface AsyncTable<C extends ScanResultConsumerBase> { 051 052 /** 053 * Gets the fully qualified table name instance of this table. 054 */ 055 TableName getName(); 056 057 /** 058 * Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance. 059 * <p> 060 * The reference returned is not a copy, so any change made to it will affect this instance. 061 */ 062 Configuration getConfiguration(); 063 064 /** 065 * Gets the {@link TableDescriptor} for this table. 066 */ 067 CompletableFuture<TableDescriptor> getDescriptor(); 068 069 /** 070 * Gets the {@link AsyncTableRegionLocator} for this table. 071 */ 072 AsyncTableRegionLocator getRegionLocator(); 073 074 /** 075 * Get timeout of each rpc request in this Table instance. It will be overridden by a more 076 * specific rpc timeout config such as readRpcTimeout or writeRpcTimeout. 077 * @see #getReadRpcTimeout(TimeUnit) 078 * @see #getWriteRpcTimeout(TimeUnit) 079 * @param unit the unit of time the timeout to be represented in 080 * @return rpc timeout in the specified time unit 081 */ 082 long getRpcTimeout(TimeUnit unit); 083 084 /** 085 * Get timeout of each rpc read request in this Table instance. 086 * @param unit the unit of time the timeout to be represented in 087 * @return read rpc timeout in the specified time unit 088 */ 089 long getReadRpcTimeout(TimeUnit unit); 090 091 /** 092 * Get timeout of each rpc write request in this Table instance. 093 * @param unit the unit of time the timeout to be represented in 094 * @return write rpc timeout in the specified time unit 095 */ 096 long getWriteRpcTimeout(TimeUnit unit); 097 098 /** 099 * Get timeout of each operation in Table instance. 100 * @param unit the unit of time the timeout to be represented in 101 * @return operation rpc timeout in the specified time unit 102 */ 103 long getOperationTimeout(TimeUnit unit); 104 105 /** 106 * Get the timeout of a single operation in a scan. It works like operation timeout for other 107 * operations. 108 * @param unit the unit of time the timeout to be represented in 109 * @return scan rpc timeout in the specified time unit 110 */ 111 long getScanTimeout(TimeUnit unit); 112 113 /** 114 * Test for the existence of columns in the table, as specified by the Get. 115 * <p> 116 * This will return true if the Get matches one or more keys, false if not. 117 * <p> 118 * This is a server-side call so it prevents any data from being transfered to the client. 119 * @return true if the specified Get matches one or more keys, false if not. The return value will 120 * be wrapped by a {@link CompletableFuture}. 121 */ 122 default CompletableFuture<Boolean> exists(Get get) { 123 return get(toCheckExistenceOnly(get)).thenApply(r -> r.getExists()); 124 } 125 126 /** 127 * Extracts certain cells from a given row. 128 * @param get The object that specifies what data to fetch and from which row. 129 * @return The data coming from the specified row, if it exists. If the row specified doesn't 130 * exist, the {@link Result} instance returned won't contain any 131 * {@link org.apache.hadoop.hbase.KeyValue}, as indicated by {@link Result#isEmpty()}. The 132 * return value will be wrapped by a {@link CompletableFuture}. 133 */ 134 CompletableFuture<Result> get(Get get); 135 136 /** 137 * Puts some data to the table. 138 * @param put The data to put. 139 * @return A {@link CompletableFuture} that always returns null when complete normally. 140 */ 141 CompletableFuture<Void> put(Put put); 142 143 /** 144 * Deletes the specified cells/row. 145 * @param delete The object that specifies what to delete. 146 * @return A {@link CompletableFuture} that always returns null when complete normally. 147 */ 148 CompletableFuture<Void> delete(Delete delete); 149 150 /** 151 * Appends values to one or more columns within a single row. 152 * <p> 153 * This operation does not appear atomic to readers. Appends are done under a single row lock, so 154 * write operations to a row are synchronized, but readers do not take row locks so get and scan 155 * operations can see this operation partially completed. 156 * @param append object that specifies the columns and amounts to be used for the increment 157 * operations 158 * @return values of columns after the append operation (maybe null). The return value will be 159 * wrapped by a {@link CompletableFuture}. 160 */ 161 CompletableFuture<Result> append(Append append); 162 163 /** 164 * Increments one or more columns within a single row. 165 * <p> 166 * This operation does not appear atomic to readers. Increments are done under a single row lock, 167 * so write operations to a row are synchronized, but readers do not take row locks so get and 168 * scan operations can see this operation partially completed. 169 * @param increment object that specifies the columns and amounts to be used for the increment 170 * operations 171 * @return values of columns after the increment. The return value will be wrapped by a 172 * {@link CompletableFuture}. 173 */ 174 CompletableFuture<Result> increment(Increment increment); 175 176 /** 177 * See {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)} 178 * <p> 179 * The {@link Durability} is defaulted to {@link Durability#SYNC_WAL}. 180 * @param row The row that contains the cell to increment. 181 * @param family The column family of the cell to increment. 182 * @param qualifier The column qualifier of the cell to increment. 183 * @param amount The amount to increment the cell with (or decrement, if the amount is 184 * negative). 185 * @return The new value, post increment. The return value will be wrapped by a 186 * {@link CompletableFuture}. 187 */ 188 default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, 189 long amount) { 190 return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL); 191 } 192 193 /** 194 * Atomically increments a column value. If the column value already exists and is not a 195 * big-endian long, this could throw an exception. If the column value does not yet exist it is 196 * initialized to <code>amount</code> and written to the specified column. 197 * <p> 198 * Setting durability to {@link Durability#SKIP_WAL} means that in a fail scenario you will lose 199 * any increments that have not been flushed. 200 * @param row The row that contains the cell to increment. 201 * @param family The column family of the cell to increment. 202 * @param qualifier The column qualifier of the cell to increment. 203 * @param amount The amount to increment the cell with (or decrement, if the amount is 204 * negative). 205 * @param durability The persistence guarantee for this increment. 206 * @return The new value, post increment. The return value will be wrapped by a 207 * {@link CompletableFuture}. 208 */ 209 default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, 210 long amount, Durability durability) { 211 Preconditions.checkNotNull(row, "row is null"); 212 Preconditions.checkNotNull(family, "family is null"); 213 return increment( 214 new Increment(row).addColumn(family, qualifier, amount).setDurability(durability)) 215 .thenApply(r -> Bytes.toLong(r.getValue(family, qualifier))); 216 } 217 218 /** 219 * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it 220 * adds the Put/Delete/RowMutations. 221 * <p> 222 * Use the returned {@link CheckAndMutateBuilder} to construct your request and then execute it. 223 * This is a fluent style API, the code is like: 224 * 225 * <pre> 226 * table.checkAndMutate(row, family).qualifier(qualifier).ifNotExists().thenPut(put) 227 * .thenAccept(succ -> { 228 * if (succ) { 229 * System.out.println("Check and put succeeded"); 230 * } else { 231 * System.out.println("Check and put failed"); 232 * } 233 * }); 234 * </pre> 235 * 236 * @deprecated Since 2.4.0, will be removed in 4.0.0. For internal test use only, do not use it 237 * any more. 238 */ 239 @Deprecated 240 CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family); 241 242 /** 243 * A helper class for sending checkAndMutate request. 244 * @deprecated Since 2.4.0, will be removed in 4.0.0. For internal test use only, do not use it 245 * any more. 246 */ 247 @Deprecated 248 interface CheckAndMutateBuilder { 249 250 /** 251 * Match a qualifier. 252 * @param qualifier column qualifier to check. 253 */ 254 CheckAndMutateBuilder qualifier(byte[] qualifier); 255 256 /** 257 * Match a timerange. 258 * @param timeRange time range to check. 259 */ 260 CheckAndMutateBuilder timeRange(TimeRange timeRange); 261 262 /** 263 * Check for lack of column. 264 */ 265 CheckAndMutateBuilder ifNotExists(); 266 267 /** 268 * Check for equality. 269 * @param value the expected value 270 */ 271 default CheckAndMutateBuilder ifEquals(byte[] value) { 272 return ifMatches(CompareOperator.EQUAL, value); 273 } 274 275 /** 276 * Compare a value 277 * @param compareOp comparison operator to use 278 * @param value the expected value 279 */ 280 CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value); 281 282 /** 283 * Specify a Put to commit if the check succeeds. 284 * @param put data to put if check succeeds 285 * @return {@code true} if the new put was executed, {@code false} otherwise. The return value 286 * will be wrapped by a {@link CompletableFuture}. 287 */ 288 CompletableFuture<Boolean> thenPut(Put put); 289 290 /** 291 * Specify a Delete to commit if the check succeeds. 292 * @param delete data to delete if check succeeds 293 * @return {@code true} if the new delete was executed, {@code false} otherwise. The return 294 * value will be wrapped by a {@link CompletableFuture}. 295 */ 296 CompletableFuture<Boolean> thenDelete(Delete delete); 297 298 /** 299 * Specify a RowMutations to commit if the check succeeds. 300 * @param mutation mutations to perform if check succeeds 301 * @return true if the new mutation was executed, false otherwise. The return value will be 302 * wrapped by a {@link CompletableFuture}. 303 */ 304 CompletableFuture<Boolean> thenMutate(RowMutations mutation); 305 } 306 307 /** 308 * Atomically checks if a row matches the specified filter. If it does, it adds the 309 * Put/Delete/RowMutations. 310 * <p> 311 * Use the returned {@link CheckAndMutateWithFilterBuilder} to construct your request and then 312 * execute it. This is a fluent style API, the code is like: 313 * 314 * <pre> 315 * table.checkAndMutate(row, filter).thenPut(put).thenAccept(succ -> { 316 * if (succ) { 317 * System.out.println("Check and put succeeded"); 318 * } else { 319 * System.out.println("Check and put failed"); 320 * } 321 * }); 322 * </pre> 323 * 324 * @deprecated Since 2.4.0, will be removed in 4.0.0. For internal test use only, do not use it 325 * any more. 326 */ 327 @Deprecated 328 CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter); 329 330 /** 331 * A helper class for sending checkAndMutate request with a filter. 332 * @deprecated Since 2.4.0, will be removed in 4.0.0. For internal test use only, do not use it 333 * any more. 334 */ 335 @Deprecated 336 interface CheckAndMutateWithFilterBuilder { 337 338 /** 339 * Match a timerange. 340 * @param timeRange time range to check. 341 */ 342 CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange); 343 344 /** 345 * Specify a Put to commit if the check succeeds. 346 * @param put data to put if check succeeds 347 * @return {@code true} if the new put was executed, {@code false} otherwise. The return value 348 * will be wrapped by a {@link CompletableFuture}. 349 */ 350 CompletableFuture<Boolean> thenPut(Put put); 351 352 /** 353 * Specify a Delete to commit if the check succeeds. 354 * @param delete data to delete if check succeeds 355 * @return {@code true} if the new delete was executed, {@code false} otherwise. The return 356 * value will be wrapped by a {@link CompletableFuture}. 357 */ 358 CompletableFuture<Boolean> thenDelete(Delete delete); 359 360 /** 361 * Specify a RowMutations to commit if the check succeeds. 362 * @param mutation mutations to perform if check succeeds 363 * @return true if the new mutation was executed, false otherwise. The return value will be 364 * wrapped by a {@link CompletableFuture}. 365 */ 366 CompletableFuture<Boolean> thenMutate(RowMutations mutation); 367 } 368 369 /** 370 * checkAndMutate that atomically checks if a row matches the specified condition. If it does, it 371 * performs the specified action. 372 * @param checkAndMutate The CheckAndMutate object. 373 * @return A {@link CompletableFuture}s that represent the result for the CheckAndMutate. 374 */ 375 CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate); 376 377 /** 378 * Batch version of checkAndMutate. The specified CheckAndMutates are batched only in the sense 379 * that they are sent to a RS in one RPC, but each CheckAndMutate operation is still executed 380 * atomically (and thus, each may fail independently of others). 381 * @param checkAndMutates The list of CheckAndMutate. 382 * @return A list of {@link CompletableFuture}s that represent the result for each CheckAndMutate. 383 */ 384 List<CompletableFuture<CheckAndMutateResult>> 385 checkAndMutate(List<CheckAndMutate> checkAndMutates); 386 387 /** 388 * A simple version of batch checkAndMutate. It will fail if there are any failures. 389 * @param checkAndMutates The list of rows to apply. 390 * @return A {@link CompletableFuture} that wrapper the result list. 391 */ 392 default CompletableFuture<List<CheckAndMutateResult>> 393 checkAndMutateAll(List<CheckAndMutate> checkAndMutates) { 394 return allOf(checkAndMutate(checkAndMutates)); 395 } 396 397 /** 398 * Performs multiple mutations atomically on a single row. Currently {@link Put} and 399 * {@link Delete} are supported. 400 * @param mutation object that specifies the set of mutations to perform atomically 401 * @return A {@link CompletableFuture} that returns results of Increment/Append operations 402 */ 403 CompletableFuture<Result> mutateRow(RowMutations mutation); 404 405 /** 406 * The scan API uses the observer pattern. 407 * @param scan A configured {@link Scan} object. 408 * @param consumer the consumer used to receive results. 409 * @see ScanResultConsumer 410 * @see AdvancedScanResultConsumer 411 */ 412 void scan(Scan scan, C consumer); 413 414 /** 415 * Gets a scanner on the current table for the given family. 416 * @param family The column family to scan. 417 * @return A scanner. 418 */ 419 default ResultScanner getScanner(byte[] family) { 420 return getScanner(new Scan().addFamily(family)); 421 } 422 423 /** 424 * Gets a scanner on the current table for the given family and qualifier. 425 * @param family The column family to scan. 426 * @param qualifier The column qualifier to scan. 427 * @return A scanner. 428 */ 429 default ResultScanner getScanner(byte[] family, byte[] qualifier) { 430 return getScanner(new Scan().addColumn(family, qualifier)); 431 } 432 433 /** 434 * Returns a scanner on the current table as specified by the {@link Scan} object. 435 * @param scan A configured {@link Scan} object. 436 * @return A scanner. 437 */ 438 ResultScanner getScanner(Scan scan); 439 440 /** 441 * Return all the results that match the given scan object. 442 * <p> 443 * Notice that usually you should use this method with a {@link Scan} object that has limit set. 444 * For example, if you want to get the closest row after a given row, you could do this: 445 * <p> 446 * 447 * <pre> 448 * table.scanAll(new Scan().withStartRow(row, false).setLimit(1)).thenAccept(results -> { 449 * if (results.isEmpty()) { 450 * System.out.println("No row after " + Bytes.toStringBinary(row)); 451 * } else { 452 * System.out.println("The closest row after " + Bytes.toStringBinary(row) + " is " 453 * + Bytes.toStringBinary(results.stream().findFirst().get().getRow())); 454 * } 455 * }); 456 * </pre> 457 * <p> 458 * If your result set is very large, you should use other scan method to get a scanner or use 459 * callback to process the results. They will do chunking to prevent OOM. The scanAll method will 460 * fetch all the results and store them in a List and then return the list to you. 461 * <p> 462 * The scan metrics will be collected background if you enable it but you have no way to get it. 463 * Usually you can get scan metrics from {@code ResultScanner}, or through 464 * {@code ScanResultConsumer.onScanMetricsCreated} but this method only returns a list of results. 465 * So if you really care about scan metrics then you'd better use other scan methods which return 466 * a {@code ResultScanner} or let you pass in a {@code ScanResultConsumer}. There is no 467 * performance difference between these scan methods so do not worry. 468 * @param scan A configured {@link Scan} object. So if you use this method to fetch a really large 469 * result set, it is likely to cause OOM. 470 * @return The results of this small scan operation. The return value will be wrapped by a 471 * {@link CompletableFuture}. 472 */ 473 CompletableFuture<List<Result>> scanAll(Scan scan); 474 475 /** 476 * Test for the existence of columns in the table, as specified by the Gets. 477 * <p> 478 * This will return a list of booleans. Each value will be true if the related Get matches one or 479 * more keys, false if not. 480 * <p> 481 * This is a server-side call so it prevents any data from being transferred to the client. 482 * @param gets the Gets 483 * @return A list of {@link CompletableFuture}s that represent the existence for each get. 484 */ 485 default List<CompletableFuture<Boolean>> exists(List<Get> gets) { 486 return get(toCheckExistenceOnly(gets)).stream() 487 .<CompletableFuture<Boolean>> map(f -> f.thenApply(r -> r.getExists())).collect(toList()); 488 } 489 490 /** 491 * A simple version for batch exists. It will fail if there are any failures and you will get the 492 * whole result boolean list at once if the operation is succeeded. 493 * @param gets the Gets 494 * @return A {@link CompletableFuture} that wrapper the result boolean list. 495 */ 496 default CompletableFuture<List<Boolean>> existsAll(List<Get> gets) { 497 return allOf(exists(gets)); 498 } 499 500 /** 501 * Extracts certain cells from the given rows, in batch. 502 * <p> 503 * Notice that you may not get all the results with this function, which means some of the 504 * returned {@link CompletableFuture}s may succeed while some of the other returned 505 * {@link CompletableFuture}s may fail. 506 * @param gets The objects that specify what data to fetch and from which rows. 507 * @return A list of {@link CompletableFuture}s that represent the result for each get. 508 */ 509 List<CompletableFuture<Result>> get(List<Get> gets); 510 511 /** 512 * A simple version for batch get. It will fail if there are any failures and you will get the 513 * whole result list at once if the operation is succeeded. 514 * @param gets The objects that specify what data to fetch and from which rows. 515 * @return A {@link CompletableFuture} that wrapper the result list. 516 */ 517 default CompletableFuture<List<Result>> getAll(List<Get> gets) { 518 return allOf(get(gets)); 519 } 520 521 /** 522 * Puts some data in the table, in batch. 523 * @param puts The list of mutations to apply. 524 * @return A list of {@link CompletableFuture}s that represent the result for each put. 525 */ 526 List<CompletableFuture<Void>> put(List<Put> puts); 527 528 /** 529 * A simple version of batch put. It will fail if there are any failures. 530 * @param puts The list of mutations to apply. 531 * @return A {@link CompletableFuture} that always returns null when complete normally. 532 */ 533 default CompletableFuture<Void> putAll(List<Put> puts) { 534 return allOf(put(puts)).thenApply(r -> null); 535 } 536 537 /** 538 * Deletes the specified cells/rows in bulk. 539 * @param deletes list of things to delete. 540 * @return A list of {@link CompletableFuture}s that represent the result for each delete. 541 */ 542 List<CompletableFuture<Void>> delete(List<Delete> deletes); 543 544 /** 545 * A simple version of batch delete. It will fail if there are any failures. 546 * @param deletes list of things to delete. 547 * @return A {@link CompletableFuture} that always returns null when complete normally. 548 */ 549 default CompletableFuture<Void> deleteAll(List<Delete> deletes) { 550 return allOf(delete(deletes)).thenApply(r -> null); 551 } 552 553 /** 554 * Method that does a batch call on Deletes, Gets, Puts, Increments, Appends and RowMutations. The 555 * ordering of execution of the actions is not defined. Meaning if you do a Put and a Get in the 556 * same {@link #batch} call, you will not necessarily be guaranteed that the Get returns what the 557 * Put had put. 558 * @param actions list of Get, Put, Delete, Increment, Append, and RowMutations objects 559 * @return A list of {@link CompletableFuture}s that represent the result for each action. 560 */ 561 <T> List<CompletableFuture<T>> batch(List<? extends Row> actions); 562 563 /** 564 * A simple version of batch. It will fail if there are any failures and you will get the whole 565 * result list at once if the operation is succeeded. 566 * @param actions list of Get, Put, Delete, Increment, Append and RowMutations objects 567 * @return A list of the result for the actions. Wrapped by a {@link CompletableFuture}. 568 */ 569 default <T> CompletableFuture<List<T>> batchAll(List<? extends Row> actions) { 570 return allOf(batch(actions)); 571 } 572 573 /** 574 * Execute the given coprocessor call on the region which contains the given {@code row}. 575 * <p> 576 * The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a 577 * one line lambda expression, like: 578 * 579 * <pre> 580 * channel -> xxxService.newStub(channel) 581 * </pre> 582 * 583 * @param stubMaker a delegation to the actual {@code newStub} call. 584 * @param callable a delegation to the actual protobuf rpc call. See the comment of 585 * {@link ServiceCaller} for more details. 586 * @param row The row key used to identify the remote region location 587 * @param <S> the type of the asynchronous stub 588 * @param <R> the type of the return value 589 * @return the return value of the protobuf rpc call, wrapped by a {@link CompletableFuture}. 590 * @see ServiceCaller 591 */ 592 <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 593 ServiceCaller<S, R> callable, byte[] row); 594 595 /** 596 * The callback when we want to execute a coprocessor call on a range of regions. 597 * <p> 598 * As the locating itself also takes some time, the implementation may want to send rpc calls on 599 * the fly, which means we do not know how many regions we have when we get the return value of 600 * the rpc calls, so we need an {@link #onComplete()} which is used to tell you that we have 601 * passed all the return values to you(through the {@link #onRegionComplete(RegionInfo, Object)} 602 * or {@link #onRegionError(RegionInfo, Throwable)} calls), i.e, there will be no 603 * {@link #onRegionComplete(RegionInfo, Object)} or {@link #onRegionError(RegionInfo, Throwable)} 604 * calls in the future. 605 * <p> 606 * Here is a pseudo code to describe a typical implementation of a range coprocessor service 607 * method to help you better understand how the {@link CoprocessorCallback} will be called. The 608 * {@code callback} in the pseudo code is our {@link CoprocessorCallback}. And notice that the 609 * {@code whenComplete} is {@code CompletableFuture.whenComplete}. 610 * 611 * <pre> 612 * locateThenCall(byte[] row) { 613 * locate(row).whenComplete((location, locateError) -> { 614 * if (locateError != null) { 615 * callback.onError(locateError); 616 * return; 617 * } 618 * incPendingCall(); 619 * region = location.getRegion(); 620 * if (region.getEndKey() > endKey) { 621 * locateEnd = true; 622 * } else { 623 * locateThenCall(region.getEndKey()); 624 * } 625 * sendCall().whenComplete((resp, error) -> { 626 * if (error != null) { 627 * callback.onRegionError(region, error); 628 * } else { 629 * callback.onRegionComplete(region, resp); 630 * } 631 * if (locateEnd && decPendingCallAndGet() == 0) { 632 * callback.onComplete(); 633 * } 634 * }); 635 * }); 636 * } 637 * </pre> 638 */ 639 @InterfaceAudience.Public 640 interface CoprocessorCallback<R> { 641 642 /** 643 * Indicate that the respose of a region is available 644 * @param region the region that the response belongs to 645 * @param resp the response of the coprocessor call 646 */ 647 void onRegionComplete(RegionInfo region, R resp); 648 649 /** 650 * Indicate that the error for a region is available 651 * @param region the region that the error belongs to 652 * @param error the response error of the coprocessor call 653 */ 654 void onRegionError(RegionInfo region, Throwable error); 655 656 /** 657 * Indicate that all responses of the regions have been notified by calling 658 * {@link #onRegionComplete(RegionInfo, Object)} or 659 * {@link #onRegionError(RegionInfo, Throwable)}. 660 */ 661 void onComplete(); 662 663 /** 664 * Indicate that we got an error which does not belong to any regions. Usually a locating error. 665 */ 666 void onError(Throwable error); 667 } 668 669 /** 670 * Helper class for sending coprocessorService request that executes a coprocessor call on regions 671 * which are covered by a range. 672 * <p> 673 * If {@code fromRow} is not specified the selection will start with the first table region. If 674 * {@code toRow} is not specified the selection will continue through the last table region. 675 * @param <S> the type of the protobuf Service you want to call. 676 * @param <R> the type of the return value. 677 */ 678 interface CoprocessorServiceBuilder<S, R> { 679 680 /** 681 * Specify a start row 682 * @param startKey start region selection with region containing this row, inclusive. 683 */ 684 default CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey) { 685 return fromRow(startKey, true); 686 } 687 688 /** 689 * Specify a start row 690 * @param startKey start region selection with region containing this row 691 * @param inclusive whether to include the startKey 692 */ 693 CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey, boolean inclusive); 694 695 /** 696 * Specify a stop row 697 * @param endKey select regions up to and including the region containing this row, exclusive. 698 */ 699 default CoprocessorServiceBuilder<S, R> toRow(byte[] endKey) { 700 return toRow(endKey, false); 701 } 702 703 /** 704 * Specify a stop row 705 * @param endKey select regions up to and including the region containing this row 706 * @param inclusive whether to include the endKey 707 */ 708 CoprocessorServiceBuilder<S, R> toRow(byte[] endKey, boolean inclusive); 709 710 /** 711 * Execute the coprocessorService request. You can get the response through the 712 * {@link CoprocessorCallback}. 713 */ 714 void execute(); 715 } 716 717 /** 718 * Execute a coprocessor call on the regions which are covered by a range. 719 * <p> 720 * Use the returned {@link CoprocessorServiceBuilder} construct your request and then execute it. 721 * <p> 722 * The {@code stubMaker} is just a delegation to the {@code xxxService.newStub} call. Usually it 723 * is only a one line lambda expression, like: 724 * 725 * <pre> 726 * channel -> xxxService.newStub(channel) 727 * </pre> 728 * 729 * @param stubMaker a delegation to the actual {@code newStub} call. 730 * @param callable a delegation to the actual protobuf rpc call. See the comment of 731 * {@link ServiceCaller} for more details. 732 * @param callback callback to get the response. See the comment of {@link CoprocessorCallback} 733 * for more details. 734 */ 735 <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(Function<RpcChannel, S> stubMaker, 736 ServiceCaller<S, R> callable, CoprocessorCallback<R> callback); 737}