001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static java.util.stream.Collectors.toList;
021
022import com.google.protobuf.RpcChannel;
023import io.opentelemetry.api.trace.Span;
024import io.opentelemetry.context.Context;
025import io.opentelemetry.context.Scope;
026import java.io.IOException;
027import java.util.List;
028import java.util.concurrent.CompletableFuture;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.TimeUnit;
031import java.util.function.Function;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.CompareOperator;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.filter.Filter;
036import org.apache.hadoop.hbase.io.TimeRange;
037import org.apache.hadoop.hbase.util.FutureUtils;
038import org.apache.yetus.audience.InterfaceAudience;
039
040/**
041 * Just a wrapper of {@link RawAsyncTableImpl}. The difference is that users need to provide a
042 * thread pool when constructing this class, and the callback methods registered to the returned
043 * {@link CompletableFuture} will be executed in this thread pool. So usually it is safe for users
044 * to do anything they want in the callbacks without breaking the rpc framework.
045 */
046@InterfaceAudience.Private
047class AsyncTableImpl implements AsyncTable<ScanResultConsumer> {
048
049  private final RawAsyncTableImpl rawTable;
050
051  private final ExecutorService pool;
052
053  AsyncTableImpl(RawAsyncTableImpl rawTable, ExecutorService pool) {
054    this.rawTable = rawTable;
055    this.pool = pool;
056  }
057
058  @Override
059  public TableName getName() {
060    return rawTable.getName();
061  }
062
063  @Override
064  public Configuration getConfiguration() {
065    return rawTable.getConfiguration();
066  }
067
068  @Override
069  public CompletableFuture<TableDescriptor> getDescriptor() {
070    return wrap(rawTable.getDescriptor());
071  }
072
073  @Override
074  public AsyncTableRegionLocator getRegionLocator() {
075    return rawTable.getRegionLocator();
076  }
077
078  @Override
079  public long getRpcTimeout(TimeUnit unit) {
080    return rawTable.getRpcTimeout(unit);
081  }
082
083  @Override
084  public long getReadRpcTimeout(TimeUnit unit) {
085    return rawTable.getReadRpcTimeout(unit);
086  }
087
088  @Override
089  public long getWriteRpcTimeout(TimeUnit unit) {
090    return rawTable.getWriteRpcTimeout(unit);
091  }
092
093  @Override
094  public long getOperationTimeout(TimeUnit unit) {
095    return rawTable.getOperationTimeout(unit);
096  }
097
098  @Override
099  public long getScanTimeout(TimeUnit unit) {
100    return rawTable.getScanTimeout(unit);
101  }
102
103  private <T> CompletableFuture<T> wrap(CompletableFuture<T> future) {
104    return FutureUtils.wrapFuture(future, pool);
105  }
106
107  @Override
108  public CompletableFuture<Result> get(Get get) {
109    return wrap(rawTable.get(get));
110  }
111
112  @Override
113  public CompletableFuture<Void> put(Put put) {
114    return wrap(rawTable.put(put));
115  }
116
117  @Override
118  public CompletableFuture<Void> delete(Delete delete) {
119    return wrap(rawTable.delete(delete));
120  }
121
122  @Override
123  public CompletableFuture<Result> append(Append append) {
124    return wrap(rawTable.append(append));
125  }
126
127  @Override
128  public CompletableFuture<Result> increment(Increment increment) {
129    return wrap(rawTable.increment(increment));
130  }
131
132  @Override
133  public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
134    return new CheckAndMutateBuilder() {
135
136      private final CheckAndMutateBuilder builder = rawTable.checkAndMutate(row, family);
137
138      @Override
139      public CompletableFuture<Boolean> thenPut(Put put) {
140        return wrap(builder.thenPut(put));
141      }
142
143      @Override
144      public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
145        return wrap(builder.thenMutate(mutation));
146      }
147
148      @Override
149      public CompletableFuture<Boolean> thenDelete(Delete delete) {
150        return wrap(builder.thenDelete(delete));
151      }
152
153      @Override
154      public CheckAndMutateBuilder qualifier(byte[] qualifier) {
155        builder.qualifier(qualifier);
156        return this;
157      }
158
159      @Override
160      public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
161        builder.timeRange(timeRange);
162        return this;
163      }
164
165      @Override
166      public CheckAndMutateBuilder ifNotExists() {
167        builder.ifNotExists();
168        return this;
169      }
170
171      @Override
172      public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
173        builder.ifMatches(compareOp, value);
174        return this;
175      }
176    };
177  }
178
179  @Override
180  public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
181    return new CheckAndMutateWithFilterBuilder() {
182
183      private final CheckAndMutateWithFilterBuilder builder = rawTable.checkAndMutate(row, filter);
184
185      @Override
186      public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
187        builder.timeRange(timeRange);
188        return this;
189      }
190
191      @Override
192      public CompletableFuture<Boolean> thenPut(Put put) {
193        return wrap(builder.thenPut(put));
194      }
195
196      @Override
197      public CompletableFuture<Boolean> thenDelete(Delete delete) {
198        return wrap(builder.thenDelete(delete));
199      }
200
201      @Override
202      public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
203        return wrap(builder.thenMutate(mutation));
204      }
205    };
206  }
207
208  @Override
209  public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) {
210    return wrap(rawTable.checkAndMutate(checkAndMutate));
211  }
212
213  @Override
214  public List<CompletableFuture<CheckAndMutateResult>>
215    checkAndMutate(List<CheckAndMutate> checkAndMutates) {
216    return rawTable.checkAndMutate(checkAndMutates).stream().map(this::wrap).collect(toList());
217  }
218
219  @Override
220  public CompletableFuture<Result> mutateRow(RowMutations mutation) {
221    return wrap(rawTable.mutateRow(mutation));
222  }
223
224  @Override
225  public CompletableFuture<List<Result>> scanAll(Scan scan) {
226    return wrap(rawTable.scanAll(scan));
227  }
228
229  @Override
230  public ResultScanner getScanner(Scan scan) {
231    return rawTable.getScanner(scan);
232  }
233
234  private void scan0(Scan scan, ScanResultConsumer consumer) {
235    Span span = null;
236    try (AsyncTableResultScanner scanner = rawTable.getScanner(scan)) {
237      span = scanner.getSpan();
238      try (Scope ignored = span.makeCurrent()) {
239        consumer.onScanMetricsCreated(scanner.getScanMetrics());
240        for (Result result; (result = scanner.next()) != null;) {
241          if (!consumer.onNext(result)) {
242            break;
243          }
244        }
245        consumer.onComplete();
246      }
247    } catch (IOException e) {
248      try (Scope ignored = span.makeCurrent()) {
249        consumer.onError(e);
250      }
251    }
252  }
253
254  @Override
255  public void scan(Scan scan, ScanResultConsumer consumer) {
256    final Context context = Context.current();
257    pool.execute(context.wrap(() -> scan0(scan, consumer)));
258  }
259
260  @Override
261  public List<CompletableFuture<Result>> get(List<Get> gets) {
262    return rawTable.get(gets).stream().map(this::wrap).collect(toList());
263  }
264
265  @Override
266  public List<CompletableFuture<Void>> put(List<Put> puts) {
267    return rawTable.put(puts).stream().map(this::wrap).collect(toList());
268  }
269
270  @Override
271  public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
272    return rawTable.delete(deletes).stream().map(this::wrap).collect(toList());
273  }
274
275  @Override
276  public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
277    return rawTable.<T> batch(actions).stream().map(this::wrap).collect(toList());
278  }
279
280  @Override
281  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
282    ServiceCaller<S, R> callable, byte[] row) {
283    return wrap(rawTable.coprocessorService(stubMaker, callable, row));
284  }
285
286  @Override
287  public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(
288    Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable,
289    CoprocessorCallback<R> callback) {
290    final Context context = Context.current();
291    CoprocessorCallback<R> wrappedCallback = new CoprocessorCallback<R>() {
292
293      @Override
294      public void onRegionComplete(RegionInfo region, R resp) {
295        pool.execute(context.wrap(() -> callback.onRegionComplete(region, resp)));
296      }
297
298      @Override
299      public void onRegionError(RegionInfo region, Throwable error) {
300        pool.execute(context.wrap(() -> callback.onRegionError(region, error)));
301      }
302
303      @Override
304      public void onComplete() {
305        pool.execute(context.wrap(callback::onComplete));
306      }
307
308      @Override
309      public void onError(Throwable error) {
310        pool.execute(context.wrap(() -> callback.onError(error)));
311      }
312    };
313    CoprocessorServiceBuilder<S, R> builder =
314      rawTable.coprocessorService(stubMaker, callable, wrappedCallback);
315    return new CoprocessorServiceBuilder<S, R>() {
316
317      @Override
318      public CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey, boolean inclusive) {
319        builder.fromRow(startKey, inclusive);
320        return this;
321      }
322
323      @Override
324      public CoprocessorServiceBuilder<S, R> toRow(byte[] endKey, boolean inclusive) {
325        builder.toRow(endKey, inclusive);
326        return this;
327      }
328
329      @Override
330      public void execute() {
331        builder.execute();
332      }
333    };
334  }
335}