001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static java.util.stream.Collectors.toList;
021
022import io.opentelemetry.api.trace.Span;
023import io.opentelemetry.context.Context;
024import io.opentelemetry.context.Scope;
025import java.io.IOException;
026import java.time.Duration;
027import java.util.List;
028import java.util.Map;
029import java.util.concurrent.CompletableFuture;
030import java.util.concurrent.ExecutorService;
031import java.util.concurrent.Phaser;
032import java.util.concurrent.TimeUnit;
033import java.util.function.Function;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.CompareOperator;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.filter.Filter;
038import org.apache.hadoop.hbase.io.TimeRange;
039import org.apache.hadoop.hbase.util.FutureUtils;
040import org.apache.yetus.audience.InterfaceAudience;
041
042import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
043
044/**
045 * Just a wrapper of {@link RawAsyncTableImpl}. The difference is that users need to provide a
046 * thread pool when constructing this class, and the callback methods registered to the returned
047 * {@link CompletableFuture} will be executed in this thread pool. So usually it is safe for users
048 * to do anything they want in the callbacks without breaking the rpc framework.
049 */
050@InterfaceAudience.Private
051class AsyncTableImpl implements AsyncTable<ScanResultConsumer> {
052
053  private final RawAsyncTableImpl rawTable;
054
055  private final ExecutorService pool;
056
057  AsyncTableImpl(RawAsyncTableImpl rawTable, ExecutorService pool) {
058    this.rawTable = rawTable;
059    this.pool = pool;
060  }
061
062  @Override
063  public TableName getName() {
064    return rawTable.getName();
065  }
066
067  @Override
068  public Configuration getConfiguration() {
069    return rawTable.getConfiguration();
070  }
071
072  @Override
073  public CompletableFuture<TableDescriptor> getDescriptor() {
074    return wrap(rawTable.getDescriptor());
075  }
076
077  @Override
078  public AsyncTableRegionLocator getRegionLocator() {
079    return rawTable.getRegionLocator();
080  }
081
082  @Override
083  public long getRpcTimeout(TimeUnit unit) {
084    return rawTable.getRpcTimeout(unit);
085  }
086
087  @Override
088  public long getReadRpcTimeout(TimeUnit unit) {
089    return rawTable.getReadRpcTimeout(unit);
090  }
091
092  @Override
093  public long getWriteRpcTimeout(TimeUnit unit) {
094    return rawTable.getWriteRpcTimeout(unit);
095  }
096
097  @Override
098  public long getOperationTimeout(TimeUnit unit) {
099    return rawTable.getOperationTimeout(unit);
100  }
101
102  @Override
103  public long getScanTimeout(TimeUnit unit) {
104    return rawTable.getScanTimeout(unit);
105  }
106
107  @Override
108  public Map<String, byte[]> getRequestAttributes() {
109    return rawTable.getRequestAttributes();
110  }
111
112  private <T> CompletableFuture<T> wrap(CompletableFuture<T> future) {
113    return FutureUtils.wrapFuture(future, pool);
114  }
115
116  @Override
117  public CompletableFuture<Result> get(Get get) {
118    return wrap(rawTable.get(get));
119  }
120
121  @Override
122  public CompletableFuture<Void> put(Put put) {
123    return wrap(rawTable.put(put));
124  }
125
126  @Override
127  public CompletableFuture<Void> delete(Delete delete) {
128    return wrap(rawTable.delete(delete));
129  }
130
131  @Override
132  public CompletableFuture<Result> append(Append append) {
133    return wrap(rawTable.append(append));
134  }
135
136  @Override
137  public CompletableFuture<Result> increment(Increment increment) {
138    return wrap(rawTable.increment(increment));
139  }
140
141  @Override
142  public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
143    return new CheckAndMutateBuilder() {
144
145      private final CheckAndMutateBuilder builder = rawTable.checkAndMutate(row, family);
146
147      @Override
148      public CompletableFuture<Boolean> thenPut(Put put) {
149        return wrap(builder.thenPut(put));
150      }
151
152      @Override
153      public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
154        return wrap(builder.thenMutate(mutation));
155      }
156
157      @Override
158      public CompletableFuture<Boolean> thenDelete(Delete delete) {
159        return wrap(builder.thenDelete(delete));
160      }
161
162      @Override
163      public CheckAndMutateBuilder qualifier(byte[] qualifier) {
164        builder.qualifier(qualifier);
165        return this;
166      }
167
168      @Override
169      public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
170        builder.timeRange(timeRange);
171        return this;
172      }
173
174      @Override
175      public CheckAndMutateBuilder ifNotExists() {
176        builder.ifNotExists();
177        return this;
178      }
179
180      @Override
181      public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
182        builder.ifMatches(compareOp, value);
183        return this;
184      }
185    };
186  }
187
188  @Override
189  public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
190    return new CheckAndMutateWithFilterBuilder() {
191
192      private final CheckAndMutateWithFilterBuilder builder = rawTable.checkAndMutate(row, filter);
193
194      @Override
195      public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
196        builder.timeRange(timeRange);
197        return this;
198      }
199
200      @Override
201      public CompletableFuture<Boolean> thenPut(Put put) {
202        return wrap(builder.thenPut(put));
203      }
204
205      @Override
206      public CompletableFuture<Boolean> thenDelete(Delete delete) {
207        return wrap(builder.thenDelete(delete));
208      }
209
210      @Override
211      public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
212        return wrap(builder.thenMutate(mutation));
213      }
214    };
215  }
216
217  @Override
218  public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) {
219    return wrap(rawTable.checkAndMutate(checkAndMutate));
220  }
221
222  @Override
223  public List<CompletableFuture<CheckAndMutateResult>>
224    checkAndMutate(List<CheckAndMutate> checkAndMutates) {
225    return rawTable.checkAndMutate(checkAndMutates).stream().map(this::wrap).collect(toList());
226  }
227
228  @Override
229  public CompletableFuture<Result> mutateRow(RowMutations mutation) {
230    return wrap(rawTable.mutateRow(mutation));
231  }
232
233  @Override
234  public CompletableFuture<List<Result>> scanAll(Scan scan) {
235    return wrap(rawTable.scanAll(scan));
236  }
237
238  @Override
239  public ResultScanner getScanner(Scan scan) {
240    return rawTable.getScanner(scan);
241  }
242
243  private void scan0(Scan scan, ScanResultConsumer consumer) {
244    Span span = null;
245    try (AsyncTableResultScanner scanner = rawTable.getScanner(scan)) {
246      span = scanner.getSpan();
247      try (Scope ignored = span.makeCurrent()) {
248        if (scan.isScanMetricsEnabled()) {
249          consumer.onScanMetricsCreated(scanner.getScanMetrics());
250        }
251        for (Result result; (result = scanner.next()) != null;) {
252          if (!consumer.onNext(result)) {
253            break;
254          }
255        }
256        consumer.onComplete();
257      }
258    } catch (IOException e) {
259      try (Scope ignored = span.makeCurrent()) {
260        consumer.onError(e);
261      }
262    }
263  }
264
265  @Override
266  public void scan(Scan scan, ScanResultConsumer consumer) {
267    final Context context = Context.current();
268    pool.execute(context.wrap(() -> scan0(scan, consumer)));
269  }
270
271  @Override
272  public List<CompletableFuture<Result>> get(List<Get> gets) {
273    return rawTable.get(gets).stream().map(this::wrap).collect(toList());
274  }
275
276  @Override
277  public List<CompletableFuture<Void>> put(List<Put> puts) {
278    return rawTable.put(puts).stream().map(this::wrap).collect(toList());
279  }
280
281  @Override
282  public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
283    return rawTable.delete(deletes).stream().map(this::wrap).collect(toList());
284  }
285
286  @Override
287  public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
288    return rawTable.<T> batch(actions).stream().map(this::wrap).collect(toList());
289  }
290
291  @Override
292  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
293    ServiceCaller<S, R> callable, byte[] row) {
294    return wrap(rawTable.coprocessorService(stubMaker, callable, row));
295  }
296
297  @Override
298  public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(
299    Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable,
300    CoprocessorCallback<R> callback) {
301    return coprocessorService(stubMaker, callable,
302      new NoopPartialResultCoprocessorCallback<>(callback));
303  }
304
305  @Override
306  public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(
307    Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable,
308    PartialResultCoprocessorCallback<S, R> callback) {
309    final Context context = Context.current();
310    PartialResultCoprocessorCallback<S, R> wrappedCallback =
311      new PartialResultCoprocessorCallback<S, R>() {
312
313        private final Phaser regionCompletesInProgress = new Phaser(1);
314
315        @Override
316        public void onRegionComplete(RegionInfo region, R resp) {
317          regionCompletesInProgress.register();
318          pool.execute(context.wrap(() -> {
319            try {
320              callback.onRegionComplete(region, resp);
321            } finally {
322              regionCompletesInProgress.arriveAndDeregister();
323            }
324          }));
325        }
326
327        @Override
328        public void onRegionError(RegionInfo region, Throwable error) {
329          regionCompletesInProgress.register();
330          pool.execute(context.wrap(() -> {
331            try {
332              callback.onRegionError(region, error);
333            } finally {
334              regionCompletesInProgress.arriveAndDeregister();
335            }
336          }));
337        }
338
339        @Override
340        public void onComplete() {
341          pool.execute(context.wrap(() -> {
342            // Guarantee that onComplete() is called after all onRegionComplete()'s are called
343            regionCompletesInProgress.arriveAndAwaitAdvance();
344            callback.onComplete();
345          }));
346        }
347
348        @Override
349        public void onError(Throwable error) {
350          pool.execute(context.wrap(() -> callback.onError(error)));
351        }
352
353        @Override
354        public ServiceCaller<S, R> getNextCallable(R response, RegionInfo region) {
355          return callback.getNextCallable(response, region);
356        }
357
358        @Override
359        public Duration getWaitInterval(R response, RegionInfo region) {
360          return callback.getWaitInterval(response, region);
361        }
362
363      };
364    CoprocessorServiceBuilder<S, R> builder =
365      rawTable.coprocessorService(stubMaker, callable, wrappedCallback);
366    return new CoprocessorServiceBuilder<S, R>() {
367
368      @Override
369      public CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey, boolean inclusive) {
370        builder.fromRow(startKey, inclusive);
371        return this;
372      }
373
374      @Override
375      public CoprocessorServiceBuilder<S, R> toRow(byte[] endKey, boolean inclusive) {
376        builder.toRow(endKey, inclusive);
377        return this;
378      }
379
380      @Override
381      public void execute() {
382        builder.execute();
383      }
384    };
385  }
386}