001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static java.util.stream.Collectors.toList;
021
022import io.opentelemetry.api.trace.Span;
023import io.opentelemetry.context.Context;
024import io.opentelemetry.context.Scope;
025import java.io.IOException;
026import java.util.List;
027import java.util.Map;
028import java.util.concurrent.CompletableFuture;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.TimeUnit;
031import java.util.function.Function;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.CompareOperator;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.filter.Filter;
036import org.apache.hadoop.hbase.io.TimeRange;
037import org.apache.hadoop.hbase.util.FutureUtils;
038import org.apache.yetus.audience.InterfaceAudience;
039
040import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
041
042/**
043 * Just a wrapper of {@link RawAsyncTableImpl}. The difference is that users need to provide a
044 * thread pool when constructing this class, and the callback methods registered to the returned
045 * {@link CompletableFuture} will be executed in this thread pool. So usually it is safe for users
046 * to do anything they want in the callbacks without breaking the rpc framework.
047 */
048@InterfaceAudience.Private
049class AsyncTableImpl implements AsyncTable<ScanResultConsumer> {
050
051  private final RawAsyncTableImpl rawTable;
052
053  private final ExecutorService pool;
054
055  AsyncTableImpl(RawAsyncTableImpl rawTable, ExecutorService pool) {
056    this.rawTable = rawTable;
057    this.pool = pool;
058  }
059
060  @Override
061  public TableName getName() {
062    return rawTable.getName();
063  }
064
065  @Override
066  public Configuration getConfiguration() {
067    return rawTable.getConfiguration();
068  }
069
070  @Override
071  public CompletableFuture<TableDescriptor> getDescriptor() {
072    return wrap(rawTable.getDescriptor());
073  }
074
075  @Override
076  public AsyncTableRegionLocator getRegionLocator() {
077    return rawTable.getRegionLocator();
078  }
079
080  @Override
081  public long getRpcTimeout(TimeUnit unit) {
082    return rawTable.getRpcTimeout(unit);
083  }
084
085  @Override
086  public long getReadRpcTimeout(TimeUnit unit) {
087    return rawTable.getReadRpcTimeout(unit);
088  }
089
090  @Override
091  public long getWriteRpcTimeout(TimeUnit unit) {
092    return rawTable.getWriteRpcTimeout(unit);
093  }
094
095  @Override
096  public long getOperationTimeout(TimeUnit unit) {
097    return rawTable.getOperationTimeout(unit);
098  }
099
100  @Override
101  public long getScanTimeout(TimeUnit unit) {
102    return rawTable.getScanTimeout(unit);
103  }
104
105  @Override
106  public Map<String, byte[]> getRequestAttributes() {
107    return rawTable.getRequestAttributes();
108  }
109
110  private <T> CompletableFuture<T> wrap(CompletableFuture<T> future) {
111    return FutureUtils.wrapFuture(future, pool);
112  }
113
114  @Override
115  public CompletableFuture<Result> get(Get get) {
116    return wrap(rawTable.get(get));
117  }
118
119  @Override
120  public CompletableFuture<Void> put(Put put) {
121    return wrap(rawTable.put(put));
122  }
123
124  @Override
125  public CompletableFuture<Void> delete(Delete delete) {
126    return wrap(rawTable.delete(delete));
127  }
128
129  @Override
130  public CompletableFuture<Result> append(Append append) {
131    return wrap(rawTable.append(append));
132  }
133
134  @Override
135  public CompletableFuture<Result> increment(Increment increment) {
136    return wrap(rawTable.increment(increment));
137  }
138
139  @Override
140  public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
141    return new CheckAndMutateBuilder() {
142
143      private final CheckAndMutateBuilder builder = rawTable.checkAndMutate(row, family);
144
145      @Override
146      public CompletableFuture<Boolean> thenPut(Put put) {
147        return wrap(builder.thenPut(put));
148      }
149
150      @Override
151      public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
152        return wrap(builder.thenMutate(mutation));
153      }
154
155      @Override
156      public CompletableFuture<Boolean> thenDelete(Delete delete) {
157        return wrap(builder.thenDelete(delete));
158      }
159
160      @Override
161      public CheckAndMutateBuilder qualifier(byte[] qualifier) {
162        builder.qualifier(qualifier);
163        return this;
164      }
165
166      @Override
167      public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
168        builder.timeRange(timeRange);
169        return this;
170      }
171
172      @Override
173      public CheckAndMutateBuilder ifNotExists() {
174        builder.ifNotExists();
175        return this;
176      }
177
178      @Override
179      public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
180        builder.ifMatches(compareOp, value);
181        return this;
182      }
183    };
184  }
185
186  @Override
187  public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
188    return new CheckAndMutateWithFilterBuilder() {
189
190      private final CheckAndMutateWithFilterBuilder builder = rawTable.checkAndMutate(row, filter);
191
192      @Override
193      public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
194        builder.timeRange(timeRange);
195        return this;
196      }
197
198      @Override
199      public CompletableFuture<Boolean> thenPut(Put put) {
200        return wrap(builder.thenPut(put));
201      }
202
203      @Override
204      public CompletableFuture<Boolean> thenDelete(Delete delete) {
205        return wrap(builder.thenDelete(delete));
206      }
207
208      @Override
209      public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
210        return wrap(builder.thenMutate(mutation));
211      }
212    };
213  }
214
215  @Override
216  public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) {
217    return wrap(rawTable.checkAndMutate(checkAndMutate));
218  }
219
220  @Override
221  public List<CompletableFuture<CheckAndMutateResult>>
222    checkAndMutate(List<CheckAndMutate> checkAndMutates) {
223    return rawTable.checkAndMutate(checkAndMutates).stream().map(this::wrap).collect(toList());
224  }
225
226  @Override
227  public CompletableFuture<Result> mutateRow(RowMutations mutation) {
228    return wrap(rawTable.mutateRow(mutation));
229  }
230
231  @Override
232  public CompletableFuture<List<Result>> scanAll(Scan scan) {
233    return wrap(rawTable.scanAll(scan));
234  }
235
236  @Override
237  public ResultScanner getScanner(Scan scan) {
238    return rawTable.getScanner(scan);
239  }
240
241  private void scan0(Scan scan, ScanResultConsumer consumer) {
242    Span span = null;
243    try (AsyncTableResultScanner scanner = rawTable.getScanner(scan)) {
244      span = scanner.getSpan();
245      try (Scope ignored = span.makeCurrent()) {
246        if (scan.isScanMetricsEnabled()) {
247          consumer.onScanMetricsCreated(scanner.getScanMetrics());
248        }
249        for (Result result; (result = scanner.next()) != null;) {
250          if (!consumer.onNext(result)) {
251            break;
252          }
253        }
254        consumer.onComplete();
255      }
256    } catch (IOException e) {
257      try (Scope ignored = span.makeCurrent()) {
258        consumer.onError(e);
259      }
260    }
261  }
262
263  @Override
264  public void scan(Scan scan, ScanResultConsumer consumer) {
265    final Context context = Context.current();
266    pool.execute(context.wrap(() -> scan0(scan, consumer)));
267  }
268
269  @Override
270  public List<CompletableFuture<Result>> get(List<Get> gets) {
271    return rawTable.get(gets).stream().map(this::wrap).collect(toList());
272  }
273
274  @Override
275  public List<CompletableFuture<Void>> put(List<Put> puts) {
276    return rawTable.put(puts).stream().map(this::wrap).collect(toList());
277  }
278
279  @Override
280  public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
281    return rawTable.delete(deletes).stream().map(this::wrap).collect(toList());
282  }
283
284  @Override
285  public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
286    return rawTable.<T> batch(actions).stream().map(this::wrap).collect(toList());
287  }
288
289  @Override
290  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
291    ServiceCaller<S, R> callable, byte[] row) {
292    return wrap(rawTable.coprocessorService(stubMaker, callable, row));
293  }
294
295  @Override
296  public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(
297    Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable,
298    CoprocessorCallback<R> callback) {
299    final Context context = Context.current();
300    CoprocessorCallback<R> wrappedCallback = new CoprocessorCallback<R>() {
301
302      @Override
303      public void onRegionComplete(RegionInfo region, R resp) {
304        pool.execute(context.wrap(() -> callback.onRegionComplete(region, resp)));
305      }
306
307      @Override
308      public void onRegionError(RegionInfo region, Throwable error) {
309        pool.execute(context.wrap(() -> callback.onRegionError(region, error)));
310      }
311
312      @Override
313      public void onComplete() {
314        pool.execute(context.wrap(callback::onComplete));
315      }
316
317      @Override
318      public void onError(Throwable error) {
319        pool.execute(context.wrap(() -> callback.onError(error)));
320      }
321    };
322    CoprocessorServiceBuilder<S, R> builder =
323      rawTable.coprocessorService(stubMaker, callable, wrappedCallback);
324    return new CoprocessorServiceBuilder<S, R>() {
325
326      @Override
327      public CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey, boolean inclusive) {
328        builder.fromRow(startKey, inclusive);
329        return this;
330      }
331
332      @Override
333      public CoprocessorServiceBuilder<S, R> toRow(byte[] endKey, boolean inclusive) {
334        builder.toRow(endKey, inclusive);
335        return this;
336      }
337
338      @Override
339      public void execute() {
340        builder.execute();
341      }
342    };
343  }
344}