001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.thrift; 020 021import static org.apache.hadoop.hbase.util.Bytes.getBytes; 022 023import java.io.IOException; 024import java.net.InetAddress; 025import java.net.InetSocketAddress; 026import java.net.UnknownHostException; 027import java.nio.ByteBuffer; 028import java.security.PrivilegedAction; 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.Collections; 032import java.util.HashMap; 033import java.util.List; 034import java.util.Map; 035import java.util.TreeMap; 036import java.util.concurrent.BlockingQueue; 037import java.util.concurrent.ExecutorService; 038import java.util.concurrent.LinkedBlockingQueue; 039import java.util.concurrent.ThreadPoolExecutor; 040import java.util.concurrent.TimeUnit; 041 042import javax.security.auth.callback.Callback; 043import javax.security.auth.callback.UnsupportedCallbackException; 044import javax.security.sasl.AuthorizeCallback; 045import javax.security.sasl.SaslServer; 046 047import org.apache.commons.lang3.ArrayUtils; 048import org.apache.hadoop.conf.Configuration; 049import org.apache.hadoop.hbase.Cell.Type; 050import org.apache.hadoop.hbase.CellBuilder; 051import org.apache.hadoop.hbase.CellBuilderFactory; 052import org.apache.hadoop.hbase.CellBuilderType; 053import org.apache.hadoop.hbase.CellUtil; 054import org.apache.hadoop.hbase.HBaseConfiguration; 055import org.apache.hadoop.hbase.HColumnDescriptor; 056import org.apache.hadoop.hbase.HConstants; 057import org.apache.hadoop.hbase.HRegionLocation; 058import org.apache.hadoop.hbase.HTableDescriptor; 059import org.apache.hadoop.hbase.KeyValue; 060import org.apache.hadoop.hbase.MetaTableAccessor; 061import org.apache.hadoop.hbase.ServerName; 062import org.apache.hadoop.hbase.TableName; 063import org.apache.hadoop.hbase.TableNotFoundException; 064import org.apache.hadoop.hbase.client.Admin; 065import org.apache.hadoop.hbase.client.Append; 066import org.apache.hadoop.hbase.client.Delete; 067import org.apache.hadoop.hbase.client.Durability; 068import org.apache.hadoop.hbase.client.Get; 069import org.apache.hadoop.hbase.client.Increment; 070import org.apache.hadoop.hbase.client.OperationWithAttributes; 071import org.apache.hadoop.hbase.client.Put; 072import org.apache.hadoop.hbase.client.RegionInfo; 073import org.apache.hadoop.hbase.client.RegionLocator; 074import org.apache.hadoop.hbase.client.Result; 075import org.apache.hadoop.hbase.client.ResultScanner; 076import org.apache.hadoop.hbase.client.Scan; 077import org.apache.hadoop.hbase.client.Table; 078import org.apache.hadoop.hbase.filter.Filter; 079import org.apache.hadoop.hbase.filter.ParseFilter; 080import org.apache.hadoop.hbase.filter.PrefixFilter; 081import org.apache.hadoop.hbase.filter.WhileMatchFilter; 082import org.apache.hadoop.hbase.log.HBaseMarkers; 083import org.apache.hadoop.hbase.security.SaslUtil; 084import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; 085import org.apache.hadoop.hbase.security.SecurityUtil; 086import org.apache.hadoop.hbase.security.UserProvider; 087import org.apache.hadoop.hbase.thrift.generated.AlreadyExists; 088import org.apache.hadoop.hbase.thrift.generated.BatchMutation; 089import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor; 090import org.apache.hadoop.hbase.thrift.generated.Hbase; 091import org.apache.hadoop.hbase.thrift.generated.IOError; 092import org.apache.hadoop.hbase.thrift.generated.IllegalArgument; 093import org.apache.hadoop.hbase.thrift.generated.Mutation; 094import org.apache.hadoop.hbase.thrift.generated.TAppend; 095import org.apache.hadoop.hbase.thrift.generated.TCell; 096import org.apache.hadoop.hbase.thrift.generated.TIncrement; 097import org.apache.hadoop.hbase.thrift.generated.TRegionInfo; 098import org.apache.hadoop.hbase.thrift.generated.TRowResult; 099import org.apache.hadoop.hbase.thrift.generated.TScan; 100import org.apache.hadoop.hbase.util.Bytes; 101import org.apache.hadoop.hbase.util.ConnectionCache; 102import org.apache.hadoop.hbase.util.DNS; 103import org.apache.hadoop.hbase.util.JvmPauseMonitor; 104import org.apache.hadoop.hbase.util.Strings; 105import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler; 106import org.apache.hadoop.security.UserGroupInformation; 107import org.apache.hadoop.security.authorize.ProxyUsers; 108import org.apache.thrift.TException; 109import org.apache.thrift.TProcessor; 110import org.apache.thrift.protocol.TBinaryProtocol; 111import org.apache.thrift.protocol.TCompactProtocol; 112import org.apache.thrift.protocol.TProtocol; 113import org.apache.thrift.protocol.TProtocolFactory; 114import org.apache.thrift.server.THsHaServer; 115import org.apache.thrift.server.TNonblockingServer; 116import org.apache.thrift.server.TServer; 117import org.apache.thrift.server.TServlet; 118import org.apache.thrift.server.TThreadedSelectorServer; 119import org.apache.thrift.transport.TFramedTransport; 120import org.apache.thrift.transport.TNonblockingServerSocket; 121import org.apache.thrift.transport.TNonblockingServerTransport; 122import org.apache.thrift.transport.TSaslServerTransport; 123import org.apache.thrift.transport.TServerSocket; 124import org.apache.thrift.transport.TServerTransport; 125import org.apache.thrift.transport.TTransportFactory; 126import org.apache.yetus.audience.InterfaceAudience; 127import org.eclipse.jetty.http.HttpVersion; 128import org.eclipse.jetty.server.HttpConfiguration; 129import org.eclipse.jetty.server.HttpConnectionFactory; 130import org.eclipse.jetty.server.SecureRequestCustomizer; 131import org.eclipse.jetty.server.Server; 132import org.eclipse.jetty.server.ServerConnector; 133import org.eclipse.jetty.server.SslConnectionFactory; 134import org.eclipse.jetty.servlet.ServletContextHandler; 135import org.eclipse.jetty.servlet.ServletHolder; 136import org.eclipse.jetty.util.ssl.SslContextFactory; 137import org.eclipse.jetty.util.thread.QueuedThreadPool; 138import org.slf4j.Logger; 139import org.slf4j.LoggerFactory; 140import org.apache.hbase.thirdparty.com.google.common.base.Joiner; 141import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 142import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 143import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 144import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 145import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; 146import org.apache.hbase.thirdparty.org.apache.commons.cli.OptionGroup; 147 148/** 149 * ThriftServerRunner - this class starts up a Thrift server which implements 150 * the Hbase API specified in the Hbase.thrift IDL file. 151 */ 152@InterfaceAudience.Private 153public class ThriftServerRunner implements Runnable { 154 155 private static final Logger LOG = LoggerFactory.getLogger(ThriftServerRunner.class); 156 157 private static final int DEFAULT_HTTP_MAX_HEADER_SIZE = 64 * 1024; // 64k 158 159 static final String SERVER_TYPE_CONF_KEY = 160 "hbase.regionserver.thrift.server.type"; 161 162 static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress"; 163 static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact"; 164 static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed"; 165 static final String MAX_FRAME_SIZE_CONF_KEY = "hbase.regionserver.thrift.framed.max_frame_size_in_mb"; 166 static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port"; 167 static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement"; 168 static final String USE_HTTP_CONF_KEY = "hbase.regionserver.thrift.http"; 169 static final String HTTP_MIN_THREADS = "hbase.thrift.http_threads.min"; 170 static final String HTTP_MAX_THREADS = "hbase.thrift.http_threads.max"; 171 172 static final String THRIFT_SSL_ENABLED = "hbase.thrift.ssl.enabled"; 173 static final String THRIFT_SSL_KEYSTORE_STORE = "hbase.thrift.ssl.keystore.store"; 174 static final String THRIFT_SSL_KEYSTORE_PASSWORD = "hbase.thrift.ssl.keystore.password"; 175 static final String THRIFT_SSL_KEYSTORE_KEYPASSWORD = "hbase.thrift.ssl.keystore.keypassword"; 176 static final String THRIFT_SSL_EXCLUDE_CIPHER_SUITES = "hbase.thrift.ssl.exclude.cipher.suites"; 177 static final String THRIFT_SSL_INCLUDE_CIPHER_SUITES = "hbase.thrift.ssl.include.cipher.suites"; 178 static final String THRIFT_SSL_EXCLUDE_PROTOCOLS = "hbase.thrift.ssl.exclude.protocols"; 179 static final String THRIFT_SSL_INCLUDE_PROTOCOLS = "hbase.thrift.ssl.include.protocols"; 180 181 /** 182 * Amount of time in milliseconds before a server thread will timeout 183 * waiting for client to send data on a connected socket. Currently, 184 * applies only to TBoundedThreadPoolServer 185 */ 186 public static final String THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY = 187 "hbase.thrift.server.socket.read.timeout"; 188 public static final int THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT = 60000; 189 190 191 /** 192 * Thrift quality of protection configuration key. Valid values can be: 193 * auth-conf: authentication, integrity and confidentiality checking 194 * auth-int: authentication and integrity checking 195 * auth: authentication only 196 * 197 * This is used to authenticate the callers and support impersonation. 198 * The thrift server and the HBase cluster must run in secure mode. 199 */ 200 static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop"; 201 static final String BACKLOG_CONF_KEY = "hbase.regionserver.thrift.backlog"; 202 203 private static final String DEFAULT_BIND_ADDR = "0.0.0.0"; 204 public static final int DEFAULT_LISTEN_PORT = 9090; 205 public static final int HREGION_VERSION = 1; 206 static final String THRIFT_SUPPORT_PROXYUSER = "hbase.thrift.support.proxyuser"; 207 private final int listenPort; 208 209 private Configuration conf; 210 volatile TServer tserver; 211 volatile Server httpServer; 212 private final Hbase.Iface handler; 213 private final ThriftMetrics metrics; 214 private final HBaseHandler hbaseHandler; 215 private final UserGroupInformation realUser; 216 217 private SaslUtil.QualityOfProtection qop; 218 private String host; 219 220 private final boolean securityEnabled; 221 private final boolean doAsEnabled; 222 223 private final JvmPauseMonitor pauseMonitor; 224 225 /** An enum of server implementation selections */ 226 public enum ImplType { 227 HS_HA("hsha", true, THsHaServer.class, true), 228 NONBLOCKING("nonblocking", true, TNonblockingServer.class, true), 229 THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true), 230 THREADED_SELECTOR( 231 "threadedselector", true, TThreadedSelectorServer.class, true); 232 233 public static final ImplType DEFAULT = THREAD_POOL; 234 235 final String option; 236 final boolean isAlwaysFramed; 237 final Class<? extends TServer> serverClass; 238 final boolean canSpecifyBindIP; 239 240 private ImplType(String option, boolean isAlwaysFramed, 241 Class<? extends TServer> serverClass, boolean canSpecifyBindIP) { 242 this.option = option; 243 this.isAlwaysFramed = isAlwaysFramed; 244 this.serverClass = serverClass; 245 this.canSpecifyBindIP = canSpecifyBindIP; 246 } 247 248 /** 249 * @return <code>-option</code> so we can get the list of options from 250 * {@link #values()} 251 */ 252 @Override 253 public String toString() { 254 return "-" + option; 255 } 256 257 public String getOption() { 258 return option; 259 } 260 261 public boolean isAlwaysFramed() { 262 return isAlwaysFramed; 263 } 264 265 public String getDescription() { 266 StringBuilder sb = new StringBuilder("Use the " + 267 serverClass.getSimpleName()); 268 if (isAlwaysFramed) { 269 sb.append(" This implies the framed transport."); 270 } 271 if (this == DEFAULT) { 272 sb.append("This is the default."); 273 } 274 return sb.toString(); 275 } 276 277 static OptionGroup createOptionGroup() { 278 OptionGroup group = new OptionGroup(); 279 for (ImplType t : values()) { 280 group.addOption(new Option(t.option, t.getDescription())); 281 } 282 return group; 283 } 284 285 public static ImplType getServerImpl(Configuration conf) { 286 String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option); 287 for (ImplType t : values()) { 288 if (confType.equals(t.option)) { 289 return t; 290 } 291 } 292 throw new AssertionError("Unknown server ImplType.option:" + confType); 293 } 294 295 static void setServerImpl(CommandLine cmd, Configuration conf) { 296 ImplType chosenType = null; 297 int numChosen = 0; 298 for (ImplType t : values()) { 299 if (cmd.hasOption(t.option)) { 300 chosenType = t; 301 ++numChosen; 302 } 303 } 304 if (numChosen < 1) { 305 LOG.info("Using default thrift server type"); 306 chosenType = DEFAULT; 307 } else if (numChosen > 1) { 308 throw new AssertionError("Exactly one option out of " + 309 Arrays.toString(values()) + " has to be specified"); 310 } 311 LOG.info("Using thrift server type " + chosenType.option); 312 conf.set(SERVER_TYPE_CONF_KEY, chosenType.option); 313 } 314 315 public String simpleClassName() { 316 return serverClass.getSimpleName(); 317 } 318 319 public static List<String> serversThatCannotSpecifyBindIP() { 320 List<String> l = new ArrayList<>(); 321 for (ImplType t : values()) { 322 if (!t.canSpecifyBindIP) { 323 l.add(t.simpleClassName()); 324 } 325 } 326 return l; 327 } 328 329 } 330 331 public ThriftServerRunner(Configuration conf) throws IOException { 332 UserProvider userProvider = UserProvider.instantiate(conf); 333 // login the server principal (if using secure Hadoop) 334 securityEnabled = userProvider.isHadoopSecurityEnabled() 335 && userProvider.isHBaseSecurityEnabled(); 336 if (securityEnabled) { 337 host = Strings.domainNamePointerToHostName(DNS.getDefaultHost( 338 conf.get("hbase.thrift.dns.interface", "default"), 339 conf.get("hbase.thrift.dns.nameserver", "default"))); 340 userProvider.login("hbase.thrift.keytab.file", 341 "hbase.thrift.kerberos.principal", host); 342 } 343 this.conf = HBaseConfiguration.create(conf); 344 this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT); 345 this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE); 346 this.pauseMonitor = new JvmPauseMonitor(conf, this.metrics.getSource()); 347 this.hbaseHandler = new HBaseHandler(conf, userProvider); 348 this.hbaseHandler.initMetrics(metrics); 349 this.handler = HbaseHandlerMetricsProxy.newInstance( 350 hbaseHandler, metrics, conf); 351 this.realUser = userProvider.getCurrent().getUGI(); 352 String strQop = conf.get(THRIFT_QOP_KEY); 353 if (strQop != null) { 354 this.qop = SaslUtil.getQop(strQop); 355 } 356 doAsEnabled = conf.getBoolean(THRIFT_SUPPORT_PROXYUSER, false); 357 if (doAsEnabled) { 358 if (!conf.getBoolean(USE_HTTP_CONF_KEY, false)) { 359 LOG.warn("Fail to enable the doAs feature. hbase.regionserver.thrift.http is not configured "); 360 } 361 } 362 if (qop != null) { 363 if (qop != QualityOfProtection.AUTHENTICATION && 364 qop != QualityOfProtection.INTEGRITY && 365 qop != QualityOfProtection.PRIVACY) { 366 throw new IOException(String.format("Invalide %s: It must be one of %s, %s, or %s.", 367 THRIFT_QOP_KEY, 368 QualityOfProtection.AUTHENTICATION.name(), 369 QualityOfProtection.INTEGRITY.name(), 370 QualityOfProtection.PRIVACY.name())); 371 } 372 checkHttpSecurity(qop, conf); 373 if (!securityEnabled) { 374 throw new IOException("Thrift server must" 375 + " run in secure mode to support authentication"); 376 } 377 } 378 } 379 380 private void checkHttpSecurity(QualityOfProtection qop, Configuration conf) { 381 if (qop == QualityOfProtection.PRIVACY && 382 conf.getBoolean(USE_HTTP_CONF_KEY, false) && 383 !conf.getBoolean(THRIFT_SSL_ENABLED, false)) { 384 throw new IllegalArgumentException("Thrift HTTP Server's QoP is privacy, but " + 385 THRIFT_SSL_ENABLED + " is false"); 386 } 387 } 388 389 /* 390 * Runs the Thrift server 391 */ 392 @Override 393 public void run() { 394 realUser.doAs(new PrivilegedAction<Object>() { 395 @Override 396 public Object run() { 397 try { 398 pauseMonitor.start(); 399 if (conf.getBoolean(USE_HTTP_CONF_KEY, false)) { 400 setupHTTPServer(); 401 httpServer.start(); 402 httpServer.join(); 403 } else { 404 setupServer(); 405 tserver.serve(); 406 } 407 } catch (Exception e) { 408 LOG.error(HBaseMarkers.FATAL, "Cannot run ThriftServer", e); 409 // Crash the process if the ThriftServer is not running 410 System.exit(-1); 411 } 412 return null; 413 } 414 }); 415 416 } 417 418 public void shutdown() { 419 if (pauseMonitor != null) { 420 pauseMonitor.stop(); 421 } 422 if (tserver != null) { 423 tserver.stop(); 424 tserver = null; 425 } 426 if (httpServer != null) { 427 try { 428 httpServer.stop(); 429 httpServer = null; 430 } catch (Exception e) { 431 LOG.error("Problem encountered in shutting down HTTP server " + e.getCause()); 432 } 433 httpServer = null; 434 } 435 } 436 437 private void setupHTTPServer() throws IOException { 438 TProtocolFactory protocolFactory = new TBinaryProtocol.Factory(); 439 TProcessor processor = new Hbase.Processor<>(handler); 440 TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, realUser, 441 conf, hbaseHandler, securityEnabled, doAsEnabled); 442 443 // Set the default max thread number to 100 to limit 444 // the number of concurrent requests so that Thrfit HTTP server doesn't OOM easily. 445 // Jetty set the default max thread number to 250, if we don't set it. 446 // 447 // Our default min thread number 2 is the same as that used by Jetty. 448 int minThreads = conf.getInt(HTTP_MIN_THREADS, 2); 449 int maxThreads = conf.getInt(HTTP_MAX_THREADS, 100); 450 QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads); 451 threadPool.setMinThreads(minThreads); 452 httpServer = new Server(threadPool); 453 454 // Context handler 455 ServletContextHandler ctxHandler = new ServletContextHandler(httpServer, "/", ServletContextHandler.SESSIONS); 456 ctxHandler.addServlet(new ServletHolder(thriftHttpServlet), "/*"); 457 458 // set up Jetty and run the embedded server 459 HttpConfiguration httpConfig = new HttpConfiguration(); 460 httpConfig.setSecureScheme("https"); 461 httpConfig.setSecurePort(listenPort); 462 httpConfig.setHeaderCacheSize(DEFAULT_HTTP_MAX_HEADER_SIZE); 463 httpConfig.setRequestHeaderSize(DEFAULT_HTTP_MAX_HEADER_SIZE); 464 httpConfig.setResponseHeaderSize(DEFAULT_HTTP_MAX_HEADER_SIZE); 465 httpConfig.setSendServerVersion(false); 466 httpConfig.setSendDateHeader(false); 467 468 ServerConnector serverConnector; 469 if(conf.getBoolean(THRIFT_SSL_ENABLED, false)) { 470 HttpConfiguration httpsConfig = new HttpConfiguration(httpConfig); 471 httpsConfig.addCustomizer(new SecureRequestCustomizer()); 472 473 SslContextFactory sslCtxFactory = new SslContextFactory(); 474 String keystore = conf.get(THRIFT_SSL_KEYSTORE_STORE); 475 String password = HBaseConfiguration.getPassword(conf, 476 THRIFT_SSL_KEYSTORE_PASSWORD, null); 477 String keyPassword = HBaseConfiguration.getPassword(conf, 478 THRIFT_SSL_KEYSTORE_KEYPASSWORD, password); 479 sslCtxFactory.setKeyStorePath(keystore); 480 sslCtxFactory.setKeyStorePassword(password); 481 sslCtxFactory.setKeyManagerPassword(keyPassword); 482 483 String[] excludeCiphers = conf.getStrings( 484 THRIFT_SSL_EXCLUDE_CIPHER_SUITES, ArrayUtils.EMPTY_STRING_ARRAY); 485 if (excludeCiphers.length != 0) { 486 sslCtxFactory.setExcludeCipherSuites(excludeCiphers); 487 } 488 String[] includeCiphers = conf.getStrings( 489 THRIFT_SSL_INCLUDE_CIPHER_SUITES, ArrayUtils.EMPTY_STRING_ARRAY); 490 if (includeCiphers.length != 0) { 491 sslCtxFactory.setIncludeCipherSuites(includeCiphers); 492 } 493 494 // Disable SSLv3 by default due to "Poodle" Vulnerability - CVE-2014-3566 495 String[] excludeProtocols = conf.getStrings( 496 THRIFT_SSL_EXCLUDE_PROTOCOLS, "SSLv3"); 497 if (excludeProtocols.length != 0) { 498 sslCtxFactory.setExcludeProtocols(excludeProtocols); 499 } 500 String[] includeProtocols = conf.getStrings( 501 THRIFT_SSL_INCLUDE_PROTOCOLS, ArrayUtils.EMPTY_STRING_ARRAY); 502 if (includeProtocols.length != 0) { 503 sslCtxFactory.setIncludeProtocols(includeProtocols); 504 } 505 506 serverConnector = new ServerConnector(httpServer, 507 new SslConnectionFactory(sslCtxFactory, HttpVersion.HTTP_1_1.toString()), 508 new HttpConnectionFactory(httpsConfig)); 509 } else { 510 serverConnector = new ServerConnector(httpServer, new HttpConnectionFactory(httpConfig)); 511 } 512 serverConnector.setPort(listenPort); 513 String host = getBindAddress(conf).getHostAddress(); 514 serverConnector.setHost(host); 515 httpServer.addConnector(serverConnector); 516 httpServer.setStopAtShutdown(true); 517 518 if (doAsEnabled) { 519 ProxyUsers.refreshSuperUserGroupsConfiguration(conf); 520 } 521 522 LOG.info("Starting Thrift HTTP Server on " + Integer.toString(listenPort)); 523 } 524 525 /** 526 * Setting up the thrift TServer 527 */ 528 private void setupServer() throws Exception { 529 // Construct correct ProtocolFactory 530 TProtocolFactory protocolFactory; 531 if (conf.getBoolean(COMPACT_CONF_KEY, false)) { 532 LOG.debug("Using compact protocol"); 533 protocolFactory = new TCompactProtocol.Factory(); 534 } else { 535 LOG.debug("Using binary protocol"); 536 protocolFactory = new TBinaryProtocol.Factory(); 537 } 538 539 final TProcessor p = new Hbase.Processor<>(handler); 540 ImplType implType = ImplType.getServerImpl(conf); 541 TProcessor processor = p; 542 543 // Construct correct TransportFactory 544 TTransportFactory transportFactory; 545 if (conf.getBoolean(FRAMED_CONF_KEY, false) || implType.isAlwaysFramed) { 546 if (qop != null) { 547 throw new RuntimeException("Thrift server authentication" 548 + " doesn't work with framed transport yet"); 549 } 550 transportFactory = new TFramedTransport.Factory( 551 conf.getInt(MAX_FRAME_SIZE_CONF_KEY, 2) * 1024 * 1024); 552 LOG.debug("Using framed transport"); 553 } else if (qop == null) { 554 transportFactory = new TTransportFactory(); 555 } else { 556 // Extract the name from the principal 557 String name = SecurityUtil.getUserFromPrincipal( 558 conf.get("hbase.thrift.kerberos.principal")); 559 Map<String, String> saslProperties = SaslUtil.initSaslProperties(qop.name()); 560 TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory(); 561 saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties, 562 new SaslGssCallbackHandler() { 563 @Override 564 public void handle(Callback[] callbacks) 565 throws UnsupportedCallbackException { 566 AuthorizeCallback ac = null; 567 for (Callback callback : callbacks) { 568 if (callback instanceof AuthorizeCallback) { 569 ac = (AuthorizeCallback) callback; 570 } else { 571 throw new UnsupportedCallbackException(callback, 572 "Unrecognized SASL GSSAPI Callback"); 573 } 574 } 575 if (ac != null) { 576 String authid = ac.getAuthenticationID(); 577 String authzid = ac.getAuthorizationID(); 578 if (!authid.equals(authzid)) { 579 ac.setAuthorized(false); 580 } else { 581 ac.setAuthorized(true); 582 String userName = SecurityUtil.getUserFromPrincipal(authzid); 583 LOG.info("Effective user: " + userName); 584 ac.setAuthorizedID(userName); 585 } 586 } 587 } 588 }); 589 transportFactory = saslFactory; 590 591 // Create a processor wrapper, to get the caller 592 processor = new TProcessor() { 593 @Override 594 public boolean process(TProtocol inProt, 595 TProtocol outProt) throws TException { 596 TSaslServerTransport saslServerTransport = 597 (TSaslServerTransport)inProt.getTransport(); 598 SaslServer saslServer = saslServerTransport.getSaslServer(); 599 String principal = saslServer.getAuthorizationID(); 600 hbaseHandler.setEffectiveUser(principal); 601 return p.process(inProt, outProt); 602 } 603 }; 604 } 605 606 if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) { 607 LOG.error("Server types " + Joiner.on(", ").join( 608 ImplType.serversThatCannotSpecifyBindIP()) + " don't support IP " + 609 "address binding at the moment. See " + 610 "https://issues.apache.org/jira/browse/HBASE-2155 for details."); 611 throw new RuntimeException( 612 "-" + BIND_CONF_KEY + " not supported with " + implType); 613 } 614 615 // Thrift's implementation uses '0' as a placeholder for 'use the default.' 616 int backlog = conf.getInt(BACKLOG_CONF_KEY, 0); 617 618 if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING || 619 implType == ImplType.THREADED_SELECTOR) { 620 InetAddress listenAddress = getBindAddress(conf); 621 TNonblockingServerTransport serverTransport = new TNonblockingServerSocket( 622 new InetSocketAddress(listenAddress, listenPort)); 623 624 if (implType == ImplType.NONBLOCKING) { 625 TNonblockingServer.Args serverArgs = 626 new TNonblockingServer.Args(serverTransport); 627 serverArgs.processor(processor) 628 .transportFactory(transportFactory) 629 .protocolFactory(protocolFactory); 630 tserver = new TNonblockingServer(serverArgs); 631 } else if (implType == ImplType.HS_HA) { 632 THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport); 633 CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(), metrics); 634 ExecutorService executorService = createExecutor( 635 callQueue, serverArgs.getMaxWorkerThreads(), serverArgs.getMaxWorkerThreads()); 636 serverArgs.executorService(executorService) 637 .processor(processor) 638 .transportFactory(transportFactory) 639 .protocolFactory(protocolFactory); 640 tserver = new THsHaServer(serverArgs); 641 } else { // THREADED_SELECTOR 642 TThreadedSelectorServer.Args serverArgs = 643 new HThreadedSelectorServerArgs(serverTransport, conf); 644 CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(), metrics); 645 ExecutorService executorService = createExecutor( 646 callQueue, serverArgs.getWorkerThreads(), serverArgs.getWorkerThreads()); 647 serverArgs.executorService(executorService) 648 .processor(processor) 649 .transportFactory(transportFactory) 650 .protocolFactory(protocolFactory); 651 tserver = new TThreadedSelectorServer(serverArgs); 652 } 653 LOG.info("starting HBase " + implType.simpleClassName() + 654 " server on " + Integer.toString(listenPort)); 655 } else if (implType == ImplType.THREAD_POOL) { 656 // Thread pool server. Get the IP address to bind to. 657 InetAddress listenAddress = getBindAddress(conf); 658 int readTimeout = conf.getInt(THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY, 659 THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT); 660 TServerTransport serverTransport = new TServerSocket( 661 new TServerSocket.ServerSocketTransportArgs(). 662 bindAddr(new InetSocketAddress(listenAddress, listenPort)). 663 backlog(backlog). 664 clientTimeout(readTimeout)); 665 666 TBoundedThreadPoolServer.Args serverArgs = 667 new TBoundedThreadPoolServer.Args(serverTransport, conf); 668 serverArgs.processor(processor) 669 .transportFactory(transportFactory) 670 .protocolFactory(protocolFactory); 671 LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on " 672 + listenAddress + ":" + Integer.toString(listenPort) 673 + " with readTimeout " + readTimeout + "ms; " + serverArgs); 674 TBoundedThreadPoolServer tserver = 675 new TBoundedThreadPoolServer(serverArgs, metrics); 676 this.tserver = tserver; 677 } else { 678 throw new AssertionError("Unsupported Thrift server implementation: " + 679 implType.simpleClassName()); 680 } 681 682 // A sanity check that we instantiated the right type of server. 683 if (tserver.getClass() != implType.serverClass) { 684 throw new AssertionError("Expected to create Thrift server class " + 685 implType.serverClass.getName() + " but got " + 686 tserver.getClass().getName()); 687 } 688 689 690 691 registerFilters(conf); 692 } 693 694 ExecutorService createExecutor(BlockingQueue<Runnable> callQueue, 695 int minWorkers, int maxWorkers) { 696 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); 697 tfb.setDaemon(true); 698 tfb.setNameFormat("thrift-worker-%d"); 699 ThreadPoolExecutor threadPool = new THBaseThreadPoolExecutor(minWorkers, maxWorkers, 700 Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build(), metrics); 701 threadPool.allowCoreThreadTimeOut(true); 702 return threadPool; 703 } 704 705 private InetAddress getBindAddress(Configuration conf) 706 throws UnknownHostException { 707 String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR); 708 return InetAddress.getByName(bindAddressStr); 709 } 710 711 protected static class ResultScannerWrapper { 712 713 private final ResultScanner scanner; 714 private final boolean sortColumns; 715 public ResultScannerWrapper(ResultScanner resultScanner, 716 boolean sortResultColumns) { 717 scanner = resultScanner; 718 sortColumns = sortResultColumns; 719 } 720 721 public ResultScanner getScanner() { 722 return scanner; 723 } 724 725 public boolean isColumnSorted() { 726 return sortColumns; 727 } 728 } 729 730 /** 731 * The HBaseHandler is a glue object that connects Thrift RPC calls to the 732 * HBase client API primarily defined in the Admin and Table objects. 733 */ 734 public static class HBaseHandler implements Hbase.Iface { 735 protected Configuration conf; 736 protected static final Logger LOG = LoggerFactory.getLogger(HBaseHandler.class); 737 738 // nextScannerId and scannerMap are used to manage scanner state 739 protected int nextScannerId = 0; 740 protected HashMap<Integer, ResultScannerWrapper> scannerMap = null; 741 private ThriftMetrics metrics = null; 742 743 private final ConnectionCache connectionCache; 744 IncrementCoalescer coalescer = null; 745 746 static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval"; 747 static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime"; 748 749 /** 750 * Returns a list of all the column families for a given Table. 751 * 752 * @param table 753 * @throws IOException 754 */ 755 byte[][] getAllColumns(Table table) throws IOException { 756 HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies(); 757 byte[][] columns = new byte[cds.length][]; 758 for (int i = 0; i < cds.length; i++) { 759 columns[i] = Bytes.add(cds[i].getName(), 760 KeyValue.COLUMN_FAMILY_DELIM_ARRAY); 761 } 762 return columns; 763 } 764 765 /** 766 * Creates and returns a Table instance from a given table name. 767 * 768 * @param tableName 769 * name of table 770 * @return Table object 771 * @throws IOException 772 */ 773 public Table getTable(final byte[] tableName) throws 774 IOException { 775 String table = Bytes.toString(tableName); 776 return connectionCache.getTable(table); 777 } 778 779 public Table getTable(final ByteBuffer tableName) throws IOException { 780 return getTable(getBytes(tableName)); 781 } 782 783 /** 784 * Assigns a unique ID to the scanner and adds the mapping to an internal 785 * hash-map. 786 * 787 * @param scanner 788 * @return integer scanner id 789 */ 790 protected synchronized int addScanner(ResultScanner scanner,boolean sortColumns) { 791 int id = nextScannerId++; 792 ResultScannerWrapper resultScannerWrapper = new ResultScannerWrapper(scanner, sortColumns); 793 scannerMap.put(id, resultScannerWrapper); 794 return id; 795 } 796 797 /** 798 * Returns the scanner associated with the specified ID. 799 * 800 * @param id 801 * @return a Scanner, or null if ID was invalid. 802 */ 803 protected synchronized ResultScannerWrapper getScanner(int id) { 804 return scannerMap.get(id); 805 } 806 807 /** 808 * Removes the scanner associated with the specified ID from the internal 809 * id->scanner hash-map. 810 * 811 * @param id 812 * @return a Scanner, or null if ID was invalid. 813 */ 814 protected synchronized ResultScannerWrapper removeScanner(int id) { 815 return scannerMap.remove(id); 816 } 817 818 protected HBaseHandler(final Configuration c, 819 final UserProvider userProvider) throws IOException { 820 this.conf = c; 821 scannerMap = new HashMap<>(); 822 this.coalescer = new IncrementCoalescer(this); 823 824 int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000); 825 int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000); 826 connectionCache = new ConnectionCache( 827 conf, userProvider, cleanInterval, maxIdleTime); 828 } 829 830 /** 831 * Obtain HBaseAdmin. Creates the instance if it is not already created. 832 */ 833 private Admin getAdmin() throws IOException { 834 return connectionCache.getAdmin(); 835 } 836 837 void setEffectiveUser(String effectiveUser) { 838 connectionCache.setEffectiveUser(effectiveUser); 839 } 840 841 @Override 842 public void enableTable(ByteBuffer tableName) throws IOError { 843 try{ 844 getAdmin().enableTable(getTableName(tableName)); 845 } catch (IOException e) { 846 LOG.warn(e.getMessage(), e); 847 throw getIOError(e); 848 } 849 } 850 851 @Override 852 public void disableTable(ByteBuffer tableName) throws IOError{ 853 try{ 854 getAdmin().disableTable(getTableName(tableName)); 855 } catch (IOException e) { 856 LOG.warn(e.getMessage(), e); 857 throw getIOError(e); 858 } 859 } 860 861 @Override 862 public boolean isTableEnabled(ByteBuffer tableName) throws IOError { 863 try { 864 return this.connectionCache.getAdmin().isTableEnabled(getTableName(tableName)); 865 } catch (IOException e) { 866 LOG.warn(e.getMessage(), e); 867 throw getIOError(e); 868 } 869 } 870 871 // ThriftServerRunner.compact should be deprecated and replaced with methods specific to 872 // table and region. 873 @Override 874 public void compact(ByteBuffer tableNameOrRegionName) throws IOError { 875 try { 876 try { 877 getAdmin().compactRegion(getBytes(tableNameOrRegionName)); 878 } catch (IllegalArgumentException e) { 879 // Invalid region, try table 880 getAdmin().compact(TableName.valueOf(getBytes(tableNameOrRegionName))); 881 } 882 } catch (IOException e) { 883 LOG.warn(e.getMessage(), e); 884 throw getIOError(e); 885 } 886 } 887 888 // ThriftServerRunner.majorCompact should be deprecated and replaced with methods specific 889 // to table and region. 890 @Override 891 public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError { 892 try { 893 try { 894 getAdmin().compactRegion(getBytes(tableNameOrRegionName)); 895 } catch (IllegalArgumentException e) { 896 // Invalid region, try table 897 getAdmin().compact(TableName.valueOf(getBytes(tableNameOrRegionName))); 898 } 899 } catch (IOException e) { 900 LOG.warn(e.getMessage(), e); 901 throw getIOError(e); 902 } 903 } 904 905 @Override 906 public List<ByteBuffer> getTableNames() throws IOError { 907 try { 908 TableName[] tableNames = this.getAdmin().listTableNames(); 909 ArrayList<ByteBuffer> list = new ArrayList<>(tableNames.length); 910 for (int i = 0; i < tableNames.length; i++) { 911 list.add(ByteBuffer.wrap(tableNames[i].getName())); 912 } 913 return list; 914 } catch (IOException e) { 915 LOG.warn(e.getMessage(), e); 916 throw getIOError(e); 917 } 918 } 919 920 /** 921 * @return the list of regions in the given table, or an empty list if the table does not exist 922 */ 923 @Override 924 public List<TRegionInfo> getTableRegions(ByteBuffer tableName) 925 throws IOError { 926 try (RegionLocator locator = connectionCache.getRegionLocator(getBytes(tableName))) { 927 List<HRegionLocation> regionLocations = locator.getAllRegionLocations(); 928 List<TRegionInfo> results = new ArrayList<>(regionLocations.size()); 929 for (HRegionLocation regionLocation : regionLocations) { 930 RegionInfo info = regionLocation.getRegionInfo(); 931 ServerName serverName = regionLocation.getServerName(); 932 TRegionInfo region = new TRegionInfo(); 933 region.serverName = ByteBuffer.wrap( 934 Bytes.toBytes(serverName.getHostname())); 935 region.port = serverName.getPort(); 936 region.startKey = ByteBuffer.wrap(info.getStartKey()); 937 region.endKey = ByteBuffer.wrap(info.getEndKey()); 938 region.id = info.getRegionId(); 939 region.name = ByteBuffer.wrap(info.getRegionName()); 940 region.version = HREGION_VERSION; // HRegion now not versioned, PB encoding used 941 results.add(region); 942 } 943 return results; 944 } catch (TableNotFoundException e) { 945 // Return empty list for non-existing table 946 return Collections.emptyList(); 947 } catch (IOException e){ 948 LOG.warn(e.getMessage(), e); 949 throw getIOError(e); 950 } 951 } 952 953 @Override 954 public List<TCell> get( 955 ByteBuffer tableName, ByteBuffer row, ByteBuffer column, 956 Map<ByteBuffer, ByteBuffer> attributes) 957 throws IOError { 958 byte [][] famAndQf = CellUtil.parseColumn(getBytes(column)); 959 if (famAndQf.length == 1) { 960 return get(tableName, row, famAndQf[0], null, attributes); 961 } 962 if (famAndQf.length == 2) { 963 return get(tableName, row, famAndQf[0], famAndQf[1], attributes); 964 } 965 throw new IllegalArgumentException("Invalid familyAndQualifier provided."); 966 } 967 968 /** 969 * Note: this internal interface is slightly different from public APIs in regard to handling 970 * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather, 971 * we respect qual == null as a request for the entire column family. The caller ( 972 * {@link #get(ByteBuffer, ByteBuffer, ByteBuffer, Map)}) interface IS consistent in that the 973 * column is parse like normal. 974 */ 975 protected List<TCell> get(ByteBuffer tableName, 976 ByteBuffer row, 977 byte[] family, 978 byte[] qualifier, 979 Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 980 Table table = null; 981 try { 982 table = getTable(tableName); 983 Get get = new Get(getBytes(row)); 984 addAttributes(get, attributes); 985 if (qualifier == null) { 986 get.addFamily(family); 987 } else { 988 get.addColumn(family, qualifier); 989 } 990 Result result = table.get(get); 991 return ThriftUtilities.cellFromHBase(result.rawCells()); 992 } catch (IOException e) { 993 LOG.warn(e.getMessage(), e); 994 throw getIOError(e); 995 } finally { 996 closeTable(table); 997 } 998 } 999 1000 @Override 1001 public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, ByteBuffer column, 1002 int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 1003 byte [][] famAndQf = CellUtil.parseColumn(getBytes(column)); 1004 if(famAndQf.length == 1) { 1005 return getVer(tableName, row, famAndQf[0], null, numVersions, attributes); 1006 } 1007 if (famAndQf.length == 2) { 1008 return getVer(tableName, row, famAndQf[0], famAndQf[1], numVersions, attributes); 1009 } 1010 throw new IllegalArgumentException("Invalid familyAndQualifier provided."); 1011 1012 } 1013 1014 /** 1015 * Note: this public interface is slightly different from public Java APIs in regard to 1016 * handling of the qualifier. Here we differ from the public Java API in that null != byte[0]. 1017 * Rather, we respect qual == null as a request for the entire column family. If you want to 1018 * access the entire column family, use 1019 * {@link #getVer(ByteBuffer, ByteBuffer, ByteBuffer, int, Map)} with a {@code column} value 1020 * that lacks a {@code ':'}. 1021 */ 1022 public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family, 1023 byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 1024 1025 Table table = null; 1026 try { 1027 table = getTable(tableName); 1028 Get get = new Get(getBytes(row)); 1029 addAttributes(get, attributes); 1030 if (null == qualifier) { 1031 get.addFamily(family); 1032 } else { 1033 get.addColumn(family, qualifier); 1034 } 1035 get.setMaxVersions(numVersions); 1036 Result result = table.get(get); 1037 return ThriftUtilities.cellFromHBase(result.rawCells()); 1038 } catch (IOException e) { 1039 LOG.warn(e.getMessage(), e); 1040 throw getIOError(e); 1041 } finally{ 1042 closeTable(table); 1043 } 1044 } 1045 1046 @Override 1047 public List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column, 1048 long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 1049 byte [][] famAndQf = CellUtil.parseColumn(getBytes(column)); 1050 if (famAndQf.length == 1) { 1051 return getVerTs(tableName, row, famAndQf[0], null, timestamp, numVersions, attributes); 1052 } 1053 if (famAndQf.length == 2) { 1054 return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, numVersions, 1055 attributes); 1056 } 1057 throw new IllegalArgumentException("Invalid familyAndQualifier provided."); 1058 } 1059 1060 /** 1061 * Note: this internal interface is slightly different from public APIs in regard to handling 1062 * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather, 1063 * we respect qual == null as a request for the entire column family. The caller ( 1064 * {@link #getVerTs(ByteBuffer, ByteBuffer, ByteBuffer, long, int, Map)}) interface IS 1065 * consistent in that the column is parse like normal. 1066 */ 1067 protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family, 1068 byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) 1069 throws IOError { 1070 1071 Table table = null; 1072 try { 1073 table = getTable(tableName); 1074 Get get = new Get(getBytes(row)); 1075 addAttributes(get, attributes); 1076 if (null == qualifier) { 1077 get.addFamily(family); 1078 } else { 1079 get.addColumn(family, qualifier); 1080 } 1081 get.setTimeRange(0, timestamp); 1082 get.setMaxVersions(numVersions); 1083 Result result = table.get(get); 1084 return ThriftUtilities.cellFromHBase(result.rawCells()); 1085 } catch (IOException e) { 1086 LOG.warn(e.getMessage(), e); 1087 throw getIOError(e); 1088 } finally{ 1089 closeTable(table); 1090 } 1091 } 1092 1093 @Override 1094 public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row, 1095 Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 1096 return getRowWithColumnsTs(tableName, row, null, 1097 HConstants.LATEST_TIMESTAMP, 1098 attributes); 1099 } 1100 1101 @Override 1102 public List<TRowResult> getRowWithColumns(ByteBuffer tableName, 1103 ByteBuffer row, 1104 List<ByteBuffer> columns, 1105 Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 1106 return getRowWithColumnsTs(tableName, row, columns, 1107 HConstants.LATEST_TIMESTAMP, 1108 attributes); 1109 } 1110 1111 @Override 1112 public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row, 1113 long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 1114 return getRowWithColumnsTs(tableName, row, null, 1115 timestamp, attributes); 1116 } 1117 1118 @Override 1119 public List<TRowResult> getRowWithColumnsTs( 1120 ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns, 1121 long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 1122 1123 Table table = null; 1124 try { 1125 table = getTable(tableName); 1126 if (columns == null) { 1127 Get get = new Get(getBytes(row)); 1128 addAttributes(get, attributes); 1129 get.setTimeRange(0, timestamp); 1130 Result result = table.get(get); 1131 return ThriftUtilities.rowResultFromHBase(result); 1132 } 1133 Get get = new Get(getBytes(row)); 1134 addAttributes(get, attributes); 1135 for(ByteBuffer column : columns) { 1136 byte [][] famAndQf = CellUtil.parseColumn(getBytes(column)); 1137 if (famAndQf.length == 1) { 1138 get.addFamily(famAndQf[0]); 1139 } else { 1140 get.addColumn(famAndQf[0], famAndQf[1]); 1141 } 1142 } 1143 get.setTimeRange(0, timestamp); 1144 Result result = table.get(get); 1145 return ThriftUtilities.rowResultFromHBase(result); 1146 } catch (IOException e) { 1147 LOG.warn(e.getMessage(), e); 1148 throw getIOError(e); 1149 } finally{ 1150 closeTable(table); 1151 } 1152 } 1153 1154 @Override 1155 public List<TRowResult> getRows(ByteBuffer tableName, 1156 List<ByteBuffer> rows, 1157 Map<ByteBuffer, ByteBuffer> attributes) 1158 throws IOError { 1159 return getRowsWithColumnsTs(tableName, rows, null, 1160 HConstants.LATEST_TIMESTAMP, 1161 attributes); 1162 } 1163 1164 @Override 1165 public List<TRowResult> getRowsWithColumns(ByteBuffer tableName, 1166 List<ByteBuffer> rows, 1167 List<ByteBuffer> columns, 1168 Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 1169 return getRowsWithColumnsTs(tableName, rows, columns, 1170 HConstants.LATEST_TIMESTAMP, 1171 attributes); 1172 } 1173 1174 @Override 1175 public List<TRowResult> getRowsTs(ByteBuffer tableName, 1176 List<ByteBuffer> rows, 1177 long timestamp, 1178 Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 1179 return getRowsWithColumnsTs(tableName, rows, null, 1180 timestamp, attributes); 1181 } 1182 1183 @Override 1184 public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName, 1185 List<ByteBuffer> rows, 1186 List<ByteBuffer> columns, long timestamp, 1187 Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 1188 1189 Table table= null; 1190 try { 1191 List<Get> gets = new ArrayList<>(rows.size()); 1192 table = getTable(tableName); 1193 if (metrics != null) { 1194 metrics.incNumRowKeysInBatchGet(rows.size()); 1195 } 1196 for (ByteBuffer row : rows) { 1197 Get get = new Get(getBytes(row)); 1198 addAttributes(get, attributes); 1199 if (columns != null) { 1200 1201 for(ByteBuffer column : columns) { 1202 byte [][] famAndQf = CellUtil.parseColumn(getBytes(column)); 1203 if (famAndQf.length == 1) { 1204 get.addFamily(famAndQf[0]); 1205 } else { 1206 get.addColumn(famAndQf[0], famAndQf[1]); 1207 } 1208 } 1209 } 1210 get.setTimeRange(0, timestamp); 1211 gets.add(get); 1212 } 1213 Result[] result = table.get(gets); 1214 return ThriftUtilities.rowResultFromHBase(result); 1215 } catch (IOException e) { 1216 LOG.warn(e.getMessage(), e); 1217 throw getIOError(e); 1218 } finally{ 1219 closeTable(table); 1220 } 1221 } 1222 1223 @Override 1224 public void deleteAll( 1225 ByteBuffer tableName, ByteBuffer row, ByteBuffer column, 1226 Map<ByteBuffer, ByteBuffer> attributes) 1227 throws IOError { 1228 deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP, 1229 attributes); 1230 } 1231 1232 @Override 1233 public void deleteAllTs(ByteBuffer tableName, 1234 ByteBuffer row, 1235 ByteBuffer column, 1236 long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 1237 Table table = null; 1238 try { 1239 table = getTable(tableName); 1240 Delete delete = new Delete(getBytes(row)); 1241 addAttributes(delete, attributes); 1242 byte [][] famAndQf = CellUtil.parseColumn(getBytes(column)); 1243 if (famAndQf.length == 1) { 1244 delete.addFamily(famAndQf[0], timestamp); 1245 } else { 1246 delete.addColumns(famAndQf[0], famAndQf[1], timestamp); 1247 } 1248 table.delete(delete); 1249 1250 } catch (IOException e) { 1251 LOG.warn(e.getMessage(), e); 1252 throw getIOError(e); 1253 } finally { 1254 closeTable(table); 1255 } 1256 } 1257 1258 @Override 1259 public void deleteAllRow( 1260 ByteBuffer tableName, ByteBuffer row, 1261 Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 1262 deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, attributes); 1263 } 1264 1265 @Override 1266 public void deleteAllRowTs( 1267 ByteBuffer tableName, ByteBuffer row, long timestamp, 1268 Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 1269 Table table = null; 1270 try { 1271 table = getTable(tableName); 1272 Delete delete = new Delete(getBytes(row), timestamp); 1273 addAttributes(delete, attributes); 1274 table.delete(delete); 1275 } catch (IOException e) { 1276 LOG.warn(e.getMessage(), e); 1277 throw getIOError(e); 1278 } finally { 1279 closeTable(table); 1280 } 1281 } 1282 1283 @Override 1284 public void createTable(ByteBuffer in_tableName, 1285 List<ColumnDescriptor> columnFamilies) throws IOError, 1286 IllegalArgument, AlreadyExists { 1287 TableName tableName = getTableName(in_tableName); 1288 try { 1289 if (getAdmin().tableExists(tableName)) { 1290 throw new AlreadyExists("table name already in use"); 1291 } 1292 HTableDescriptor desc = new HTableDescriptor(tableName); 1293 for (ColumnDescriptor col : columnFamilies) { 1294 HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col); 1295 desc.addFamily(colDesc); 1296 } 1297 getAdmin().createTable(desc); 1298 } catch (IOException e) { 1299 LOG.warn(e.getMessage(), e); 1300 throw getIOError(e); 1301 } catch (IllegalArgumentException e) { 1302 LOG.warn(e.getMessage(), e); 1303 throw new IllegalArgument(Throwables.getStackTraceAsString(e)); 1304 } 1305 } 1306 1307 private static TableName getTableName(ByteBuffer buffer) { 1308 return TableName.valueOf(getBytes(buffer)); 1309 } 1310 1311 @Override 1312 public void deleteTable(ByteBuffer in_tableName) throws IOError { 1313 TableName tableName = getTableName(in_tableName); 1314 if (LOG.isDebugEnabled()) { 1315 LOG.debug("deleteTable: table=" + tableName); 1316 } 1317 try { 1318 if (!getAdmin().tableExists(tableName)) { 1319 throw new IOException("table does not exist"); 1320 } 1321 getAdmin().deleteTable(tableName); 1322 } catch (IOException e) { 1323 LOG.warn(e.getMessage(), e); 1324 throw getIOError(e); 1325 } 1326 } 1327 1328 @Override 1329 public void mutateRow(ByteBuffer tableName, ByteBuffer row, 1330 List<Mutation> mutations, Map<ByteBuffer, ByteBuffer> attributes) 1331 throws IOError, IllegalArgument { 1332 mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP, 1333 attributes); 1334 } 1335 1336 @Override 1337 public void mutateRowTs(ByteBuffer tableName, ByteBuffer row, 1338 List<Mutation> mutations, long timestamp, 1339 Map<ByteBuffer, ByteBuffer> attributes) 1340 throws IOError, IllegalArgument { 1341 Table table = null; 1342 try { 1343 table = getTable(tableName); 1344 Put put = new Put(getBytes(row), timestamp); 1345 addAttributes(put, attributes); 1346 1347 Delete delete = new Delete(getBytes(row)); 1348 addAttributes(delete, attributes); 1349 if (metrics != null) { 1350 metrics.incNumRowKeysInBatchMutate(mutations.size()); 1351 } 1352 1353 // I apologize for all this mess :) 1354 CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY); 1355 for (Mutation m : mutations) { 1356 byte[][] famAndQf = CellUtil.parseColumn(getBytes(m.column)); 1357 if (m.isDelete) { 1358 if (famAndQf.length == 1) { 1359 delete.addFamily(famAndQf[0], timestamp); 1360 } else { 1361 delete.addColumns(famAndQf[0], famAndQf[1], timestamp); 1362 } 1363 delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL 1364 : Durability.SKIP_WAL); 1365 } else { 1366 if(famAndQf.length == 1) { 1367 LOG.warn("No column qualifier specified. Delete is the only mutation supported " 1368 + "over the whole column family."); 1369 } else { 1370 put.add(builder.clear() 1371 .setRow(put.getRow()) 1372 .setFamily(famAndQf[0]) 1373 .setQualifier(famAndQf[1]) 1374 .setTimestamp(put.getTimestamp()) 1375 .setType(Type.Put) 1376 .setValue(m.value != null ? getBytes(m.value) 1377 : HConstants.EMPTY_BYTE_ARRAY) 1378 .build()); 1379 } 1380 put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1381 } 1382 } 1383 if (!delete.isEmpty()) 1384 table.delete(delete); 1385 if (!put.isEmpty()) 1386 table.put(put); 1387 } catch (IOException e) { 1388 LOG.warn(e.getMessage(), e); 1389 throw getIOError(e); 1390 } catch (IllegalArgumentException e) { 1391 LOG.warn(e.getMessage(), e); 1392 throw new IllegalArgument(Throwables.getStackTraceAsString(e)); 1393 } finally{ 1394 closeTable(table); 1395 } 1396 } 1397 1398 @Override 1399 public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches, 1400 Map<ByteBuffer, ByteBuffer> attributes) 1401 throws IOError, IllegalArgument, TException { 1402 mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes); 1403 } 1404 1405 @Override 1406 public void mutateRowsTs( 1407 ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp, 1408 Map<ByteBuffer, ByteBuffer> attributes) 1409 throws IOError, IllegalArgument, TException { 1410 List<Put> puts = new ArrayList<>(); 1411 List<Delete> deletes = new ArrayList<>(); 1412 CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY); 1413 for (BatchMutation batch : rowBatches) { 1414 byte[] row = getBytes(batch.row); 1415 List<Mutation> mutations = batch.mutations; 1416 Delete delete = new Delete(row); 1417 addAttributes(delete, attributes); 1418 Put put = new Put(row, timestamp); 1419 addAttributes(put, attributes); 1420 for (Mutation m : mutations) { 1421 byte[][] famAndQf = CellUtil.parseColumn(getBytes(m.column)); 1422 if (m.isDelete) { 1423 // no qualifier, family only. 1424 if (famAndQf.length == 1) { 1425 delete.addFamily(famAndQf[0], timestamp); 1426 } else { 1427 delete.addColumns(famAndQf[0], famAndQf[1], timestamp); 1428 } 1429 delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL 1430 : Durability.SKIP_WAL); 1431 } else { 1432 if (famAndQf.length == 1) { 1433 LOG.warn("No column qualifier specified. Delete is the only mutation supported " 1434 + "over the whole column family."); 1435 } 1436 if (famAndQf.length == 2) { 1437 try { 1438 put.add(builder.clear() 1439 .setRow(put.getRow()) 1440 .setFamily(famAndQf[0]) 1441 .setQualifier(famAndQf[1]) 1442 .setTimestamp(put.getTimestamp()) 1443 .setType(Type.Put) 1444 .setValue(m.value != null ? getBytes(m.value) 1445 : HConstants.EMPTY_BYTE_ARRAY) 1446 .build()); 1447 } catch (IOException e) { 1448 throw new IllegalArgumentException(e); 1449 } 1450 } else { 1451 throw new IllegalArgumentException("Invalid famAndQf provided."); 1452 } 1453 put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1454 } 1455 } 1456 if (!delete.isEmpty()) 1457 deletes.add(delete); 1458 if (!put.isEmpty()) 1459 puts.add(put); 1460 } 1461 1462 Table table = null; 1463 try { 1464 table = getTable(tableName); 1465 if (!puts.isEmpty()) 1466 table.put(puts); 1467 if (!deletes.isEmpty()) 1468 table.delete(deletes); 1469 1470 } catch (IOException e) { 1471 LOG.warn(e.getMessage(), e); 1472 throw getIOError(e); 1473 } catch (IllegalArgumentException e) { 1474 LOG.warn(e.getMessage(), e); 1475 throw new IllegalArgument(Throwables.getStackTraceAsString(e)); 1476 } finally{ 1477 closeTable(table); 1478 } 1479 } 1480 1481 @Override 1482 public long atomicIncrement( 1483 ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount) 1484 throws IOError, IllegalArgument, TException { 1485 byte [][] famAndQf = CellUtil.parseColumn(getBytes(column)); 1486 if(famAndQf.length == 1) { 1487 return atomicIncrement(tableName, row, famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, amount); 1488 } 1489 return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount); 1490 } 1491 1492 protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row, 1493 byte [] family, byte [] qualifier, long amount) 1494 throws IOError, IllegalArgument, TException { 1495 Table table = null; 1496 try { 1497 table = getTable(tableName); 1498 return table.incrementColumnValue( 1499 getBytes(row), family, qualifier, amount); 1500 } catch (IOException e) { 1501 LOG.warn(e.getMessage(), e); 1502 throw getIOError(e); 1503 } finally { 1504 closeTable(table); 1505 } 1506 } 1507 1508 @Override 1509 public void scannerClose(int id) throws IOError, IllegalArgument { 1510 LOG.debug("scannerClose: id=" + id); 1511 ResultScannerWrapper resultScannerWrapper = getScanner(id); 1512 if (resultScannerWrapper == null) { 1513 String message = "scanner ID is invalid"; 1514 LOG.warn(message); 1515 throw new IllegalArgument("scanner ID is invalid"); 1516 } 1517 resultScannerWrapper.getScanner().close(); 1518 removeScanner(id); 1519 } 1520 1521 @Override 1522 public List<TRowResult> scannerGetList(int id,int nbRows) 1523 throws IllegalArgument, IOError { 1524 LOG.debug("scannerGetList: id=" + id); 1525 ResultScannerWrapper resultScannerWrapper = getScanner(id); 1526 if (null == resultScannerWrapper) { 1527 String message = "scanner ID is invalid"; 1528 LOG.warn(message); 1529 throw new IllegalArgument("scanner ID is invalid"); 1530 } 1531 1532 Result [] results = null; 1533 try { 1534 results = resultScannerWrapper.getScanner().next(nbRows); 1535 if (null == results) { 1536 return new ArrayList<>(); 1537 } 1538 } catch (IOException e) { 1539 LOG.warn(e.getMessage(), e); 1540 throw getIOError(e); 1541 } 1542 return ThriftUtilities.rowResultFromHBase(results, resultScannerWrapper.isColumnSorted()); 1543 } 1544 1545 @Override 1546 public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError { 1547 return scannerGetList(id,1); 1548 } 1549 1550 @Override 1551 public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan, 1552 Map<ByteBuffer, ByteBuffer> attributes) 1553 throws IOError { 1554 1555 Table table = null; 1556 try { 1557 table = getTable(tableName); 1558 Scan scan = new Scan(); 1559 addAttributes(scan, attributes); 1560 if (tScan.isSetStartRow()) { 1561 scan.setStartRow(tScan.getStartRow()); 1562 } 1563 if (tScan.isSetStopRow()) { 1564 scan.setStopRow(tScan.getStopRow()); 1565 } 1566 if (tScan.isSetTimestamp()) { 1567 scan.setTimeRange(0, tScan.getTimestamp()); 1568 } 1569 if (tScan.isSetCaching()) { 1570 scan.setCaching(tScan.getCaching()); 1571 } 1572 if (tScan.isSetBatchSize()) { 1573 scan.setBatch(tScan.getBatchSize()); 1574 } 1575 if (tScan.isSetColumns() && tScan.getColumns().size() != 0) { 1576 for(ByteBuffer column : tScan.getColumns()) { 1577 byte [][] famQf = CellUtil.parseColumn(getBytes(column)); 1578 if(famQf.length == 1) { 1579 scan.addFamily(famQf[0]); 1580 } else { 1581 scan.addColumn(famQf[0], famQf[1]); 1582 } 1583 } 1584 } 1585 if (tScan.isSetFilterString()) { 1586 ParseFilter parseFilter = new ParseFilter(); 1587 scan.setFilter( 1588 parseFilter.parseFilterString(tScan.getFilterString())); 1589 } 1590 if (tScan.isSetReversed()) { 1591 scan.setReversed(tScan.isReversed()); 1592 } 1593 if (tScan.isSetCacheBlocks()) { 1594 scan.setCacheBlocks(tScan.isCacheBlocks()); 1595 } 1596 return addScanner(table.getScanner(scan), tScan.sortColumns); 1597 } catch (IOException e) { 1598 LOG.warn(e.getMessage(), e); 1599 throw getIOError(e); 1600 } finally{ 1601 closeTable(table); 1602 } 1603 } 1604 1605 @Override 1606 public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow, 1607 List<ByteBuffer> columns, 1608 Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 1609 1610 Table table = null; 1611 try { 1612 table = getTable(tableName); 1613 Scan scan = new Scan(getBytes(startRow)); 1614 addAttributes(scan, attributes); 1615 if(columns != null && columns.size() != 0) { 1616 for(ByteBuffer column : columns) { 1617 byte [][] famQf = CellUtil.parseColumn(getBytes(column)); 1618 if(famQf.length == 1) { 1619 scan.addFamily(famQf[0]); 1620 } else { 1621 scan.addColumn(famQf[0], famQf[1]); 1622 } 1623 } 1624 } 1625 return addScanner(table.getScanner(scan), false); 1626 } catch (IOException e) { 1627 LOG.warn(e.getMessage(), e); 1628 throw getIOError(e); 1629 } finally{ 1630 closeTable(table); 1631 } 1632 } 1633 1634 @Override 1635 public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow, 1636 ByteBuffer stopRow, List<ByteBuffer> columns, 1637 Map<ByteBuffer, ByteBuffer> attributes) 1638 throws IOError, TException { 1639 1640 Table table = null; 1641 try { 1642 table = getTable(tableName); 1643 Scan scan = new Scan(getBytes(startRow), getBytes(stopRow)); 1644 addAttributes(scan, attributes); 1645 if(columns != null && columns.size() != 0) { 1646 for(ByteBuffer column : columns) { 1647 byte [][] famQf = CellUtil.parseColumn(getBytes(column)); 1648 if(famQf.length == 1) { 1649 scan.addFamily(famQf[0]); 1650 } else { 1651 scan.addColumn(famQf[0], famQf[1]); 1652 } 1653 } 1654 } 1655 return addScanner(table.getScanner(scan), false); 1656 } catch (IOException e) { 1657 LOG.warn(e.getMessage(), e); 1658 throw getIOError(e); 1659 } finally{ 1660 closeTable(table); 1661 } 1662 } 1663 1664 @Override 1665 public int scannerOpenWithPrefix(ByteBuffer tableName, 1666 ByteBuffer startAndPrefix, 1667 List<ByteBuffer> columns, 1668 Map<ByteBuffer, ByteBuffer> attributes) 1669 throws IOError, TException { 1670 1671 Table table = null; 1672 try { 1673 table = getTable(tableName); 1674 Scan scan = new Scan(getBytes(startAndPrefix)); 1675 addAttributes(scan, attributes); 1676 Filter f = new WhileMatchFilter( 1677 new PrefixFilter(getBytes(startAndPrefix))); 1678 scan.setFilter(f); 1679 if (columns != null && columns.size() != 0) { 1680 for(ByteBuffer column : columns) { 1681 byte [][] famQf = CellUtil.parseColumn(getBytes(column)); 1682 if(famQf.length == 1) { 1683 scan.addFamily(famQf[0]); 1684 } else { 1685 scan.addColumn(famQf[0], famQf[1]); 1686 } 1687 } 1688 } 1689 return addScanner(table.getScanner(scan), false); 1690 } catch (IOException e) { 1691 LOG.warn(e.getMessage(), e); 1692 throw getIOError(e); 1693 } finally{ 1694 closeTable(table); 1695 } 1696 } 1697 1698 @Override 1699 public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow, 1700 List<ByteBuffer> columns, long timestamp, 1701 Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException { 1702 1703 Table table = null; 1704 try { 1705 table = getTable(tableName); 1706 Scan scan = new Scan(getBytes(startRow)); 1707 addAttributes(scan, attributes); 1708 scan.setTimeRange(0, timestamp); 1709 if (columns != null && columns.size() != 0) { 1710 for (ByteBuffer column : columns) { 1711 byte [][] famQf = CellUtil.parseColumn(getBytes(column)); 1712 if(famQf.length == 1) { 1713 scan.addFamily(famQf[0]); 1714 } else { 1715 scan.addColumn(famQf[0], famQf[1]); 1716 } 1717 } 1718 } 1719 return addScanner(table.getScanner(scan), false); 1720 } catch (IOException e) { 1721 LOG.warn(e.getMessage(), e); 1722 throw getIOError(e); 1723 } finally{ 1724 closeTable(table); 1725 } 1726 } 1727 1728 @Override 1729 public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow, 1730 ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp, 1731 Map<ByteBuffer, ByteBuffer> attributes) 1732 throws IOError, TException { 1733 1734 Table table = null; 1735 try { 1736 table = getTable(tableName); 1737 Scan scan = new Scan(getBytes(startRow), getBytes(stopRow)); 1738 addAttributes(scan, attributes); 1739 scan.setTimeRange(0, timestamp); 1740 if (columns != null && columns.size() != 0) { 1741 for (ByteBuffer column : columns) { 1742 byte [][] famQf = CellUtil.parseColumn(getBytes(column)); 1743 if(famQf.length == 1) { 1744 scan.addFamily(famQf[0]); 1745 } else { 1746 scan.addColumn(famQf[0], famQf[1]); 1747 } 1748 } 1749 } 1750 scan.setTimeRange(0, timestamp); 1751 return addScanner(table.getScanner(scan), false); 1752 } catch (IOException e) { 1753 LOG.warn(e.getMessage(), e); 1754 throw getIOError(e); 1755 } finally{ 1756 closeTable(table); 1757 } 1758 } 1759 1760 @Override 1761 public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors( 1762 ByteBuffer tableName) throws IOError, TException { 1763 1764 Table table = null; 1765 try { 1766 TreeMap<ByteBuffer, ColumnDescriptor> columns = new TreeMap<>(); 1767 1768 table = getTable(tableName); 1769 HTableDescriptor desc = table.getTableDescriptor(); 1770 1771 for (HColumnDescriptor e : desc.getFamilies()) { 1772 ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e); 1773 columns.put(col.name, col); 1774 } 1775 return columns; 1776 } catch (IOException e) { 1777 LOG.warn(e.getMessage(), e); 1778 throw getIOError(e); 1779 } finally { 1780 closeTable(table); 1781 } 1782 } 1783 1784 private void closeTable(Table table) throws IOError 1785 { 1786 try{ 1787 if(table != null){ 1788 table.close(); 1789 } 1790 } catch (IOException e){ 1791 LOG.error(e.getMessage(), e); 1792 throw getIOError(e); 1793 } 1794 } 1795 1796 @Override 1797 public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError { 1798 try { 1799 byte[] row = getBytes(searchRow); 1800 Result startRowResult = getReverseScanResult(TableName.META_TABLE_NAME.getName(), row, 1801 HConstants.CATALOG_FAMILY); 1802 1803 if (startRowResult == null) { 1804 throw new IOException("Cannot find row in "+ TableName.META_TABLE_NAME+", row=" 1805 + Bytes.toStringBinary(row)); 1806 } 1807 1808 // find region start and end keys 1809 RegionInfo regionInfo = MetaTableAccessor.getRegionInfo(startRowResult); 1810 if (regionInfo == null) { 1811 throw new IOException("RegionInfo REGIONINFO was null or " + 1812 " empty in Meta for row=" 1813 + Bytes.toStringBinary(row)); 1814 } 1815 TRegionInfo region = new TRegionInfo(); 1816 region.setStartKey(regionInfo.getStartKey()); 1817 region.setEndKey(regionInfo.getEndKey()); 1818 region.id = regionInfo.getRegionId(); 1819 region.setName(regionInfo.getRegionName()); 1820 region.version = HREGION_VERSION; // version not used anymore, PB encoding used. 1821 1822 // find region assignment to server 1823 ServerName serverName = MetaTableAccessor.getServerName(startRowResult, 0); 1824 if (serverName != null) { 1825 region.setServerName(Bytes.toBytes(serverName.getHostname())); 1826 region.port = serverName.getPort(); 1827 } 1828 return region; 1829 } catch (IOException e) { 1830 LOG.warn(e.getMessage(), e); 1831 throw getIOError(e); 1832 } 1833 } 1834 1835 private Result getReverseScanResult(byte[] tableName, byte[] row, byte[] family) 1836 throws IOException { 1837 Scan scan = new Scan(row); 1838 scan.setReversed(true); 1839 scan.addFamily(family); 1840 scan.setStartRow(row); 1841 Table table = getTable(tableName); 1842 try (ResultScanner scanner = table.getScanner(scan)) { 1843 return scanner.next(); 1844 } finally{ 1845 if(table != null){ 1846 table.close(); 1847 } 1848 } 1849 } 1850 1851 private void initMetrics(ThriftMetrics metrics) { 1852 this.metrics = metrics; 1853 } 1854 1855 @Override 1856 public void increment(TIncrement tincrement) throws IOError, TException { 1857 1858 if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) { 1859 throw new TException("Must supply a table and a row key; can't increment"); 1860 } 1861 1862 if (conf.getBoolean(COALESCE_INC_KEY, false)) { 1863 this.coalescer.queueIncrement(tincrement); 1864 return; 1865 } 1866 1867 Table table = null; 1868 try { 1869 table = getTable(tincrement.getTable()); 1870 Increment inc = ThriftUtilities.incrementFromThrift(tincrement); 1871 table.increment(inc); 1872 } catch (IOException e) { 1873 LOG.warn(e.getMessage(), e); 1874 throw getIOError(e); 1875 } finally{ 1876 closeTable(table); 1877 } 1878 } 1879 1880 @Override 1881 public void incrementRows(List<TIncrement> tincrements) throws IOError, TException { 1882 if (conf.getBoolean(COALESCE_INC_KEY, false)) { 1883 this.coalescer.queueIncrements(tincrements); 1884 return; 1885 } 1886 for (TIncrement tinc : tincrements) { 1887 increment(tinc); 1888 } 1889 } 1890 1891 @Override 1892 public List<TCell> append(TAppend tappend) throws IOError, TException { 1893 if (tappend.getRow().length == 0 || tappend.getTable().length == 0) { 1894 throw new TException("Must supply a table and a row key; can't append"); 1895 } 1896 1897 Table table = null; 1898 try { 1899 table = getTable(tappend.getTable()); 1900 Append append = ThriftUtilities.appendFromThrift(tappend); 1901 Result result = table.append(append); 1902 return ThriftUtilities.cellFromHBase(result.rawCells()); 1903 } catch (IOException e) { 1904 LOG.warn(e.getMessage(), e); 1905 throw getIOError(e); 1906 } finally{ 1907 closeTable(table); 1908 } 1909 } 1910 1911 @Override 1912 public boolean checkAndPut(ByteBuffer tableName, ByteBuffer row, ByteBuffer column, 1913 ByteBuffer value, Mutation mput, Map<ByteBuffer, ByteBuffer> attributes) throws IOError, 1914 IllegalArgument, TException { 1915 Put put; 1916 try { 1917 put = new Put(getBytes(row), HConstants.LATEST_TIMESTAMP); 1918 addAttributes(put, attributes); 1919 1920 byte[][] famAndQf = CellUtil.parseColumn(getBytes(mput.column)); 1921 put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 1922 .setRow(put.getRow()) 1923 .setFamily(famAndQf[0]) 1924 .setQualifier(famAndQf[1]) 1925 .setTimestamp(put.getTimestamp()) 1926 .setType(Type.Put) 1927 .setValue(mput.value != null ? getBytes(mput.value) 1928 : HConstants.EMPTY_BYTE_ARRAY) 1929 .build()); 1930 put.setDurability(mput.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1931 } catch (IOException | IllegalArgumentException e) { 1932 LOG.warn(e.getMessage(), e); 1933 throw new IllegalArgument(Throwables.getStackTraceAsString(e)); 1934 } 1935 1936 Table table = null; 1937 try { 1938 table = getTable(tableName); 1939 byte[][] famAndQf = CellUtil.parseColumn(getBytes(column)); 1940 Table.CheckAndMutateBuilder mutateBuilder = 1941 table.checkAndMutate(getBytes(row), famAndQf[0]).qualifier(famAndQf[1]); 1942 if (value != null) { 1943 return mutateBuilder.ifEquals(getBytes(value)).thenPut(put); 1944 } else { 1945 return mutateBuilder.ifNotExists().thenPut(put); 1946 } 1947 } catch (IOException e) { 1948 LOG.warn(e.getMessage(), e); 1949 throw getIOError(e); 1950 } catch (IllegalArgumentException e) { 1951 LOG.warn(e.getMessage(), e); 1952 throw new IllegalArgument(Throwables.getStackTraceAsString(e)); 1953 } finally { 1954 closeTable(table); 1955 } 1956 } 1957 } 1958 1959 1960 private static IOError getIOError(Throwable throwable) { 1961 IOError error = new IOErrorWithCause(throwable); 1962 error.setMessage(Throwables.getStackTraceAsString(throwable)); 1963 return error; 1964 } 1965 1966 /** 1967 * Adds all the attributes into the Operation object 1968 */ 1969 private static void addAttributes(OperationWithAttributes op, 1970 Map<ByteBuffer, ByteBuffer> attributes) { 1971 if (attributes == null || attributes.isEmpty()) { 1972 return; 1973 } 1974 for (Map.Entry<ByteBuffer, ByteBuffer> entry : attributes.entrySet()) { 1975 String name = Bytes.toStringBinary(getBytes(entry.getKey())); 1976 byte[] value = getBytes(entry.getValue()); 1977 op.setAttribute(name, value); 1978 } 1979 } 1980 1981 public static void registerFilters(Configuration conf) { 1982 String[] filters = conf.getStrings("hbase.thrift.filters"); 1983 Splitter splitter = Splitter.on(':'); 1984 if(filters != null) { 1985 for(String filterClass: filters) { 1986 List<String> filterPart = splitter.splitToList(filterClass); 1987 if(filterPart.size() != 2) { 1988 LOG.warn("Invalid filter specification " + filterClass + " - skipping"); 1989 } else { 1990 ParseFilter.registerFilter(filterPart.get(0), filterPart.get(1)); 1991 } 1992 } 1993 } 1994 } 1995 1996 public static class IOErrorWithCause extends IOError { 1997 private Throwable cause; 1998 public IOErrorWithCause(Throwable cause) { 1999 this.cause = cause; 2000 } 2001 2002 @Override 2003 public synchronized Throwable getCause() { 2004 return cause; 2005 } 2006 2007 @Override 2008 public boolean equals(Object other) { 2009 if (super.equals(other) && 2010 other instanceof IOErrorWithCause) { 2011 Throwable otherCause = ((IOErrorWithCause) other).getCause(); 2012 if (this.getCause() != null) { 2013 return otherCause != null && this.getCause().equals(otherCause); 2014 } else { 2015 return otherCause == null; 2016 } 2017 } 2018 return false; 2019 } 2020 2021 @Override 2022 public int hashCode() { 2023 int result = super.hashCode(); 2024 result = 31 * result + (cause != null ? cause.hashCode() : 0); 2025 return result; 2026 } 2027 } 2028}