001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.thrift2.client; 020 021import static org.apache.hadoop.hbase.thrift.Constants.HBASE_THRIFT_CLIENT_SCANNER_CACHING; 022import static org.apache.hadoop.hbase.thrift.Constants.HBASE_THRIFT_CLIENT_SCANNER_CACHING_DEFAULT; 023 024import java.io.IOException; 025import java.nio.ByteBuffer; 026import java.util.ArrayDeque; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.List; 030import java.util.Queue; 031import java.util.concurrent.TimeUnit; 032 033import org.apache.commons.lang3.NotImplementedException; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.CompareOperator; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.Append; 039import org.apache.hadoop.hbase.client.Delete; 040import org.apache.hadoop.hbase.client.Get; 041import org.apache.hadoop.hbase.client.Increment; 042import org.apache.hadoop.hbase.client.Put; 043import org.apache.hadoop.hbase.client.Result; 044import org.apache.hadoop.hbase.client.ResultScanner; 045import org.apache.hadoop.hbase.client.Row; 046import org.apache.hadoop.hbase.client.RowMutations; 047import org.apache.hadoop.hbase.client.Scan; 048import org.apache.hadoop.hbase.client.Table; 049import org.apache.hadoop.hbase.client.TableDescriptor; 050import org.apache.hadoop.hbase.client.coprocessor.Batch; 051import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 052import org.apache.hadoop.hbase.io.TimeRange; 053import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; 054import org.apache.hadoop.hbase.thrift2.ThriftUtilities; 055import org.apache.hadoop.hbase.thrift2.generated.TAppend; 056import org.apache.hadoop.hbase.thrift2.generated.TDelete; 057import org.apache.hadoop.hbase.thrift2.generated.TGet; 058import org.apache.hadoop.hbase.thrift2.generated.THBaseService; 059import org.apache.hadoop.hbase.thrift2.generated.TIncrement; 060import org.apache.hadoop.hbase.thrift2.generated.TPut; 061import org.apache.hadoop.hbase.thrift2.generated.TResult; 062import org.apache.hadoop.hbase.thrift2.generated.TRowMutations; 063import org.apache.hadoop.hbase.thrift2.generated.TScan; 064import org.apache.hadoop.hbase.thrift2.generated.TTableDescriptor; 065import org.apache.hadoop.hbase.util.Bytes; 066import org.apache.thrift.TException; 067import org.apache.thrift.transport.TTransport; 068import org.apache.yetus.audience.InterfaceAudience; 069 070import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 071import org.apache.hbase.thirdparty.com.google.common.primitives.Booleans; 072 073@InterfaceAudience.Private 074public class ThriftTable implements Table { 075 076 private TableName tableName; 077 private Configuration conf; 078 private TTransport tTransport; 079 private THBaseService.Client client; 080 private ByteBuffer tableNameInBytes; 081 private int operationTimeout; 082 083 private final int scannerCaching; 084 085 public ThriftTable(TableName tableName, THBaseService.Client client, TTransport tTransport, 086 Configuration conf) { 087 this.tableName = tableName; 088 this.tableNameInBytes = ByteBuffer.wrap(tableName.toBytes()); 089 this.conf = conf; 090 this.tTransport = tTransport; 091 this.client = client; 092 this.scannerCaching = conf.getInt(HBASE_THRIFT_CLIENT_SCANNER_CACHING, 093 HBASE_THRIFT_CLIENT_SCANNER_CACHING_DEFAULT); 094 this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 095 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); 096 097 098 } 099 100 @Override 101 public TableName getName() { 102 return tableName; 103 } 104 105 @Override 106 public Configuration getConfiguration() { 107 return conf; 108 } 109 110 @Override 111 public TableDescriptor getDescriptor() throws IOException { 112 try { 113 TTableDescriptor tableDescriptor = client 114 .getTableDescriptor(ThriftUtilities.tableNameFromHBase(tableName)); 115 return ThriftUtilities.tableDescriptorFromThrift(tableDescriptor); 116 } catch (TException e) { 117 throw new IOException(e); 118 } 119 } 120 121 @Override 122 public boolean exists(Get get) throws IOException { 123 TGet tGet = ThriftUtilities.getFromHBase(get); 124 try { 125 return client.exists(tableNameInBytes, tGet); 126 } catch (TException e) { 127 throw new IOException(e); 128 } 129 } 130 131 @Override 132 public boolean[] exists(List<Get> gets) throws IOException { 133 List<TGet> tGets = new ArrayList<>(); 134 for (Get get: gets) { 135 tGets.add(ThriftUtilities.getFromHBase(get)); 136 } 137 try { 138 List<Boolean> results = client.existsAll(tableNameInBytes, tGets); 139 return Booleans.toArray(results); 140 } catch (TException e) { 141 throw new IOException(e); 142 } 143 } 144 145 @Override 146 public void batch(List<? extends Row> actions, Object[] results) 147 throws IOException { 148 throw new IOException("Batch not supported in ThriftTable, use put(List<Put> puts), " 149 + "get(List<Get> gets) or delete(List<Delete> deletes) respectively"); 150 151 152 } 153 154 @Override 155 public <R> void batchCallback(List<? extends Row> actions, Object[] results, 156 Batch.Callback<R> callback) throws IOException { 157 throw new IOException("BatchCallback not supported in ThriftTable, use put(List<Put> puts), " 158 + "get(List<Get> gets) or delete(List<Delete> deletes) respectively"); 159 } 160 161 @Override 162 public Result get(Get get) throws IOException { 163 TGet tGet = ThriftUtilities.getFromHBase(get); 164 try { 165 TResult tResult = client.get(tableNameInBytes, tGet); 166 return ThriftUtilities.resultFromThrift(tResult); 167 } catch (TException e) { 168 throw new IOException(e); 169 } 170 } 171 172 @Override 173 public Result[] get(List<Get> gets) throws IOException { 174 List<TGet> tGets = ThriftUtilities.getsFromHBase(gets); 175 try { 176 List<TResult> results = client.getMultiple(tableNameInBytes, tGets); 177 return ThriftUtilities.resultsFromThrift(results); 178 } catch (TException e) { 179 throw new IOException(e); 180 } 181 } 182 183 /** 184 * A scanner to perform scan from thrift server 185 * getScannerResults is used in this scanner 186 */ 187 private class Scanner implements ResultScanner { 188 protected TScan scan; 189 protected Result lastResult = null; 190 protected final Queue<Result> cache = new ArrayDeque<>();; 191 192 193 public Scanner(Scan scan) throws IOException { 194 if (scan.getBatch() > 0) { 195 throw new IOException("Batch is not supported in Scanner"); 196 } 197 if (scan.getCaching() <= 0) { 198 scan.setCaching(scannerCaching); 199 } else if (scan.getCaching() == 1 && scan.isReversed()){ 200 // for reverse scan, we need to pass the last row to the next scanner 201 // we need caching number bigger than 1 202 scan.setCaching(scan.getCaching() + 1); 203 } 204 this.scan = ThriftUtilities.scanFromHBase(scan); 205 } 206 207 208 @Override 209 public Result next() throws IOException { 210 if (cache.size() == 0) { 211 setupNextScanner(); 212 try { 213 List<TResult> tResults = client 214 .getScannerResults(tableNameInBytes, scan, scan.getCaching()); 215 Result[] results = ThriftUtilities.resultsFromThrift(tResults); 216 boolean firstKey = true; 217 for (Result result : results) { 218 // If it is a reverse scan, we use the last result's key as the startkey, since there is 219 // no way to construct a closet rowkey smaller than the last result 220 // So when the results return, we must rule out the first result, since it has already 221 // returned to user. 222 if (firstKey) { 223 firstKey = false; 224 if (scan.isReversed() && lastResult != null) { 225 if (Bytes.equals(lastResult.getRow(), result.getRow())) { 226 continue; 227 } 228 } 229 } 230 cache.add(result); 231 lastResult = result; 232 } 233 } catch (TException e) { 234 throw new IOException(e); 235 } 236 } 237 238 if (cache.size() > 0) { 239 return cache.poll(); 240 } else { 241 //scan finished 242 return null; 243 } 244 } 245 246 @Override 247 public void close() { 248 } 249 250 @Override 251 public boolean renewLease() { 252 throw new RuntimeException("renewLease() not supported"); 253 } 254 255 @Override 256 public ScanMetrics getScanMetrics() { 257 throw new RuntimeException("getScanMetrics() not supported"); 258 } 259 260 private void setupNextScanner() { 261 //if lastResult is null null, it means it is not the fist scan 262 if (lastResult!= null) { 263 byte[] lastRow = lastResult.getRow(); 264 if (scan.isReversed()) { 265 //for reverse scan, we can't find the closet row before this row 266 scan.setStartRow(lastRow); 267 } else { 268 scan.setStartRow(createClosestRowAfter(lastRow)); 269 } 270 } 271 } 272 273 274 /** 275 * Create the closest row after the specified row 276 */ 277 protected byte[] createClosestRowAfter(byte[] row) { 278 if (row == null) { 279 throw new RuntimeException("The passed row is null"); 280 } 281 return Arrays.copyOf(row, row.length + 1); 282 } 283 } 284 285 @Override 286 public ResultScanner getScanner(Scan scan) throws IOException { 287 return new Scanner(scan); 288 } 289 290 @Override 291 public ResultScanner getScanner(byte[] family) throws IOException { 292 Scan scan = new Scan(); 293 scan.addFamily(family); 294 return getScanner(scan); 295 } 296 297 @Override 298 public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException { 299 Scan scan = new Scan(); 300 scan.addColumn(family, qualifier); 301 return getScanner(scan); 302 } 303 304 @Override 305 public void put(Put put) throws IOException { 306 TPut tPut = ThriftUtilities.putFromHBase(put); 307 try { 308 client.put(tableNameInBytes, tPut); 309 } catch (TException e) { 310 throw new IOException(e); 311 } 312 } 313 314 @Override 315 public void put(List<Put> puts) throws IOException { 316 List<TPut> tPuts = ThriftUtilities.putsFromHBase(puts); 317 try { 318 client.putMultiple(tableNameInBytes, tPuts); 319 } catch (TException e) { 320 throw new IOException(e); 321 } 322 } 323 324 @Override 325 public void delete(Delete delete) throws IOException { 326 TDelete tDelete = ThriftUtilities.deleteFromHBase(delete); 327 try { 328 client.deleteSingle(tableNameInBytes, tDelete); 329 } catch (TException e) { 330 throw new IOException(e); 331 } 332 } 333 334 @Override 335 public void delete(List<Delete> deletes) throws IOException { 336 List<TDelete> tDeletes = ThriftUtilities.deletesFromHBase(deletes); 337 try { 338 client.deleteMultiple(tableNameInBytes, tDeletes); 339 } catch (TException e) { 340 throw new IOException(e); 341 } 342 } 343 344 private class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder { 345 346 private final byte[] row; 347 private final byte[] family; 348 private byte[] qualifier; 349 private CompareOperator op; 350 private byte[] value; 351 352 CheckAndMutateBuilderImpl(byte[] row, byte[] family) { 353 this.row = Preconditions.checkNotNull(row, "row is null"); 354 this.family = Preconditions.checkNotNull(family, "family is null"); 355 } 356 357 @Override 358 public CheckAndMutateBuilder qualifier(byte[] qualifier) { 359 this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" + 360 " an empty byte array, or just do not call this method if you want a null qualifier"); 361 return this; 362 } 363 364 @Override 365 public CheckAndMutateBuilder timeRange(TimeRange timeRange) { 366 throw new NotImplementedException("timeRange not supported in ThriftTable"); 367 } 368 369 @Override 370 public CheckAndMutateBuilder ifNotExists() { 371 this.op = CompareOperator.EQUAL; 372 this.value = null; 373 return this; 374 } 375 376 @Override 377 public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) { 378 this.op = Preconditions.checkNotNull(compareOp, "compareOp is null"); 379 this.value = Preconditions.checkNotNull(value, "value is null"); 380 return this; 381 } 382 383 private void preCheck() { 384 Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" + 385 " calling ifNotExists/ifEquals/ifMatches before executing the request"); 386 } 387 388 @Override 389 public boolean thenPut(Put put) throws IOException { 390 preCheck(); 391 RowMutations rowMutations = new RowMutations(put.getRow()); 392 rowMutations.add(put); 393 return checkAndMutate(row, family, qualifier, op, value, rowMutations); 394 } 395 396 @Override 397 public boolean thenDelete(Delete delete) throws IOException { 398 preCheck(); 399 RowMutations rowMutations = new RowMutations(delete.getRow()); 400 rowMutations.add(delete); 401 return checkAndMutate(row, family, qualifier, op, value, rowMutations); 402 } 403 404 @Override 405 public boolean thenMutate(RowMutations mutation) throws IOException { 406 preCheck(); 407 return checkAndMutate(row, family, qualifier, op, value, mutation); 408 } 409 } 410 411 412 @Override 413 public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, 414 byte[] value, RowMutations mutation) throws IOException { 415 try { 416 ByteBuffer valueBuffer = value == null? null : ByteBuffer.wrap(value); 417 return client.checkAndMutate(tableNameInBytes, ByteBuffer.wrap(row), ByteBuffer.wrap(family), 418 ByteBuffer.wrap(qualifier), ThriftUtilities.compareOpFromHBase(op), valueBuffer, 419 ThriftUtilities.rowMutationsFromHBase(mutation)); 420 } catch (TException e) { 421 throw new IOException(e); 422 } 423 } 424 425 @Override 426 public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { 427 return new CheckAndMutateBuilderImpl(row, family); 428 } 429 430 @Override 431 public void mutateRow(RowMutations rm) throws IOException { 432 TRowMutations tRowMutations = ThriftUtilities.rowMutationsFromHBase(rm); 433 try { 434 client.mutateRow(tableNameInBytes, tRowMutations); 435 } catch (TException e) { 436 throw new IOException(e); 437 } 438 } 439 440 @Override 441 public Result append(Append append) throws IOException { 442 TAppend tAppend = ThriftUtilities.appendFromHBase(append); 443 try { 444 TResult tResult = client.append(tableNameInBytes, tAppend); 445 return ThriftUtilities.resultFromThrift(tResult); 446 } catch (TException e) { 447 throw new IOException(e); 448 } 449 } 450 451 @Override 452 public Result increment(Increment increment) throws IOException { 453 TIncrement tIncrement = ThriftUtilities.incrementFromHBase(increment); 454 try { 455 TResult tResult = client.increment(tableNameInBytes, tIncrement); 456 return ThriftUtilities.resultFromThrift(tResult); 457 } catch (TException e) { 458 throw new IOException(e); 459 } 460 } 461 462 @Override 463 public void close() throws IOException { 464 tTransport.close(); 465 } 466 467 @Override 468 public long getRpcTimeout(TimeUnit unit) { 469 return unit.convert(operationTimeout, TimeUnit.MILLISECONDS); 470 } 471 472 @Override 473 public long getReadRpcTimeout(TimeUnit unit) { 474 return unit.convert(operationTimeout, TimeUnit.MILLISECONDS); 475 } 476 477 @Override 478 public long getWriteRpcTimeout(TimeUnit unit) { 479 return unit.convert(operationTimeout, TimeUnit.MILLISECONDS); 480 } 481 482 @Override 483 public long getOperationTimeout(TimeUnit unit) { 484 return unit.convert(operationTimeout, TimeUnit.MILLISECONDS); 485 } 486 487 @Override 488 public CoprocessorRpcChannel coprocessorService(byte[] row) { 489 throw new NotImplementedException("coprocessorService not supported in ThriftTable"); 490 } 491 492}