001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static java.util.stream.Collectors.toList;
021
022import io.opentelemetry.api.trace.Span;
023import io.opentelemetry.context.Context;
024import io.opentelemetry.context.Scope;
025import java.io.IOException;
026import java.util.List;
027import java.util.concurrent.CompletableFuture;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.TimeUnit;
030import java.util.function.Function;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.CompareOperator;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.filter.Filter;
035import org.apache.hadoop.hbase.io.TimeRange;
036import org.apache.hadoop.hbase.util.FutureUtils;
037import org.apache.yetus.audience.InterfaceAudience;
038
039import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
040
041/**
042 * Just a wrapper of {@link RawAsyncTableImpl}. The difference is that users need to provide a
043 * thread pool when constructing this class, and the callback methods registered to the returned
044 * {@link CompletableFuture} will be executed in this thread pool. So usually it is safe for users
045 * to do anything they want in the callbacks without breaking the rpc framework.
046 */
047@InterfaceAudience.Private
048class AsyncTableImpl implements AsyncTable<ScanResultConsumer> {
049
050  private final RawAsyncTableImpl rawTable;
051
052  private final ExecutorService pool;
053
054  AsyncTableImpl(RawAsyncTableImpl rawTable, ExecutorService pool) {
055    this.rawTable = rawTable;
056    this.pool = pool;
057  }
058
059  @Override
060  public TableName getName() {
061    return rawTable.getName();
062  }
063
064  @Override
065  public Configuration getConfiguration() {
066    return rawTable.getConfiguration();
067  }
068
069  @Override
070  public CompletableFuture<TableDescriptor> getDescriptor() {
071    return wrap(rawTable.getDescriptor());
072  }
073
074  @Override
075  public AsyncTableRegionLocator getRegionLocator() {
076    return rawTable.getRegionLocator();
077  }
078
079  @Override
080  public long getRpcTimeout(TimeUnit unit) {
081    return rawTable.getRpcTimeout(unit);
082  }
083
084  @Override
085  public long getReadRpcTimeout(TimeUnit unit) {
086    return rawTable.getReadRpcTimeout(unit);
087  }
088
089  @Override
090  public long getWriteRpcTimeout(TimeUnit unit) {
091    return rawTable.getWriteRpcTimeout(unit);
092  }
093
094  @Override
095  public long getOperationTimeout(TimeUnit unit) {
096    return rawTable.getOperationTimeout(unit);
097  }
098
099  @Override
100  public long getScanTimeout(TimeUnit unit) {
101    return rawTable.getScanTimeout(unit);
102  }
103
104  private <T> CompletableFuture<T> wrap(CompletableFuture<T> future) {
105    return FutureUtils.wrapFuture(future, pool);
106  }
107
108  @Override
109  public CompletableFuture<Result> get(Get get) {
110    return wrap(rawTable.get(get));
111  }
112
113  @Override
114  public CompletableFuture<Void> put(Put put) {
115    return wrap(rawTable.put(put));
116  }
117
118  @Override
119  public CompletableFuture<Void> delete(Delete delete) {
120    return wrap(rawTable.delete(delete));
121  }
122
123  @Override
124  public CompletableFuture<Result> append(Append append) {
125    return wrap(rawTable.append(append));
126  }
127
128  @Override
129  public CompletableFuture<Result> increment(Increment increment) {
130    return wrap(rawTable.increment(increment));
131  }
132
133  @Override
134  public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
135    return new CheckAndMutateBuilder() {
136
137      private final CheckAndMutateBuilder builder = rawTable.checkAndMutate(row, family);
138
139      @Override
140      public CompletableFuture<Boolean> thenPut(Put put) {
141        return wrap(builder.thenPut(put));
142      }
143
144      @Override
145      public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
146        return wrap(builder.thenMutate(mutation));
147      }
148
149      @Override
150      public CompletableFuture<Boolean> thenDelete(Delete delete) {
151        return wrap(builder.thenDelete(delete));
152      }
153
154      @Override
155      public CheckAndMutateBuilder qualifier(byte[] qualifier) {
156        builder.qualifier(qualifier);
157        return this;
158      }
159
160      @Override
161      public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
162        builder.timeRange(timeRange);
163        return this;
164      }
165
166      @Override
167      public CheckAndMutateBuilder ifNotExists() {
168        builder.ifNotExists();
169        return this;
170      }
171
172      @Override
173      public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
174        builder.ifMatches(compareOp, value);
175        return this;
176      }
177    };
178  }
179
180  @Override
181  public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
182    return new CheckAndMutateWithFilterBuilder() {
183
184      private final CheckAndMutateWithFilterBuilder builder = rawTable.checkAndMutate(row, filter);
185
186      @Override
187      public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
188        builder.timeRange(timeRange);
189        return this;
190      }
191
192      @Override
193      public CompletableFuture<Boolean> thenPut(Put put) {
194        return wrap(builder.thenPut(put));
195      }
196
197      @Override
198      public CompletableFuture<Boolean> thenDelete(Delete delete) {
199        return wrap(builder.thenDelete(delete));
200      }
201
202      @Override
203      public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
204        return wrap(builder.thenMutate(mutation));
205      }
206    };
207  }
208
209  @Override
210  public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) {
211    return wrap(rawTable.checkAndMutate(checkAndMutate));
212  }
213
214  @Override
215  public List<CompletableFuture<CheckAndMutateResult>>
216    checkAndMutate(List<CheckAndMutate> checkAndMutates) {
217    return rawTable.checkAndMutate(checkAndMutates).stream().map(this::wrap).collect(toList());
218  }
219
220  @Override
221  public CompletableFuture<Result> mutateRow(RowMutations mutation) {
222    return wrap(rawTable.mutateRow(mutation));
223  }
224
225  @Override
226  public CompletableFuture<List<Result>> scanAll(Scan scan) {
227    return wrap(rawTable.scanAll(scan));
228  }
229
230  @Override
231  public ResultScanner getScanner(Scan scan) {
232    return rawTable.getScanner(scan);
233  }
234
235  private void scan0(Scan scan, ScanResultConsumer consumer) {
236    Span span = null;
237    try (AsyncTableResultScanner scanner = rawTable.getScanner(scan)) {
238      span = scanner.getSpan();
239      try (Scope ignored = span.makeCurrent()) {
240        if (scan.isScanMetricsEnabled()) {
241          consumer.onScanMetricsCreated(scanner.getScanMetrics());
242        }
243        for (Result result; (result = scanner.next()) != null;) {
244          if (!consumer.onNext(result)) {
245            break;
246          }
247        }
248        consumer.onComplete();
249      }
250    } catch (IOException e) {
251      try (Scope ignored = span.makeCurrent()) {
252        consumer.onError(e);
253      }
254    }
255  }
256
257  @Override
258  public void scan(Scan scan, ScanResultConsumer consumer) {
259    final Context context = Context.current();
260    pool.execute(context.wrap(() -> scan0(scan, consumer)));
261  }
262
263  @Override
264  public List<CompletableFuture<Result>> get(List<Get> gets) {
265    return rawTable.get(gets).stream().map(this::wrap).collect(toList());
266  }
267
268  @Override
269  public List<CompletableFuture<Void>> put(List<Put> puts) {
270    return rawTable.put(puts).stream().map(this::wrap).collect(toList());
271  }
272
273  @Override
274  public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
275    return rawTable.delete(deletes).stream().map(this::wrap).collect(toList());
276  }
277
278  @Override
279  public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
280    return rawTable.<T> batch(actions).stream().map(this::wrap).collect(toList());
281  }
282
283  @Override
284  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
285    ServiceCaller<S, R> callable, byte[] row) {
286    return wrap(rawTable.coprocessorService(stubMaker, callable, row));
287  }
288
289  @Override
290  public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(
291    Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable,
292    CoprocessorCallback<R> callback) {
293    final Context context = Context.current();
294    CoprocessorCallback<R> wrappedCallback = new CoprocessorCallback<R>() {
295
296      @Override
297      public void onRegionComplete(RegionInfo region, R resp) {
298        pool.execute(context.wrap(() -> callback.onRegionComplete(region, resp)));
299      }
300
301      @Override
302      public void onRegionError(RegionInfo region, Throwable error) {
303        pool.execute(context.wrap(() -> callback.onRegionError(region, error)));
304      }
305
306      @Override
307      public void onComplete() {
308        pool.execute(context.wrap(callback::onComplete));
309      }
310
311      @Override
312      public void onError(Throwable error) {
313        pool.execute(context.wrap(() -> callback.onError(error)));
314      }
315    };
316    CoprocessorServiceBuilder<S, R> builder =
317      rawTable.coprocessorService(stubMaker, callable, wrappedCallback);
318    return new CoprocessorServiceBuilder<S, R>() {
319
320      @Override
321      public CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey, boolean inclusive) {
322        builder.fromRow(startKey, inclusive);
323        return this;
324      }
325
326      @Override
327      public CoprocessorServiceBuilder<S, R> toRow(byte[] endKey, boolean inclusive) {
328        builder.toRow(endKey, inclusive);
329        return this;
330      }
331
332      @Override
333      public void execute() {
334        builder.execute();
335      }
336    };
337  }
338}