001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static java.util.stream.Collectors.toList; 021 022import com.google.protobuf.RpcChannel; 023import io.opentelemetry.api.trace.Span; 024import io.opentelemetry.context.Context; 025import io.opentelemetry.context.Scope; 026import java.io.IOException; 027import java.util.List; 028import java.util.concurrent.CompletableFuture; 029import java.util.concurrent.ExecutorService; 030import java.util.concurrent.TimeUnit; 031import java.util.function.Function; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.CompareOperator; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.filter.Filter; 036import org.apache.hadoop.hbase.io.TimeRange; 037import org.apache.hadoop.hbase.util.FutureUtils; 038import org.apache.yetus.audience.InterfaceAudience; 039 040/** 041 * Just a wrapper of {@link RawAsyncTableImpl}. The difference is that users need to provide a 042 * thread pool when constructing this class, and the callback methods registered to the returned 043 * {@link CompletableFuture} will be executed in this thread pool. So usually it is safe for users 044 * to do anything they want in the callbacks without breaking the rpc framework. 045 */ 046@InterfaceAudience.Private 047class AsyncTableImpl implements AsyncTable<ScanResultConsumer> { 048 049 private final RawAsyncTableImpl rawTable; 050 051 private final ExecutorService pool; 052 053 AsyncTableImpl(RawAsyncTableImpl rawTable, ExecutorService pool) { 054 this.rawTable = rawTable; 055 this.pool = pool; 056 } 057 058 @Override 059 public TableName getName() { 060 return rawTable.getName(); 061 } 062 063 @Override 064 public Configuration getConfiguration() { 065 return rawTable.getConfiguration(); 066 } 067 068 @Override 069 public CompletableFuture<TableDescriptor> getDescriptor() { 070 return wrap(rawTable.getDescriptor()); 071 } 072 073 @Override 074 public AsyncTableRegionLocator getRegionLocator() { 075 return rawTable.getRegionLocator(); 076 } 077 078 @Override 079 public long getRpcTimeout(TimeUnit unit) { 080 return rawTable.getRpcTimeout(unit); 081 } 082 083 @Override 084 public long getReadRpcTimeout(TimeUnit unit) { 085 return rawTable.getReadRpcTimeout(unit); 086 } 087 088 @Override 089 public long getWriteRpcTimeout(TimeUnit unit) { 090 return rawTable.getWriteRpcTimeout(unit); 091 } 092 093 @Override 094 public long getOperationTimeout(TimeUnit unit) { 095 return rawTable.getOperationTimeout(unit); 096 } 097 098 @Override 099 public long getScanTimeout(TimeUnit unit) { 100 return rawTable.getScanTimeout(unit); 101 } 102 103 private <T> CompletableFuture<T> wrap(CompletableFuture<T> future) { 104 return FutureUtils.wrapFuture(future, pool); 105 } 106 107 @Override 108 public CompletableFuture<Result> get(Get get) { 109 return wrap(rawTable.get(get)); 110 } 111 112 @Override 113 public CompletableFuture<Void> put(Put put) { 114 return wrap(rawTable.put(put)); 115 } 116 117 @Override 118 public CompletableFuture<Void> delete(Delete delete) { 119 return wrap(rawTable.delete(delete)); 120 } 121 122 @Override 123 public CompletableFuture<Result> append(Append append) { 124 return wrap(rawTable.append(append)); 125 } 126 127 @Override 128 public CompletableFuture<Result> increment(Increment increment) { 129 return wrap(rawTable.increment(increment)); 130 } 131 132 @Override 133 public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { 134 return new CheckAndMutateBuilder() { 135 136 private final CheckAndMutateBuilder builder = rawTable.checkAndMutate(row, family); 137 138 @Override 139 public CompletableFuture<Boolean> thenPut(Put put) { 140 return wrap(builder.thenPut(put)); 141 } 142 143 @Override 144 public CompletableFuture<Boolean> thenMutate(RowMutations mutation) { 145 return wrap(builder.thenMutate(mutation)); 146 } 147 148 @Override 149 public CompletableFuture<Boolean> thenDelete(Delete delete) { 150 return wrap(builder.thenDelete(delete)); 151 } 152 153 @Override 154 public CheckAndMutateBuilder qualifier(byte[] qualifier) { 155 builder.qualifier(qualifier); 156 return this; 157 } 158 159 @Override 160 public CheckAndMutateBuilder timeRange(TimeRange timeRange) { 161 builder.timeRange(timeRange); 162 return this; 163 } 164 165 @Override 166 public CheckAndMutateBuilder ifNotExists() { 167 builder.ifNotExists(); 168 return this; 169 } 170 171 @Override 172 public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) { 173 builder.ifMatches(compareOp, value); 174 return this; 175 } 176 }; 177 } 178 179 @Override 180 public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) { 181 return new CheckAndMutateWithFilterBuilder() { 182 183 private final CheckAndMutateWithFilterBuilder builder = rawTable.checkAndMutate(row, filter); 184 185 @Override 186 public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) { 187 builder.timeRange(timeRange); 188 return this; 189 } 190 191 @Override 192 public CompletableFuture<Boolean> thenPut(Put put) { 193 return wrap(builder.thenPut(put)); 194 } 195 196 @Override 197 public CompletableFuture<Boolean> thenDelete(Delete delete) { 198 return wrap(builder.thenDelete(delete)); 199 } 200 201 @Override 202 public CompletableFuture<Boolean> thenMutate(RowMutations mutation) { 203 return wrap(builder.thenMutate(mutation)); 204 } 205 }; 206 } 207 208 @Override 209 public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) { 210 return wrap(rawTable.checkAndMutate(checkAndMutate)); 211 } 212 213 @Override 214 public List<CompletableFuture<CheckAndMutateResult>> 215 checkAndMutate(List<CheckAndMutate> checkAndMutates) { 216 return rawTable.checkAndMutate(checkAndMutates).stream().map(this::wrap).collect(toList()); 217 } 218 219 @Override 220 public CompletableFuture<Result> mutateRow(RowMutations mutation) { 221 return wrap(rawTable.mutateRow(mutation)); 222 } 223 224 @Override 225 public CompletableFuture<List<Result>> scanAll(Scan scan) { 226 return wrap(rawTable.scanAll(scan)); 227 } 228 229 @Override 230 public ResultScanner getScanner(Scan scan) { 231 return rawTable.getScanner(scan); 232 } 233 234 private void scan0(Scan scan, ScanResultConsumer consumer) { 235 Span span = null; 236 try (AsyncTableResultScanner scanner = rawTable.getScanner(scan)) { 237 span = scanner.getSpan(); 238 try (Scope ignored = span.makeCurrent()) { 239 consumer.onScanMetricsCreated(scanner.getScanMetrics()); 240 for (Result result; (result = scanner.next()) != null;) { 241 if (!consumer.onNext(result)) { 242 break; 243 } 244 } 245 consumer.onComplete(); 246 } 247 } catch (IOException e) { 248 try (Scope ignored = span.makeCurrent()) { 249 consumer.onError(e); 250 } 251 } 252 } 253 254 @Override 255 public void scan(Scan scan, ScanResultConsumer consumer) { 256 final Context context = Context.current(); 257 pool.execute(context.wrap(() -> scan0(scan, consumer))); 258 } 259 260 @Override 261 public List<CompletableFuture<Result>> get(List<Get> gets) { 262 return rawTable.get(gets).stream().map(this::wrap).collect(toList()); 263 } 264 265 @Override 266 public List<CompletableFuture<Void>> put(List<Put> puts) { 267 return rawTable.put(puts).stream().map(this::wrap).collect(toList()); 268 } 269 270 @Override 271 public List<CompletableFuture<Void>> delete(List<Delete> deletes) { 272 return rawTable.delete(deletes).stream().map(this::wrap).collect(toList()); 273 } 274 275 @Override 276 public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) { 277 return rawTable.<T> batch(actions).stream().map(this::wrap).collect(toList()); 278 } 279 280 @Override 281 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 282 ServiceCaller<S, R> callable, byte[] row) { 283 return wrap(rawTable.coprocessorService(stubMaker, callable, row)); 284 } 285 286 @Override 287 public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService( 288 Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable, 289 CoprocessorCallback<R> callback) { 290 final Context context = Context.current(); 291 CoprocessorCallback<R> wrappedCallback = new CoprocessorCallback<R>() { 292 293 @Override 294 public void onRegionComplete(RegionInfo region, R resp) { 295 pool.execute(context.wrap(() -> callback.onRegionComplete(region, resp))); 296 } 297 298 @Override 299 public void onRegionError(RegionInfo region, Throwable error) { 300 pool.execute(context.wrap(() -> callback.onRegionError(region, error))); 301 } 302 303 @Override 304 public void onComplete() { 305 pool.execute(context.wrap(callback::onComplete)); 306 } 307 308 @Override 309 public void onError(Throwable error) { 310 pool.execute(context.wrap(() -> callback.onError(error))); 311 } 312 }; 313 CoprocessorServiceBuilder<S, R> builder = 314 rawTable.coprocessorService(stubMaker, callable, wrappedCallback); 315 return new CoprocessorServiceBuilder<S, R>() { 316 317 @Override 318 public CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey, boolean inclusive) { 319 builder.fromRow(startKey, inclusive); 320 return this; 321 } 322 323 @Override 324 public CoprocessorServiceBuilder<S, R> toRow(byte[] endKey, boolean inclusive) { 325 builder.toRow(endKey, inclusive); 326 return this; 327 } 328 329 @Override 330 public void execute() { 331 builder.execute(); 332 } 333 }; 334 } 335}