001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.thrift2.client; 020 021import static org.apache.hadoop.hbase.thrift.Constants.HBASE_THRIFT_CLIENT_SCANNER_CACHING; 022import static org.apache.hadoop.hbase.thrift.Constants.HBASE_THRIFT_CLIENT_SCANNER_CACHING_DEFAULT; 023 024import java.io.IOException; 025import java.nio.ByteBuffer; 026import java.util.ArrayDeque; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.List; 030import java.util.Queue; 031import java.util.concurrent.TimeUnit; 032 033import org.apache.commons.lang3.NotImplementedException; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.CompareOperator; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.Append; 039import org.apache.hadoop.hbase.client.Delete; 040import org.apache.hadoop.hbase.client.Get; 041import org.apache.hadoop.hbase.client.Increment; 042import org.apache.hadoop.hbase.client.Put; 043import org.apache.hadoop.hbase.client.RegionLocator; 044import org.apache.hadoop.hbase.client.Result; 045import org.apache.hadoop.hbase.client.ResultScanner; 046import org.apache.hadoop.hbase.client.Row; 047import org.apache.hadoop.hbase.client.RowMutations; 048import org.apache.hadoop.hbase.client.Scan; 049import org.apache.hadoop.hbase.client.Table; 050import org.apache.hadoop.hbase.client.TableDescriptor; 051import org.apache.hadoop.hbase.client.coprocessor.Batch; 052import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 053import org.apache.hadoop.hbase.filter.Filter; 054import org.apache.hadoop.hbase.io.TimeRange; 055import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; 056import org.apache.hadoop.hbase.thrift2.ThriftUtilities; 057import org.apache.hadoop.hbase.thrift2.generated.TAppend; 058import org.apache.hadoop.hbase.thrift2.generated.TDelete; 059import org.apache.hadoop.hbase.thrift2.generated.TGet; 060import org.apache.hadoop.hbase.thrift2.generated.THBaseService; 061import org.apache.hadoop.hbase.thrift2.generated.TIncrement; 062import org.apache.hadoop.hbase.thrift2.generated.TPut; 063import org.apache.hadoop.hbase.thrift2.generated.TResult; 064import org.apache.hadoop.hbase.thrift2.generated.TRowMutations; 065import org.apache.hadoop.hbase.thrift2.generated.TScan; 066import org.apache.hadoop.hbase.thrift2.generated.TTableDescriptor; 067import org.apache.hadoop.hbase.util.Bytes; 068import org.apache.thrift.TException; 069import org.apache.thrift.transport.TTransport; 070import org.apache.yetus.audience.InterfaceAudience; 071 072import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 073import org.apache.hbase.thirdparty.com.google.common.primitives.Booleans; 074 075@InterfaceAudience.Private 076public class ThriftTable implements Table { 077 078 private TableName tableName; 079 private Configuration conf; 080 private TTransport tTransport; 081 private THBaseService.Client client; 082 private ByteBuffer tableNameInBytes; 083 private int operationTimeout; 084 085 private final int scannerCaching; 086 087 public ThriftTable(TableName tableName, THBaseService.Client client, TTransport tTransport, 088 Configuration conf) { 089 this.tableName = tableName; 090 this.tableNameInBytes = ByteBuffer.wrap(tableName.toBytes()); 091 this.conf = conf; 092 this.tTransport = tTransport; 093 this.client = client; 094 this.scannerCaching = conf.getInt(HBASE_THRIFT_CLIENT_SCANNER_CACHING, 095 HBASE_THRIFT_CLIENT_SCANNER_CACHING_DEFAULT); 096 this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 097 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); 098 099 100 } 101 102 @Override 103 public TableName getName() { 104 return tableName; 105 } 106 107 @Override 108 public Configuration getConfiguration() { 109 return conf; 110 } 111 112 @Override 113 public TableDescriptor getDescriptor() throws IOException { 114 try { 115 TTableDescriptor tableDescriptor = client 116 .getTableDescriptor(ThriftUtilities.tableNameFromHBase(tableName)); 117 return ThriftUtilities.tableDescriptorFromThrift(tableDescriptor); 118 } catch (TException e) { 119 throw new IOException(e); 120 } 121 } 122 123 @Override 124 public boolean exists(Get get) throws IOException { 125 TGet tGet = ThriftUtilities.getFromHBase(get); 126 try { 127 return client.exists(tableNameInBytes, tGet); 128 } catch (TException e) { 129 throw new IOException(e); 130 } 131 } 132 133 @Override 134 public boolean[] exists(List<Get> gets) throws IOException { 135 List<TGet> tGets = new ArrayList<>(); 136 for (Get get: gets) { 137 tGets.add(ThriftUtilities.getFromHBase(get)); 138 } 139 try { 140 List<Boolean> results = client.existsAll(tableNameInBytes, tGets); 141 return Booleans.toArray(results); 142 } catch (TException e) { 143 throw new IOException(e); 144 } 145 } 146 147 @Override 148 public void batch(List<? extends Row> actions, Object[] results) 149 throws IOException { 150 throw new IOException("Batch not supported in ThriftTable, use put(List<Put> puts), " 151 + "get(List<Get> gets) or delete(List<Delete> deletes) respectively"); 152 153 154 } 155 156 @Override 157 public <R> void batchCallback(List<? extends Row> actions, Object[] results, 158 Batch.Callback<R> callback) throws IOException { 159 throw new IOException("BatchCallback not supported in ThriftTable, use put(List<Put> puts), " 160 + "get(List<Get> gets) or delete(List<Delete> deletes) respectively"); 161 } 162 163 @Override 164 public Result get(Get get) throws IOException { 165 TGet tGet = ThriftUtilities.getFromHBase(get); 166 try { 167 TResult tResult = client.get(tableNameInBytes, tGet); 168 return ThriftUtilities.resultFromThrift(tResult); 169 } catch (TException e) { 170 throw new IOException(e); 171 } 172 } 173 174 @Override 175 public Result[] get(List<Get> gets) throws IOException { 176 List<TGet> tGets = ThriftUtilities.getsFromHBase(gets); 177 try { 178 List<TResult> results = client.getMultiple(tableNameInBytes, tGets); 179 return ThriftUtilities.resultsFromThrift(results); 180 } catch (TException e) { 181 throw new IOException(e); 182 } 183 } 184 185 /** 186 * A scanner to perform scan from thrift server 187 * getScannerResults is used in this scanner 188 */ 189 private class Scanner implements ResultScanner { 190 protected TScan scan; 191 protected Result lastResult = null; 192 protected final Queue<Result> cache = new ArrayDeque<>();; 193 194 195 public Scanner(Scan scan) throws IOException { 196 if (scan.getBatch() > 0) { 197 throw new IOException("Batch is not supported in Scanner"); 198 } 199 if (scan.getCaching() <= 0) { 200 scan.setCaching(scannerCaching); 201 } else if (scan.getCaching() == 1 && scan.isReversed()){ 202 // for reverse scan, we need to pass the last row to the next scanner 203 // we need caching number bigger than 1 204 scan.setCaching(scan.getCaching() + 1); 205 } 206 this.scan = ThriftUtilities.scanFromHBase(scan); 207 } 208 209 210 @Override 211 public Result next() throws IOException { 212 if (cache.size() == 0) { 213 setupNextScanner(); 214 try { 215 List<TResult> tResults = client 216 .getScannerResults(tableNameInBytes, scan, scan.getCaching()); 217 Result[] results = ThriftUtilities.resultsFromThrift(tResults); 218 boolean firstKey = true; 219 for (Result result : results) { 220 // If it is a reverse scan, we use the last result's key as the startkey, since there is 221 // no way to construct a closet rowkey smaller than the last result 222 // So when the results return, we must rule out the first result, since it has already 223 // returned to user. 224 if (firstKey) { 225 firstKey = false; 226 if (scan.isReversed() && lastResult != null) { 227 if (Bytes.equals(lastResult.getRow(), result.getRow())) { 228 continue; 229 } 230 } 231 } 232 cache.add(result); 233 lastResult = result; 234 } 235 } catch (TException e) { 236 throw new IOException(e); 237 } 238 } 239 240 if (cache.size() > 0) { 241 return cache.poll(); 242 } else { 243 //scan finished 244 return null; 245 } 246 } 247 248 @Override 249 public void close() { 250 } 251 252 @Override 253 public boolean renewLease() { 254 throw new RuntimeException("renewLease() not supported"); 255 } 256 257 @Override 258 public ScanMetrics getScanMetrics() { 259 throw new RuntimeException("getScanMetrics() not supported"); 260 } 261 262 private void setupNextScanner() { 263 //if lastResult is null null, it means it is not the fist scan 264 if (lastResult!= null) { 265 byte[] lastRow = lastResult.getRow(); 266 if (scan.isReversed()) { 267 //for reverse scan, we can't find the closet row before this row 268 scan.setStartRow(lastRow); 269 } else { 270 scan.setStartRow(createClosestRowAfter(lastRow)); 271 } 272 } 273 } 274 275 276 /** 277 * Create the closest row after the specified row 278 */ 279 protected byte[] createClosestRowAfter(byte[] row) { 280 if (row == null) { 281 throw new RuntimeException("The passed row is null"); 282 } 283 return Arrays.copyOf(row, row.length + 1); 284 } 285 } 286 287 @Override 288 public ResultScanner getScanner(Scan scan) throws IOException { 289 return new Scanner(scan); 290 } 291 292 @Override 293 public ResultScanner getScanner(byte[] family) throws IOException { 294 Scan scan = new Scan(); 295 scan.addFamily(family); 296 return getScanner(scan); 297 } 298 299 @Override 300 public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException { 301 Scan scan = new Scan(); 302 scan.addColumn(family, qualifier); 303 return getScanner(scan); 304 } 305 306 @Override 307 public void put(Put put) throws IOException { 308 TPut tPut = ThriftUtilities.putFromHBase(put); 309 try { 310 client.put(tableNameInBytes, tPut); 311 } catch (TException e) { 312 throw new IOException(e); 313 } 314 } 315 316 @Override 317 public void put(List<Put> puts) throws IOException { 318 List<TPut> tPuts = ThriftUtilities.putsFromHBase(puts); 319 try { 320 client.putMultiple(tableNameInBytes, tPuts); 321 } catch (TException e) { 322 throw new IOException(e); 323 } 324 } 325 326 @Override 327 public void delete(Delete delete) throws IOException { 328 TDelete tDelete = ThriftUtilities.deleteFromHBase(delete); 329 try { 330 client.deleteSingle(tableNameInBytes, tDelete); 331 } catch (TException e) { 332 throw new IOException(e); 333 } 334 } 335 336 @Override 337 public void delete(List<Delete> deletes) throws IOException { 338 List<TDelete> tDeletes = ThriftUtilities.deletesFromHBase(deletes); 339 try { 340 client.deleteMultiple(tableNameInBytes, tDeletes); 341 } catch (TException e) { 342 throw new IOException(e); 343 } 344 } 345 346 private class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder { 347 348 private final byte[] row; 349 private final byte[] family; 350 private byte[] qualifier; 351 private CompareOperator op; 352 private byte[] value; 353 354 CheckAndMutateBuilderImpl(byte[] row, byte[] family) { 355 this.row = Preconditions.checkNotNull(row, "row is null"); 356 this.family = Preconditions.checkNotNull(family, "family is null"); 357 } 358 359 @Override 360 public CheckAndMutateBuilder qualifier(byte[] qualifier) { 361 this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" + 362 " an empty byte array, or just do not call this method if you want a null qualifier"); 363 return this; 364 } 365 366 @Override 367 public CheckAndMutateBuilder timeRange(TimeRange timeRange) { 368 throw new NotImplementedException("timeRange not supported in ThriftTable"); 369 } 370 371 @Override 372 public CheckAndMutateBuilder ifNotExists() { 373 this.op = CompareOperator.EQUAL; 374 this.value = null; 375 return this; 376 } 377 378 @Override 379 public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) { 380 this.op = Preconditions.checkNotNull(compareOp, "compareOp is null"); 381 this.value = Preconditions.checkNotNull(value, "value is null"); 382 return this; 383 } 384 385 private void preCheck() { 386 Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" + 387 " calling ifNotExists/ifEquals/ifMatches before executing the request"); 388 } 389 390 @Override 391 public boolean thenPut(Put put) throws IOException { 392 preCheck(); 393 RowMutations rowMutations = new RowMutations(put.getRow()); 394 rowMutations.add(put); 395 return checkAndMutate(row, family, qualifier, op, value, rowMutations); 396 } 397 398 @Override 399 public boolean thenDelete(Delete delete) throws IOException { 400 preCheck(); 401 RowMutations rowMutations = new RowMutations(delete.getRow()); 402 rowMutations.add(delete); 403 return checkAndMutate(row, family, qualifier, op, value, rowMutations); 404 } 405 406 @Override 407 public boolean thenMutate(RowMutations mutation) throws IOException { 408 preCheck(); 409 return checkAndMutate(row, family, qualifier, op, value, mutation); 410 } 411 } 412 413 414 @Override 415 public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, 416 byte[] value, RowMutations mutation) throws IOException { 417 try { 418 ByteBuffer valueBuffer = value == null? null : ByteBuffer.wrap(value); 419 return client.checkAndMutate(tableNameInBytes, ByteBuffer.wrap(row), ByteBuffer.wrap(family), 420 ByteBuffer.wrap(qualifier), ThriftUtilities.compareOpFromHBase(op), valueBuffer, 421 ThriftUtilities.rowMutationsFromHBase(mutation)); 422 } catch (TException e) { 423 throw new IOException(e); 424 } 425 } 426 427 @Override 428 public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { 429 return new CheckAndMutateBuilderImpl(row, family); 430 } 431 432 @Override 433 public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) { 434 throw new NotImplementedException("Implement later"); 435 } 436 437 @Override 438 public void mutateRow(RowMutations rm) throws IOException { 439 TRowMutations tRowMutations = ThriftUtilities.rowMutationsFromHBase(rm); 440 try { 441 client.mutateRow(tableNameInBytes, tRowMutations); 442 } catch (TException e) { 443 throw new IOException(e); 444 } 445 } 446 447 @Override 448 public Result append(Append append) throws IOException { 449 TAppend tAppend = ThriftUtilities.appendFromHBase(append); 450 try { 451 TResult tResult = client.append(tableNameInBytes, tAppend); 452 return ThriftUtilities.resultFromThrift(tResult); 453 } catch (TException e) { 454 throw new IOException(e); 455 } 456 } 457 458 @Override 459 public Result increment(Increment increment) throws IOException { 460 TIncrement tIncrement = ThriftUtilities.incrementFromHBase(increment); 461 try { 462 TResult tResult = client.increment(tableNameInBytes, tIncrement); 463 return ThriftUtilities.resultFromThrift(tResult); 464 } catch (TException e) { 465 throw new IOException(e); 466 } 467 } 468 469 @Override 470 public void close() throws IOException { 471 tTransport.close(); 472 } 473 474 @Override 475 public long getRpcTimeout(TimeUnit unit) { 476 return unit.convert(operationTimeout, TimeUnit.MILLISECONDS); 477 } 478 479 @Override 480 public long getReadRpcTimeout(TimeUnit unit) { 481 return unit.convert(operationTimeout, TimeUnit.MILLISECONDS); 482 } 483 484 @Override 485 public long getWriteRpcTimeout(TimeUnit unit) { 486 return unit.convert(operationTimeout, TimeUnit.MILLISECONDS); 487 } 488 489 @Override 490 public long getOperationTimeout(TimeUnit unit) { 491 return unit.convert(operationTimeout, TimeUnit.MILLISECONDS); 492 } 493 494 @Override 495 public CoprocessorRpcChannel coprocessorService(byte[] row) { 496 throw new NotImplementedException("coprocessorService not supported in ThriftTable"); 497 } 498 499 @Override 500 public RegionLocator getRegionLocator() throws IOException { 501 throw new NotImplementedException("getRegionLocator not supported in ThriftTable"); 502 } 503}