001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static java.util.stream.Collectors.toList; 021 022import io.opentelemetry.api.trace.Span; 023import io.opentelemetry.context.Context; 024import io.opentelemetry.context.Scope; 025import java.io.IOException; 026import java.time.Duration; 027import java.util.List; 028import java.util.Map; 029import java.util.concurrent.CompletableFuture; 030import java.util.concurrent.ExecutorService; 031import java.util.concurrent.Phaser; 032import java.util.concurrent.TimeUnit; 033import java.util.function.Function; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.CompareOperator; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.filter.Filter; 038import org.apache.hadoop.hbase.io.TimeRange; 039import org.apache.hadoop.hbase.util.FutureUtils; 040import org.apache.yetus.audience.InterfaceAudience; 041 042import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; 043 044/** 045 * Just a wrapper of {@link RawAsyncTableImpl}. The difference is that users need to provide a 046 * thread pool when constructing this class, and the callback methods registered to the returned 047 * {@link CompletableFuture} will be executed in this thread pool. So usually it is safe for users 048 * to do anything they want in the callbacks without breaking the rpc framework. 049 */ 050@InterfaceAudience.Private 051class AsyncTableImpl implements AsyncTable<ScanResultConsumer> { 052 053 private final RawAsyncTableImpl rawTable; 054 055 private final ExecutorService pool; 056 057 AsyncTableImpl(RawAsyncTableImpl rawTable, ExecutorService pool) { 058 this.rawTable = rawTable; 059 this.pool = pool; 060 } 061 062 @Override 063 public TableName getName() { 064 return rawTable.getName(); 065 } 066 067 @Override 068 public Configuration getConfiguration() { 069 return rawTable.getConfiguration(); 070 } 071 072 @Override 073 public CompletableFuture<TableDescriptor> getDescriptor() { 074 return wrap(rawTable.getDescriptor()); 075 } 076 077 @Override 078 public AsyncTableRegionLocator getRegionLocator() { 079 return rawTable.getRegionLocator(); 080 } 081 082 @Override 083 public long getRpcTimeout(TimeUnit unit) { 084 return rawTable.getRpcTimeout(unit); 085 } 086 087 @Override 088 public long getReadRpcTimeout(TimeUnit unit) { 089 return rawTable.getReadRpcTimeout(unit); 090 } 091 092 @Override 093 public long getWriteRpcTimeout(TimeUnit unit) { 094 return rawTable.getWriteRpcTimeout(unit); 095 } 096 097 @Override 098 public long getOperationTimeout(TimeUnit unit) { 099 return rawTable.getOperationTimeout(unit); 100 } 101 102 @Override 103 public long getScanTimeout(TimeUnit unit) { 104 return rawTable.getScanTimeout(unit); 105 } 106 107 @Override 108 public Map<String, byte[]> getRequestAttributes() { 109 return rawTable.getRequestAttributes(); 110 } 111 112 private <T> CompletableFuture<T> wrap(CompletableFuture<T> future) { 113 return FutureUtils.wrapFuture(future, pool); 114 } 115 116 @Override 117 public CompletableFuture<Result> get(Get get) { 118 return wrap(rawTable.get(get)); 119 } 120 121 @Override 122 public CompletableFuture<Void> put(Put put) { 123 return wrap(rawTable.put(put)); 124 } 125 126 @Override 127 public CompletableFuture<Void> delete(Delete delete) { 128 return wrap(rawTable.delete(delete)); 129 } 130 131 @Override 132 public CompletableFuture<Result> append(Append append) { 133 return wrap(rawTable.append(append)); 134 } 135 136 @Override 137 public CompletableFuture<Result> increment(Increment increment) { 138 return wrap(rawTable.increment(increment)); 139 } 140 141 @Override 142 public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { 143 return new CheckAndMutateBuilder() { 144 145 private final CheckAndMutateBuilder builder = rawTable.checkAndMutate(row, family); 146 147 @Override 148 public CompletableFuture<Boolean> thenPut(Put put) { 149 return wrap(builder.thenPut(put)); 150 } 151 152 @Override 153 public CompletableFuture<Boolean> thenMutate(RowMutations mutation) { 154 return wrap(builder.thenMutate(mutation)); 155 } 156 157 @Override 158 public CompletableFuture<Boolean> thenDelete(Delete delete) { 159 return wrap(builder.thenDelete(delete)); 160 } 161 162 @Override 163 public CheckAndMutateBuilder qualifier(byte[] qualifier) { 164 builder.qualifier(qualifier); 165 return this; 166 } 167 168 @Override 169 public CheckAndMutateBuilder timeRange(TimeRange timeRange) { 170 builder.timeRange(timeRange); 171 return this; 172 } 173 174 @Override 175 public CheckAndMutateBuilder ifNotExists() { 176 builder.ifNotExists(); 177 return this; 178 } 179 180 @Override 181 public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) { 182 builder.ifMatches(compareOp, value); 183 return this; 184 } 185 }; 186 } 187 188 @Override 189 public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) { 190 return new CheckAndMutateWithFilterBuilder() { 191 192 private final CheckAndMutateWithFilterBuilder builder = rawTable.checkAndMutate(row, filter); 193 194 @Override 195 public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) { 196 builder.timeRange(timeRange); 197 return this; 198 } 199 200 @Override 201 public CompletableFuture<Boolean> thenPut(Put put) { 202 return wrap(builder.thenPut(put)); 203 } 204 205 @Override 206 public CompletableFuture<Boolean> thenDelete(Delete delete) { 207 return wrap(builder.thenDelete(delete)); 208 } 209 210 @Override 211 public CompletableFuture<Boolean> thenMutate(RowMutations mutation) { 212 return wrap(builder.thenMutate(mutation)); 213 } 214 }; 215 } 216 217 @Override 218 public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) { 219 return wrap(rawTable.checkAndMutate(checkAndMutate)); 220 } 221 222 @Override 223 public List<CompletableFuture<CheckAndMutateResult>> 224 checkAndMutate(List<CheckAndMutate> checkAndMutates) { 225 return rawTable.checkAndMutate(checkAndMutates).stream().map(this::wrap).collect(toList()); 226 } 227 228 @Override 229 public CompletableFuture<Result> mutateRow(RowMutations mutation) { 230 return wrap(rawTable.mutateRow(mutation)); 231 } 232 233 @Override 234 public CompletableFuture<List<Result>> scanAll(Scan scan) { 235 return wrap(rawTable.scanAll(scan)); 236 } 237 238 @Override 239 public ResultScanner getScanner(Scan scan) { 240 return rawTable.getScanner(scan); 241 } 242 243 private void scan0(Scan scan, ScanResultConsumer consumer) { 244 Span span = null; 245 try (AsyncTableResultScanner scanner = rawTable.getScanner(scan)) { 246 span = scanner.getSpan(); 247 try (Scope ignored = span.makeCurrent()) { 248 if (scan.isScanMetricsEnabled()) { 249 consumer.onScanMetricsCreated(scanner.getScanMetrics()); 250 } 251 for (Result result; (result = scanner.next()) != null;) { 252 if (!consumer.onNext(result)) { 253 break; 254 } 255 } 256 consumer.onComplete(); 257 } 258 } catch (IOException e) { 259 try (Scope ignored = span.makeCurrent()) { 260 consumer.onError(e); 261 } 262 } 263 } 264 265 @Override 266 public void scan(Scan scan, ScanResultConsumer consumer) { 267 final Context context = Context.current(); 268 pool.execute(context.wrap(() -> scan0(scan, consumer))); 269 } 270 271 @Override 272 public List<CompletableFuture<Result>> get(List<Get> gets) { 273 return rawTable.get(gets).stream().map(this::wrap).collect(toList()); 274 } 275 276 @Override 277 public List<CompletableFuture<Void>> put(List<Put> puts) { 278 return rawTable.put(puts).stream().map(this::wrap).collect(toList()); 279 } 280 281 @Override 282 public List<CompletableFuture<Void>> delete(List<Delete> deletes) { 283 return rawTable.delete(deletes).stream().map(this::wrap).collect(toList()); 284 } 285 286 @Override 287 public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) { 288 return rawTable.<T> batch(actions).stream().map(this::wrap).collect(toList()); 289 } 290 291 @Override 292 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 293 ServiceCaller<S, R> callable, byte[] row) { 294 return wrap(rawTable.coprocessorService(stubMaker, callable, row)); 295 } 296 297 @Override 298 public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService( 299 Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable, 300 CoprocessorCallback<R> callback) { 301 return coprocessorService(stubMaker, callable, 302 new NoopPartialResultCoprocessorCallback<>(callback)); 303 } 304 305 @Override 306 public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService( 307 Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable, 308 PartialResultCoprocessorCallback<S, R> callback) { 309 final Context context = Context.current(); 310 PartialResultCoprocessorCallback<S, R> wrappedCallback = 311 new PartialResultCoprocessorCallback<S, R>() { 312 313 private final Phaser regionCompletesInProgress = new Phaser(1); 314 315 @Override 316 public void onRegionComplete(RegionInfo region, R resp) { 317 regionCompletesInProgress.register(); 318 pool.execute(context.wrap(() -> { 319 try { 320 callback.onRegionComplete(region, resp); 321 } finally { 322 regionCompletesInProgress.arriveAndDeregister(); 323 } 324 })); 325 } 326 327 @Override 328 public void onRegionError(RegionInfo region, Throwable error) { 329 regionCompletesInProgress.register(); 330 pool.execute(context.wrap(() -> { 331 try { 332 callback.onRegionError(region, error); 333 } finally { 334 regionCompletesInProgress.arriveAndDeregister(); 335 } 336 })); 337 } 338 339 @Override 340 public void onComplete() { 341 pool.execute(context.wrap(() -> { 342 // Guarantee that onComplete() is called after all onRegionComplete()'s are called 343 regionCompletesInProgress.arriveAndAwaitAdvance(); 344 callback.onComplete(); 345 })); 346 } 347 348 @Override 349 public void onError(Throwable error) { 350 pool.execute(context.wrap(() -> callback.onError(error))); 351 } 352 353 @Override 354 public ServiceCaller<S, R> getNextCallable(R response, RegionInfo region) { 355 return callback.getNextCallable(response, region); 356 } 357 358 @Override 359 public Duration getWaitInterval(R response, RegionInfo region) { 360 return callback.getWaitInterval(response, region); 361 } 362 363 }; 364 CoprocessorServiceBuilder<S, R> builder = 365 rawTable.coprocessorService(stubMaker, callable, wrappedCallback); 366 return new CoprocessorServiceBuilder<S, R>() { 367 368 @Override 369 public CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey, boolean inclusive) { 370 builder.fromRow(startKey, inclusive); 371 return this; 372 } 373 374 @Override 375 public CoprocessorServiceBuilder<S, R> toRow(byte[] endKey, boolean inclusive) { 376 builder.toRow(endKey, inclusive); 377 return this; 378 } 379 380 @Override 381 public void execute() { 382 builder.execute(); 383 } 384 }; 385 } 386}