001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static java.util.stream.Collectors.toList; 021 022import com.google.protobuf.RpcChannel; 023import java.io.IOException; 024import java.util.List; 025import java.util.concurrent.CompletableFuture; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.TimeUnit; 028import java.util.function.Function; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.CompareOperator; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.filter.Filter; 033import org.apache.hadoop.hbase.io.TimeRange; 034import org.apache.hadoop.hbase.util.FutureUtils; 035import org.apache.yetus.audience.InterfaceAudience; 036 037/** 038 * Just a wrapper of {@link RawAsyncTableImpl}. The difference is that users need to provide a 039 * thread pool when constructing this class, and the callback methods registered to the returned 040 * {@link CompletableFuture} will be executed in this thread pool. So usually it is safe for users 041 * to do anything they want in the callbacks without breaking the rpc framework. 042 */ 043@InterfaceAudience.Private 044class AsyncTableImpl implements AsyncTable<ScanResultConsumer> { 045 046 private final AsyncTable<AdvancedScanResultConsumer> rawTable; 047 048 private final ExecutorService pool; 049 050 AsyncTableImpl(AsyncConnectionImpl conn, AsyncTable<AdvancedScanResultConsumer> rawTable, 051 ExecutorService pool) { 052 this.rawTable = rawTable; 053 this.pool = pool; 054 } 055 056 @Override 057 public TableName getName() { 058 return rawTable.getName(); 059 } 060 061 @Override 062 public Configuration getConfiguration() { 063 return rawTable.getConfiguration(); 064 } 065 066 @Override 067 public CompletableFuture<TableDescriptor> getDescriptor() { 068 return wrap(rawTable.getDescriptor()); 069 } 070 071 @Override 072 public AsyncTableRegionLocator getRegionLocator() { 073 return rawTable.getRegionLocator(); 074 } 075 076 @Override 077 public long getRpcTimeout(TimeUnit unit) { 078 return rawTable.getRpcTimeout(unit); 079 } 080 081 @Override 082 public long getReadRpcTimeout(TimeUnit unit) { 083 return rawTable.getReadRpcTimeout(unit); 084 } 085 086 @Override 087 public long getWriteRpcTimeout(TimeUnit unit) { 088 return rawTable.getWriteRpcTimeout(unit); 089 } 090 091 @Override 092 public long getOperationTimeout(TimeUnit unit) { 093 return rawTable.getOperationTimeout(unit); 094 } 095 096 @Override 097 public long getScanTimeout(TimeUnit unit) { 098 return rawTable.getScanTimeout(unit); 099 } 100 101 private <T> CompletableFuture<T> wrap(CompletableFuture<T> future) { 102 return FutureUtils.wrapFuture(future, pool); 103 } 104 105 @Override 106 public CompletableFuture<Result> get(Get get) { 107 return wrap(rawTable.get(get)); 108 } 109 110 @Override 111 public CompletableFuture<Void> put(Put put) { 112 return wrap(rawTable.put(put)); 113 } 114 115 @Override 116 public CompletableFuture<Void> delete(Delete delete) { 117 return wrap(rawTable.delete(delete)); 118 } 119 120 @Override 121 public CompletableFuture<Result> append(Append append) { 122 return wrap(rawTable.append(append)); 123 } 124 125 @Override 126 public CompletableFuture<Result> increment(Increment increment) { 127 return wrap(rawTable.increment(increment)); 128 } 129 130 @Override 131 public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { 132 return new CheckAndMutateBuilder() { 133 134 private final CheckAndMutateBuilder builder = rawTable.checkAndMutate(row, family); 135 136 @Override 137 public CompletableFuture<Boolean> thenPut(Put put) { 138 return wrap(builder.thenPut(put)); 139 } 140 141 @Override 142 public CompletableFuture<Boolean> thenMutate(RowMutations mutation) { 143 return wrap(builder.thenMutate(mutation)); 144 } 145 146 @Override 147 public CompletableFuture<Boolean> thenDelete(Delete delete) { 148 return wrap(builder.thenDelete(delete)); 149 } 150 151 @Override 152 public CheckAndMutateBuilder qualifier(byte[] qualifier) { 153 builder.qualifier(qualifier); 154 return this; 155 } 156 157 @Override 158 public CheckAndMutateBuilder timeRange(TimeRange timeRange) { 159 builder.timeRange(timeRange); 160 return this; 161 } 162 163 @Override 164 public CheckAndMutateBuilder ifNotExists() { 165 builder.ifNotExists(); 166 return this; 167 } 168 169 @Override 170 public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) { 171 builder.ifMatches(compareOp, value); 172 return this; 173 } 174 }; 175 } 176 177 @Override 178 public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) { 179 return new CheckAndMutateWithFilterBuilder() { 180 181 private final CheckAndMutateWithFilterBuilder builder = 182 rawTable.checkAndMutate(row, filter); 183 184 @Override 185 public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) { 186 builder.timeRange(timeRange); 187 return this; 188 } 189 190 @Override 191 public CompletableFuture<Boolean> thenPut(Put put) { 192 return wrap(builder.thenPut(put)); 193 } 194 195 @Override 196 public CompletableFuture<Boolean> thenDelete(Delete delete) { 197 return wrap(builder.thenDelete(delete)); 198 } 199 200 @Override 201 public CompletableFuture<Boolean> thenMutate(RowMutations mutation) { 202 return wrap(builder.thenMutate(mutation)); 203 } 204 }; 205 } 206 207 @Override 208 public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) { 209 return wrap(rawTable.checkAndMutate(checkAndMutate)); 210 } 211 212 @Override 213 public List<CompletableFuture<CheckAndMutateResult>> checkAndMutate( 214 List<CheckAndMutate> checkAndMutates) { 215 return rawTable.checkAndMutate(checkAndMutates).stream() 216 .map(this::wrap).collect(toList()); 217 } 218 219 @Override 220 public CompletableFuture<Result> mutateRow(RowMutations mutation) { 221 return wrap(rawTable.mutateRow(mutation)); 222 } 223 224 @Override 225 public CompletableFuture<List<Result>> scanAll(Scan scan) { 226 return wrap(rawTable.scanAll(scan)); 227 } 228 229 @Override 230 public ResultScanner getScanner(Scan scan) { 231 return rawTable.getScanner(scan); 232 } 233 234 private void scan0(Scan scan, ScanResultConsumer consumer) { 235 try (ResultScanner scanner = getScanner(scan)) { 236 consumer.onScanMetricsCreated(scanner.getScanMetrics()); 237 for (Result result; (result = scanner.next()) != null;) { 238 if (!consumer.onNext(result)) { 239 break; 240 } 241 } 242 consumer.onComplete(); 243 } catch (IOException e) { 244 consumer.onError(e); 245 } 246 } 247 248 @Override 249 public void scan(Scan scan, ScanResultConsumer consumer) { 250 pool.execute(() -> scan0(scan, consumer)); 251 } 252 253 @Override 254 public List<CompletableFuture<Result>> get(List<Get> gets) { 255 return rawTable.get(gets).stream().map(this::wrap).collect(toList()); 256 } 257 258 @Override 259 public List<CompletableFuture<Void>> put(List<Put> puts) { 260 return rawTable.put(puts).stream().map(this::wrap).collect(toList()); 261 } 262 263 @Override 264 public List<CompletableFuture<Void>> delete(List<Delete> deletes) { 265 return rawTable.delete(deletes).stream().map(this::wrap).collect(toList()); 266 } 267 268 @Override 269 public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) { 270 return rawTable.<T> batch(actions).stream().map(this::wrap).collect(toList()); 271 } 272 273 @Override 274 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 275 ServiceCaller<S, R> callable, byte[] row) { 276 return wrap(rawTable.coprocessorService(stubMaker, callable, row)); 277 } 278 279 @Override 280 public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService( 281 Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable, 282 CoprocessorCallback<R> callback) { 283 CoprocessorCallback<R> wrappedCallback = new CoprocessorCallback<R>() { 284 285 @Override 286 public void onRegionComplete(RegionInfo region, R resp) { 287 pool.execute(() -> callback.onRegionComplete(region, resp)); 288 } 289 290 @Override 291 public void onRegionError(RegionInfo region, Throwable error) { 292 pool.execute(() -> callback.onRegionError(region, error)); 293 } 294 295 @Override 296 public void onComplete() { 297 pool.execute(() -> callback.onComplete()); 298 } 299 300 @Override 301 public void onError(Throwable error) { 302 pool.execute(() -> callback.onError(error)); 303 } 304 }; 305 CoprocessorServiceBuilder<S, R> builder = 306 rawTable.coprocessorService(stubMaker, callable, wrappedCallback); 307 return new CoprocessorServiceBuilder<S, R>() { 308 309 @Override 310 public CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey, boolean inclusive) { 311 builder.fromRow(startKey, inclusive); 312 return this; 313 } 314 315 @Override 316 public CoprocessorServiceBuilder<S, R> toRow(byte[] endKey, boolean inclusive) { 317 builder.toRow(endKey, inclusive); 318 return this; 319 } 320 321 @Override 322 public void execute() { 323 builder.execute(); 324 } 325 }; 326 } 327}