001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.setCoprocessorError;
021
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.util.ArrayList;
025import java.util.List;
026import java.util.Map;
027import java.util.TreeMap;
028import java.util.concurrent.Callable;
029import java.util.concurrent.CompletableFuture;
030import java.util.concurrent.ConcurrentLinkedQueue;
031import java.util.concurrent.CountDownLatch;
032import java.util.concurrent.ExecutionException;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.Future;
035import java.util.concurrent.RejectedExecutionException;
036import java.util.concurrent.TimeUnit;
037import java.util.stream.Collectors;
038import org.apache.commons.lang3.ArrayUtils;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.hbase.CompareOperator;
041import org.apache.hadoop.hbase.DoNotRetryIOException;
042import org.apache.hadoop.hbase.HBaseIOException;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.HRegionLocation;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
047import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
048import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
049import org.apache.hadoop.hbase.filter.Filter;
050import org.apache.hadoop.hbase.io.TimeRange;
051import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.util.ConcurrentMapUtils.IOExceptionSupplier;
054import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
055import org.apache.hadoop.hbase.util.FutureUtils;
056import org.apache.hadoop.hbase.util.Pair;
057import org.apache.yetus.audience.InterfaceAudience;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061import org.apache.hbase.thirdparty.com.google.common.primitives.Booleans;
062import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
063import org.apache.hbase.thirdparty.com.google.protobuf.Message;
064import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
065import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
066import org.apache.hbase.thirdparty.com.google.protobuf.Service;
067import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
068
069import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
070
071/**
072 * The table implementation based on {@link AsyncTable}.
073 */
074@InterfaceAudience.Private
075class TableOverAsyncTable implements Table {
076
077  private static final Logger LOG = LoggerFactory.getLogger(TableOverAsyncTable.class);
078
079  private final AsyncConnectionImpl conn;
080
081  private final AsyncTable<?> table;
082
083  private final IOExceptionSupplier<ExecutorService> poolSupplier;
084
085  TableOverAsyncTable(AsyncConnectionImpl conn, AsyncTable<?> table,
086      IOExceptionSupplier<ExecutorService> poolSupplier) {
087    this.conn = conn;
088    this.table = table;
089    this.poolSupplier = poolSupplier;
090  }
091
092  @Override
093  public TableName getName() {
094    return table.getName();
095  }
096
097  @Override
098  public Configuration getConfiguration() {
099    return table.getConfiguration();
100  }
101
102  @Override
103  public TableDescriptor getDescriptor() throws IOException {
104    return FutureUtils.get(conn.getAdmin().getDescriptor(getName()));
105  }
106
107  @Override
108  public boolean exists(Get get) throws IOException {
109    return FutureUtils.get(table.exists(get));
110  }
111
112  @Override
113  public boolean[] exists(List<Get> gets) throws IOException {
114    return Booleans.toArray(FutureUtils.get(table.existsAll(gets)));
115  }
116
117  @Override
118  public void batch(List<? extends Row> actions, Object[] results) throws IOException {
119    if (ArrayUtils.isEmpty(results)) {
120      FutureUtils.get(table.batchAll(actions));
121      return;
122    }
123    List<ThrowableWithExtraContext> errors = new ArrayList<>();
124    List<CompletableFuture<Object>> futures = table.batch(actions);
125    for (int i = 0, n = results.length; i < n; i++) {
126      try {
127        results[i] = FutureUtils.get(futures.get(i));
128      } catch (IOException e) {
129        results[i] = e;
130        errors.add(new ThrowableWithExtraContext(e, EnvironmentEdgeManager.currentTime(),
131          "Error when processing " + actions.get(i)));
132      }
133    }
134    if (!errors.isEmpty()) {
135      throw new RetriesExhaustedException(errors.size(), errors);
136    }
137  }
138
139  @Override
140  public <R> void batchCallback(List<? extends Row> actions, Object[] results, Callback<R> callback)
141      throws IOException, InterruptedException {
142    ConcurrentLinkedQueue<ThrowableWithExtraContext> errors = new ConcurrentLinkedQueue<>();
143    CountDownLatch latch = new CountDownLatch(actions.size());
144    AsyncTableRegionLocator locator = conn.getRegionLocator(getName());
145    List<CompletableFuture<R>> futures = table.<R> batch(actions);
146    for (int i = 0, n = futures.size(); i < n; i++) {
147      final int index = i;
148      FutureUtils.addListener(futures.get(i), (r, e) -> {
149        if (e != null) {
150          errors.add(new ThrowableWithExtraContext(e, EnvironmentEdgeManager.currentTime(),
151            "Error when processing " + actions.get(index)));
152          if (!ArrayUtils.isEmpty(results)) {
153            results[index] = e;
154          }
155          latch.countDown();
156        } else {
157          if (!ArrayUtils.isEmpty(results)) {
158            results[index] = r;
159          }
160          FutureUtils.addListener(locator.getRegionLocation(actions.get(index).getRow()),
161            (l, le) -> {
162              if (le != null) {
163                errors.add(new ThrowableWithExtraContext(le, EnvironmentEdgeManager.currentTime(),
164                  "Error when finding the region for row " +
165                    Bytes.toStringBinary(actions.get(index).getRow())));
166              } else {
167                callback.update(l.getRegion().getRegionName(), actions.get(index).getRow(), r);
168              }
169              latch.countDown();
170            });
171        }
172      });
173    }
174    latch.await();
175    if (!errors.isEmpty()) {
176      throw new RetriesExhaustedException(errors.size(),
177        errors.stream().collect(Collectors.toList()));
178    }
179  }
180
181  @Override
182  public Result get(Get get) throws IOException {
183    return FutureUtils.get(table.get(get));
184  }
185
186  @Override
187  public Result[] get(List<Get> gets) throws IOException {
188    return FutureUtils.get(table.getAll(gets)).toArray(new Result[0]);
189  }
190
191  @Override
192  public ResultScanner getScanner(Scan scan) throws IOException {
193    return table.getScanner(scan);
194  }
195
196  @Override
197  public ResultScanner getScanner(byte[] family) throws IOException {
198    return table.getScanner(family);
199  }
200
201  @Override
202  public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException {
203    return table.getScanner(family, qualifier);
204  }
205
206  @Override
207  public void put(Put put) throws IOException {
208    FutureUtils.get(table.put(put));
209  }
210
211  @Override
212  public void put(List<Put> puts) throws IOException {
213    FutureUtils.get(table.putAll(puts));
214  }
215
216  @Override
217  public void delete(Delete delete) throws IOException {
218    FutureUtils.get(table.delete(delete));
219  }
220
221  @Override
222  public void delete(List<Delete> deletes) throws IOException {
223    FutureUtils.get(table.deleteAll(deletes));
224  }
225
226  @Override
227  public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
228    return new CheckAndMutateBuilder() {
229
230      private final AsyncTable.CheckAndMutateBuilder builder = table.checkAndMutate(row, family);
231
232      @Override
233      public CheckAndMutateBuilder qualifier(byte[] qualifier) {
234        builder.qualifier(qualifier);
235        return this;
236      }
237
238      @Override
239      public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
240        builder.timeRange(timeRange);
241        return this;
242      }
243
244      @Override
245      public CheckAndMutateBuilder ifNotExists() {
246        builder.ifNotExists();
247        return this;
248      }
249
250      @Override
251      public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
252        builder.ifMatches(compareOp, value);
253        return this;
254      }
255
256      @Override
257      public boolean thenPut(Put put) throws IOException {
258        return FutureUtils.get(builder.thenPut(put));
259      }
260
261      @Override
262      public boolean thenDelete(Delete delete) throws IOException {
263        return FutureUtils.get(builder.thenDelete(delete));
264      }
265
266      @Override
267      public boolean thenMutate(RowMutations mutation) throws IOException {
268        return FutureUtils.get(builder.thenMutate(mutation));
269      }
270    };
271  }
272
273  @Override
274  public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
275    return new CheckAndMutateWithFilterBuilder() {
276      private final AsyncTable.CheckAndMutateWithFilterBuilder builder =
277        table.checkAndMutate(row, filter);
278
279      @Override
280      public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
281        builder.timeRange(timeRange);
282        return this;
283      }
284
285      @Override
286      public boolean thenPut(Put put) throws IOException {
287        return FutureUtils.get(builder.thenPut(put));
288      }
289
290      @Override
291      public boolean thenDelete(Delete delete) throws IOException {
292        return FutureUtils.get(builder.thenDelete(delete));
293      }
294
295      @Override
296      public boolean thenMutate(RowMutations mutation) throws IOException {
297        return FutureUtils.get(builder.thenMutate(mutation));
298      }
299    };
300  }
301
302  @Override
303  public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException {
304    return FutureUtils.get(table.checkAndMutate(checkAndMutate));
305  }
306
307  @Override
308  public List<CheckAndMutateResult> checkAndMutate(List<CheckAndMutate> checkAndMutates)
309    throws IOException {
310    return FutureUtils.get(table.checkAndMutateAll(checkAndMutates));
311  }
312
313  @Override
314  public void mutateRow(RowMutations rm) throws IOException {
315    FutureUtils.get(table.mutateRow(rm));
316  }
317
318  @Override
319  public Result append(Append append) throws IOException {
320    return FutureUtils.get(table.append(append));
321  }
322
323  @Override
324  public Result increment(Increment increment) throws IOException {
325    return FutureUtils.get(table.increment(increment));
326  }
327
328  @Override
329  public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount)
330      throws IOException {
331    return FutureUtils.get(table.incrementColumnValue(row, family, qualifier, amount));
332  }
333
334  @Override
335  public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount,
336      Durability durability) throws IOException {
337    return FutureUtils.get(table.incrementColumnValue(row, family, qualifier, amount, durability));
338  }
339
340  @Override
341  public void close() {
342  }
343
344  @SuppressWarnings("deprecation")
345  private static final class RegionCoprocessorRpcChannel extends RegionCoprocessorRpcChannelImpl
346      implements CoprocessorRpcChannel {
347
348    RegionCoprocessorRpcChannel(AsyncConnectionImpl conn, TableName tableName, RegionInfo region,
349        byte[] row, long rpcTimeoutNs, long operationTimeoutNs) {
350      super(conn, tableName, region, row, rpcTimeoutNs, operationTimeoutNs);
351    }
352
353    @Override
354    public void callMethod(MethodDescriptor method, RpcController controller, Message request,
355        Message responsePrototype, RpcCallback<Message> done) {
356      ClientCoprocessorRpcController c = new ClientCoprocessorRpcController();
357      CoprocessorBlockingRpcCallback<Message> callback = new CoprocessorBlockingRpcCallback<>();
358      super.callMethod(method, c, request, responsePrototype, callback);
359      Message ret;
360      try {
361        ret = callback.get();
362      } catch (IOException e) {
363        setCoprocessorError(controller, e);
364        return;
365      }
366      if (c.failed()) {
367        setCoprocessorError(controller, c.getFailed());
368      }
369      done.run(ret);
370    }
371
372    @Override
373    public Message callBlockingMethod(MethodDescriptor method, RpcController controller,
374        Message request, Message responsePrototype) throws ServiceException {
375      ClientCoprocessorRpcController c = new ClientCoprocessorRpcController();
376      CoprocessorBlockingRpcCallback<Message> done = new CoprocessorBlockingRpcCallback<>();
377      callMethod(method, c, request, responsePrototype, done);
378      Message ret;
379      try {
380        ret = done.get();
381      } catch (IOException e) {
382        throw new ServiceException(e);
383      }
384      if (c.failed()) {
385        setCoprocessorError(controller, c.getFailed());
386        throw new ServiceException(c.getFailed());
387      }
388      return ret;
389    }
390  }
391
392  @Override
393  public RegionCoprocessorRpcChannel coprocessorService(byte[] row) {
394    return new RegionCoprocessorRpcChannel(conn, getName(), null, row,
395      getRpcTimeout(TimeUnit.NANOSECONDS), getOperationTimeout(TimeUnit.NANOSECONDS));
396  }
397
398  /**
399   * Get the corresponding start keys and regions for an arbitrary range of keys.
400   * <p>
401   * @param startKey Starting row in range, inclusive
402   * @param endKey Ending row in range
403   * @param includeEndKey true if endRow is inclusive, false if exclusive
404   * @return A pair of list of start keys and list of HRegionLocations that contain the specified
405   *         range
406   * @throws IOException if a remote or network exception occurs
407   */
408  private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(final byte[] startKey,
409      final byte[] endKey, final boolean includeEndKey) throws IOException {
410    return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
411  }
412
413  /**
414   * Get the corresponding start keys and regions for an arbitrary range of keys.
415   * <p>
416   * @param startKey Starting row in range, inclusive
417   * @param endKey Ending row in range
418   * @param includeEndKey true if endRow is inclusive, false if exclusive
419   * @param reload true to reload information or false to use cached information
420   * @return A pair of list of start keys and list of HRegionLocations that contain the specified
421   *         range
422   * @throws IOException if a remote or network exception occurs
423   */
424  private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(final byte[] startKey,
425      final byte[] endKey, final boolean includeEndKey, final boolean reload) throws IOException {
426    final boolean endKeyIsEndOfTable = Bytes.equals(endKey, HConstants.EMPTY_END_ROW);
427    if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
428      throw new IllegalArgumentException(
429        "Invalid range: " + Bytes.toStringBinary(startKey) + " > " + Bytes.toStringBinary(endKey));
430    }
431    List<byte[]> keysInRange = new ArrayList<>();
432    List<HRegionLocation> regionsInRange = new ArrayList<>();
433    byte[] currentKey = startKey;
434    do {
435      HRegionLocation regionLocation =
436        FutureUtils.get(conn.getRegionLocator(getName()).getRegionLocation(currentKey, reload));
437      keysInRange.add(currentKey);
438      regionsInRange.add(regionLocation);
439      currentKey = regionLocation.getRegion().getEndKey();
440    } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW) &&
441      (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0 ||
442        (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
443    return new Pair<>(keysInRange, regionsInRange);
444  }
445
446  private List<byte[]> getStartKeysInRange(byte[] start, byte[] end) throws IOException {
447    if (start == null) {
448      start = HConstants.EMPTY_START_ROW;
449    }
450    if (end == null) {
451      end = HConstants.EMPTY_END_ROW;
452    }
453    return getKeysAndRegionsInRange(start, end, true).getFirst();
454  }
455
456  @FunctionalInterface
457  private interface StubCall<R> {
458    R call(RegionCoprocessorRpcChannel channel) throws Exception;
459  }
460
461  private <R> void coprocssorService(String serviceName, byte[] startKey, byte[] endKey,
462      Callback<R> callback, StubCall<R> call) throws Throwable {
463    // get regions covered by the row range
464    ExecutorService pool = this.poolSupplier.get();
465    List<byte[]> keys = getStartKeysInRange(startKey, endKey);
466    Map<byte[], Future<R>> futures = new TreeMap<>(Bytes.BYTES_COMPARATOR);
467    try {
468      for (byte[] r : keys) {
469        RegionCoprocessorRpcChannel channel = coprocessorService(r);
470        Future<R> future = pool.submit(new Callable<R>() {
471          @Override
472          public R call() throws Exception {
473            R result = call.call(channel);
474            byte[] region = channel.getLastRegion();
475            if (callback != null) {
476              callback.update(region, r, result);
477            }
478            return result;
479          }
480        });
481        futures.put(r, future);
482      }
483    } catch (RejectedExecutionException e) {
484      // maybe the connection has been closed, let's check
485      if (conn.isClosed()) {
486        throw new DoNotRetryIOException("Connection is closed", e);
487      } else {
488        throw new HBaseIOException("Coprocessor operation is rejected", e);
489      }
490    }
491    for (Map.Entry<byte[], Future<R>> e : futures.entrySet()) {
492      try {
493        e.getValue().get();
494      } catch (ExecutionException ee) {
495        LOG.warn("Error calling coprocessor service " + serviceName + " for row " +
496          Bytes.toStringBinary(e.getKey()), ee);
497        throw ee.getCause();
498      } catch (InterruptedException ie) {
499        throw new InterruptedIOException("Interrupted calling coprocessor service " + serviceName +
500          " for row " + Bytes.toStringBinary(e.getKey())).initCause(ie);
501      }
502    }
503  }
504
505  @Override
506  public <T extends Service, R> void coprocessorService(Class<T> service, byte[] startKey,
507      byte[] endKey, Call<T, R> callable, Callback<R> callback) throws ServiceException, Throwable {
508    coprocssorService(service.getName(), startKey, endKey, callback, channel -> {
509      T instance = ProtobufUtil.newServiceStub(service, channel);
510      return callable.call(instance);
511    });
512  }
513
514  @SuppressWarnings("unchecked")
515  @Override
516  public <R extends Message> void batchCoprocessorService(MethodDescriptor methodDescriptor,
517      Message request, byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback)
518      throws ServiceException, Throwable {
519    coprocssorService(methodDescriptor.getFullName(), startKey, endKey, callback, channel -> {
520      return (R) channel.callBlockingMethod(methodDescriptor, null, request, responsePrototype);
521    });
522  }
523
524  @Override
525  public long getRpcTimeout(TimeUnit unit) {
526    return table.getRpcTimeout(unit);
527  }
528
529  @Override
530  public long getReadRpcTimeout(TimeUnit unit) {
531    return table.getReadRpcTimeout(unit);
532  }
533
534  @Override
535  public long getWriteRpcTimeout(TimeUnit unit) {
536    return table.getWriteRpcTimeout(unit);
537  }
538
539  @Override
540  public long getOperationTimeout(TimeUnit unit) {
541    return table.getOperationTimeout(unit);
542  }
543
544  @Override
545  public RegionLocator getRegionLocator() throws IOException {
546    return conn.toConnection().getRegionLocator(getName());
547  }
548}