001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static java.util.stream.Collectors.toList; 021 022import com.google.protobuf.RpcChannel; 023import java.io.IOException; 024import java.util.List; 025import java.util.concurrent.CompletableFuture; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.TimeUnit; 028import java.util.function.Function; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.CompareOperator; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.io.TimeRange; 033import org.apache.hadoop.hbase.util.FutureUtils; 034import org.apache.yetus.audience.InterfaceAudience; 035 036/** 037 * Just a wrapper of {@link RawAsyncTableImpl}. The difference is that users need to provide a 038 * thread pool when constructing this class, and the callback methods registered to the returned 039 * {@link CompletableFuture} will be executed in this thread pool. So usually it is safe for users 040 * to do anything they want in the callbacks without breaking the rpc framework. 041 */ 042@InterfaceAudience.Private 043class AsyncTableImpl implements AsyncTable<ScanResultConsumer> { 044 045 private final AsyncTable<AdvancedScanResultConsumer> rawTable; 046 047 private final ExecutorService pool; 048 049 AsyncTableImpl(AsyncConnectionImpl conn, AsyncTable<AdvancedScanResultConsumer> rawTable, 050 ExecutorService pool) { 051 this.rawTable = rawTable; 052 this.pool = pool; 053 } 054 055 @Override 056 public TableName getName() { 057 return rawTable.getName(); 058 } 059 060 @Override 061 public Configuration getConfiguration() { 062 return rawTable.getConfiguration(); 063 } 064 065 @Override 066 public long getRpcTimeout(TimeUnit unit) { 067 return rawTable.getRpcTimeout(unit); 068 } 069 070 @Override 071 public long getReadRpcTimeout(TimeUnit unit) { 072 return rawTable.getReadRpcTimeout(unit); 073 } 074 075 @Override 076 public long getWriteRpcTimeout(TimeUnit unit) { 077 return rawTable.getWriteRpcTimeout(unit); 078 } 079 080 @Override 081 public long getOperationTimeout(TimeUnit unit) { 082 return rawTable.getOperationTimeout(unit); 083 } 084 085 @Override 086 public long getScanTimeout(TimeUnit unit) { 087 return rawTable.getScanTimeout(unit); 088 } 089 090 private <T> CompletableFuture<T> wrap(CompletableFuture<T> future) { 091 return FutureUtils.wrapFuture(future, pool); 092 } 093 094 @Override 095 public CompletableFuture<Result> get(Get get) { 096 return wrap(rawTable.get(get)); 097 } 098 099 @Override 100 public CompletableFuture<Void> put(Put put) { 101 return wrap(rawTable.put(put)); 102 } 103 104 @Override 105 public CompletableFuture<Void> delete(Delete delete) { 106 return wrap(rawTable.delete(delete)); 107 } 108 109 @Override 110 public CompletableFuture<Result> append(Append append) { 111 return wrap(rawTable.append(append)); 112 } 113 114 @Override 115 public CompletableFuture<Result> increment(Increment increment) { 116 return wrap(rawTable.increment(increment)); 117 } 118 119 @Override 120 public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { 121 return new CheckAndMutateBuilder() { 122 123 private final CheckAndMutateBuilder builder = rawTable.checkAndMutate(row, family); 124 125 @Override 126 public CompletableFuture<Boolean> thenPut(Put put) { 127 return wrap(builder.thenPut(put)); 128 } 129 130 @Override 131 public CompletableFuture<Boolean> thenMutate(RowMutations mutation) { 132 return wrap(builder.thenMutate(mutation)); 133 } 134 135 @Override 136 public CompletableFuture<Boolean> thenDelete(Delete delete) { 137 return wrap(builder.thenDelete(delete)); 138 } 139 140 @Override 141 public CheckAndMutateBuilder qualifier(byte[] qualifier) { 142 builder.qualifier(qualifier); 143 return this; 144 } 145 146 @Override 147 public CheckAndMutateBuilder timeRange(TimeRange timeRange) { 148 builder.timeRange(timeRange); 149 return this; 150 } 151 152 @Override 153 public CheckAndMutateBuilder ifNotExists() { 154 builder.ifNotExists(); 155 return this; 156 } 157 158 @Override 159 public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) { 160 builder.ifMatches(compareOp, value); 161 return this; 162 } 163 }; 164 } 165 166 @Override 167 public CompletableFuture<Void> mutateRow(RowMutations mutation) { 168 return wrap(rawTable.mutateRow(mutation)); 169 } 170 171 @Override 172 public CompletableFuture<List<Result>> scanAll(Scan scan) { 173 return wrap(rawTable.scanAll(scan)); 174 } 175 176 @Override 177 public ResultScanner getScanner(Scan scan) { 178 return rawTable.getScanner(scan); 179 } 180 181 private void scan0(Scan scan, ScanResultConsumer consumer) { 182 try (ResultScanner scanner = getScanner(scan)) { 183 consumer.onScanMetricsCreated(scanner.getScanMetrics()); 184 for (Result result; (result = scanner.next()) != null;) { 185 if (!consumer.onNext(result)) { 186 break; 187 } 188 } 189 consumer.onComplete(); 190 } catch (IOException e) { 191 consumer.onError(e); 192 } 193 } 194 195 @Override 196 public void scan(Scan scan, ScanResultConsumer consumer) { 197 pool.execute(() -> scan0(scan, consumer)); 198 } 199 200 @Override 201 public List<CompletableFuture<Result>> get(List<Get> gets) { 202 return rawTable.get(gets).stream().map(this::wrap).collect(toList()); 203 } 204 205 @Override 206 public List<CompletableFuture<Void>> put(List<Put> puts) { 207 return rawTable.put(puts).stream().map(this::wrap).collect(toList()); 208 } 209 210 @Override 211 public List<CompletableFuture<Void>> delete(List<Delete> deletes) { 212 return rawTable.delete(deletes).stream().map(this::wrap).collect(toList()); 213 } 214 215 @Override 216 public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) { 217 return rawTable.<T> batch(actions).stream().map(this::wrap).collect(toList()); 218 } 219 220 @Override 221 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 222 ServiceCaller<S, R> callable, byte[] row) { 223 return wrap(rawTable.coprocessorService(stubMaker, callable, row)); 224 } 225 226 @Override 227 public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService( 228 Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable, 229 CoprocessorCallback<R> callback) { 230 CoprocessorCallback<R> wrappedCallback = new CoprocessorCallback<R>() { 231 232 @Override 233 public void onRegionComplete(RegionInfo region, R resp) { 234 pool.execute(() -> callback.onRegionComplete(region, resp)); 235 } 236 237 @Override 238 public void onRegionError(RegionInfo region, Throwable error) { 239 pool.execute(() -> callback.onRegionError(region, error)); 240 } 241 242 @Override 243 public void onComplete() { 244 pool.execute(() -> callback.onComplete()); 245 } 246 247 @Override 248 public void onError(Throwable error) { 249 pool.execute(() -> callback.onError(error)); 250 } 251 }; 252 CoprocessorServiceBuilder<S, R> builder = 253 rawTable.coprocessorService(stubMaker, callable, wrappedCallback); 254 return new CoprocessorServiceBuilder<S, R>() { 255 256 @Override 257 public CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey, boolean inclusive) { 258 builder.fromRow(startKey, inclusive); 259 return this; 260 } 261 262 @Override 263 public CoprocessorServiceBuilder<S, R> toRow(byte[] endKey, boolean inclusive) { 264 builder.toRow(endKey, inclusive); 265 return this; 266 } 267 268 @Override 269 public void execute() { 270 builder.execute(); 271 } 272 }; 273 } 274}