001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.thrift2.client; 020 021import static org.apache.hadoop.hbase.thrift.Constants.HBASE_THRIFT_CLIENT_SCANNER_CACHING; 022import static org.apache.hadoop.hbase.thrift.Constants.HBASE_THRIFT_CLIENT_SCANNER_CACHING_DEFAULT; 023 024import java.io.IOException; 025import java.nio.ByteBuffer; 026import java.util.ArrayDeque; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.List; 030import java.util.Queue; 031import java.util.concurrent.TimeUnit; 032 033import org.apache.commons.lang3.NotImplementedException; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.CompareOperator; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.Append; 039import org.apache.hadoop.hbase.client.CheckAndMutate; 040import org.apache.hadoop.hbase.client.CheckAndMutateResult; 041import org.apache.hadoop.hbase.client.Delete; 042import org.apache.hadoop.hbase.client.Get; 043import org.apache.hadoop.hbase.client.Increment; 044import org.apache.hadoop.hbase.client.Put; 045import org.apache.hadoop.hbase.client.RegionLocator; 046import org.apache.hadoop.hbase.client.Result; 047import org.apache.hadoop.hbase.client.ResultScanner; 048import org.apache.hadoop.hbase.client.Row; 049import org.apache.hadoop.hbase.client.RowMutations; 050import org.apache.hadoop.hbase.client.Scan; 051import org.apache.hadoop.hbase.client.Table; 052import org.apache.hadoop.hbase.client.TableDescriptor; 053import org.apache.hadoop.hbase.client.coprocessor.Batch; 054import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 055import org.apache.hadoop.hbase.filter.Filter; 056import org.apache.hadoop.hbase.io.TimeRange; 057import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; 058import org.apache.hadoop.hbase.thrift2.ThriftUtilities; 059import org.apache.hadoop.hbase.thrift2.generated.TAppend; 060import org.apache.hadoop.hbase.thrift2.generated.TDelete; 061import org.apache.hadoop.hbase.thrift2.generated.TGet; 062import org.apache.hadoop.hbase.thrift2.generated.THBaseService; 063import org.apache.hadoop.hbase.thrift2.generated.TIncrement; 064import org.apache.hadoop.hbase.thrift2.generated.TPut; 065import org.apache.hadoop.hbase.thrift2.generated.TResult; 066import org.apache.hadoop.hbase.thrift2.generated.TRowMutations; 067import org.apache.hadoop.hbase.thrift2.generated.TScan; 068import org.apache.hadoop.hbase.thrift2.generated.TTableDescriptor; 069import org.apache.hadoop.hbase.util.Bytes; 070import org.apache.thrift.TException; 071import org.apache.thrift.transport.TTransport; 072import org.apache.yetus.audience.InterfaceAudience; 073 074import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 075import org.apache.hbase.thirdparty.com.google.common.primitives.Booleans; 076 077@InterfaceAudience.Private 078public class ThriftTable implements Table { 079 080 private TableName tableName; 081 private Configuration conf; 082 private TTransport tTransport; 083 private THBaseService.Client client; 084 private ByteBuffer tableNameInBytes; 085 private int operationTimeout; 086 087 private final int scannerCaching; 088 089 public ThriftTable(TableName tableName, THBaseService.Client client, TTransport tTransport, 090 Configuration conf) { 091 this.tableName = tableName; 092 this.tableNameInBytes = ByteBuffer.wrap(tableName.toBytes()); 093 this.conf = conf; 094 this.tTransport = tTransport; 095 this.client = client; 096 this.scannerCaching = conf.getInt(HBASE_THRIFT_CLIENT_SCANNER_CACHING, 097 HBASE_THRIFT_CLIENT_SCANNER_CACHING_DEFAULT); 098 this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 099 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); 100 101 102 } 103 104 @Override 105 public TableName getName() { 106 return tableName; 107 } 108 109 @Override 110 public Configuration getConfiguration() { 111 return conf; 112 } 113 114 @Override 115 public TableDescriptor getDescriptor() throws IOException { 116 try { 117 TTableDescriptor tableDescriptor = client 118 .getTableDescriptor(ThriftUtilities.tableNameFromHBase(tableName)); 119 return ThriftUtilities.tableDescriptorFromThrift(tableDescriptor); 120 } catch (TException e) { 121 throw new IOException(e); 122 } 123 } 124 125 @Override 126 public boolean exists(Get get) throws IOException { 127 TGet tGet = ThriftUtilities.getFromHBase(get); 128 try { 129 return client.exists(tableNameInBytes, tGet); 130 } catch (TException e) { 131 throw new IOException(e); 132 } 133 } 134 135 @Override 136 public boolean[] exists(List<Get> gets) throws IOException { 137 List<TGet> tGets = new ArrayList<>(); 138 for (Get get: gets) { 139 tGets.add(ThriftUtilities.getFromHBase(get)); 140 } 141 try { 142 List<Boolean> results = client.existsAll(tableNameInBytes, tGets); 143 return Booleans.toArray(results); 144 } catch (TException e) { 145 throw new IOException(e); 146 } 147 } 148 149 @Override 150 public void batch(List<? extends Row> actions, Object[] results) 151 throws IOException { 152 throw new IOException("Batch not supported in ThriftTable, use put(List<Put> puts), " 153 + "get(List<Get> gets) or delete(List<Delete> deletes) respectively"); 154 155 156 } 157 158 @Override 159 public <R> void batchCallback(List<? extends Row> actions, Object[] results, 160 Batch.Callback<R> callback) throws IOException { 161 throw new IOException("BatchCallback not supported in ThriftTable, use put(List<Put> puts), " 162 + "get(List<Get> gets) or delete(List<Delete> deletes) respectively"); 163 } 164 165 @Override 166 public Result get(Get get) throws IOException { 167 TGet tGet = ThriftUtilities.getFromHBase(get); 168 try { 169 TResult tResult = client.get(tableNameInBytes, tGet); 170 return ThriftUtilities.resultFromThrift(tResult); 171 } catch (TException e) { 172 throw new IOException(e); 173 } 174 } 175 176 @Override 177 public Result[] get(List<Get> gets) throws IOException { 178 List<TGet> tGets = ThriftUtilities.getsFromHBase(gets); 179 try { 180 List<TResult> results = client.getMultiple(tableNameInBytes, tGets); 181 return ThriftUtilities.resultsFromThrift(results); 182 } catch (TException e) { 183 throw new IOException(e); 184 } 185 } 186 187 /** 188 * A scanner to perform scan from thrift server 189 * getScannerResults is used in this scanner 190 */ 191 private class Scanner implements ResultScanner { 192 protected TScan scan; 193 protected Result lastResult = null; 194 protected final Queue<Result> cache = new ArrayDeque<>();; 195 196 197 public Scanner(Scan scan) throws IOException { 198 if (scan.getBatch() > 0) { 199 throw new IOException("Batch is not supported in Scanner"); 200 } 201 if (scan.getCaching() <= 0) { 202 scan.setCaching(scannerCaching); 203 } else if (scan.getCaching() == 1 && scan.isReversed()){ 204 // for reverse scan, we need to pass the last row to the next scanner 205 // we need caching number bigger than 1 206 scan.setCaching(scan.getCaching() + 1); 207 } 208 this.scan = ThriftUtilities.scanFromHBase(scan); 209 } 210 211 212 @Override 213 public Result next() throws IOException { 214 if (cache.size() == 0) { 215 setupNextScanner(); 216 try { 217 List<TResult> tResults = client 218 .getScannerResults(tableNameInBytes, scan, scan.getCaching()); 219 Result[] results = ThriftUtilities.resultsFromThrift(tResults); 220 boolean firstKey = true; 221 for (Result result : results) { 222 // If it is a reverse scan, we use the last result's key as the startkey, since there is 223 // no way to construct a closet rowkey smaller than the last result 224 // So when the results return, we must rule out the first result, since it has already 225 // returned to user. 226 if (firstKey) { 227 firstKey = false; 228 if (scan.isReversed() && lastResult != null) { 229 if (Bytes.equals(lastResult.getRow(), result.getRow())) { 230 continue; 231 } 232 } 233 } 234 cache.add(result); 235 lastResult = result; 236 } 237 } catch (TException e) { 238 throw new IOException(e); 239 } 240 } 241 242 if (cache.size() > 0) { 243 return cache.poll(); 244 } else { 245 //scan finished 246 return null; 247 } 248 } 249 250 @Override 251 public void close() { 252 } 253 254 @Override 255 public boolean renewLease() { 256 throw new RuntimeException("renewLease() not supported"); 257 } 258 259 @Override 260 public ScanMetrics getScanMetrics() { 261 throw new RuntimeException("getScanMetrics() not supported"); 262 } 263 264 private void setupNextScanner() { 265 //if lastResult is null null, it means it is not the fist scan 266 if (lastResult!= null) { 267 byte[] lastRow = lastResult.getRow(); 268 if (scan.isReversed()) { 269 //for reverse scan, we can't find the closet row before this row 270 scan.setStartRow(lastRow); 271 } else { 272 scan.setStartRow(createClosestRowAfter(lastRow)); 273 } 274 } 275 } 276 277 278 /** 279 * Create the closest row after the specified row 280 */ 281 protected byte[] createClosestRowAfter(byte[] row) { 282 if (row == null) { 283 throw new RuntimeException("The passed row is null"); 284 } 285 return Arrays.copyOf(row, row.length + 1); 286 } 287 } 288 289 @Override 290 public ResultScanner getScanner(Scan scan) throws IOException { 291 return new Scanner(scan); 292 } 293 294 @Override 295 public ResultScanner getScanner(byte[] family) throws IOException { 296 Scan scan = new Scan(); 297 scan.addFamily(family); 298 return getScanner(scan); 299 } 300 301 @Override 302 public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException { 303 Scan scan = new Scan(); 304 scan.addColumn(family, qualifier); 305 return getScanner(scan); 306 } 307 308 @Override 309 public void put(Put put) throws IOException { 310 TPut tPut = ThriftUtilities.putFromHBase(put); 311 try { 312 client.put(tableNameInBytes, tPut); 313 } catch (TException e) { 314 throw new IOException(e); 315 } 316 } 317 318 @Override 319 public void put(List<Put> puts) throws IOException { 320 List<TPut> tPuts = ThriftUtilities.putsFromHBase(puts); 321 try { 322 client.putMultiple(tableNameInBytes, tPuts); 323 } catch (TException e) { 324 throw new IOException(e); 325 } 326 } 327 328 @Override 329 public void delete(Delete delete) throws IOException { 330 TDelete tDelete = ThriftUtilities.deleteFromHBase(delete); 331 try { 332 client.deleteSingle(tableNameInBytes, tDelete); 333 } catch (TException e) { 334 throw new IOException(e); 335 } 336 } 337 338 @Override 339 public void delete(List<Delete> deletes) throws IOException { 340 List<TDelete> tDeletes = ThriftUtilities.deletesFromHBase(deletes); 341 try { 342 client.deleteMultiple(tableNameInBytes, tDeletes); 343 } catch (TException e) { 344 throw new IOException(e); 345 } 346 } 347 348 private class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder { 349 350 private final byte[] row; 351 private final byte[] family; 352 private byte[] qualifier; 353 private CompareOperator op; 354 private byte[] value; 355 356 CheckAndMutateBuilderImpl(byte[] row, byte[] family) { 357 this.row = Preconditions.checkNotNull(row, "row is null"); 358 this.family = Preconditions.checkNotNull(family, "family is null"); 359 } 360 361 @Override 362 public CheckAndMutateBuilder qualifier(byte[] qualifier) { 363 this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" + 364 " an empty byte array, or just do not call this method if you want a null qualifier"); 365 return this; 366 } 367 368 @Override 369 public CheckAndMutateBuilder timeRange(TimeRange timeRange) { 370 throw new NotImplementedException("timeRange not supported in ThriftTable"); 371 } 372 373 @Override 374 public CheckAndMutateBuilder ifNotExists() { 375 this.op = CompareOperator.EQUAL; 376 this.value = null; 377 return this; 378 } 379 380 @Override 381 public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) { 382 this.op = Preconditions.checkNotNull(compareOp, "compareOp is null"); 383 this.value = Preconditions.checkNotNull(value, "value is null"); 384 return this; 385 } 386 387 private void preCheck() { 388 Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" + 389 " calling ifNotExists/ifEquals/ifMatches before executing the request"); 390 } 391 392 @Override 393 public boolean thenPut(Put put) throws IOException { 394 preCheck(); 395 RowMutations rowMutations = new RowMutations(put.getRow()); 396 rowMutations.add(put); 397 return checkAndMutate(row, family, qualifier, op, value, rowMutations); 398 } 399 400 @Override 401 public boolean thenDelete(Delete delete) throws IOException { 402 preCheck(); 403 RowMutations rowMutations = new RowMutations(delete.getRow()); 404 rowMutations.add(delete); 405 return checkAndMutate(row, family, qualifier, op, value, rowMutations); 406 } 407 408 @Override 409 public boolean thenMutate(RowMutations mutation) throws IOException { 410 preCheck(); 411 return checkAndMutate(row, family, qualifier, op, value, mutation); 412 } 413 } 414 415 416 @Override 417 public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, 418 byte[] value, RowMutations mutation) throws IOException { 419 try { 420 ByteBuffer valueBuffer = value == null? null : ByteBuffer.wrap(value); 421 return client.checkAndMutate(tableNameInBytes, ByteBuffer.wrap(row), ByteBuffer.wrap(family), 422 ByteBuffer.wrap(qualifier), ThriftUtilities.compareOpFromHBase(op), valueBuffer, 423 ThriftUtilities.rowMutationsFromHBase(mutation)); 424 } catch (TException e) { 425 throw new IOException(e); 426 } 427 } 428 429 @Override 430 public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { 431 return new CheckAndMutateBuilderImpl(row, family); 432 } 433 434 @Override 435 public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) { 436 throw new NotImplementedException("Implement later"); 437 } 438 439 @Override 440 public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) { 441 throw new NotImplementedException("Implement later"); 442 } 443 444 @Override 445 public List<CheckAndMutateResult> checkAndMutate(List<CheckAndMutate> checkAndMutates) { 446 throw new NotImplementedException("Implement later"); 447 } 448 449 @Override 450 public Result mutateRow(RowMutations rm) throws IOException { 451 TRowMutations tRowMutations = ThriftUtilities.rowMutationsFromHBase(rm); 452 try { 453 client.mutateRow(tableNameInBytes, tRowMutations); 454 return Result.EMPTY_RESULT; 455 } catch (TException e) { 456 throw new IOException(e); 457 } 458 } 459 460 @Override 461 public Result append(Append append) throws IOException { 462 TAppend tAppend = ThriftUtilities.appendFromHBase(append); 463 try { 464 TResult tResult = client.append(tableNameInBytes, tAppend); 465 return ThriftUtilities.resultFromThrift(tResult); 466 } catch (TException e) { 467 throw new IOException(e); 468 } 469 } 470 471 @Override 472 public Result increment(Increment increment) throws IOException { 473 TIncrement tIncrement = ThriftUtilities.incrementFromHBase(increment); 474 try { 475 TResult tResult = client.increment(tableNameInBytes, tIncrement); 476 return ThriftUtilities.resultFromThrift(tResult); 477 } catch (TException e) { 478 throw new IOException(e); 479 } 480 } 481 482 @Override 483 public void close() throws IOException { 484 tTransport.close(); 485 } 486 487 @Override 488 public long getRpcTimeout(TimeUnit unit) { 489 return unit.convert(operationTimeout, TimeUnit.MILLISECONDS); 490 } 491 492 @Override 493 public long getReadRpcTimeout(TimeUnit unit) { 494 return unit.convert(operationTimeout, TimeUnit.MILLISECONDS); 495 } 496 497 @Override 498 public long getWriteRpcTimeout(TimeUnit unit) { 499 return unit.convert(operationTimeout, TimeUnit.MILLISECONDS); 500 } 501 502 @Override 503 public long getOperationTimeout(TimeUnit unit) { 504 return unit.convert(operationTimeout, TimeUnit.MILLISECONDS); 505 } 506 507 @Override 508 public CoprocessorRpcChannel coprocessorService(byte[] row) { 509 throw new NotImplementedException("coprocessorService not supported in ThriftTable"); 510 } 511 512 @Override 513 public RegionLocator getRegionLocator() throws IOException { 514 throw new NotImplementedException("getRegionLocator not supported in ThriftTable"); 515 } 516}