001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static java.util.stream.Collectors.toList; 021 022import io.opentelemetry.api.trace.Span; 023import io.opentelemetry.context.Context; 024import io.opentelemetry.context.Scope; 025import java.io.IOException; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.CompletableFuture; 029import java.util.concurrent.ExecutorService; 030import java.util.concurrent.TimeUnit; 031import java.util.function.Function; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.CompareOperator; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.filter.Filter; 036import org.apache.hadoop.hbase.io.TimeRange; 037import org.apache.hadoop.hbase.util.FutureUtils; 038import org.apache.yetus.audience.InterfaceAudience; 039 040import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; 041 042/** 043 * Just a wrapper of {@link RawAsyncTableImpl}. The difference is that users need to provide a 044 * thread pool when constructing this class, and the callback methods registered to the returned 045 * {@link CompletableFuture} will be executed in this thread pool. So usually it is safe for users 046 * to do anything they want in the callbacks without breaking the rpc framework. 047 */ 048@InterfaceAudience.Private 049class AsyncTableImpl implements AsyncTable<ScanResultConsumer> { 050 051 private final RawAsyncTableImpl rawTable; 052 053 private final ExecutorService pool; 054 055 AsyncTableImpl(RawAsyncTableImpl rawTable, ExecutorService pool) { 056 this.rawTable = rawTable; 057 this.pool = pool; 058 } 059 060 @Override 061 public TableName getName() { 062 return rawTable.getName(); 063 } 064 065 @Override 066 public Configuration getConfiguration() { 067 return rawTable.getConfiguration(); 068 } 069 070 @Override 071 public CompletableFuture<TableDescriptor> getDescriptor() { 072 return wrap(rawTable.getDescriptor()); 073 } 074 075 @Override 076 public AsyncTableRegionLocator getRegionLocator() { 077 return rawTable.getRegionLocator(); 078 } 079 080 @Override 081 public long getRpcTimeout(TimeUnit unit) { 082 return rawTable.getRpcTimeout(unit); 083 } 084 085 @Override 086 public long getReadRpcTimeout(TimeUnit unit) { 087 return rawTable.getReadRpcTimeout(unit); 088 } 089 090 @Override 091 public long getWriteRpcTimeout(TimeUnit unit) { 092 return rawTable.getWriteRpcTimeout(unit); 093 } 094 095 @Override 096 public long getOperationTimeout(TimeUnit unit) { 097 return rawTable.getOperationTimeout(unit); 098 } 099 100 @Override 101 public long getScanTimeout(TimeUnit unit) { 102 return rawTable.getScanTimeout(unit); 103 } 104 105 @Override 106 public Map<String, byte[]> getRequestAttributes() { 107 return rawTable.getRequestAttributes(); 108 } 109 110 private <T> CompletableFuture<T> wrap(CompletableFuture<T> future) { 111 return FutureUtils.wrapFuture(future, pool); 112 } 113 114 @Override 115 public CompletableFuture<Result> get(Get get) { 116 return wrap(rawTable.get(get)); 117 } 118 119 @Override 120 public CompletableFuture<Void> put(Put put) { 121 return wrap(rawTable.put(put)); 122 } 123 124 @Override 125 public CompletableFuture<Void> delete(Delete delete) { 126 return wrap(rawTable.delete(delete)); 127 } 128 129 @Override 130 public CompletableFuture<Result> append(Append append) { 131 return wrap(rawTable.append(append)); 132 } 133 134 @Override 135 public CompletableFuture<Result> increment(Increment increment) { 136 return wrap(rawTable.increment(increment)); 137 } 138 139 @Override 140 public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { 141 return new CheckAndMutateBuilder() { 142 143 private final CheckAndMutateBuilder builder = rawTable.checkAndMutate(row, family); 144 145 @Override 146 public CompletableFuture<Boolean> thenPut(Put put) { 147 return wrap(builder.thenPut(put)); 148 } 149 150 @Override 151 public CompletableFuture<Boolean> thenMutate(RowMutations mutation) { 152 return wrap(builder.thenMutate(mutation)); 153 } 154 155 @Override 156 public CompletableFuture<Boolean> thenDelete(Delete delete) { 157 return wrap(builder.thenDelete(delete)); 158 } 159 160 @Override 161 public CheckAndMutateBuilder qualifier(byte[] qualifier) { 162 builder.qualifier(qualifier); 163 return this; 164 } 165 166 @Override 167 public CheckAndMutateBuilder timeRange(TimeRange timeRange) { 168 builder.timeRange(timeRange); 169 return this; 170 } 171 172 @Override 173 public CheckAndMutateBuilder ifNotExists() { 174 builder.ifNotExists(); 175 return this; 176 } 177 178 @Override 179 public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) { 180 builder.ifMatches(compareOp, value); 181 return this; 182 } 183 }; 184 } 185 186 @Override 187 public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) { 188 return new CheckAndMutateWithFilterBuilder() { 189 190 private final CheckAndMutateWithFilterBuilder builder = rawTable.checkAndMutate(row, filter); 191 192 @Override 193 public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) { 194 builder.timeRange(timeRange); 195 return this; 196 } 197 198 @Override 199 public CompletableFuture<Boolean> thenPut(Put put) { 200 return wrap(builder.thenPut(put)); 201 } 202 203 @Override 204 public CompletableFuture<Boolean> thenDelete(Delete delete) { 205 return wrap(builder.thenDelete(delete)); 206 } 207 208 @Override 209 public CompletableFuture<Boolean> thenMutate(RowMutations mutation) { 210 return wrap(builder.thenMutate(mutation)); 211 } 212 }; 213 } 214 215 @Override 216 public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) { 217 return wrap(rawTable.checkAndMutate(checkAndMutate)); 218 } 219 220 @Override 221 public List<CompletableFuture<CheckAndMutateResult>> 222 checkAndMutate(List<CheckAndMutate> checkAndMutates) { 223 return rawTable.checkAndMutate(checkAndMutates).stream().map(this::wrap).collect(toList()); 224 } 225 226 @Override 227 public CompletableFuture<Result> mutateRow(RowMutations mutation) { 228 return wrap(rawTable.mutateRow(mutation)); 229 } 230 231 @Override 232 public CompletableFuture<List<Result>> scanAll(Scan scan) { 233 return wrap(rawTable.scanAll(scan)); 234 } 235 236 @Override 237 public ResultScanner getScanner(Scan scan) { 238 return rawTable.getScanner(scan); 239 } 240 241 private void scan0(Scan scan, ScanResultConsumer consumer) { 242 Span span = null; 243 try (AsyncTableResultScanner scanner = rawTable.getScanner(scan)) { 244 span = scanner.getSpan(); 245 try (Scope ignored = span.makeCurrent()) { 246 if (scan.isScanMetricsEnabled()) { 247 consumer.onScanMetricsCreated(scanner.getScanMetrics()); 248 } 249 for (Result result; (result = scanner.next()) != null;) { 250 if (!consumer.onNext(result)) { 251 break; 252 } 253 } 254 consumer.onComplete(); 255 } 256 } catch (IOException e) { 257 try (Scope ignored = span.makeCurrent()) { 258 consumer.onError(e); 259 } 260 } 261 } 262 263 @Override 264 public void scan(Scan scan, ScanResultConsumer consumer) { 265 final Context context = Context.current(); 266 pool.execute(context.wrap(() -> scan0(scan, consumer))); 267 } 268 269 @Override 270 public List<CompletableFuture<Result>> get(List<Get> gets) { 271 return rawTable.get(gets).stream().map(this::wrap).collect(toList()); 272 } 273 274 @Override 275 public List<CompletableFuture<Void>> put(List<Put> puts) { 276 return rawTable.put(puts).stream().map(this::wrap).collect(toList()); 277 } 278 279 @Override 280 public List<CompletableFuture<Void>> delete(List<Delete> deletes) { 281 return rawTable.delete(deletes).stream().map(this::wrap).collect(toList()); 282 } 283 284 @Override 285 public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) { 286 return rawTable.<T> batch(actions).stream().map(this::wrap).collect(toList()); 287 } 288 289 @Override 290 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 291 ServiceCaller<S, R> callable, byte[] row) { 292 return wrap(rawTable.coprocessorService(stubMaker, callable, row)); 293 } 294 295 @Override 296 public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService( 297 Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable, 298 CoprocessorCallback<R> callback) { 299 final Context context = Context.current(); 300 CoprocessorCallback<R> wrappedCallback = new CoprocessorCallback<R>() { 301 302 @Override 303 public void onRegionComplete(RegionInfo region, R resp) { 304 pool.execute(context.wrap(() -> callback.onRegionComplete(region, resp))); 305 } 306 307 @Override 308 public void onRegionError(RegionInfo region, Throwable error) { 309 pool.execute(context.wrap(() -> callback.onRegionError(region, error))); 310 } 311 312 @Override 313 public void onComplete() { 314 pool.execute(context.wrap(callback::onComplete)); 315 } 316 317 @Override 318 public void onError(Throwable error) { 319 pool.execute(context.wrap(() -> callback.onError(error))); 320 } 321 }; 322 CoprocessorServiceBuilder<S, R> builder = 323 rawTable.coprocessorService(stubMaker, callable, wrappedCallback); 324 return new CoprocessorServiceBuilder<S, R>() { 325 326 @Override 327 public CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey, boolean inclusive) { 328 builder.fromRow(startKey, inclusive); 329 return this; 330 } 331 332 @Override 333 public CoprocessorServiceBuilder<S, R> toRow(byte[] endKey, boolean inclusive) { 334 builder.toRow(endKey, inclusive); 335 return this; 336 } 337 338 @Override 339 public void execute() { 340 builder.execute(); 341 } 342 }; 343 } 344}