001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static java.util.stream.Collectors.toList; 021 022import com.google.protobuf.RpcChannel; 023import java.io.IOException; 024import java.util.List; 025import java.util.concurrent.CompletableFuture; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.TimeUnit; 028import java.util.function.Function; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.CompareOperator; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.filter.Filter; 033import org.apache.hadoop.hbase.io.TimeRange; 034import org.apache.hadoop.hbase.util.FutureUtils; 035import org.apache.yetus.audience.InterfaceAudience; 036 037/** 038 * Just a wrapper of {@link RawAsyncTableImpl}. The difference is that users need to provide a 039 * thread pool when constructing this class, and the callback methods registered to the returned 040 * {@link CompletableFuture} will be executed in this thread pool. So usually it is safe for users 041 * to do anything they want in the callbacks without breaking the rpc framework. 042 */ 043@InterfaceAudience.Private 044class AsyncTableImpl implements AsyncTable<ScanResultConsumer> { 045 046 private final AsyncTable<AdvancedScanResultConsumer> rawTable; 047 048 private final ExecutorService pool; 049 050 AsyncTableImpl(AsyncConnectionImpl conn, AsyncTable<AdvancedScanResultConsumer> rawTable, 051 ExecutorService pool) { 052 this.rawTable = rawTable; 053 this.pool = pool; 054 } 055 056 @Override 057 public TableName getName() { 058 return rawTable.getName(); 059 } 060 061 @Override 062 public Configuration getConfiguration() { 063 return rawTable.getConfiguration(); 064 } 065 066 @Override 067 public CompletableFuture<TableDescriptor> getDescriptor() { 068 return wrap(rawTable.getDescriptor()); 069 } 070 071 @Override 072 public AsyncTableRegionLocator getRegionLocator() { 073 return rawTable.getRegionLocator(); 074 } 075 076 @Override 077 public long getRpcTimeout(TimeUnit unit) { 078 return rawTable.getRpcTimeout(unit); 079 } 080 081 @Override 082 public long getReadRpcTimeout(TimeUnit unit) { 083 return rawTable.getReadRpcTimeout(unit); 084 } 085 086 @Override 087 public long getWriteRpcTimeout(TimeUnit unit) { 088 return rawTable.getWriteRpcTimeout(unit); 089 } 090 091 @Override 092 public long getOperationTimeout(TimeUnit unit) { 093 return rawTable.getOperationTimeout(unit); 094 } 095 096 @Override 097 public long getScanTimeout(TimeUnit unit) { 098 return rawTable.getScanTimeout(unit); 099 } 100 101 private <T> CompletableFuture<T> wrap(CompletableFuture<T> future) { 102 return FutureUtils.wrapFuture(future, pool); 103 } 104 105 @Override 106 public CompletableFuture<Result> get(Get get) { 107 return wrap(rawTable.get(get)); 108 } 109 110 @Override 111 public CompletableFuture<Void> put(Put put) { 112 return wrap(rawTable.put(put)); 113 } 114 115 @Override 116 public CompletableFuture<Void> delete(Delete delete) { 117 return wrap(rawTable.delete(delete)); 118 } 119 120 @Override 121 public CompletableFuture<Result> append(Append append) { 122 return wrap(rawTable.append(append)); 123 } 124 125 @Override 126 public CompletableFuture<Result> increment(Increment increment) { 127 return wrap(rawTable.increment(increment)); 128 } 129 130 @Override 131 public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { 132 return new CheckAndMutateBuilder() { 133 134 private final CheckAndMutateBuilder builder = rawTable.checkAndMutate(row, family); 135 136 @Override 137 public CompletableFuture<Boolean> thenPut(Put put) { 138 return wrap(builder.thenPut(put)); 139 } 140 141 @Override 142 public CompletableFuture<Boolean> thenMutate(RowMutations mutation) { 143 return wrap(builder.thenMutate(mutation)); 144 } 145 146 @Override 147 public CompletableFuture<Boolean> thenDelete(Delete delete) { 148 return wrap(builder.thenDelete(delete)); 149 } 150 151 @Override 152 public CheckAndMutateBuilder qualifier(byte[] qualifier) { 153 builder.qualifier(qualifier); 154 return this; 155 } 156 157 @Override 158 public CheckAndMutateBuilder timeRange(TimeRange timeRange) { 159 builder.timeRange(timeRange); 160 return this; 161 } 162 163 @Override 164 public CheckAndMutateBuilder ifNotExists() { 165 builder.ifNotExists(); 166 return this; 167 } 168 169 @Override 170 public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) { 171 builder.ifMatches(compareOp, value); 172 return this; 173 } 174 }; 175 } 176 177 @Override 178 public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) { 179 return new CheckAndMutateWithFilterBuilder() { 180 181 private final CheckAndMutateWithFilterBuilder builder = 182 rawTable.checkAndMutate(row, filter); 183 184 @Override 185 public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) { 186 builder.timeRange(timeRange); 187 return this; 188 } 189 190 @Override 191 public CompletableFuture<Boolean> thenPut(Put put) { 192 return wrap(builder.thenPut(put)); 193 } 194 195 @Override 196 public CompletableFuture<Boolean> thenDelete(Delete delete) { 197 return wrap(builder.thenDelete(delete)); 198 } 199 200 @Override 201 public CompletableFuture<Boolean> thenMutate(RowMutations mutation) { 202 return wrap(builder.thenMutate(mutation)); 203 } 204 }; 205 } 206 207 @Override 208 public CompletableFuture<Void> mutateRow(RowMutations mutation) { 209 return wrap(rawTable.mutateRow(mutation)); 210 } 211 212 @Override 213 public CompletableFuture<List<Result>> scanAll(Scan scan) { 214 return wrap(rawTable.scanAll(scan)); 215 } 216 217 @Override 218 public ResultScanner getScanner(Scan scan) { 219 return rawTable.getScanner(scan); 220 } 221 222 private void scan0(Scan scan, ScanResultConsumer consumer) { 223 try (ResultScanner scanner = getScanner(scan)) { 224 consumer.onScanMetricsCreated(scanner.getScanMetrics()); 225 for (Result result; (result = scanner.next()) != null;) { 226 if (!consumer.onNext(result)) { 227 break; 228 } 229 } 230 consumer.onComplete(); 231 } catch (IOException e) { 232 consumer.onError(e); 233 } 234 } 235 236 @Override 237 public void scan(Scan scan, ScanResultConsumer consumer) { 238 pool.execute(() -> scan0(scan, consumer)); 239 } 240 241 @Override 242 public List<CompletableFuture<Result>> get(List<Get> gets) { 243 return rawTable.get(gets).stream().map(this::wrap).collect(toList()); 244 } 245 246 @Override 247 public List<CompletableFuture<Void>> put(List<Put> puts) { 248 return rawTable.put(puts).stream().map(this::wrap).collect(toList()); 249 } 250 251 @Override 252 public List<CompletableFuture<Void>> delete(List<Delete> deletes) { 253 return rawTable.delete(deletes).stream().map(this::wrap).collect(toList()); 254 } 255 256 @Override 257 public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) { 258 return rawTable.<T> batch(actions).stream().map(this::wrap).collect(toList()); 259 } 260 261 @Override 262 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 263 ServiceCaller<S, R> callable, byte[] row) { 264 return wrap(rawTable.coprocessorService(stubMaker, callable, row)); 265 } 266 267 @Override 268 public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService( 269 Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable, 270 CoprocessorCallback<R> callback) { 271 CoprocessorCallback<R> wrappedCallback = new CoprocessorCallback<R>() { 272 273 @Override 274 public void onRegionComplete(RegionInfo region, R resp) { 275 pool.execute(() -> callback.onRegionComplete(region, resp)); 276 } 277 278 @Override 279 public void onRegionError(RegionInfo region, Throwable error) { 280 pool.execute(() -> callback.onRegionError(region, error)); 281 } 282 283 @Override 284 public void onComplete() { 285 pool.execute(() -> callback.onComplete()); 286 } 287 288 @Override 289 public void onError(Throwable error) { 290 pool.execute(() -> callback.onError(error)); 291 } 292 }; 293 CoprocessorServiceBuilder<S, R> builder = 294 rawTable.coprocessorService(stubMaker, callable, wrappedCallback); 295 return new CoprocessorServiceBuilder<S, R>() { 296 297 @Override 298 public CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey, boolean inclusive) { 299 builder.fromRow(startKey, inclusive); 300 return this; 301 } 302 303 @Override 304 public CoprocessorServiceBuilder<S, R> toRow(byte[] endKey, boolean inclusive) { 305 builder.toRow(endKey, inclusive); 306 return this; 307 } 308 309 @Override 310 public void execute() { 311 builder.execute(); 312 } 313 }; 314 } 315}