001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.thrift2; 020 021import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.appendFromThrift; 022import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.compareOpFromThrift; 023import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deleteFromThrift; 024import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deletesFromThrift; 025import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.getFromThrift; 026import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.getsFromThrift; 027import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.incrementFromThrift; 028import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.putFromThrift; 029import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.putsFromThrift; 030import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.resultFromHBase; 031import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.resultsFromHBase; 032import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.rowMutationsFromThrift; 033import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.scanFromThrift; 034import static org.apache.thrift.TBaseHelper.byteBufferToByteArray; 035 036import java.io.IOException; 037import java.lang.reflect.InvocationHandler; 038import java.lang.reflect.InvocationTargetException; 039import java.lang.reflect.Method; 040import java.lang.reflect.Proxy; 041import java.nio.ByteBuffer; 042import java.util.ArrayList; 043import java.util.Collections; 044import java.util.List; 045import java.util.Map; 046import java.util.concurrent.ConcurrentHashMap; 047import java.util.concurrent.atomic.AtomicInteger; 048 049import org.apache.hadoop.conf.Configuration; 050import org.apache.hadoop.hbase.DoNotRetryIOException; 051import org.apache.hadoop.hbase.HRegionLocation; 052import org.apache.hadoop.hbase.client.RegionLocator; 053import org.apache.hadoop.hbase.client.ResultScanner; 054import org.apache.hadoop.hbase.client.Table; 055import org.apache.hadoop.hbase.security.UserProvider; 056import org.apache.hadoop.hbase.thrift.ThriftMetrics; 057import org.apache.hadoop.hbase.thrift2.generated.TAppend; 058import org.apache.hadoop.hbase.thrift2.generated.TCompareOp; 059import org.apache.hadoop.hbase.thrift2.generated.TDelete; 060import org.apache.hadoop.hbase.thrift2.generated.TGet; 061import org.apache.hadoop.hbase.thrift2.generated.THBaseService; 062import org.apache.hadoop.hbase.thrift2.generated.THRegionLocation; 063import org.apache.hadoop.hbase.thrift2.generated.TIOError; 064import org.apache.hadoop.hbase.thrift2.generated.TIllegalArgument; 065import org.apache.hadoop.hbase.thrift2.generated.TIncrement; 066import org.apache.hadoop.hbase.thrift2.generated.TPut; 067import org.apache.hadoop.hbase.thrift2.generated.TResult; 068import org.apache.hadoop.hbase.thrift2.generated.TRowMutations; 069import org.apache.hadoop.hbase.thrift2.generated.TScan; 070import org.apache.hadoop.hbase.util.Bytes; 071import org.apache.hadoop.hbase.util.ConnectionCache; 072import org.apache.thrift.TException; 073import org.apache.yetus.audience.InterfaceAudience; 074import org.slf4j.Logger; 075import org.slf4j.LoggerFactory; 076 077/** 078 * This class is a glue object that connects Thrift RPC calls to the HBase client API primarily 079 * defined in the Table interface. 080 */ 081@InterfaceAudience.Private 082@SuppressWarnings("deprecation") 083public class ThriftHBaseServiceHandler implements THBaseService.Iface { 084 085 // TODO: Size of pool configuraple 086 private static final Logger LOG = LoggerFactory.getLogger(ThriftHBaseServiceHandler.class); 087 088 // nextScannerId and scannerMap are used to manage scanner state 089 // TODO: Cleanup thread for Scanners, Scanner id wrap 090 private final AtomicInteger nextScannerId = new AtomicInteger(0); 091 private final Map<Integer, ResultScanner> scannerMap = new ConcurrentHashMap<>(); 092 093 private final ConnectionCache connectionCache; 094 095 static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval"; 096 static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime"; 097 098 private static final IOException ioe 099 = new DoNotRetryIOException("Thrift Server is in Read-only mode."); 100 private boolean isReadOnly; 101 102 public static THBaseService.Iface newInstance( 103 THBaseService.Iface handler, ThriftMetrics metrics) { 104 return (THBaseService.Iface) Proxy.newProxyInstance(handler.getClass().getClassLoader(), 105 new Class[] { THBaseService.Iface.class }, new THBaseServiceMetricsProxy(handler, metrics)); 106 } 107 108 private static final class THBaseServiceMetricsProxy implements InvocationHandler { 109 private final THBaseService.Iface handler; 110 private final ThriftMetrics metrics; 111 112 private THBaseServiceMetricsProxy(THBaseService.Iface handler, ThriftMetrics metrics) { 113 this.handler = handler; 114 this.metrics = metrics; 115 } 116 117 @Override 118 public Object invoke(Object proxy, Method m, Object[] args) throws Throwable { 119 Object result; 120 long start = now(); 121 try { 122 result = m.invoke(handler, args); 123 } catch (InvocationTargetException e) { 124 metrics.exception(e.getCause()); 125 throw e.getTargetException(); 126 } catch (Exception e) { 127 metrics.exception(e); 128 throw new RuntimeException("unexpected invocation exception: " + e.getMessage()); 129 } finally { 130 long processTime = now() - start; 131 metrics.incMethodTime(m.getName(), processTime); 132 } 133 return result; 134 } 135 } 136 137 private static class TIOErrorWithCause extends TIOError { 138 private Throwable cause; 139 140 public TIOErrorWithCause(Throwable cause) { 141 super(); 142 this.cause = cause; 143 } 144 145 @Override 146 public synchronized Throwable getCause() { 147 return cause; 148 } 149 150 @Override 151 public boolean equals(Object other) { 152 if (super.equals(other) && 153 other instanceof TIOErrorWithCause) { 154 Throwable otherCause = ((TIOErrorWithCause) other).getCause(); 155 if (this.getCause() != null) { 156 return otherCause != null && this.getCause().equals(otherCause); 157 } else { 158 return otherCause == null; 159 } 160 } 161 return false; 162 } 163 164 @Override 165 public int hashCode() { 166 int result = super.hashCode(); 167 result = 31 * result + (cause != null ? cause.hashCode() : 0); 168 return result; 169 } 170 } 171 172 private static long now() { 173 return System.nanoTime(); 174 } 175 176 ThriftHBaseServiceHandler(final Configuration conf, 177 final UserProvider userProvider) throws IOException { 178 int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000); 179 int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000); 180 connectionCache = new ConnectionCache( 181 conf, userProvider, cleanInterval, maxIdleTime); 182 isReadOnly = conf.getBoolean("hbase.thrift.readonly", false); 183 } 184 185 private Table getTable(ByteBuffer tableName) { 186 try { 187 return connectionCache.getTable(Bytes.toString(byteBufferToByteArray(tableName))); 188 } catch (IOException ie) { 189 throw new RuntimeException(ie); 190 } 191 } 192 193 private RegionLocator getLocator(ByteBuffer tableName) { 194 try { 195 return connectionCache.getRegionLocator(byteBufferToByteArray(tableName)); 196 } catch (IOException ie) { 197 throw new RuntimeException(ie); 198 } 199 } 200 201 private void closeTable(Table table) throws TIOError { 202 try { 203 table.close(); 204 } catch (IOException e) { 205 throw getTIOError(e); 206 } 207 } 208 209 private TIOError getTIOError(IOException e) { 210 TIOError err = new TIOErrorWithCause(e); 211 err.setMessage(e.getMessage()); 212 return err; 213 } 214 215 /** 216 * Assigns a unique ID to the scanner and adds the mapping to an internal HashMap. 217 * @param scanner to add 218 * @return Id for this Scanner 219 */ 220 private int addScanner(ResultScanner scanner) { 221 int id = nextScannerId.getAndIncrement(); 222 scannerMap.put(id, scanner); 223 return id; 224 } 225 226 /** 227 * Returns the Scanner associated with the specified Id. 228 * @param id of the Scanner to get 229 * @return a Scanner, or null if the Id is invalid 230 */ 231 private ResultScanner getScanner(int id) { 232 return scannerMap.get(id); 233 } 234 235 void setEffectiveUser(String effectiveUser) { 236 connectionCache.setEffectiveUser(effectiveUser); 237 } 238 239 /** 240 * Removes the scanner associated with the specified ID from the internal HashMap. 241 * @param id of the Scanner to remove 242 * @return the removed Scanner, or <code>null</code> if the Id is invalid 243 */ 244 protected ResultScanner removeScanner(int id) { 245 return scannerMap.remove(id); 246 } 247 248 @Override 249 public boolean exists(ByteBuffer table, TGet get) throws TIOError, TException { 250 Table htable = getTable(table); 251 try { 252 return htable.exists(getFromThrift(get)); 253 } catch (IOException e) { 254 throw getTIOError(e); 255 } finally { 256 closeTable(htable); 257 } 258 } 259 260 @Override 261 public List<Boolean> existsAll(ByteBuffer table, List<TGet> gets) throws TIOError, TException { 262 Table htable = getTable(table); 263 try { 264 boolean[] exists = htable.existsAll(getsFromThrift(gets)); 265 List<Boolean> result = new ArrayList<>(exists.length); 266 for (boolean exist : exists) { 267 result.add(exist); 268 } 269 return result; 270 } catch (IOException e) { 271 throw getTIOError(e); 272 } finally { 273 closeTable(htable); 274 } 275 } 276 277 @Override 278 public TResult get(ByteBuffer table, TGet get) throws TIOError, TException { 279 Table htable = getTable(table); 280 try { 281 return resultFromHBase(htable.get(getFromThrift(get))); 282 } catch (IOException e) { 283 throw getTIOError(e); 284 } finally { 285 closeTable(htable); 286 } 287 } 288 289 @Override 290 public List<TResult> getMultiple(ByteBuffer table, List<TGet> gets) throws TIOError, TException { 291 Table htable = getTable(table); 292 try { 293 return resultsFromHBase(htable.get(getsFromThrift(gets))); 294 } catch (IOException e) { 295 throw getTIOError(e); 296 } finally { 297 closeTable(htable); 298 } 299 } 300 301 @Override 302 public void put(ByteBuffer table, TPut put) throws TIOError, TException { 303 checkReadOnlyMode(); 304 Table htable = getTable(table); 305 try { 306 htable.put(putFromThrift(put)); 307 } catch (IOException e) { 308 throw getTIOError(e); 309 } finally { 310 closeTable(htable); 311 } 312 } 313 314 @Override 315 public boolean checkAndPut(ByteBuffer table, ByteBuffer row, ByteBuffer family, 316 ByteBuffer qualifier, ByteBuffer value, TPut put) throws TIOError, TException { 317 checkReadOnlyMode(); 318 Table htable = getTable(table); 319 try { 320 Table.CheckAndMutateBuilder builder = htable.checkAndMutate(byteBufferToByteArray(row), 321 byteBufferToByteArray(family)).qualifier(byteBufferToByteArray(qualifier)); 322 if (value == null) { 323 return builder.ifNotExists().thenPut(putFromThrift(put)); 324 } else { 325 return builder.ifEquals(byteBufferToByteArray(value)).thenPut(putFromThrift(put)); 326 } 327 } catch (IOException e) { 328 throw getTIOError(e); 329 } finally { 330 closeTable(htable); 331 } 332 } 333 334 @Override 335 public void putMultiple(ByteBuffer table, List<TPut> puts) throws TIOError, TException { 336 checkReadOnlyMode(); 337 Table htable = getTable(table); 338 try { 339 htable.put(putsFromThrift(puts)); 340 } catch (IOException e) { 341 throw getTIOError(e); 342 } finally { 343 closeTable(htable); 344 } 345 } 346 347 @Override 348 public void deleteSingle(ByteBuffer table, TDelete deleteSingle) throws TIOError, TException { 349 checkReadOnlyMode(); 350 Table htable = getTable(table); 351 try { 352 htable.delete(deleteFromThrift(deleteSingle)); 353 } catch (IOException e) { 354 throw getTIOError(e); 355 } finally { 356 closeTable(htable); 357 } 358 } 359 360 @Override 361 public List<TDelete> deleteMultiple(ByteBuffer table, List<TDelete> deletes) throws TIOError, 362 TException { 363 checkReadOnlyMode(); 364 Table htable = getTable(table); 365 try { 366 htable.delete(deletesFromThrift(deletes)); 367 } catch (IOException e) { 368 throw getTIOError(e); 369 } finally { 370 closeTable(htable); 371 } 372 return Collections.emptyList(); 373 } 374 375 @Override 376 public boolean checkAndMutate(ByteBuffer table, ByteBuffer row, ByteBuffer family, 377 ByteBuffer qualifier, TCompareOp compareOp, ByteBuffer value, TRowMutations rowMutations) 378 throws TIOError, TException { 379 checkReadOnlyMode(); 380 try (final Table htable = getTable(table)) { 381 return htable.checkAndMutate(byteBufferToByteArray(row), byteBufferToByteArray(family)) 382 .qualifier(byteBufferToByteArray(qualifier)) 383 .ifMatches(compareOpFromThrift(compareOp), byteBufferToByteArray(value)) 384 .thenMutate(rowMutationsFromThrift(rowMutations)); 385 } catch (IOException e) { 386 throw getTIOError(e); 387 } 388 } 389 390 @Override 391 public boolean checkAndDelete(ByteBuffer table, ByteBuffer row, ByteBuffer family, 392 ByteBuffer qualifier, ByteBuffer value, TDelete deleteSingle) throws TIOError, TException { 393 checkReadOnlyMode(); 394 Table htable = getTable(table); 395 try { 396 Table.CheckAndMutateBuilder mutateBuilder = 397 htable.checkAndMutate(byteBufferToByteArray(row), byteBufferToByteArray(family)) 398 .qualifier(byteBufferToByteArray(qualifier)); 399 if (value == null) { 400 return mutateBuilder.ifNotExists().thenDelete(deleteFromThrift(deleteSingle)); 401 } else { 402 return mutateBuilder.ifEquals(byteBufferToByteArray(value)) 403 .thenDelete(deleteFromThrift(deleteSingle)); 404 } 405 } catch (IOException e) { 406 throw getTIOError(e); 407 } finally { 408 closeTable(htable); 409 } 410 } 411 412 @Override 413 public TResult increment(ByteBuffer table, TIncrement increment) throws TIOError, TException { 414 checkReadOnlyMode(); 415 Table htable = getTable(table); 416 try { 417 return resultFromHBase(htable.increment(incrementFromThrift(increment))); 418 } catch (IOException e) { 419 throw getTIOError(e); 420 } finally { 421 closeTable(htable); 422 } 423 } 424 425 @Override 426 public TResult append(ByteBuffer table, TAppend append) throws TIOError, TException { 427 checkReadOnlyMode(); 428 Table htable = getTable(table); 429 try { 430 return resultFromHBase(htable.append(appendFromThrift(append))); 431 } catch (IOException e) { 432 throw getTIOError(e); 433 } finally { 434 closeTable(htable); 435 } 436 } 437 438 @Override 439 public int openScanner(ByteBuffer table, TScan scan) throws TIOError, TException { 440 Table htable = getTable(table); 441 ResultScanner resultScanner = null; 442 try { 443 resultScanner = htable.getScanner(scanFromThrift(scan)); 444 } catch (IOException e) { 445 throw getTIOError(e); 446 } finally { 447 closeTable(htable); 448 } 449 return addScanner(resultScanner); 450 } 451 452 @Override 453 public List<TResult> getScannerRows(int scannerId, int numRows) throws TIOError, 454 TIllegalArgument, TException { 455 ResultScanner scanner = getScanner(scannerId); 456 if (scanner == null) { 457 TIllegalArgument ex = new TIllegalArgument(); 458 ex.setMessage("Invalid scanner Id"); 459 throw ex; 460 } 461 try { 462 connectionCache.updateConnectionAccessTime(); 463 return resultsFromHBase(scanner.next(numRows)); 464 } catch (IOException e) { 465 throw getTIOError(e); 466 } 467 } 468 469 @Override 470 public List<TResult> getScannerResults(ByteBuffer table, TScan scan, int numRows) 471 throws TIOError, TException { 472 Table htable = getTable(table); 473 List<TResult> results = null; 474 ResultScanner scanner = null; 475 try { 476 scanner = htable.getScanner(scanFromThrift(scan)); 477 results = resultsFromHBase(scanner.next(numRows)); 478 } catch (IOException e) { 479 throw getTIOError(e); 480 } finally { 481 if (scanner != null) { 482 scanner.close(); 483 } 484 closeTable(htable); 485 } 486 return results; 487 } 488 489 490 491 @Override 492 public void closeScanner(int scannerId) throws TIOError, TIllegalArgument, TException { 493 LOG.debug("scannerClose: id=" + scannerId); 494 ResultScanner scanner = getScanner(scannerId); 495 if (scanner == null) { 496 String message = "scanner ID is invalid"; 497 LOG.warn(message); 498 TIllegalArgument ex = new TIllegalArgument(); 499 ex.setMessage("Invalid scanner Id"); 500 throw ex; 501 } 502 scanner.close(); 503 removeScanner(scannerId); 504 } 505 506 @Override 507 public void mutateRow(ByteBuffer table, TRowMutations rowMutations) throws TIOError, TException { 508 checkReadOnlyMode(); 509 Table htable = getTable(table); 510 try { 511 htable.mutateRow(rowMutationsFromThrift(rowMutations)); 512 } catch (IOException e) { 513 throw getTIOError(e); 514 } finally { 515 closeTable(htable); 516 } 517 } 518 519 @Override 520 public List<THRegionLocation> getAllRegionLocations(ByteBuffer table) 521 throws TIOError, TException { 522 RegionLocator locator = null; 523 try { 524 locator = getLocator(table); 525 return ThriftUtilities.regionLocationsFromHBase(locator.getAllRegionLocations()); 526 527 } catch (IOException e) { 528 throw getTIOError(e); 529 } finally { 530 if (locator != null) { 531 try { 532 locator.close(); 533 } catch (IOException e) { 534 LOG.warn("Couldn't close the locator.", e); 535 } 536 } 537 } 538 } 539 540 @Override 541 public THRegionLocation getRegionLocation(ByteBuffer table, ByteBuffer row, boolean reload) 542 throws TIOError, TException { 543 544 RegionLocator locator = null; 545 try { 546 locator = getLocator(table); 547 byte[] rowBytes = byteBufferToByteArray(row); 548 HRegionLocation hrl = locator.getRegionLocation(rowBytes, reload); 549 return ThriftUtilities.regionLocationFromHBase(hrl); 550 551 } catch (IOException e) { 552 throw getTIOError(e); 553 } finally { 554 if (locator != null) { 555 try { 556 locator.close(); 557 } catch (IOException e) { 558 LOG.warn("Couldn't close the locator.", e); 559 } 560 } 561 } 562 } 563 564 private void checkReadOnlyMode() throws TIOError { 565 if (isReadOnly()) { 566 throw getTIOError(ioe); 567 } 568 } 569 570 private boolean isReadOnly() { 571 return isReadOnly; 572 } 573}