001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.setCoprocessorError;
021
022import io.opentelemetry.api.trace.Span;
023import io.opentelemetry.context.Context;
024import io.opentelemetry.context.Scope;
025import java.io.IOException;
026import java.io.InterruptedIOException;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.Map;
030import java.util.TreeMap;
031import java.util.concurrent.CompletableFuture;
032import java.util.concurrent.ConcurrentLinkedQueue;
033import java.util.concurrent.CountDownLatch;
034import java.util.concurrent.ExecutionException;
035import java.util.concurrent.ExecutorService;
036import java.util.concurrent.Future;
037import java.util.concurrent.RejectedExecutionException;
038import java.util.concurrent.TimeUnit;
039import java.util.function.Supplier;
040import java.util.stream.Collectors;
041import org.apache.commons.lang3.ArrayUtils;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.hbase.CompareOperator;
044import org.apache.hadoop.hbase.DoNotRetryIOException;
045import org.apache.hadoop.hbase.HBaseIOException;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.HRegionLocation;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
050import org.apache.hadoop.hbase.client.coprocessor.Batch;
051import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder;
052import org.apache.hadoop.hbase.filter.Filter;
053import org.apache.hadoop.hbase.io.TimeRange;
054import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
055import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes;
056import org.apache.hadoop.hbase.trace.TraceUtil;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
059import org.apache.hadoop.hbase.util.FutureUtils;
060import org.apache.hadoop.hbase.util.IOExceptionSupplier;
061import org.apache.hadoop.hbase.util.Pair;
062import org.apache.yetus.audience.InterfaceAudience;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066import org.apache.hbase.thirdparty.com.google.common.primitives.Booleans;
067import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
068import org.apache.hbase.thirdparty.com.google.protobuf.Message;
069import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
070import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
071import org.apache.hbase.thirdparty.com.google.protobuf.Service;
072import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
073
074import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
075
076/**
077 * The table implementation based on {@link AsyncTable}.
078 */
079@InterfaceAudience.Private
080class TableOverAsyncTable implements Table {
081
082  private static final Logger LOG = LoggerFactory.getLogger(TableOverAsyncTable.class);
083
084  private final AsyncConnectionImpl conn;
085
086  private final AsyncTable<?> table;
087
088  private final IOExceptionSupplier<ExecutorService> poolSupplier;
089
090  TableOverAsyncTable(AsyncConnectionImpl conn, AsyncTable<?> table,
091    IOExceptionSupplier<ExecutorService> poolSupplier) {
092    this.conn = conn;
093    this.table = table;
094    this.poolSupplier = poolSupplier;
095  }
096
097  @Override
098  public TableName getName() {
099    return table.getName();
100  }
101
102  @Override
103  public Configuration getConfiguration() {
104    return table.getConfiguration();
105  }
106
107  @Override
108  public TableDescriptor getDescriptor() throws IOException {
109    return FutureUtils.get(conn.getAdmin().getDescriptor(getName()));
110  }
111
112  @Override
113  public boolean exists(Get get) throws IOException {
114    return FutureUtils.get(table.exists(get));
115  }
116
117  @Override
118  public boolean[] exists(List<Get> gets) throws IOException {
119    return Booleans.toArray(FutureUtils.get(table.existsAll(gets)));
120  }
121
122  @Override
123  public void batch(List<? extends Row> actions, Object[] results) throws IOException {
124    if (ArrayUtils.isEmpty(results)) {
125      FutureUtils.get(table.batchAll(actions));
126      return;
127    }
128    List<ThrowableWithExtraContext> errors = new ArrayList<>();
129    List<CompletableFuture<Object>> futures = table.batch(actions);
130    for (int i = 0, n = results.length; i < n; i++) {
131      try {
132        results[i] = FutureUtils.get(futures.get(i));
133      } catch (IOException e) {
134        results[i] = e;
135        errors.add(new ThrowableWithExtraContext(e, EnvironmentEdgeManager.currentTime(),
136          "Error when processing " + actions.get(i)));
137      }
138    }
139    if (!errors.isEmpty()) {
140      throw new RetriesExhaustedException(errors.size(), errors);
141    }
142  }
143
144  @Override
145  public <R> void batchCallback(List<? extends Row> actions, Object[] results,
146    Batch.Callback<R> callback) throws IOException, InterruptedException {
147    ConcurrentLinkedQueue<ThrowableWithExtraContext> errors = new ConcurrentLinkedQueue<>();
148    CountDownLatch latch = new CountDownLatch(actions.size());
149    AsyncTableRegionLocator locator = conn.getRegionLocator(getName());
150    List<CompletableFuture<R>> futures = table.<R> batch(actions);
151    for (int i = 0, n = futures.size(); i < n; i++) {
152      final int index = i;
153      FutureUtils.addListener(futures.get(i), (r, e) -> {
154        if (e != null) {
155          errors.add(new ThrowableWithExtraContext(e, EnvironmentEdgeManager.currentTime(),
156            "Error when processing " + actions.get(index)));
157          if (!ArrayUtils.isEmpty(results)) {
158            results[index] = e;
159          }
160          latch.countDown();
161        } else {
162          if (!ArrayUtils.isEmpty(results)) {
163            results[index] = r;
164          }
165          FutureUtils.addListener(locator.getRegionLocation(actions.get(index).getRow()),
166            (l, le) -> {
167              if (le != null) {
168                errors.add(new ThrowableWithExtraContext(le, EnvironmentEdgeManager.currentTime(),
169                  "Error when finding the region for row "
170                    + Bytes.toStringBinary(actions.get(index).getRow())));
171              } else {
172                callback.update(l.getRegion().getRegionName(), actions.get(index).getRow(), r);
173              }
174              latch.countDown();
175            });
176        }
177      });
178    }
179    latch.await();
180    if (!errors.isEmpty()) {
181      throw new RetriesExhaustedException(errors.size(),
182        errors.stream().collect(Collectors.toList()));
183    }
184  }
185
186  @Override
187  public Result get(Get get) throws IOException {
188    return FutureUtils.get(table.get(get));
189  }
190
191  @Override
192  public Result[] get(List<Get> gets) throws IOException {
193    return FutureUtils.get(table.getAll(gets)).toArray(new Result[0]);
194  }
195
196  @Override
197  public ResultScanner getScanner(Scan scan) throws IOException {
198    return table.getScanner(scan);
199  }
200
201  @Override
202  public ResultScanner getScanner(byte[] family) throws IOException {
203    return table.getScanner(family);
204  }
205
206  @Override
207  public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException {
208    return table.getScanner(family, qualifier);
209  }
210
211  @Override
212  public void put(Put put) throws IOException {
213    FutureUtils.get(table.put(put));
214  }
215
216  @Override
217  public void put(List<Put> puts) throws IOException {
218    FutureUtils.get(table.putAll(puts));
219  }
220
221  @Override
222  public void delete(Delete delete) throws IOException {
223    FutureUtils.get(table.delete(delete));
224  }
225
226  @Override
227  public void delete(List<Delete> deletes) throws IOException {
228    FutureUtils.get(table.deleteAll(deletes));
229  }
230
231  @Override
232  public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
233    return new CheckAndMutateBuilder() {
234
235      private final AsyncTable.CheckAndMutateBuilder builder = table.checkAndMutate(row, family);
236
237      @Override
238      public CheckAndMutateBuilder qualifier(byte[] qualifier) {
239        builder.qualifier(qualifier);
240        return this;
241      }
242
243      @Override
244      public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
245        builder.timeRange(timeRange);
246        return this;
247      }
248
249      @Override
250      public CheckAndMutateBuilder ifNotExists() {
251        builder.ifNotExists();
252        return this;
253      }
254
255      @Override
256      public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
257        builder.ifMatches(compareOp, value);
258        return this;
259      }
260
261      @Override
262      public boolean thenPut(Put put) throws IOException {
263        return FutureUtils.get(builder.thenPut(put));
264      }
265
266      @Override
267      public boolean thenDelete(Delete delete) throws IOException {
268        return FutureUtils.get(builder.thenDelete(delete));
269      }
270
271      @Override
272      public boolean thenMutate(RowMutations mutation) throws IOException {
273        return FutureUtils.get(builder.thenMutate(mutation));
274      }
275    };
276  }
277
278  @Override
279  public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
280    return new CheckAndMutateWithFilterBuilder() {
281      private final AsyncTable.CheckAndMutateWithFilterBuilder builder =
282        table.checkAndMutate(row, filter);
283
284      @Override
285      public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
286        builder.timeRange(timeRange);
287        return this;
288      }
289
290      @Override
291      public boolean thenPut(Put put) throws IOException {
292        return FutureUtils.get(builder.thenPut(put));
293      }
294
295      @Override
296      public boolean thenDelete(Delete delete) throws IOException {
297        return FutureUtils.get(builder.thenDelete(delete));
298      }
299
300      @Override
301      public boolean thenMutate(RowMutations mutation) throws IOException {
302        return FutureUtils.get(builder.thenMutate(mutation));
303      }
304    };
305  }
306
307  @Override
308  public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException {
309    return FutureUtils.get(table.checkAndMutate(checkAndMutate));
310  }
311
312  @Override
313  public List<CheckAndMutateResult> checkAndMutate(List<CheckAndMutate> checkAndMutates)
314    throws IOException {
315    return FutureUtils.get(table.checkAndMutateAll(checkAndMutates));
316  }
317
318  @Override
319  public Result mutateRow(RowMutations rm) throws IOException {
320    return FutureUtils.get(table.mutateRow(rm));
321  }
322
323  @Override
324  public Result append(Append append) throws IOException {
325    return FutureUtils.get(table.append(append));
326  }
327
328  @Override
329  public Result increment(Increment increment) throws IOException {
330    return FutureUtils.get(table.increment(increment));
331  }
332
333  @Override
334  public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount)
335    throws IOException {
336    return FutureUtils.get(table.incrementColumnValue(row, family, qualifier, amount));
337  }
338
339  @Override
340  public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount,
341    Durability durability) throws IOException {
342    return FutureUtils.get(table.incrementColumnValue(row, family, qualifier, amount, durability));
343  }
344
345  @Override
346  public void close() {
347  }
348
349  @SuppressWarnings("deprecation")
350  private static final class RegionCoprocessorRpcChannel extends RegionCoprocessorRpcChannelImpl
351    implements CoprocessorRpcChannel {
352
353    RegionCoprocessorRpcChannel(AsyncConnectionImpl conn, TableName tableName, RegionInfo region,
354      byte[] row, long rpcTimeoutNs, long operationTimeoutNs) {
355      super(conn, tableName, region, row, rpcTimeoutNs, operationTimeoutNs);
356    }
357
358    @Override
359    public void callMethod(MethodDescriptor method, RpcController controller, Message request,
360      Message responsePrototype, RpcCallback<Message> done) {
361      ClientCoprocessorRpcController c = new ClientCoprocessorRpcController();
362      CoprocessorBlockingRpcCallback<Message> callback = new CoprocessorBlockingRpcCallback<>();
363      super.callMethod(method, c, request, responsePrototype, callback);
364      Message ret;
365      try {
366        ret = callback.get();
367      } catch (IOException e) {
368        setCoprocessorError(controller, e);
369        return;
370      }
371      if (c.failed()) {
372        setCoprocessorError(controller, c.getFailed());
373      }
374      done.run(ret);
375    }
376
377    @Override
378    public Message callBlockingMethod(MethodDescriptor method, RpcController controller,
379      Message request, Message responsePrototype) throws ServiceException {
380      ClientCoprocessorRpcController c = new ClientCoprocessorRpcController();
381      CoprocessorBlockingRpcCallback<Message> done = new CoprocessorBlockingRpcCallback<>();
382      callMethod(method, c, request, responsePrototype, done);
383      Message ret;
384      try {
385        ret = done.get();
386      } catch (IOException e) {
387        throw new ServiceException(e);
388      }
389      if (c.failed()) {
390        setCoprocessorError(controller, c.getFailed());
391        throw new ServiceException(c.getFailed());
392      }
393      return ret;
394    }
395  }
396
397  @Override
398  public RegionCoprocessorRpcChannel coprocessorService(byte[] row) {
399    return new RegionCoprocessorRpcChannel(conn, getName(), null, row,
400      getRpcTimeout(TimeUnit.NANOSECONDS), getOperationTimeout(TimeUnit.NANOSECONDS));
401  }
402
403  /**
404   * Get the corresponding start keys and regions for an arbitrary range of keys.
405   * <p>
406   * @param startKey      Starting row in range, inclusive
407   * @param endKey        Ending row in range
408   * @param includeEndKey true if endRow is inclusive, false if exclusive
409   * @return A pair of list of start keys and list of HRegionLocations that contain the specified
410   *         range
411   * @throws IOException if a remote or network exception occurs
412   */
413  private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(final byte[] startKey,
414    final byte[] endKey, final boolean includeEndKey) throws IOException {
415    return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
416  }
417
418  /**
419   * Get the corresponding start keys and regions for an arbitrary range of keys.
420   * <p>
421   * @param startKey      Starting row in range, inclusive
422   * @param endKey        Ending row in range
423   * @param includeEndKey true if endRow is inclusive, false if exclusive
424   * @param reload        true to reload information or false to use cached information
425   * @return A pair of list of start keys and list of HRegionLocations that contain the specified
426   *         range
427   * @throws IOException if a remote or network exception occurs
428   */
429  private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(final byte[] startKey,
430    final byte[] endKey, final boolean includeEndKey, final boolean reload) throws IOException {
431    final boolean endKeyIsEndOfTable = Bytes.equals(endKey, HConstants.EMPTY_END_ROW);
432    if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
433      throw new IllegalArgumentException(
434        "Invalid range: " + Bytes.toStringBinary(startKey) + " > " + Bytes.toStringBinary(endKey));
435    }
436    List<byte[]> keysInRange = new ArrayList<>();
437    List<HRegionLocation> regionsInRange = new ArrayList<>();
438    byte[] currentKey = startKey;
439    do {
440      HRegionLocation regionLocation =
441        FutureUtils.get(conn.getRegionLocator(getName()).getRegionLocation(currentKey, reload));
442      keysInRange.add(currentKey);
443      regionsInRange.add(regionLocation);
444      currentKey = regionLocation.getRegion().getEndKey();
445    } while (
446      !Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
447        && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
448          || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0))
449    );
450    return new Pair<>(keysInRange, regionsInRange);
451  }
452
453  private List<byte[]> getStartKeysInRange(byte[] start, byte[] end) throws IOException {
454    if (start == null) {
455      start = HConstants.EMPTY_START_ROW;
456    }
457    if (end == null) {
458      end = HConstants.EMPTY_END_ROW;
459    }
460    return getKeysAndRegionsInRange(start, end, true).getFirst();
461  }
462
463  @FunctionalInterface
464  private interface StubCall<R> {
465    R call(RegionCoprocessorRpcChannel channel) throws Exception;
466  }
467
468  private <R> void coprocessorService(String serviceName, byte[] startKey, byte[] endKey,
469    Batch.Callback<R> callback, StubCall<R> call) throws Throwable {
470    // get regions covered by the row range
471    ExecutorService pool = Context.current().wrap(this.poolSupplier.get());
472    List<byte[]> keys = getStartKeysInRange(startKey, endKey);
473    Map<byte[], Future<R>> futures = new TreeMap<>(Bytes.BYTES_COMPARATOR);
474    try {
475      for (byte[] r : keys) {
476        RegionCoprocessorRpcChannel channel = coprocessorService(r);
477        Future<R> future = pool.submit(() -> {
478          R result = call.call(channel);
479          byte[] region = channel.getLastRegion();
480          if (callback != null) {
481            callback.update(region, r, result);
482          }
483          return result;
484        });
485        futures.put(r, future);
486      }
487    } catch (RejectedExecutionException e) {
488      // maybe the connection has been closed, let's check
489      if (conn.isClosed()) {
490        throw new DoNotRetryIOException("Connection is closed", e);
491      } else {
492        throw new HBaseIOException("Coprocessor operation is rejected", e);
493      }
494    }
495    for (Map.Entry<byte[], Future<R>> e : futures.entrySet()) {
496      try {
497        e.getValue().get();
498      } catch (ExecutionException ee) {
499        LOG.warn("Error calling coprocessor service {} for row {}", serviceName,
500          Bytes.toStringBinary(e.getKey()), ee);
501        throw ee.getCause();
502      } catch (InterruptedException ie) {
503        throw new InterruptedIOException("Interrupted calling coprocessor service " + serviceName
504          + " for row " + Bytes.toStringBinary(e.getKey())).initCause(ie);
505      }
506    }
507  }
508
509  @Override
510  public <T extends Service, R> void coprocessorService(Class<T> service, byte[] startKey,
511    byte[] endKey, Batch.Call<T, R> callable, Batch.Callback<R> callback)
512    throws ServiceException, Throwable {
513    final Supplier<Span> supplier = new TableOperationSpanBuilder(conn)
514      .setTableName(table.getName()).setOperation(HBaseSemanticAttributes.Operation.COPROC_EXEC);
515    TraceUtil.trace(() -> {
516      final Context context = Context.current();
517      coprocessorService(service.getName(), startKey, endKey, callback, channel -> {
518        try (Scope ignored = context.makeCurrent()) {
519          T instance = ProtobufUtil.newServiceStub(service, channel);
520          return callable.call(instance);
521        }
522      });
523    }, supplier);
524  }
525
526  @SuppressWarnings("unchecked")
527  @Override
528  public <R extends Message> void batchCoprocessorService(MethodDescriptor methodDescriptor,
529    Message request, byte[] startKey, byte[] endKey, R responsePrototype,
530    Batch.Callback<R> callback) throws ServiceException, Throwable {
531    final Supplier<Span> supplier = new TableOperationSpanBuilder(conn)
532      .setTableName(table.getName()).setOperation(HBaseSemanticAttributes.Operation.COPROC_EXEC);
533    TraceUtil.trace(() -> {
534      final Context context = Context.current();
535      coprocessorService(methodDescriptor.getFullName(), startKey, endKey, callback, channel -> {
536        try (Scope ignored = context.makeCurrent()) {
537          return (R) channel.callBlockingMethod(methodDescriptor, null, request, responsePrototype);
538        }
539      });
540    }, supplier);
541  }
542
543  @Override
544  public long getRpcTimeout(TimeUnit unit) {
545    return table.getRpcTimeout(unit);
546  }
547
548  @Override
549  public long getReadRpcTimeout(TimeUnit unit) {
550    return table.getReadRpcTimeout(unit);
551  }
552
553  @Override
554  public long getWriteRpcTimeout(TimeUnit unit) {
555    return table.getWriteRpcTimeout(unit);
556  }
557
558  @Override
559  public long getOperationTimeout(TimeUnit unit) {
560    return table.getOperationTimeout(unit);
561  }
562
563  @Override
564  public Map<String, byte[]> getRequestAttributes() {
565    return table.getRequestAttributes();
566  }
567
568  @Override
569  public RegionLocator getRegionLocator() throws IOException {
570    return conn.toConnection().getRegionLocator(getName());
571  }
572}