001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.setCoprocessorError; 021 022import io.opentelemetry.api.trace.Span; 023import io.opentelemetry.context.Context; 024import io.opentelemetry.context.Scope; 025import java.io.IOException; 026import java.io.InterruptedIOException; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.Map; 030import java.util.TreeMap; 031import java.util.concurrent.CompletableFuture; 032import java.util.concurrent.ConcurrentLinkedQueue; 033import java.util.concurrent.CountDownLatch; 034import java.util.concurrent.ExecutionException; 035import java.util.concurrent.ExecutorService; 036import java.util.concurrent.Future; 037import java.util.concurrent.RejectedExecutionException; 038import java.util.concurrent.TimeUnit; 039import java.util.function.Supplier; 040import java.util.stream.Collectors; 041import org.apache.commons.lang3.ArrayUtils; 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.hbase.CompareOperator; 044import org.apache.hadoop.hbase.DoNotRetryIOException; 045import org.apache.hadoop.hbase.HBaseIOException; 046import org.apache.hadoop.hbase.HConstants; 047import org.apache.hadoop.hbase.HRegionLocation; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext; 050import org.apache.hadoop.hbase.client.coprocessor.Batch; 051import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder; 052import org.apache.hadoop.hbase.filter.Filter; 053import org.apache.hadoop.hbase.io.TimeRange; 054import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; 055import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes; 056import org.apache.hadoop.hbase.trace.TraceUtil; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 059import org.apache.hadoop.hbase.util.FutureUtils; 060import org.apache.hadoop.hbase.util.IOExceptionSupplier; 061import org.apache.hadoop.hbase.util.Pair; 062import org.apache.yetus.audience.InterfaceAudience; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066import org.apache.hbase.thirdparty.com.google.common.primitives.Booleans; 067import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 068import org.apache.hbase.thirdparty.com.google.protobuf.Message; 069import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 070import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 071import org.apache.hbase.thirdparty.com.google.protobuf.Service; 072import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 073 074import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 075 076/** 077 * The table implementation based on {@link AsyncTable}. 078 */ 079@InterfaceAudience.Private 080class TableOverAsyncTable implements Table { 081 082 private static final Logger LOG = LoggerFactory.getLogger(TableOverAsyncTable.class); 083 084 private final AsyncConnectionImpl conn; 085 086 private final AsyncTable<?> table; 087 088 private final IOExceptionSupplier<ExecutorService> poolSupplier; 089 090 TableOverAsyncTable(AsyncConnectionImpl conn, AsyncTable<?> table, 091 IOExceptionSupplier<ExecutorService> poolSupplier) { 092 this.conn = conn; 093 this.table = table; 094 this.poolSupplier = poolSupplier; 095 } 096 097 @Override 098 public TableName getName() { 099 return table.getName(); 100 } 101 102 @Override 103 public Configuration getConfiguration() { 104 return table.getConfiguration(); 105 } 106 107 @Override 108 public TableDescriptor getDescriptor() throws IOException { 109 return FutureUtils.get(conn.getAdmin().getDescriptor(getName())); 110 } 111 112 @Override 113 public boolean exists(Get get) throws IOException { 114 return FutureUtils.get(table.exists(get)); 115 } 116 117 @Override 118 public boolean[] exists(List<Get> gets) throws IOException { 119 return Booleans.toArray(FutureUtils.get(table.existsAll(gets))); 120 } 121 122 @Override 123 public void batch(List<? extends Row> actions, Object[] results) throws IOException { 124 if (ArrayUtils.isEmpty(results)) { 125 FutureUtils.get(table.batchAll(actions)); 126 return; 127 } 128 List<ThrowableWithExtraContext> errors = new ArrayList<>(); 129 List<CompletableFuture<Object>> futures = table.batch(actions); 130 for (int i = 0, n = results.length; i < n; i++) { 131 try { 132 results[i] = FutureUtils.get(futures.get(i)); 133 } catch (IOException e) { 134 results[i] = e; 135 errors.add(new ThrowableWithExtraContext(e, EnvironmentEdgeManager.currentTime(), 136 "Error when processing " + actions.get(i))); 137 } 138 } 139 if (!errors.isEmpty()) { 140 throw new RetriesExhaustedException(errors.size(), errors); 141 } 142 } 143 144 @Override 145 public <R> void batchCallback(List<? extends Row> actions, Object[] results, 146 Batch.Callback<R> callback) throws IOException, InterruptedException { 147 ConcurrentLinkedQueue<ThrowableWithExtraContext> errors = new ConcurrentLinkedQueue<>(); 148 CountDownLatch latch = new CountDownLatch(actions.size()); 149 AsyncTableRegionLocator locator = conn.getRegionLocator(getName()); 150 List<CompletableFuture<R>> futures = table.<R> batch(actions); 151 for (int i = 0, n = futures.size(); i < n; i++) { 152 final int index = i; 153 FutureUtils.addListener(futures.get(i), (r, e) -> { 154 if (e != null) { 155 errors.add(new ThrowableWithExtraContext(e, EnvironmentEdgeManager.currentTime(), 156 "Error when processing " + actions.get(index))); 157 if (!ArrayUtils.isEmpty(results)) { 158 results[index] = e; 159 } 160 latch.countDown(); 161 } else { 162 if (!ArrayUtils.isEmpty(results)) { 163 results[index] = r; 164 } 165 FutureUtils.addListener(locator.getRegionLocation(actions.get(index).getRow()), 166 (l, le) -> { 167 if (le != null) { 168 errors.add(new ThrowableWithExtraContext(le, EnvironmentEdgeManager.currentTime(), 169 "Error when finding the region for row " 170 + Bytes.toStringBinary(actions.get(index).getRow()))); 171 } else { 172 callback.update(l.getRegion().getRegionName(), actions.get(index).getRow(), r); 173 } 174 latch.countDown(); 175 }); 176 } 177 }); 178 } 179 latch.await(); 180 if (!errors.isEmpty()) { 181 throw new RetriesExhaustedException(errors.size(), 182 errors.stream().collect(Collectors.toList())); 183 } 184 } 185 186 @Override 187 public Result get(Get get) throws IOException { 188 return FutureUtils.get(table.get(get)); 189 } 190 191 @Override 192 public Result[] get(List<Get> gets) throws IOException { 193 return FutureUtils.get(table.getAll(gets)).toArray(new Result[0]); 194 } 195 196 @Override 197 public ResultScanner getScanner(Scan scan) throws IOException { 198 return table.getScanner(scan); 199 } 200 201 @Override 202 public ResultScanner getScanner(byte[] family) throws IOException { 203 return table.getScanner(family); 204 } 205 206 @Override 207 public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException { 208 return table.getScanner(family, qualifier); 209 } 210 211 @Override 212 public void put(Put put) throws IOException { 213 FutureUtils.get(table.put(put)); 214 } 215 216 @Override 217 public void put(List<Put> puts) throws IOException { 218 FutureUtils.get(table.putAll(puts)); 219 } 220 221 @Override 222 public void delete(Delete delete) throws IOException { 223 FutureUtils.get(table.delete(delete)); 224 } 225 226 @Override 227 public void delete(List<Delete> deletes) throws IOException { 228 FutureUtils.get(table.deleteAll(deletes)); 229 } 230 231 @Override 232 public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { 233 return new CheckAndMutateBuilder() { 234 235 private final AsyncTable.CheckAndMutateBuilder builder = table.checkAndMutate(row, family); 236 237 @Override 238 public CheckAndMutateBuilder qualifier(byte[] qualifier) { 239 builder.qualifier(qualifier); 240 return this; 241 } 242 243 @Override 244 public CheckAndMutateBuilder timeRange(TimeRange timeRange) { 245 builder.timeRange(timeRange); 246 return this; 247 } 248 249 @Override 250 public CheckAndMutateBuilder ifNotExists() { 251 builder.ifNotExists(); 252 return this; 253 } 254 255 @Override 256 public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) { 257 builder.ifMatches(compareOp, value); 258 return this; 259 } 260 261 @Override 262 public boolean thenPut(Put put) throws IOException { 263 return FutureUtils.get(builder.thenPut(put)); 264 } 265 266 @Override 267 public boolean thenDelete(Delete delete) throws IOException { 268 return FutureUtils.get(builder.thenDelete(delete)); 269 } 270 271 @Override 272 public boolean thenMutate(RowMutations mutation) throws IOException { 273 return FutureUtils.get(builder.thenMutate(mutation)); 274 } 275 }; 276 } 277 278 @Override 279 public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) { 280 return new CheckAndMutateWithFilterBuilder() { 281 private final AsyncTable.CheckAndMutateWithFilterBuilder builder = 282 table.checkAndMutate(row, filter); 283 284 @Override 285 public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) { 286 builder.timeRange(timeRange); 287 return this; 288 } 289 290 @Override 291 public boolean thenPut(Put put) throws IOException { 292 return FutureUtils.get(builder.thenPut(put)); 293 } 294 295 @Override 296 public boolean thenDelete(Delete delete) throws IOException { 297 return FutureUtils.get(builder.thenDelete(delete)); 298 } 299 300 @Override 301 public boolean thenMutate(RowMutations mutation) throws IOException { 302 return FutureUtils.get(builder.thenMutate(mutation)); 303 } 304 }; 305 } 306 307 @Override 308 public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException { 309 return FutureUtils.get(table.checkAndMutate(checkAndMutate)); 310 } 311 312 @Override 313 public List<CheckAndMutateResult> checkAndMutate(List<CheckAndMutate> checkAndMutates) 314 throws IOException { 315 return FutureUtils.get(table.checkAndMutateAll(checkAndMutates)); 316 } 317 318 @Override 319 public Result mutateRow(RowMutations rm) throws IOException { 320 return FutureUtils.get(table.mutateRow(rm)); 321 } 322 323 @Override 324 public Result append(Append append) throws IOException { 325 return FutureUtils.get(table.append(append)); 326 } 327 328 @Override 329 public Result increment(Increment increment) throws IOException { 330 return FutureUtils.get(table.increment(increment)); 331 } 332 333 @Override 334 public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) 335 throws IOException { 336 return FutureUtils.get(table.incrementColumnValue(row, family, qualifier, amount)); 337 } 338 339 @Override 340 public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, 341 Durability durability) throws IOException { 342 return FutureUtils.get(table.incrementColumnValue(row, family, qualifier, amount, durability)); 343 } 344 345 @Override 346 public void close() { 347 } 348 349 @SuppressWarnings("deprecation") 350 private static final class RegionCoprocessorRpcChannel extends RegionCoprocessorRpcChannelImpl 351 implements CoprocessorRpcChannel { 352 353 RegionCoprocessorRpcChannel(AsyncConnectionImpl conn, TableName tableName, RegionInfo region, 354 byte[] row, long rpcTimeoutNs, long operationTimeoutNs) { 355 super(conn, tableName, region, row, rpcTimeoutNs, operationTimeoutNs); 356 } 357 358 @Override 359 public void callMethod(MethodDescriptor method, RpcController controller, Message request, 360 Message responsePrototype, RpcCallback<Message> done) { 361 ClientCoprocessorRpcController c = new ClientCoprocessorRpcController(); 362 CoprocessorBlockingRpcCallback<Message> callback = new CoprocessorBlockingRpcCallback<>(); 363 super.callMethod(method, c, request, responsePrototype, callback); 364 Message ret; 365 try { 366 ret = callback.get(); 367 } catch (IOException e) { 368 setCoprocessorError(controller, e); 369 return; 370 } 371 if (c.failed()) { 372 setCoprocessorError(controller, c.getFailed()); 373 } 374 done.run(ret); 375 } 376 377 @Override 378 public Message callBlockingMethod(MethodDescriptor method, RpcController controller, 379 Message request, Message responsePrototype) throws ServiceException { 380 ClientCoprocessorRpcController c = new ClientCoprocessorRpcController(); 381 CoprocessorBlockingRpcCallback<Message> done = new CoprocessorBlockingRpcCallback<>(); 382 callMethod(method, c, request, responsePrototype, done); 383 Message ret; 384 try { 385 ret = done.get(); 386 } catch (IOException e) { 387 throw new ServiceException(e); 388 } 389 if (c.failed()) { 390 setCoprocessorError(controller, c.getFailed()); 391 throw new ServiceException(c.getFailed()); 392 } 393 return ret; 394 } 395 } 396 397 @Override 398 public RegionCoprocessorRpcChannel coprocessorService(byte[] row) { 399 return new RegionCoprocessorRpcChannel(conn, getName(), null, row, 400 getRpcTimeout(TimeUnit.NANOSECONDS), getOperationTimeout(TimeUnit.NANOSECONDS)); 401 } 402 403 /** 404 * Get the corresponding start keys and regions for an arbitrary range of keys. 405 * <p> 406 * @param startKey Starting row in range, inclusive 407 * @param endKey Ending row in range 408 * @param includeEndKey true if endRow is inclusive, false if exclusive 409 * @return A pair of list of start keys and list of HRegionLocations that contain the specified 410 * range 411 * @throws IOException if a remote or network exception occurs 412 */ 413 private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(final byte[] startKey, 414 final byte[] endKey, final boolean includeEndKey) throws IOException { 415 return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false); 416 } 417 418 /** 419 * Get the corresponding start keys and regions for an arbitrary range of keys. 420 * <p> 421 * @param startKey Starting row in range, inclusive 422 * @param endKey Ending row in range 423 * @param includeEndKey true if endRow is inclusive, false if exclusive 424 * @param reload true to reload information or false to use cached information 425 * @return A pair of list of start keys and list of HRegionLocations that contain the specified 426 * range 427 * @throws IOException if a remote or network exception occurs 428 */ 429 private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(final byte[] startKey, 430 final byte[] endKey, final boolean includeEndKey, final boolean reload) throws IOException { 431 final boolean endKeyIsEndOfTable = Bytes.equals(endKey, HConstants.EMPTY_END_ROW); 432 if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) { 433 throw new IllegalArgumentException( 434 "Invalid range: " + Bytes.toStringBinary(startKey) + " > " + Bytes.toStringBinary(endKey)); 435 } 436 List<byte[]> keysInRange = new ArrayList<>(); 437 List<HRegionLocation> regionsInRange = new ArrayList<>(); 438 byte[] currentKey = startKey; 439 do { 440 HRegionLocation regionLocation = 441 FutureUtils.get(conn.getRegionLocator(getName()).getRegionLocation(currentKey, reload)); 442 keysInRange.add(currentKey); 443 regionsInRange.add(regionLocation); 444 currentKey = regionLocation.getRegion().getEndKey(); 445 } while ( 446 !Bytes.equals(currentKey, HConstants.EMPTY_END_ROW) 447 && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0 448 || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)) 449 ); 450 return new Pair<>(keysInRange, regionsInRange); 451 } 452 453 private List<byte[]> getStartKeysInRange(byte[] start, byte[] end) throws IOException { 454 if (start == null) { 455 start = HConstants.EMPTY_START_ROW; 456 } 457 if (end == null) { 458 end = HConstants.EMPTY_END_ROW; 459 } 460 return getKeysAndRegionsInRange(start, end, true).getFirst(); 461 } 462 463 @FunctionalInterface 464 private interface StubCall<R> { 465 R call(RegionCoprocessorRpcChannel channel) throws Exception; 466 } 467 468 private <R> void coprocessorService(String serviceName, byte[] startKey, byte[] endKey, 469 Batch.Callback<R> callback, StubCall<R> call) throws Throwable { 470 // get regions covered by the row range 471 ExecutorService pool = Context.current().wrap(this.poolSupplier.get()); 472 List<byte[]> keys = getStartKeysInRange(startKey, endKey); 473 Map<byte[], Future<R>> futures = new TreeMap<>(Bytes.BYTES_COMPARATOR); 474 try { 475 for (byte[] r : keys) { 476 RegionCoprocessorRpcChannel channel = coprocessorService(r); 477 Future<R> future = pool.submit(() -> { 478 R result = call.call(channel); 479 byte[] region = channel.getLastRegion(); 480 if (callback != null) { 481 callback.update(region, r, result); 482 } 483 return result; 484 }); 485 futures.put(r, future); 486 } 487 } catch (RejectedExecutionException e) { 488 // maybe the connection has been closed, let's check 489 if (conn.isClosed()) { 490 throw new DoNotRetryIOException("Connection is closed", e); 491 } else { 492 throw new HBaseIOException("Coprocessor operation is rejected", e); 493 } 494 } 495 for (Map.Entry<byte[], Future<R>> e : futures.entrySet()) { 496 try { 497 e.getValue().get(); 498 } catch (ExecutionException ee) { 499 LOG.warn("Error calling coprocessor service {} for row {}", serviceName, 500 Bytes.toStringBinary(e.getKey()), ee); 501 throw ee.getCause(); 502 } catch (InterruptedException ie) { 503 throw new InterruptedIOException("Interrupted calling coprocessor service " + serviceName 504 + " for row " + Bytes.toStringBinary(e.getKey())).initCause(ie); 505 } 506 } 507 } 508 509 @Override 510 public <T extends Service, R> void coprocessorService(Class<T> service, byte[] startKey, 511 byte[] endKey, Batch.Call<T, R> callable, Batch.Callback<R> callback) 512 throws ServiceException, Throwable { 513 final Supplier<Span> supplier = new TableOperationSpanBuilder(conn) 514 .setTableName(table.getName()).setOperation(HBaseSemanticAttributes.Operation.COPROC_EXEC); 515 TraceUtil.trace(() -> { 516 final Context context = Context.current(); 517 coprocessorService(service.getName(), startKey, endKey, callback, channel -> { 518 try (Scope ignored = context.makeCurrent()) { 519 T instance = ProtobufUtil.newServiceStub(service, channel); 520 return callable.call(instance); 521 } 522 }); 523 }, supplier); 524 } 525 526 @SuppressWarnings("unchecked") 527 @Override 528 public <R extends Message> void batchCoprocessorService(MethodDescriptor methodDescriptor, 529 Message request, byte[] startKey, byte[] endKey, R responsePrototype, 530 Batch.Callback<R> callback) throws ServiceException, Throwable { 531 final Supplier<Span> supplier = new TableOperationSpanBuilder(conn) 532 .setTableName(table.getName()).setOperation(HBaseSemanticAttributes.Operation.COPROC_EXEC); 533 TraceUtil.trace(() -> { 534 final Context context = Context.current(); 535 coprocessorService(methodDescriptor.getFullName(), startKey, endKey, callback, channel -> { 536 try (Scope ignored = context.makeCurrent()) { 537 return (R) channel.callBlockingMethod(methodDescriptor, null, request, responsePrototype); 538 } 539 }); 540 }, supplier); 541 } 542 543 @Override 544 public long getRpcTimeout(TimeUnit unit) { 545 return table.getRpcTimeout(unit); 546 } 547 548 @Override 549 public long getReadRpcTimeout(TimeUnit unit) { 550 return table.getReadRpcTimeout(unit); 551 } 552 553 @Override 554 public long getWriteRpcTimeout(TimeUnit unit) { 555 return table.getWriteRpcTimeout(unit); 556 } 557 558 @Override 559 public long getOperationTimeout(TimeUnit unit) { 560 return table.getOperationTimeout(unit); 561 } 562 563 @Override 564 public Map<String, byte[]> getRequestAttributes() { 565 return table.getRequestAttributes(); 566 } 567 568 @Override 569 public RegionLocator getRegionLocator() throws IOException { 570 return conn.toConnection().getRegionLocator(getName()); 571 } 572}