001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.thrift2.client; 019 020import static org.apache.hadoop.hbase.thrift.Constants.HBASE_THRIFT_CLIENT_SCANNER_CACHING; 021import static org.apache.hadoop.hbase.thrift.Constants.HBASE_THRIFT_CLIENT_SCANNER_CACHING_DEFAULT; 022 023import java.io.IOException; 024import java.nio.ByteBuffer; 025import java.util.ArrayDeque; 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.List; 029import java.util.Queue; 030import java.util.concurrent.TimeUnit; 031import org.apache.commons.lang3.NotImplementedException; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.CompareOperator; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.Append; 037import org.apache.hadoop.hbase.client.CheckAndMutate; 038import org.apache.hadoop.hbase.client.CheckAndMutateResult; 039import org.apache.hadoop.hbase.client.Delete; 040import org.apache.hadoop.hbase.client.Get; 041import org.apache.hadoop.hbase.client.Increment; 042import org.apache.hadoop.hbase.client.Put; 043import org.apache.hadoop.hbase.client.RegionLocator; 044import org.apache.hadoop.hbase.client.Result; 045import org.apache.hadoop.hbase.client.ResultScanner; 046import org.apache.hadoop.hbase.client.Row; 047import org.apache.hadoop.hbase.client.RowMutations; 048import org.apache.hadoop.hbase.client.Scan; 049import org.apache.hadoop.hbase.client.Table; 050import org.apache.hadoop.hbase.client.TableDescriptor; 051import org.apache.hadoop.hbase.client.coprocessor.Batch; 052import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 053import org.apache.hadoop.hbase.filter.Filter; 054import org.apache.hadoop.hbase.io.TimeRange; 055import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; 056import org.apache.hadoop.hbase.thrift2.ThriftUtilities; 057import org.apache.hadoop.hbase.thrift2.generated.TAppend; 058import org.apache.hadoop.hbase.thrift2.generated.TDelete; 059import org.apache.hadoop.hbase.thrift2.generated.TGet; 060import org.apache.hadoop.hbase.thrift2.generated.THBaseService; 061import org.apache.hadoop.hbase.thrift2.generated.TIncrement; 062import org.apache.hadoop.hbase.thrift2.generated.TPut; 063import org.apache.hadoop.hbase.thrift2.generated.TResult; 064import org.apache.hadoop.hbase.thrift2.generated.TRowMutations; 065import org.apache.hadoop.hbase.thrift2.generated.TScan; 066import org.apache.hadoop.hbase.thrift2.generated.TTableDescriptor; 067import org.apache.hadoop.hbase.util.Bytes; 068import org.apache.thrift.TException; 069import org.apache.thrift.transport.TTransport; 070import org.apache.yetus.audience.InterfaceAudience; 071 072import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 073import org.apache.hbase.thirdparty.com.google.common.primitives.Booleans; 074 075@InterfaceAudience.Private 076public class ThriftTable implements Table { 077 078 private TableName tableName; 079 private Configuration conf; 080 private TTransport tTransport; 081 private THBaseService.Client client; 082 private ByteBuffer tableNameInBytes; 083 private int operationTimeout; 084 085 private final int scannerCaching; 086 087 public ThriftTable(TableName tableName, THBaseService.Client client, TTransport tTransport, 088 Configuration conf) { 089 this.tableName = tableName; 090 this.tableNameInBytes = ByteBuffer.wrap(tableName.toBytes()); 091 this.conf = conf; 092 this.tTransport = tTransport; 093 this.client = client; 094 this.scannerCaching = 095 conf.getInt(HBASE_THRIFT_CLIENT_SCANNER_CACHING, HBASE_THRIFT_CLIENT_SCANNER_CACHING_DEFAULT); 096 this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 097 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); 098 099 } 100 101 @Override 102 public TableName getName() { 103 return tableName; 104 } 105 106 @Override 107 public Configuration getConfiguration() { 108 return conf; 109 } 110 111 @Override 112 public TableDescriptor getDescriptor() throws IOException { 113 try { 114 TTableDescriptor tableDescriptor = 115 client.getTableDescriptor(ThriftUtilities.tableNameFromHBase(tableName)); 116 return ThriftUtilities.tableDescriptorFromThrift(tableDescriptor); 117 } catch (TException e) { 118 throw new IOException(e); 119 } 120 } 121 122 @Override 123 public boolean exists(Get get) throws IOException { 124 TGet tGet = ThriftUtilities.getFromHBase(get); 125 try { 126 return client.exists(tableNameInBytes, tGet); 127 } catch (TException e) { 128 throw new IOException(e); 129 } 130 } 131 132 @Override 133 public boolean[] exists(List<Get> gets) throws IOException { 134 List<TGet> tGets = new ArrayList<>(); 135 for (Get get : gets) { 136 tGets.add(ThriftUtilities.getFromHBase(get)); 137 } 138 try { 139 List<Boolean> results = client.existsAll(tableNameInBytes, tGets); 140 return Booleans.toArray(results); 141 } catch (TException e) { 142 throw new IOException(e); 143 } 144 } 145 146 @Override 147 public void batch(List<? extends Row> actions, Object[] results) throws IOException { 148 throw new IOException("Batch not supported in ThriftTable, use put(List<Put> puts), " 149 + "get(List<Get> gets) or delete(List<Delete> deletes) respectively"); 150 151 } 152 153 @Override 154 public <R> void batchCallback(List<? extends Row> actions, Object[] results, 155 Batch.Callback<R> callback) throws IOException { 156 throw new IOException("BatchCallback not supported in ThriftTable, use put(List<Put> puts), " 157 + "get(List<Get> gets) or delete(List<Delete> deletes) respectively"); 158 } 159 160 @Override 161 public Result get(Get get) throws IOException { 162 TGet tGet = ThriftUtilities.getFromHBase(get); 163 try { 164 TResult tResult = client.get(tableNameInBytes, tGet); 165 return ThriftUtilities.resultFromThrift(tResult); 166 } catch (TException e) { 167 throw new IOException(e); 168 } 169 } 170 171 @Override 172 public Result[] get(List<Get> gets) throws IOException { 173 List<TGet> tGets = ThriftUtilities.getsFromHBase(gets); 174 try { 175 List<TResult> results = client.getMultiple(tableNameInBytes, tGets); 176 return ThriftUtilities.resultsFromThrift(results); 177 } catch (TException e) { 178 throw new IOException(e); 179 } 180 } 181 182 /** 183 * A scanner to perform scan from thrift server getScannerResults is used in this scanner 184 */ 185 private class Scanner implements ResultScanner { 186 protected TScan scan; 187 protected Result lastResult = null; 188 protected final Queue<Result> cache = new ArrayDeque<>();; 189 190 public Scanner(Scan scan) throws IOException { 191 if (scan.getBatch() > 0) { 192 throw new IOException("Batch is not supported in Scanner"); 193 } 194 if (scan.getCaching() <= 0) { 195 scan.setCaching(scannerCaching); 196 } else if (scan.getCaching() == 1 && scan.isReversed()) { 197 // for reverse scan, we need to pass the last row to the next scanner 198 // we need caching number bigger than 1 199 scan.setCaching(scan.getCaching() + 1); 200 } 201 this.scan = ThriftUtilities.scanFromHBase(scan); 202 } 203 204 @Override 205 public Result next() throws IOException { 206 if (cache.size() == 0) { 207 setupNextScanner(); 208 try { 209 List<TResult> tResults = 210 client.getScannerResults(tableNameInBytes, scan, scan.getCaching()); 211 Result[] results = ThriftUtilities.resultsFromThrift(tResults); 212 boolean firstKey = true; 213 for (Result result : results) { 214 // If it is a reverse scan, we use the last result's key as the startkey, since there is 215 // no way to construct a closet rowkey smaller than the last result 216 // So when the results return, we must rule out the first result, since it has already 217 // returned to user. 218 if (firstKey) { 219 firstKey = false; 220 if (scan.isReversed() && lastResult != null) { 221 if (Bytes.equals(lastResult.getRow(), result.getRow())) { 222 continue; 223 } 224 } 225 } 226 cache.add(result); 227 lastResult = result; 228 } 229 } catch (TException e) { 230 throw new IOException(e); 231 } 232 } 233 234 if (cache.size() > 0) { 235 return cache.poll(); 236 } else { 237 // scan finished 238 return null; 239 } 240 } 241 242 @Override 243 public void close() { 244 } 245 246 @Override 247 public boolean renewLease() { 248 throw new RuntimeException("renewLease() not supported"); 249 } 250 251 @Override 252 public ScanMetrics getScanMetrics() { 253 throw new RuntimeException("getScanMetrics() not supported"); 254 } 255 256 private void setupNextScanner() { 257 // if lastResult is null null, it means it is not the fist scan 258 if (lastResult != null) { 259 byte[] lastRow = lastResult.getRow(); 260 if (scan.isReversed()) { 261 // for reverse scan, we can't find the closet row before this row 262 scan.setStartRow(lastRow); 263 } else { 264 scan.setStartRow(createClosestRowAfter(lastRow)); 265 } 266 } 267 } 268 269 /** 270 * Create the closest row after the specified row 271 */ 272 protected byte[] createClosestRowAfter(byte[] row) { 273 if (row == null) { 274 throw new RuntimeException("The passed row is null"); 275 } 276 return Arrays.copyOf(row, row.length + 1); 277 } 278 } 279 280 @Override 281 public ResultScanner getScanner(Scan scan) throws IOException { 282 return new Scanner(scan); 283 } 284 285 @Override 286 public ResultScanner getScanner(byte[] family) throws IOException { 287 Scan scan = new Scan(); 288 scan.addFamily(family); 289 return getScanner(scan); 290 } 291 292 @Override 293 public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException { 294 Scan scan = new Scan(); 295 scan.addColumn(family, qualifier); 296 return getScanner(scan); 297 } 298 299 @Override 300 public void put(Put put) throws IOException { 301 TPut tPut = ThriftUtilities.putFromHBase(put); 302 try { 303 client.put(tableNameInBytes, tPut); 304 } catch (TException e) { 305 throw new IOException(e); 306 } 307 } 308 309 @Override 310 public void put(List<Put> puts) throws IOException { 311 List<TPut> tPuts = ThriftUtilities.putsFromHBase(puts); 312 try { 313 client.putMultiple(tableNameInBytes, tPuts); 314 } catch (TException e) { 315 throw new IOException(e); 316 } 317 } 318 319 @Override 320 public void delete(Delete delete) throws IOException { 321 TDelete tDelete = ThriftUtilities.deleteFromHBase(delete); 322 try { 323 client.deleteSingle(tableNameInBytes, tDelete); 324 } catch (TException e) { 325 throw new IOException(e); 326 } 327 } 328 329 @Override 330 public void delete(List<Delete> deletes) throws IOException { 331 List<TDelete> tDeletes = ThriftUtilities.deletesFromHBase(deletes); 332 try { 333 client.deleteMultiple(tableNameInBytes, tDeletes); 334 } catch (TException e) { 335 throw new IOException(e); 336 } 337 } 338 339 private class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder { 340 341 private final byte[] row; 342 private final byte[] family; 343 private byte[] qualifier; 344 private CompareOperator op; 345 private byte[] value; 346 347 CheckAndMutateBuilderImpl(byte[] row, byte[] family) { 348 this.row = Preconditions.checkNotNull(row, "row is null"); 349 this.family = Preconditions.checkNotNull(family, "family is null"); 350 } 351 352 @Override 353 public CheckAndMutateBuilder qualifier(byte[] qualifier) { 354 this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" 355 + " an empty byte array, or just do not call this method if you want a null qualifier"); 356 return this; 357 } 358 359 @Override 360 public CheckAndMutateBuilder timeRange(TimeRange timeRange) { 361 throw new NotImplementedException("timeRange not supported in ThriftTable"); 362 } 363 364 @Override 365 public CheckAndMutateBuilder ifNotExists() { 366 this.op = CompareOperator.EQUAL; 367 this.value = null; 368 return this; 369 } 370 371 @Override 372 public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) { 373 this.op = Preconditions.checkNotNull(compareOp, "compareOp is null"); 374 this.value = Preconditions.checkNotNull(value, "value is null"); 375 return this; 376 } 377 378 private void preCheck() { 379 Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" 380 + " calling ifNotExists/ifEquals/ifMatches before executing the request"); 381 } 382 383 @Override 384 public boolean thenPut(Put put) throws IOException { 385 preCheck(); 386 RowMutations rowMutations = new RowMutations(put.getRow()); 387 rowMutations.add(put); 388 return checkAndMutate(row, family, qualifier, op, value, rowMutations); 389 } 390 391 @Override 392 public boolean thenDelete(Delete delete) throws IOException { 393 preCheck(); 394 RowMutations rowMutations = new RowMutations(delete.getRow()); 395 rowMutations.add(delete); 396 return checkAndMutate(row, family, qualifier, op, value, rowMutations); 397 } 398 399 @Override 400 public boolean thenMutate(RowMutations mutation) throws IOException { 401 preCheck(); 402 return checkAndMutate(row, family, qualifier, op, value, mutation); 403 } 404 } 405 406 private boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, 407 byte[] value, RowMutations mutation) throws IOException { 408 try { 409 ByteBuffer valueBuffer = value == null ? null : ByteBuffer.wrap(value); 410 return client.checkAndMutate(tableNameInBytes, ByteBuffer.wrap(row), ByteBuffer.wrap(family), 411 ByteBuffer.wrap(qualifier), ThriftUtilities.compareOpFromHBase(op), valueBuffer, 412 ThriftUtilities.rowMutationsFromHBase(mutation)); 413 } catch (TException e) { 414 throw new IOException(e); 415 } 416 } 417 418 @Override 419 public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { 420 return new CheckAndMutateBuilderImpl(row, family); 421 } 422 423 @Override 424 public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) { 425 throw new NotImplementedException("Implement later"); 426 } 427 428 @Override 429 public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) { 430 throw new NotImplementedException("Implement later"); 431 } 432 433 @Override 434 public List<CheckAndMutateResult> checkAndMutate(List<CheckAndMutate> checkAndMutates) { 435 throw new NotImplementedException("Implement later"); 436 } 437 438 @Override 439 public Result mutateRow(RowMutations rm) throws IOException { 440 TRowMutations tRowMutations = ThriftUtilities.rowMutationsFromHBase(rm); 441 try { 442 client.mutateRow(tableNameInBytes, tRowMutations); 443 return Result.EMPTY_RESULT; 444 } catch (TException e) { 445 throw new IOException(e); 446 } 447 } 448 449 @Override 450 public Result append(Append append) throws IOException { 451 TAppend tAppend = ThriftUtilities.appendFromHBase(append); 452 try { 453 TResult tResult = client.append(tableNameInBytes, tAppend); 454 return ThriftUtilities.resultFromThrift(tResult); 455 } catch (TException e) { 456 throw new IOException(e); 457 } 458 } 459 460 @Override 461 public Result increment(Increment increment) throws IOException { 462 TIncrement tIncrement = ThriftUtilities.incrementFromHBase(increment); 463 try { 464 TResult tResult = client.increment(tableNameInBytes, tIncrement); 465 return ThriftUtilities.resultFromThrift(tResult); 466 } catch (TException e) { 467 throw new IOException(e); 468 } 469 } 470 471 @Override 472 public void close() throws IOException { 473 tTransport.close(); 474 } 475 476 @Override 477 public long getRpcTimeout(TimeUnit unit) { 478 return unit.convert(operationTimeout, TimeUnit.MILLISECONDS); 479 } 480 481 @Override 482 public long getReadRpcTimeout(TimeUnit unit) { 483 return unit.convert(operationTimeout, TimeUnit.MILLISECONDS); 484 } 485 486 @Override 487 public long getWriteRpcTimeout(TimeUnit unit) { 488 return unit.convert(operationTimeout, TimeUnit.MILLISECONDS); 489 } 490 491 @Override 492 public long getOperationTimeout(TimeUnit unit) { 493 return unit.convert(operationTimeout, TimeUnit.MILLISECONDS); 494 } 495 496 @Override 497 public CoprocessorRpcChannel coprocessorService(byte[] row) { 498 throw new NotImplementedException("coprocessorService not supported in ThriftTable"); 499 } 500 501 @Override 502 public RegionLocator getRegionLocator() throws IOException { 503 throw new NotImplementedException("getRegionLocator not supported in ThriftTable"); 504 } 505}