001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.thrift; 020 021import static org.apache.hadoop.hbase.util.Bytes.getBytes; 022 023import java.io.IOException; 024import java.net.InetAddress; 025import java.net.InetSocketAddress; 026import java.net.UnknownHostException; 027import java.nio.ByteBuffer; 028import java.security.PrivilegedAction; 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.Collections; 032import java.util.HashMap; 033import java.util.List; 034import java.util.Map; 035import java.util.TreeMap; 036import java.util.concurrent.BlockingQueue; 037import java.util.concurrent.ExecutorService; 038import java.util.concurrent.LinkedBlockingQueue; 039import java.util.concurrent.ThreadPoolExecutor; 040import java.util.concurrent.TimeUnit; 041 042import javax.security.auth.callback.Callback; 043import javax.security.auth.callback.UnsupportedCallbackException; 044import javax.security.sasl.AuthorizeCallback; 045import javax.security.sasl.SaslServer; 046 047import org.apache.commons.lang3.ArrayUtils; 048import org.apache.hadoop.conf.Configuration; 049import org.apache.hadoop.hbase.Cell.Type; 050import org.apache.hadoop.hbase.CellBuilder; 051import org.apache.hadoop.hbase.CellBuilderFactory; 052import org.apache.hadoop.hbase.CellBuilderType; 053import org.apache.hadoop.hbase.CellUtil; 054import org.apache.hadoop.hbase.HBaseConfiguration; 055import org.apache.hadoop.hbase.HColumnDescriptor; 056import org.apache.hadoop.hbase.HConstants; 057import org.apache.hadoop.hbase.HRegionLocation; 058import org.apache.hadoop.hbase.HTableDescriptor; 059import org.apache.hadoop.hbase.KeyValue; 060import org.apache.hadoop.hbase.MetaTableAccessor; 061import org.apache.hadoop.hbase.ServerName; 062import org.apache.hadoop.hbase.TableName; 063import org.apache.hadoop.hbase.TableNotFoundException; 064import org.apache.hadoop.hbase.client.Admin; 065import org.apache.hadoop.hbase.client.Append; 066import org.apache.hadoop.hbase.client.Delete; 067import org.apache.hadoop.hbase.client.Durability; 068import org.apache.hadoop.hbase.client.Get; 069import org.apache.hadoop.hbase.client.Increment; 070import org.apache.hadoop.hbase.client.OperationWithAttributes; 071import org.apache.hadoop.hbase.client.Put; 072import org.apache.hadoop.hbase.client.RegionInfo; 073import org.apache.hadoop.hbase.client.RegionLocator; 074import org.apache.hadoop.hbase.client.Result; 075import org.apache.hadoop.hbase.client.ResultScanner; 076import org.apache.hadoop.hbase.client.Scan; 077import org.apache.hadoop.hbase.client.Table; 078import org.apache.hadoop.hbase.filter.Filter; 079import org.apache.hadoop.hbase.filter.ParseFilter; 080import org.apache.hadoop.hbase.filter.PrefixFilter; 081import org.apache.hadoop.hbase.filter.WhileMatchFilter; 082import org.apache.hadoop.hbase.http.HttpServerUtil; 083import org.apache.hadoop.hbase.log.HBaseMarkers; 084import org.apache.hadoop.hbase.security.SaslUtil; 085import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; 086import org.apache.hadoop.hbase.security.SecurityUtil; 087import org.apache.hadoop.hbase.security.UserProvider; 088import org.apache.hadoop.hbase.thrift.generated.AlreadyExists; 089import org.apache.hadoop.hbase.thrift.generated.BatchMutation; 090import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor; 091import org.apache.hadoop.hbase.thrift.generated.Hbase; 092import org.apache.hadoop.hbase.thrift.generated.IOError; 093import org.apache.hadoop.hbase.thrift.generated.IllegalArgument; 094import org.apache.hadoop.hbase.thrift.generated.Mutation; 095import org.apache.hadoop.hbase.thrift.generated.TAppend; 096import org.apache.hadoop.hbase.thrift.generated.TCell; 097import org.apache.hadoop.hbase.thrift.generated.TIncrement; 098import org.apache.hadoop.hbase.thrift.generated.TRegionInfo; 099import org.apache.hadoop.hbase.thrift.generated.TRowResult; 100import org.apache.hadoop.hbase.thrift.generated.TScan; 101import org.apache.hadoop.hbase.util.Bytes; 102import org.apache.hadoop.hbase.util.ConnectionCache; 103import org.apache.hadoop.hbase.util.DNS; 104import org.apache.hadoop.hbase.util.JvmPauseMonitor; 105import org.apache.hadoop.hbase.util.Strings; 106import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler; 107import org.apache.hadoop.security.UserGroupInformation; 108import org.apache.hadoop.security.authorize.ProxyUsers; 109import org.apache.thrift.TException; 110import org.apache.thrift.TProcessor; 111import org.apache.thrift.protocol.TBinaryProtocol; 112import org.apache.thrift.protocol.TCompactProtocol; 113import org.apache.thrift.protocol.TProtocolFactory; 114import org.apache.thrift.server.THsHaServer; 115import org.apache.thrift.server.TNonblockingServer; 116import org.apache.thrift.server.TServer; 117import org.apache.thrift.server.TServlet; 118import org.apache.thrift.server.TThreadedSelectorServer; 119import org.apache.thrift.transport.TFramedTransport; 120import org.apache.thrift.transport.TNonblockingServerSocket; 121import org.apache.thrift.transport.TNonblockingServerTransport; 122import org.apache.thrift.transport.TSaslServerTransport; 123import org.apache.thrift.transport.TServerSocket; 124import org.apache.thrift.transport.TServerTransport; 125import org.apache.thrift.transport.TTransportFactory; 126import org.apache.yetus.audience.InterfaceAudience; 127import org.eclipse.jetty.http.HttpVersion; 128import org.eclipse.jetty.server.HttpConfiguration; 129import org.eclipse.jetty.server.HttpConnectionFactory; 130import org.eclipse.jetty.server.SecureRequestCustomizer; 131import org.eclipse.jetty.server.Server; 132import org.eclipse.jetty.server.ServerConnector; 133import org.eclipse.jetty.server.SslConnectionFactory; 134import org.eclipse.jetty.servlet.ServletContextHandler; 135import org.eclipse.jetty.servlet.ServletHolder; 136import org.eclipse.jetty.util.ssl.SslContextFactory; 137import org.eclipse.jetty.util.thread.QueuedThreadPool; 138import org.slf4j.Logger; 139import org.slf4j.LoggerFactory; 140import org.apache.hbase.thirdparty.com.google.common.base.Joiner; 141import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 142import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 143import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 144import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 145import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; 146import org.apache.hbase.thirdparty.org.apache.commons.cli.OptionGroup; 147 148/** 149 * ThriftServerRunner - this class starts up a Thrift server which implements 150 * the Hbase API specified in the Hbase.thrift IDL file. 151 */ 152@InterfaceAudience.Private 153public class ThriftServerRunner implements Runnable { 154 155 private static final Logger LOG = LoggerFactory.getLogger(ThriftServerRunner.class); 156 157 private static final int DEFAULT_HTTP_MAX_HEADER_SIZE = 64 * 1024; // 64k 158 159 static final String SERVER_TYPE_CONF_KEY = 160 "hbase.regionserver.thrift.server.type"; 161 162 static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress"; 163 static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact"; 164 static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed"; 165 static final String MAX_FRAME_SIZE_CONF_KEY = "hbase.regionserver.thrift.framed.max_frame_size_in_mb"; 166 static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port"; 167 static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement"; 168 static final String USE_HTTP_CONF_KEY = "hbase.regionserver.thrift.http"; 169 static final String HTTP_MIN_THREADS_KEY = "hbase.thrift.http_threads.min"; 170 static final String HTTP_MAX_THREADS_KEY = "hbase.thrift.http_threads.max"; 171 172 static final String THRIFT_SSL_ENABLED_KEY = "hbase.thrift.ssl.enabled"; 173 static final String THRIFT_SSL_KEYSTORE_STORE_KEY = "hbase.thrift.ssl.keystore.store"; 174 static final String THRIFT_SSL_KEYSTORE_PASSWORD_KEY = "hbase.thrift.ssl.keystore.password"; 175 static final String THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY = "hbase.thrift.ssl.keystore.keypassword"; 176 static final String THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY = 177 "hbase.thrift.ssl.exclude.cipher.suites"; 178 static final String THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY = 179 "hbase.thrift.ssl.include.cipher.suites"; 180 static final String THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY = "hbase.thrift.ssl.exclude.protocols"; 181 static final String THRIFT_SSL_INCLUDE_PROTOCOLS_KEY = "hbase.thrift.ssl.include.protocols"; 182 183 static final String THRIFT_SUPPORT_PROXYUSER_KEY = "hbase.thrift.support.proxyuser"; 184 185 static final String THRIFT_DNS_INTERFACE_KEY = "hbase.thrift.dns.interface"; 186 static final String THRIFT_DNS_NAMESERVER_KEY = "hbase.thrift.dns.nameserver"; 187 static final String THRIFT_KERBEROS_PRINCIPAL_KEY = "hbase.thrift.kerberos.principal"; 188 static final String THRIFT_KEYTAB_FILE_KEY = "hbase.thrift.keytab.file"; 189 static final String THRIFT_SPNEGO_PRINCIPAL_KEY = "hbase.thrift.spnego.principal"; 190 static final String THRIFT_SPNEGO_KEYTAB_FILE_KEY = "hbase.thrift.spnego.keytab.file"; 191 192 /** 193 * Amount of time in milliseconds before a server thread will timeout 194 * waiting for client to send data on a connected socket. Currently, 195 * applies only to TBoundedThreadPoolServer 196 */ 197 public static final String THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY = 198 "hbase.thrift.server.socket.read.timeout"; 199 public static final int THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT = 60000; 200 201 202 /** 203 * Thrift quality of protection configuration key. Valid values can be: 204 * auth-conf: authentication, integrity and confidentiality checking 205 * auth-int: authentication and integrity checking 206 * auth: authentication only 207 * 208 * This is used to authenticate the callers and support impersonation. 209 * The thrift server and the HBase cluster must run in secure mode. 210 */ 211 static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop"; 212 static final String BACKLOG_CONF_KEY = "hbase.regionserver.thrift.backlog"; 213 214 private static final String DEFAULT_BIND_ADDR = "0.0.0.0"; 215 public static final int DEFAULT_LISTEN_PORT = 9090; 216 public static final int HREGION_VERSION = 1; 217 218 private final int listenPort; 219 220 private Configuration conf; 221 volatile TServer tserver; 222 volatile Server httpServer; 223 private final Hbase.Iface handler; 224 private final ThriftMetrics metrics; 225 private final HBaseHandler hbaseHandler; 226 private final UserGroupInformation serviceUGI; 227 private UserGroupInformation httpUGI; 228 229 private SaslUtil.QualityOfProtection qop; 230 private String host; 231 232 private final boolean securityEnabled; 233 private final boolean doAsEnabled; 234 235 private final JvmPauseMonitor pauseMonitor; 236 237 static String THRIFT_HTTP_ALLOW_OPTIONS_METHOD = "hbase.thrift.http.allow.options.method"; 238 private static boolean THRIFT_HTTP_ALLOW_OPTIONS_METHOD_DEFAULT = false; 239 240 /** An enum of server implementation selections */ 241 public enum ImplType { 242 HS_HA("hsha", true, THsHaServer.class, true), 243 NONBLOCKING("nonblocking", true, TNonblockingServer.class, true), 244 THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true), 245 THREADED_SELECTOR("threadedselector", true, TThreadedSelectorServer.class, true); 246 247 public static final ImplType DEFAULT = THREAD_POOL; 248 249 final String option; 250 final boolean isAlwaysFramed; 251 final Class<? extends TServer> serverClass; 252 final boolean canSpecifyBindIP; 253 254 private ImplType(String option, boolean isAlwaysFramed, 255 Class<? extends TServer> serverClass, boolean canSpecifyBindIP) { 256 this.option = option; 257 this.isAlwaysFramed = isAlwaysFramed; 258 this.serverClass = serverClass; 259 this.canSpecifyBindIP = canSpecifyBindIP; 260 } 261 262 /** 263 * @return <code>-option</code> 264 */ 265 @Override 266 public String toString() { 267 return "-" + option; 268 } 269 270 public String getOption() { 271 return option; 272 } 273 274 public boolean isAlwaysFramed() { 275 return isAlwaysFramed; 276 } 277 278 public String getDescription() { 279 StringBuilder sb = new StringBuilder("Use the " + 280 serverClass.getSimpleName()); 281 if (isAlwaysFramed) { 282 sb.append(" This implies the framed transport."); 283 } 284 if (this == DEFAULT) { 285 sb.append("This is the default."); 286 } 287 return sb.toString(); 288 } 289 290 static OptionGroup createOptionGroup() { 291 OptionGroup group = new OptionGroup(); 292 for (ImplType t : values()) { 293 group.addOption(new Option(t.option, t.getDescription())); 294 } 295 return group; 296 } 297 298 public static ImplType getServerImpl(Configuration conf) { 299 String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option); 300 for (ImplType t : values()) { 301 if (confType.equals(t.option)) { 302 return t; 303 } 304 } 305 throw new AssertionError("Unknown server ImplType.option:" + confType); 306 } 307 308 static void setServerImpl(CommandLine cmd, Configuration conf) { 309 ImplType chosenType = null; 310 int numChosen = 0; 311 for (ImplType t : values()) { 312 if (cmd.hasOption(t.option)) { 313 chosenType = t; 314 ++numChosen; 315 } 316 } 317 if (numChosen < 1) { 318 LOG.info("Using default thrift server type"); 319 chosenType = DEFAULT; 320 } else if (numChosen > 1) { 321 throw new AssertionError("Exactly one option out of " + 322 Arrays.toString(values()) + " has to be specified"); 323 } 324 LOG.info("Using thrift server type " + chosenType.option); 325 conf.set(SERVER_TYPE_CONF_KEY, chosenType.option); 326 } 327 328 public String simpleClassName() { 329 return serverClass.getSimpleName(); 330 } 331 332 public static List<String> serversThatCannotSpecifyBindIP() { 333 List<String> l = new ArrayList<>(); 334 for (ImplType t : values()) { 335 if (!t.canSpecifyBindIP) { 336 l.add(t.simpleClassName()); 337 } 338 } 339 return l; 340 } 341 } 342 343 public ThriftServerRunner(Configuration conf) throws IOException { 344 // login the server principal (if using secure Hadoop) 345 UserProvider userProvider = UserProvider.instantiate(conf); 346 securityEnabled = userProvider.isHadoopSecurityEnabled() 347 && userProvider.isHBaseSecurityEnabled(); 348 if (securityEnabled) { 349 host = Strings.domainNamePointerToHostName(DNS.getDefaultHost( 350 conf.get(THRIFT_DNS_INTERFACE_KEY, "default"), 351 conf.get(THRIFT_DNS_NAMESERVER_KEY, "default"))); 352 userProvider.login(THRIFT_KEYTAB_FILE_KEY, THRIFT_KERBEROS_PRINCIPAL_KEY, host); 353 354 // Setup the SPNEGO user for HTTP if configured 355 String spnegoPrincipal = getSpengoPrincipal(conf, host); 356 String spnegoKeytab = getSpnegoKeytab(conf); 357 UserGroupInformation.setConfiguration(conf); 358 // login the SPNEGO principal using UGI to avoid polluting the login user 359 this.httpUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(spnegoPrincipal, 360 spnegoKeytab); 361 } 362 this.serviceUGI = userProvider.getCurrent().getUGI(); 363 if (httpUGI == null) { 364 this.httpUGI = serviceUGI; 365 } 366 367 this.conf = HBaseConfiguration.create(conf); 368 this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT); 369 this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE); 370 this.pauseMonitor = new JvmPauseMonitor(conf, this.metrics.getSource()); 371 this.hbaseHandler = new HBaseHandler(conf, userProvider); 372 this.hbaseHandler.initMetrics(metrics); 373 this.handler = HbaseHandlerMetricsProxy.newInstance(hbaseHandler, metrics, conf); 374 375 boolean httpEnabled = conf.getBoolean(USE_HTTP_CONF_KEY, false); 376 doAsEnabled = conf.getBoolean(THRIFT_SUPPORT_PROXYUSER_KEY, false); 377 if (doAsEnabled && !httpEnabled) { 378 LOG.warn("Fail to enable the doAs feature. " + USE_HTTP_CONF_KEY + " is not configured"); 379 } 380 381 String strQop = conf.get(THRIFT_QOP_KEY); 382 if (strQop != null) { 383 this.qop = SaslUtil.getQop(strQop); 384 } 385 if (qop != null) { 386 if (qop != QualityOfProtection.AUTHENTICATION && 387 qop != QualityOfProtection.INTEGRITY && 388 qop != QualityOfProtection.PRIVACY) { 389 throw new IOException(String.format("Invalid %s: It must be one of %s, %s, or %s.", 390 THRIFT_QOP_KEY, 391 QualityOfProtection.AUTHENTICATION.name(), 392 QualityOfProtection.INTEGRITY.name(), 393 QualityOfProtection.PRIVACY.name())); 394 } 395 checkHttpSecurity(qop, conf); 396 if (!securityEnabled) { 397 throw new IOException("Thrift server must run in secure mode to support authentication"); 398 } 399 } 400 } 401 402 403 private String getSpengoPrincipal(Configuration conf, String host) throws IOException { 404 String principal = conf.get(THRIFT_SPNEGO_PRINCIPAL_KEY); 405 if (principal == null) { 406 // We cannot use the Hadoop configuration deprecation handling here since 407 // the THRIFT_KERBEROS_PRINCIPAL_KEY config is still valid for regular Kerberos 408 // communication. The preference should be to use the THRIFT_SPNEGO_PRINCIPAL_KEY 409 // config so that THRIFT_KERBEROS_PRINCIPAL_KEY doesn't control both backend 410 // Kerberos principal and SPNEGO principal. 411 LOG.info("Using deprecated {} config for SPNEGO principal. Use {} instead.", 412 THRIFT_KERBEROS_PRINCIPAL_KEY, THRIFT_SPNEGO_PRINCIPAL_KEY); 413 principal = conf.get(THRIFT_KERBEROS_PRINCIPAL_KEY); 414 } 415 // Handle _HOST in principal value 416 return org.apache.hadoop.security.SecurityUtil.getServerPrincipal(principal, host); 417 } 418 419 private String getSpnegoKeytab(Configuration conf) { 420 String keytab = conf.get(THRIFT_SPNEGO_KEYTAB_FILE_KEY); 421 if (keytab == null) { 422 // We cannot use the Hadoop configuration deprecation handling here since 423 // the THRIFT_KEYTAB_FILE_KEY config is still valid for regular Kerberos 424 // communication. The preference should be to use the THRIFT_SPNEGO_KEYTAB_FILE_KEY 425 // config so that THRIFT_KEYTAB_FILE_KEY doesn't control both backend 426 // Kerberos keytab and SPNEGO keytab. 427 LOG.info("Using deprecated {} config for SPNEGO keytab. Use {} instead.", 428 THRIFT_KEYTAB_FILE_KEY, THRIFT_SPNEGO_KEYTAB_FILE_KEY); 429 keytab = conf.get(THRIFT_KEYTAB_FILE_KEY); 430 } 431 return keytab; 432 } 433 434 private void checkHttpSecurity(QualityOfProtection qop, Configuration conf) { 435 if (qop == QualityOfProtection.PRIVACY && 436 conf.getBoolean(USE_HTTP_CONF_KEY, false) && 437 !conf.getBoolean(THRIFT_SSL_ENABLED_KEY, false)) { 438 throw new IllegalArgumentException("Thrift HTTP Server's QoP is privacy, but " + 439 THRIFT_SSL_ENABLED_KEY + " is false"); 440 } 441 } 442 443 /* 444 * Runs the Thrift server 445 */ 446 @Override 447 public void run() { 448 serviceUGI.doAs(new PrivilegedAction<Object>() { 449 @Override 450 public Object run() { 451 try { 452 pauseMonitor.start(); 453 if (conf.getBoolean(USE_HTTP_CONF_KEY, false)) { 454 setupHTTPServer(); 455 httpServer.start(); 456 httpServer.join(); 457 } else { 458 setupServer(); 459 tserver.serve(); 460 } 461 } catch (Exception e) { 462 LOG.error(HBaseMarkers.FATAL, "Cannot run ThriftServer", e); 463 // Crash the process if the ThriftServer is not running 464 System.exit(-1); 465 } 466 return null; 467 } 468 }); 469 470 } 471 472 public void shutdown() { 473 if (pauseMonitor != null) { 474 pauseMonitor.stop(); 475 } 476 if (tserver != null) { 477 tserver.stop(); 478 tserver = null; 479 } 480 if (httpServer != null) { 481 try { 482 httpServer.stop(); 483 httpServer = null; 484 } catch (Exception e) { 485 LOG.error("Problem encountered in shutting down HTTP server", e); 486 } 487 httpServer = null; 488 } 489 } 490 491 private void setupHTTPServer() throws IOException { 492 TProtocolFactory protocolFactory = new TBinaryProtocol.Factory(); 493 TProcessor processor = new Hbase.Processor<>(handler); 494 TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, serviceUGI, 495 httpUGI, hbaseHandler, securityEnabled, doAsEnabled); 496 497 // Set the default max thread number to 100 to limit 498 // the number of concurrent requests so that Thrfit HTTP server doesn't OOM easily. 499 // Jetty set the default max thread number to 250, if we don't set it. 500 // 501 // Our default min thread number 2 is the same as that used by Jetty. 502 int minThreads = conf.getInt(HTTP_MIN_THREADS_KEY, 2); 503 int maxThreads = conf.getInt(HTTP_MAX_THREADS_KEY, 100); 504 QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads); 505 threadPool.setMinThreads(minThreads); 506 httpServer = new Server(threadPool); 507 508 // Context handler 509 ServletContextHandler ctxHandler = new ServletContextHandler(httpServer, "/", ServletContextHandler.SESSIONS); 510 ctxHandler.addServlet(new ServletHolder(thriftHttpServlet), "/*"); 511 HttpServerUtil.constrainHttpMethods(ctxHandler, 512 conf.getBoolean(THRIFT_HTTP_ALLOW_OPTIONS_METHOD, THRIFT_HTTP_ALLOW_OPTIONS_METHOD_DEFAULT)); 513 514 // set up Jetty and run the embedded server 515 HttpConfiguration httpConfig = new HttpConfiguration(); 516 httpConfig.setSecureScheme("https"); 517 httpConfig.setSecurePort(listenPort); 518 httpConfig.setHeaderCacheSize(DEFAULT_HTTP_MAX_HEADER_SIZE); 519 httpConfig.setRequestHeaderSize(DEFAULT_HTTP_MAX_HEADER_SIZE); 520 httpConfig.setResponseHeaderSize(DEFAULT_HTTP_MAX_HEADER_SIZE); 521 httpConfig.setSendServerVersion(false); 522 httpConfig.setSendDateHeader(false); 523 524 ServerConnector serverConnector; 525 if(conf.getBoolean(THRIFT_SSL_ENABLED_KEY, false)) { 526 HttpConfiguration httpsConfig = new HttpConfiguration(httpConfig); 527 httpsConfig.addCustomizer(new SecureRequestCustomizer()); 528 529 SslContextFactory sslCtxFactory = new SslContextFactory(); 530 String keystore = conf.get(THRIFT_SSL_KEYSTORE_STORE_KEY); 531 String password = HBaseConfiguration.getPassword(conf, 532 THRIFT_SSL_KEYSTORE_PASSWORD_KEY, null); 533 String keyPassword = HBaseConfiguration.getPassword(conf, 534 THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY, password); 535 sslCtxFactory.setKeyStorePath(keystore); 536 sslCtxFactory.setKeyStorePassword(password); 537 sslCtxFactory.setKeyManagerPassword(keyPassword); 538 539 String[] excludeCiphers = conf.getStrings( 540 THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY, ArrayUtils.EMPTY_STRING_ARRAY); 541 if (excludeCiphers.length != 0) { 542 sslCtxFactory.setExcludeCipherSuites(excludeCiphers); 543 } 544 String[] includeCiphers = conf.getStrings( 545 THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY, ArrayUtils.EMPTY_STRING_ARRAY); 546 if (includeCiphers.length != 0) { 547 sslCtxFactory.setIncludeCipherSuites(includeCiphers); 548 } 549 550 // Disable SSLv3 by default due to "Poodle" Vulnerability - CVE-2014-3566 551 String[] excludeProtocols = conf.getStrings( 552 THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY, "SSLv3"); 553 if (excludeProtocols.length != 0) { 554 sslCtxFactory.setExcludeProtocols(excludeProtocols); 555 } 556 String[] includeProtocols = conf.getStrings( 557 THRIFT_SSL_INCLUDE_PROTOCOLS_KEY, ArrayUtils.EMPTY_STRING_ARRAY); 558 if (includeProtocols.length != 0) { 559 sslCtxFactory.setIncludeProtocols(includeProtocols); 560 } 561 562 serverConnector = new ServerConnector(httpServer, 563 new SslConnectionFactory(sslCtxFactory, HttpVersion.HTTP_1_1.toString()), 564 new HttpConnectionFactory(httpsConfig)); 565 } else { 566 serverConnector = new ServerConnector(httpServer, new HttpConnectionFactory(httpConfig)); 567 } 568 serverConnector.setPort(listenPort); 569 serverConnector.setHost(getBindAddress(conf).getHostAddress()); 570 httpServer.addConnector(serverConnector); 571 httpServer.setStopAtShutdown(true); 572 573 if (doAsEnabled) { 574 ProxyUsers.refreshSuperUserGroupsConfiguration(conf); 575 } 576 577 LOG.info("Starting Thrift HTTP Server on {}", Integer.toString(listenPort)); 578 } 579 580 /** 581 * Setting up the thrift TServer 582 */ 583 private void setupServer() throws Exception { 584 // Construct correct ProtocolFactory 585 TProtocolFactory protocolFactory; 586 if (conf.getBoolean(COMPACT_CONF_KEY, false)) { 587 LOG.debug("Using compact protocol"); 588 protocolFactory = new TCompactProtocol.Factory(); 589 } else { 590 LOG.debug("Using binary protocol"); 591 protocolFactory = new TBinaryProtocol.Factory(); 592 } 593 594 final TProcessor p = new Hbase.Processor<>(handler); 595 ImplType implType = ImplType.getServerImpl(conf); 596 TProcessor processor = p; 597 598 // Construct correct TransportFactory 599 TTransportFactory transportFactory; 600 if (conf.getBoolean(FRAMED_CONF_KEY, false) || implType.isAlwaysFramed) { 601 if (qop != null) { 602 throw new RuntimeException("Thrift server authentication" 603 + " doesn't work with framed transport yet"); 604 } 605 transportFactory = new TFramedTransport.Factory( 606 conf.getInt(MAX_FRAME_SIZE_CONF_KEY, 2) * 1024 * 1024); 607 LOG.debug("Using framed transport"); 608 } else if (qop == null) { 609 transportFactory = new TTransportFactory(); 610 } else { 611 // Extract the name from the principal 612 String thriftKerberosPrincipal = conf.get(THRIFT_KERBEROS_PRINCIPAL_KEY); 613 if (thriftKerberosPrincipal == null) { 614 throw new IllegalArgumentException(THRIFT_KERBEROS_PRINCIPAL_KEY + " cannot be null"); 615 } 616 String name = SecurityUtil.getUserFromPrincipal(thriftKerberosPrincipal); 617 Map<String, String> saslProperties = SaslUtil.initSaslProperties(qop.name()); 618 TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory(); 619 saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties, 620 new SaslGssCallbackHandler() { 621 @Override 622 public void handle(Callback[] callbacks) 623 throws UnsupportedCallbackException { 624 AuthorizeCallback ac = null; 625 for (Callback callback : callbacks) { 626 if (callback instanceof AuthorizeCallback) { 627 ac = (AuthorizeCallback) callback; 628 } else { 629 throw new UnsupportedCallbackException(callback, 630 "Unrecognized SASL GSSAPI Callback"); 631 } 632 } 633 if (ac != null) { 634 String authid = ac.getAuthenticationID(); 635 String authzid = ac.getAuthorizationID(); 636 if (!authid.equals(authzid)) { 637 ac.setAuthorized(false); 638 } else { 639 ac.setAuthorized(true); 640 String userName = SecurityUtil.getUserFromPrincipal(authzid); 641 LOG.info("Effective user: {}", userName); 642 ac.setAuthorizedID(userName); 643 } 644 } 645 } 646 }); 647 transportFactory = saslFactory; 648 649 // Create a processor wrapper, to get the caller 650 processor = (inProt, outProt) -> { 651 TSaslServerTransport saslServerTransport = 652 (TSaslServerTransport)inProt.getTransport(); 653 SaslServer saslServer = saslServerTransport.getSaslServer(); 654 String principal = saslServer.getAuthorizationID(); 655 hbaseHandler.setEffectiveUser(principal); 656 return p.process(inProt, outProt); 657 }; 658 } 659 660 if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) { 661 LOG.error("Server types {} don't support IP address binding at the moment. See " + 662 "https://issues.apache.org/jira/browse/HBASE-2155 for details.", 663 Joiner.on(", ").join(ImplType.serversThatCannotSpecifyBindIP())); 664 throw new RuntimeException("-" + BIND_CONF_KEY + " not supported with " + implType); 665 } 666 667 // Thrift's implementation uses '0' as a placeholder for 'use the default.' 668 int backlog = conf.getInt(BACKLOG_CONF_KEY, 0); 669 670 if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING || 671 implType == ImplType.THREADED_SELECTOR) { 672 InetAddress listenAddress = getBindAddress(conf); 673 TNonblockingServerTransport serverTransport = new TNonblockingServerSocket( 674 new InetSocketAddress(listenAddress, listenPort)); 675 676 if (implType == ImplType.NONBLOCKING) { 677 TNonblockingServer.Args serverArgs = 678 new TNonblockingServer.Args(serverTransport); 679 serverArgs.processor(processor) 680 .transportFactory(transportFactory) 681 .protocolFactory(protocolFactory); 682 tserver = new TNonblockingServer(serverArgs); 683 } else if (implType == ImplType.HS_HA) { 684 THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport); 685 CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(), metrics); 686 ExecutorService executorService = createExecutor( 687 callQueue, serverArgs.getMaxWorkerThreads(), serverArgs.getMaxWorkerThreads()); 688 serverArgs.executorService(executorService) 689 .processor(processor) 690 .transportFactory(transportFactory) 691 .protocolFactory(protocolFactory); 692 tserver = new THsHaServer(serverArgs); 693 } else { // THREADED_SELECTOR 694 TThreadedSelectorServer.Args serverArgs = 695 new HThreadedSelectorServerArgs(serverTransport, conf); 696 CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(), metrics); 697 ExecutorService executorService = createExecutor( 698 callQueue, serverArgs.getWorkerThreads(), serverArgs.getWorkerThreads()); 699 serverArgs.executorService(executorService) 700 .processor(processor) 701 .transportFactory(transportFactory) 702 .protocolFactory(protocolFactory); 703 tserver = new TThreadedSelectorServer(serverArgs); 704 } 705 LOG.info("starting HBase {} server on {}", implType.simpleClassName(), 706 Integer.toString(listenPort)); 707 } else if (implType == ImplType.THREAD_POOL) { 708 // Thread pool server. Get the IP address to bind to. 709 InetAddress listenAddress = getBindAddress(conf); 710 int readTimeout = conf.getInt(THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY, 711 THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT); 712 TServerTransport serverTransport = new TServerSocket( 713 new TServerSocket.ServerSocketTransportArgs(). 714 bindAddr(new InetSocketAddress(listenAddress, listenPort)). 715 backlog(backlog). 716 clientTimeout(readTimeout)); 717 718 TBoundedThreadPoolServer.Args serverArgs = 719 new TBoundedThreadPoolServer.Args(serverTransport, conf); 720 serverArgs.processor(processor) 721 .transportFactory(transportFactory) 722 .protocolFactory(protocolFactory); 723 LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on " 724 + listenAddress + ":" + Integer.toString(listenPort) 725 + " with readTimeout " + readTimeout + "ms; " + serverArgs); 726 TBoundedThreadPoolServer tserver = 727 new TBoundedThreadPoolServer(serverArgs, metrics); 728 this.tserver = tserver; 729 } else { 730 throw new AssertionError("Unsupported Thrift server implementation: " + 731 implType.simpleClassName()); 732 } 733 734 // A sanity check that we instantiated the right type of server. 735 if (tserver.getClass() != implType.serverClass) { 736 throw new AssertionError("Expected to create Thrift server class " + 737 implType.serverClass.getName() + " but got " + 738 tserver.getClass().getName()); 739 } 740 741 742 743 registerFilters(conf); 744 } 745 746 ExecutorService createExecutor(BlockingQueue<Runnable> callQueue, 747 int minWorkers, int maxWorkers) { 748 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); 749 tfb.setDaemon(true); 750 tfb.setNameFormat("thrift-worker-%d"); 751 ThreadPoolExecutor threadPool = new THBaseThreadPoolExecutor(minWorkers, maxWorkers, 752 Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build(), metrics); 753 threadPool.allowCoreThreadTimeOut(true); 754 return threadPool; 755 } 756 757 private InetAddress getBindAddress(Configuration conf) 758 throws UnknownHostException { 759 String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR); 760 return InetAddress.getByName(bindAddressStr); 761 } 762 763 protected static class ResultScannerWrapper { 764 765 private final ResultScanner scanner; 766 private final boolean sortColumns; 767 public ResultScannerWrapper(ResultScanner resultScanner, 768 boolean sortResultColumns) { 769 scanner = resultScanner; 770 sortColumns = sortResultColumns; 771 } 772 773 public ResultScanner getScanner() { 774 return scanner; 775 } 776 777 public boolean isColumnSorted() { 778 return sortColumns; 779 } 780 } 781 782 /** 783 * The HBaseHandler is a glue object that connects Thrift RPC calls to the 784 * HBase client API primarily defined in the Admin and Table objects. 785 */ 786 public static class HBaseHandler implements Hbase.Iface { 787 protected Configuration conf; 788 protected static final Logger LOG = LoggerFactory.getLogger(HBaseHandler.class); 789 790 // nextScannerId and scannerMap are used to manage scanner state 791 protected int nextScannerId = 0; 792 protected HashMap<Integer, ResultScannerWrapper> scannerMap; 793 private ThriftMetrics metrics = null; 794 795 private final ConnectionCache connectionCache; 796 IncrementCoalescer coalescer; 797 798 static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval"; 799 static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime"; 800 801 /** 802 * Returns a list of all the column families for a given Table. 803 * 804 * @param table 805 * @throws IOException 806 */ 807 byte[][] getAllColumns(Table table) throws IOException { 808 HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies(); 809 byte[][] columns = new byte[cds.length][]; 810 for (int i = 0; i < cds.length; i++) { 811 columns[i] = Bytes.add(cds[i].getName(), 812 KeyValue.COLUMN_FAMILY_DELIM_ARRAY); 813 } 814 return columns; 815 } 816 817 /** 818 * Creates and returns a Table instance from a given table name. 819 * 820 * @param tableName 821 * name of table 822 * @return Table object 823 * @throws IOException 824 */ 825 public Table getTable(final byte[] tableName) throws 826 IOException { 827 String table = Bytes.toString(tableName); 828 return connectionCache.getTable(table); 829 } 830 831 public Table getTable(final ByteBuffer tableName) throws IOException { 832 return getTable(getBytes(tableName)); 833 } 834 835 /** 836 * Assigns a unique ID to the scanner and adds the mapping to an internal 837 * hash-map. 838 * 839 * @param scanner 840 * @return integer scanner id 841 */ 842 protected synchronized int addScanner(ResultScanner scanner,boolean sortColumns) { 843 int id = nextScannerId++; 844 ResultScannerWrapper resultScannerWrapper = new ResultScannerWrapper(scanner, sortColumns); 845 scannerMap.put(id, resultScannerWrapper); 846 return id; 847 } 848 849 /** 850 * Returns the scanner associated with the specified ID. 851 * 852 * @param id 853 * @return a Scanner, or null if ID was invalid. 854 */ 855 protected synchronized ResultScannerWrapper getScanner(int id) { 856 return scannerMap.get(id); 857 } 858 859 /** 860 * Removes the scanner associated with the specified ID from the internal 861 * id->scanner hash-map. 862 * 863 * @param id 864 * @return a Scanner, or null if ID was invalid. 865 */ 866 protected synchronized ResultScannerWrapper removeScanner(int id) { 867 return scannerMap.remove(id); 868 } 869 870 protected HBaseHandler(final Configuration c, 871 final UserProvider userProvider) throws IOException { 872 this.conf = c; 873 scannerMap = new HashMap<>(); 874 this.coalescer = new IncrementCoalescer(this); 875 876 int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000); 877 int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000); 878 connectionCache = new ConnectionCache( 879 conf, userProvider, cleanInterval, maxIdleTime); 880 } 881 882 /** 883 * Obtain HBaseAdmin. Creates the instance if it is not already created. 884 */ 885 private Admin getAdmin() throws IOException { 886 return connectionCache.getAdmin(); 887 } 888 889 void setEffectiveUser(String effectiveUser) { 890 connectionCache.setEffectiveUser(effectiveUser); 891 } 892 893 @Override 894 public void enableTable(ByteBuffer tableName) throws IOError { 895 try{ 896 getAdmin().enableTable(getTableName(tableName)); 897 } catch (IOException e) { 898 LOG.warn(e.getMessage(), e); 899 throw getIOError(e); 900 } 901 } 902 903 @Override 904 public void disableTable(ByteBuffer tableName) throws IOError{ 905 try{ 906 getAdmin().disableTable(getTableName(tableName)); 907 } catch (IOException e) { 908 LOG.warn(e.getMessage(), e); 909 throw getIOError(e); 910 } 911 } 912 913 @Override 914 public boolean isTableEnabled(ByteBuffer tableName) throws IOError { 915 try { 916 return this.connectionCache.getAdmin().isTableEnabled(getTableName(tableName)); 917 } catch (IOException e) { 918 LOG.warn(e.getMessage(), e); 919 throw getIOError(e); 920 } 921 } 922 923 // ThriftServerRunner.compact should be deprecated and replaced with methods specific to 924 // table and region. 925 @Override 926 public void compact(ByteBuffer tableNameOrRegionName) throws IOError { 927 try { 928 try { 929 getAdmin().compactRegion(getBytes(tableNameOrRegionName)); 930 } catch (IllegalArgumentException e) { 931 // Invalid region, try table 932 getAdmin().compact(TableName.valueOf(getBytes(tableNameOrRegionName))); 933 } 934 } catch (IOException e) { 935 LOG.warn(e.getMessage(), e); 936 throw getIOError(e); 937 } 938 } 939 940 // ThriftServerRunner.majorCompact should be deprecated and replaced with methods specific 941 // to table and region. 942 @Override 943 public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError { 944 try { 945 try { 946 getAdmin().compactRegion(getBytes(tableNameOrRegionName)); 947 } catch (IllegalArgumentException e) { 948 // Invalid region, try table 949 getAdmin().compact(TableName.valueOf(getBytes(tableNameOrRegionName))); 950 } 951 } catch (IOException e) { 952 LOG.warn(e.getMessage(), e); 953 throw getIOError(e); 954 } 955 } 956 957 @Override 958 public List<ByteBuffer> getTableNames() throws IOError { 959 try { 960 TableName[] tableNames = this.getAdmin().listTableNames(); 961 ArrayList<ByteBuffer> list = new ArrayList<>(tableNames.length); 962 for (TableName tableName : tableNames) { 963 list.add(ByteBuffer.wrap(tableName.getName())); 964 } 965 return list; 966 } catch (IOException e) { 967 LOG.warn(e.getMessage(), e); 968 throw getIOError(e); 969 } 970 } 971 972 /** 973 * @return the list of regions in the given table, or an empty list if the table does not exist 974 */ 975 @Override 976 public List<TRegionInfo> getTableRegions(ByteBuffer tableName) throws IOError { 977 try (RegionLocator locator = connectionCache.getRegionLocator(getBytes(tableName))) { 978 List<HRegionLocation> regionLocations = locator.getAllRegionLocations(); 979 List<TRegionInfo> results = new ArrayList<>(regionLocations.size()); 980 for (HRegionLocation regionLocation : regionLocations) { 981 RegionInfo info = regionLocation.getRegionInfo(); 982 ServerName serverName = regionLocation.getServerName(); 983 TRegionInfo region = new TRegionInfo(); 984 region.serverName = ByteBuffer.wrap( 985 Bytes.toBytes(serverName.getHostname())); 986 region.port = serverName.getPort(); 987 region.startKey = ByteBuffer.wrap(info.getStartKey()); 988 region.endKey = ByteBuffer.wrap(info.getEndKey()); 989 region.id = info.getRegionId(); 990 region.name = ByteBuffer.wrap(info.getRegionName()); 991 region.version = HREGION_VERSION; // HRegion now not versioned, PB encoding used 992 results.add(region); 993 } 994 return results; 995 } catch (TableNotFoundException e) { 996 // Return empty list for non-existing table 997 return Collections.emptyList(); 998 } catch (IOException e){ 999 LOG.warn(e.getMessage(), e); 1000 throw getIOError(e); 1001 } 1002 } 1003 1004 @Override 1005 public List<TCell> get( 1006 ByteBuffer tableName, ByteBuffer row, ByteBuffer column, 1007 Map<ByteBuffer, ByteBuffer> attributes) 1008 throws IOError { 1009 byte [][] famAndQf = CellUtil.parseColumn(getBytes(column)); 1010 if (famAndQf.length == 1) { 1011 return get(tableName, row, famAndQf[0], null, attributes); 1012 } 1013 if (famAndQf.length == 2) { 1014 return get(tableName, row, famAndQf[0], famAndQf[1], attributes); 1015 } 1016 throw new IllegalArgumentException("Invalid familyAndQualifier provided."); 1017 } 1018 1019 /** 1020 * Note: this internal interface is slightly different from public APIs in regard to handling 1021 * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather, 1022 * we respect qual == null as a request for the entire column family. The caller ( 1023 * {@link #get(ByteBuffer, ByteBuffer, ByteBuffer, Map)}) interface IS consistent in that the 1024 * column is parse like normal. 1025 */ 1026 protected List<TCell> get(ByteBuffer tableName, 1027 ByteBuffer row, 1028 byte[] family, 1029 byte[] qualifier, 1030 Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 1031 Table table = null; 1032 try { 1033 table = getTable(tableName); 1034 Get get = new Get(getBytes(row)); 1035 addAttributes(get, attributes); 1036 if (qualifier == null) { 1037 get.addFamily(family); 1038 } else { 1039 get.addColumn(family, qualifier); 1040 } 1041 Result result = table.get(get); 1042 return ThriftUtilities.cellFromHBase(result.rawCells()); 1043 } catch (IOException e) { 1044 LOG.warn(e.getMessage(), e); 1045 throw getIOError(e); 1046 } finally { 1047 closeTable(table); 1048 } 1049 } 1050 1051 @Override 1052 public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, ByteBuffer column, 1053 int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 1054 byte [][] famAndQf = CellUtil.parseColumn(getBytes(column)); 1055 if(famAndQf.length == 1) { 1056 return getVer(tableName, row, famAndQf[0], null, numVersions, attributes); 1057 } 1058 if (famAndQf.length == 2) { 1059 return getVer(tableName, row, famAndQf[0], famAndQf[1], numVersions, attributes); 1060 } 1061 throw new IllegalArgumentException("Invalid familyAndQualifier provided."); 1062 1063 } 1064 1065 /** 1066 * Note: this public interface is slightly different from public Java APIs in regard to 1067 * handling of the qualifier. Here we differ from the public Java API in that null != byte[0]. 1068 * Rather, we respect qual == null as a request for the entire column family. If you want to 1069 * access the entire column family, use 1070 * {@link #getVer(ByteBuffer, ByteBuffer, ByteBuffer, int, Map)} with a {@code column} value 1071 * that lacks a {@code ':'}. 1072 */ 1073 public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family, 1074 byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 1075 1076 Table table = null; 1077 try { 1078 table = getTable(tableName); 1079 Get get = new Get(getBytes(row)); 1080 addAttributes(get, attributes); 1081 if (null == qualifier) { 1082 get.addFamily(family); 1083 } else { 1084 get.addColumn(family, qualifier); 1085 } 1086 get.setMaxVersions(numVersions); 1087 Result result = table.get(get); 1088 return ThriftUtilities.cellFromHBase(result.rawCells()); 1089 } catch (IOException e) { 1090 LOG.warn(e.getMessage(), e); 1091 throw getIOError(e); 1092 } finally{ 1093 closeTable(table); 1094 } 1095 } 1096 1097 @Override 1098 public List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column, 1099 long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 1100 byte [][] famAndQf = CellUtil.parseColumn(getBytes(column)); 1101 if (famAndQf.length == 1) { 1102 return getVerTs(tableName, row, famAndQf[0], null, timestamp, numVersions, attributes); 1103 } 1104 if (famAndQf.length == 2) { 1105 return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, numVersions, 1106 attributes); 1107 } 1108 throw new IllegalArgumentException("Invalid familyAndQualifier provided."); 1109 } 1110 1111 /** 1112 * Note: this internal interface is slightly different from public APIs in regard to handling 1113 * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather, 1114 * we respect qual == null as a request for the entire column family. The caller ( 1115 * {@link #getVerTs(ByteBuffer, ByteBuffer, ByteBuffer, long, int, Map)}) interface IS 1116 * consistent in that the column is parse like normal. 1117 */ 1118 protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family, 1119 byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) 1120 throws IOError { 1121 1122 Table table = null; 1123 try { 1124 table = getTable(tableName); 1125 Get get = new Get(getBytes(row)); 1126 addAttributes(get, attributes); 1127 if (null == qualifier) { 1128 get.addFamily(family); 1129 } else { 1130 get.addColumn(family, qualifier); 1131 } 1132 get.setTimeRange(0, timestamp); 1133 get.setMaxVersions(numVersions); 1134 Result result = table.get(get); 1135 return ThriftUtilities.cellFromHBase(result.rawCells()); 1136 } catch (IOException e) { 1137 LOG.warn(e.getMessage(), e); 1138 throw getIOError(e); 1139 } finally{ 1140 closeTable(table); 1141 } 1142 } 1143 1144 @Override 1145 public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row, 1146 Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 1147 return getRowWithColumnsTs(tableName, row, null, 1148 HConstants.LATEST_TIMESTAMP, 1149 attributes); 1150 } 1151 1152 @Override 1153 public List<TRowResult> getRowWithColumns(ByteBuffer tableName, 1154 ByteBuffer row, 1155 List<ByteBuffer> columns, 1156 Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 1157 return getRowWithColumnsTs(tableName, row, columns, 1158 HConstants.LATEST_TIMESTAMP, 1159 attributes); 1160 } 1161 1162 @Override 1163 public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row, 1164 long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 1165 return getRowWithColumnsTs(tableName, row, null, 1166 timestamp, attributes); 1167 } 1168 1169 @Override 1170 public List<TRowResult> getRowWithColumnsTs( 1171 ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns, 1172 long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 1173 1174 Table table = null; 1175 try { 1176 table = getTable(tableName); 1177 if (columns == null) { 1178 Get get = new Get(getBytes(row)); 1179 addAttributes(get, attributes); 1180 get.setTimeRange(0, timestamp); 1181 Result result = table.get(get); 1182 return ThriftUtilities.rowResultFromHBase(result); 1183 } 1184 Get get = new Get(getBytes(row)); 1185 addAttributes(get, attributes); 1186 for(ByteBuffer column : columns) { 1187 byte [][] famAndQf = CellUtil.parseColumn(getBytes(column)); 1188 if (famAndQf.length == 1) { 1189 get.addFamily(famAndQf[0]); 1190 } else { 1191 get.addColumn(famAndQf[0], famAndQf[1]); 1192 } 1193 } 1194 get.setTimeRange(0, timestamp); 1195 Result result = table.get(get); 1196 return ThriftUtilities.rowResultFromHBase(result); 1197 } catch (IOException e) { 1198 LOG.warn(e.getMessage(), e); 1199 throw getIOError(e); 1200 } finally{ 1201 closeTable(table); 1202 } 1203 } 1204 1205 @Override 1206 public List<TRowResult> getRows(ByteBuffer tableName, 1207 List<ByteBuffer> rows, 1208 Map<ByteBuffer, ByteBuffer> attributes) 1209 throws IOError { 1210 return getRowsWithColumnsTs(tableName, rows, null, 1211 HConstants.LATEST_TIMESTAMP, 1212 attributes); 1213 } 1214 1215 @Override 1216 public List<TRowResult> getRowsWithColumns(ByteBuffer tableName, 1217 List<ByteBuffer> rows, 1218 List<ByteBuffer> columns, 1219 Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 1220 return getRowsWithColumnsTs(tableName, rows, columns, 1221 HConstants.LATEST_TIMESTAMP, 1222 attributes); 1223 } 1224 1225 @Override 1226 public List<TRowResult> getRowsTs(ByteBuffer tableName, 1227 List<ByteBuffer> rows, 1228 long timestamp, 1229 Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 1230 return getRowsWithColumnsTs(tableName, rows, null, 1231 timestamp, attributes); 1232 } 1233 1234 @Override 1235 public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName, 1236 List<ByteBuffer> rows, 1237 List<ByteBuffer> columns, long timestamp, 1238 Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 1239 1240 Table table= null; 1241 try { 1242 List<Get> gets = new ArrayList<>(rows.size()); 1243 table = getTable(tableName); 1244 if (metrics != null) { 1245 metrics.incNumRowKeysInBatchGet(rows.size()); 1246 } 1247 for (ByteBuffer row : rows) { 1248 Get get = new Get(getBytes(row)); 1249 addAttributes(get, attributes); 1250 if (columns != null) { 1251 1252 for(ByteBuffer column : columns) { 1253 byte [][] famAndQf = CellUtil.parseColumn(getBytes(column)); 1254 if (famAndQf.length == 1) { 1255 get.addFamily(famAndQf[0]); 1256 } else { 1257 get.addColumn(famAndQf[0], famAndQf[1]); 1258 } 1259 } 1260 } 1261 get.setTimeRange(0, timestamp); 1262 gets.add(get); 1263 } 1264 Result[] result = table.get(gets); 1265 return ThriftUtilities.rowResultFromHBase(result); 1266 } catch (IOException e) { 1267 LOG.warn(e.getMessage(), e); 1268 throw getIOError(e); 1269 } finally{ 1270 closeTable(table); 1271 } 1272 } 1273 1274 @Override 1275 public void deleteAll( 1276 ByteBuffer tableName, ByteBuffer row, ByteBuffer column, 1277 Map<ByteBuffer, ByteBuffer> attributes) 1278 throws IOError { 1279 deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP, 1280 attributes); 1281 } 1282 1283 @Override 1284 public void deleteAllTs(ByteBuffer tableName, 1285 ByteBuffer row, 1286 ByteBuffer column, 1287 long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 1288 Table table = null; 1289 try { 1290 table = getTable(tableName); 1291 Delete delete = new Delete(getBytes(row)); 1292 addAttributes(delete, attributes); 1293 byte [][] famAndQf = CellUtil.parseColumn(getBytes(column)); 1294 if (famAndQf.length == 1) { 1295 delete.addFamily(famAndQf[0], timestamp); 1296 } else { 1297 delete.addColumns(famAndQf[0], famAndQf[1], timestamp); 1298 } 1299 table.delete(delete); 1300 1301 } catch (IOException e) { 1302 LOG.warn(e.getMessage(), e); 1303 throw getIOError(e); 1304 } finally { 1305 closeTable(table); 1306 } 1307 } 1308 1309 @Override 1310 public void deleteAllRow( 1311 ByteBuffer tableName, ByteBuffer row, 1312 Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 1313 deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, attributes); 1314 } 1315 1316 @Override 1317 public void deleteAllRowTs( 1318 ByteBuffer tableName, ByteBuffer row, long timestamp, 1319 Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 1320 Table table = null; 1321 try { 1322 table = getTable(tableName); 1323 Delete delete = new Delete(getBytes(row), timestamp); 1324 addAttributes(delete, attributes); 1325 table.delete(delete); 1326 } catch (IOException e) { 1327 LOG.warn(e.getMessage(), e); 1328 throw getIOError(e); 1329 } finally { 1330 closeTable(table); 1331 } 1332 } 1333 1334 @Override 1335 public void createTable(ByteBuffer in_tableName, 1336 List<ColumnDescriptor> columnFamilies) throws IOError, 1337 IllegalArgument, AlreadyExists { 1338 TableName tableName = getTableName(in_tableName); 1339 try { 1340 if (getAdmin().tableExists(tableName)) { 1341 throw new AlreadyExists("table name already in use"); 1342 } 1343 HTableDescriptor desc = new HTableDescriptor(tableName); 1344 for (ColumnDescriptor col : columnFamilies) { 1345 HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col); 1346 desc.addFamily(colDesc); 1347 } 1348 getAdmin().createTable(desc); 1349 } catch (IOException e) { 1350 LOG.warn(e.getMessage(), e); 1351 throw getIOError(e); 1352 } catch (IllegalArgumentException e) { 1353 LOG.warn(e.getMessage(), e); 1354 throw new IllegalArgument(Throwables.getStackTraceAsString(e)); 1355 } 1356 } 1357 1358 private static TableName getTableName(ByteBuffer buffer) { 1359 return TableName.valueOf(getBytes(buffer)); 1360 } 1361 1362 @Override 1363 public void deleteTable(ByteBuffer in_tableName) throws IOError { 1364 TableName tableName = getTableName(in_tableName); 1365 if (LOG.isDebugEnabled()) { 1366 LOG.debug("deleteTable: table={}", tableName); 1367 } 1368 try { 1369 if (!getAdmin().tableExists(tableName)) { 1370 throw new IOException("table does not exist"); 1371 } 1372 getAdmin().deleteTable(tableName); 1373 } catch (IOException e) { 1374 LOG.warn(e.getMessage(), e); 1375 throw getIOError(e); 1376 } 1377 } 1378 1379 @Override 1380 public void mutateRow(ByteBuffer tableName, ByteBuffer row, 1381 List<Mutation> mutations, Map<ByteBuffer, ByteBuffer> attributes) 1382 throws IOError, IllegalArgument { 1383 mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP, attributes); 1384 } 1385 1386 @Override 1387 public void mutateRowTs(ByteBuffer tableName, ByteBuffer row, 1388 List<Mutation> mutations, long timestamp, 1389 Map<ByteBuffer, ByteBuffer> attributes) 1390 throws IOError, IllegalArgument { 1391 Table table = null; 1392 try { 1393 table = getTable(tableName); 1394 Put put = new Put(getBytes(row), timestamp); 1395 addAttributes(put, attributes); 1396 1397 Delete delete = new Delete(getBytes(row)); 1398 addAttributes(delete, attributes); 1399 if (metrics != null) { 1400 metrics.incNumRowKeysInBatchMutate(mutations.size()); 1401 } 1402 1403 // I apologize for all this mess :) 1404 CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY); 1405 for (Mutation m : mutations) { 1406 byte[][] famAndQf = CellUtil.parseColumn(getBytes(m.column)); 1407 if (m.isDelete) { 1408 if (famAndQf.length == 1) { 1409 delete.addFamily(famAndQf[0], timestamp); 1410 } else { 1411 delete.addColumns(famAndQf[0], famAndQf[1], timestamp); 1412 } 1413 delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1414 } else { 1415 if(famAndQf.length == 1) { 1416 LOG.warn("No column qualifier specified. Delete is the only mutation supported " 1417 + "over the whole column family."); 1418 } else { 1419 put.add(builder.clear() 1420 .setRow(put.getRow()) 1421 .setFamily(famAndQf[0]) 1422 .setQualifier(famAndQf[1]) 1423 .setTimestamp(put.getTimestamp()) 1424 .setType(Type.Put) 1425 .setValue(m.value != null ? getBytes(m.value) 1426 : HConstants.EMPTY_BYTE_ARRAY) 1427 .build()); 1428 } 1429 put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1430 } 1431 } 1432 if (!delete.isEmpty()) 1433 table.delete(delete); 1434 if (!put.isEmpty()) 1435 table.put(put); 1436 } catch (IOException e) { 1437 LOG.warn(e.getMessage(), e); 1438 throw getIOError(e); 1439 } catch (IllegalArgumentException e) { 1440 LOG.warn(e.getMessage(), e); 1441 throw new IllegalArgument(Throwables.getStackTraceAsString(e)); 1442 } finally{ 1443 closeTable(table); 1444 } 1445 } 1446 1447 @Override 1448 public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches, 1449 Map<ByteBuffer, ByteBuffer> attributes) 1450 throws IOError, IllegalArgument, TException { 1451 mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes); 1452 } 1453 1454 @Override 1455 public void mutateRowsTs( 1456 ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp, 1457 Map<ByteBuffer, ByteBuffer> attributes) 1458 throws IOError, IllegalArgument, TException { 1459 List<Put> puts = new ArrayList<>(); 1460 List<Delete> deletes = new ArrayList<>(); 1461 CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY); 1462 for (BatchMutation batch : rowBatches) { 1463 byte[] row = getBytes(batch.row); 1464 List<Mutation> mutations = batch.mutations; 1465 Delete delete = new Delete(row); 1466 addAttributes(delete, attributes); 1467 Put put = new Put(row, timestamp); 1468 addAttributes(put, attributes); 1469 for (Mutation m : mutations) { 1470 byte[][] famAndQf = CellUtil.parseColumn(getBytes(m.column)); 1471 if (m.isDelete) { 1472 // no qualifier, family only. 1473 if (famAndQf.length == 1) { 1474 delete.addFamily(famAndQf[0], timestamp); 1475 } else { 1476 delete.addColumns(famAndQf[0], famAndQf[1], timestamp); 1477 } 1478 delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL 1479 : Durability.SKIP_WAL); 1480 } else { 1481 if (famAndQf.length == 1) { 1482 LOG.warn("No column qualifier specified. Delete is the only mutation supported " 1483 + "over the whole column family."); 1484 } 1485 if (famAndQf.length == 2) { 1486 try { 1487 put.add(builder.clear() 1488 .setRow(put.getRow()) 1489 .setFamily(famAndQf[0]) 1490 .setQualifier(famAndQf[1]) 1491 .setTimestamp(put.getTimestamp()) 1492 .setType(Type.Put) 1493 .setValue(m.value != null ? getBytes(m.value) 1494 : HConstants.EMPTY_BYTE_ARRAY) 1495 .build()); 1496 } catch (IOException e) { 1497 throw new IllegalArgumentException(e); 1498 } 1499 } else { 1500 throw new IllegalArgumentException("Invalid famAndQf provided."); 1501 } 1502 put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1503 } 1504 } 1505 if (!delete.isEmpty()) 1506 deletes.add(delete); 1507 if (!put.isEmpty()) 1508 puts.add(put); 1509 } 1510 1511 Table table = null; 1512 try { 1513 table = getTable(tableName); 1514 if (!puts.isEmpty()) 1515 table.put(puts); 1516 if (!deletes.isEmpty()) 1517 table.delete(deletes); 1518 } catch (IOException e) { 1519 LOG.warn(e.getMessage(), e); 1520 throw getIOError(e); 1521 } catch (IllegalArgumentException e) { 1522 LOG.warn(e.getMessage(), e); 1523 throw new IllegalArgument(Throwables.getStackTraceAsString(e)); 1524 } finally{ 1525 closeTable(table); 1526 } 1527 } 1528 1529 @Override 1530 public long atomicIncrement( 1531 ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount) 1532 throws IOError, IllegalArgument, TException { 1533 byte [][] famAndQf = CellUtil.parseColumn(getBytes(column)); 1534 if(famAndQf.length == 1) { 1535 return atomicIncrement(tableName, row, famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, amount); 1536 } 1537 return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount); 1538 } 1539 1540 protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row, 1541 byte [] family, byte [] qualifier, long amount) 1542 throws IOError, IllegalArgument, TException { 1543 Table table = null; 1544 try { 1545 table = getTable(tableName); 1546 return table.incrementColumnValue( 1547 getBytes(row), family, qualifier, amount); 1548 } catch (IOException e) { 1549 LOG.warn(e.getMessage(), e); 1550 throw getIOError(e); 1551 } finally { 1552 closeTable(table); 1553 } 1554 } 1555 1556 @Override 1557 public void scannerClose(int id) throws IOError, IllegalArgument { 1558 LOG.debug("scannerClose: id={}", id); 1559 ResultScannerWrapper resultScannerWrapper = getScanner(id); 1560 if (resultScannerWrapper == null) { 1561 LOG.warn("scanner ID is invalid"); 1562 throw new IllegalArgument("scanner ID is invalid"); 1563 } 1564 resultScannerWrapper.getScanner().close(); 1565 removeScanner(id); 1566 } 1567 1568 @Override 1569 public List<TRowResult> scannerGetList(int id,int nbRows) 1570 throws IllegalArgument, IOError { 1571 LOG.debug("scannerGetList: id={}", id); 1572 ResultScannerWrapper resultScannerWrapper = getScanner(id); 1573 if (null == resultScannerWrapper) { 1574 String message = "scanner ID is invalid"; 1575 LOG.warn(message); 1576 throw new IllegalArgument("scanner ID is invalid"); 1577 } 1578 1579 Result [] results; 1580 try { 1581 results = resultScannerWrapper.getScanner().next(nbRows); 1582 if (null == results) { 1583 return new ArrayList<>(); 1584 } 1585 } catch (IOException e) { 1586 LOG.warn(e.getMessage(), e); 1587 throw getIOError(e); 1588 } 1589 return ThriftUtilities.rowResultFromHBase(results, resultScannerWrapper.isColumnSorted()); 1590 } 1591 1592 @Override 1593 public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError { 1594 return scannerGetList(id,1); 1595 } 1596 1597 @Override 1598 public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan, 1599 Map<ByteBuffer, ByteBuffer> attributes) 1600 throws IOError { 1601 1602 Table table = null; 1603 try { 1604 table = getTable(tableName); 1605 Scan scan = new Scan(); 1606 addAttributes(scan, attributes); 1607 if (tScan.isSetStartRow()) { 1608 scan.setStartRow(tScan.getStartRow()); 1609 } 1610 if (tScan.isSetStopRow()) { 1611 scan.setStopRow(tScan.getStopRow()); 1612 } 1613 if (tScan.isSetTimestamp()) { 1614 scan.setTimeRange(0, tScan.getTimestamp()); 1615 } 1616 if (tScan.isSetCaching()) { 1617 scan.setCaching(tScan.getCaching()); 1618 } 1619 if (tScan.isSetBatchSize()) { 1620 scan.setBatch(tScan.getBatchSize()); 1621 } 1622 if (tScan.isSetColumns() && !tScan.getColumns().isEmpty()) { 1623 for(ByteBuffer column : tScan.getColumns()) { 1624 byte [][] famQf = CellUtil.parseColumn(getBytes(column)); 1625 if(famQf.length == 1) { 1626 scan.addFamily(famQf[0]); 1627 } else { 1628 scan.addColumn(famQf[0], famQf[1]); 1629 } 1630 } 1631 } 1632 if (tScan.isSetFilterString()) { 1633 ParseFilter parseFilter = new ParseFilter(); 1634 scan.setFilter( 1635 parseFilter.parseFilterString(tScan.getFilterString())); 1636 } 1637 if (tScan.isSetReversed()) { 1638 scan.setReversed(tScan.isReversed()); 1639 } 1640 if (tScan.isSetCacheBlocks()) { 1641 scan.setCacheBlocks(tScan.isCacheBlocks()); 1642 } 1643 return addScanner(table.getScanner(scan), tScan.sortColumns); 1644 } catch (IOException e) { 1645 LOG.warn(e.getMessage(), e); 1646 throw getIOError(e); 1647 } finally{ 1648 closeTable(table); 1649 } 1650 } 1651 1652 @Override 1653 public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow, 1654 List<ByteBuffer> columns, 1655 Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 1656 1657 Table table = null; 1658 try { 1659 table = getTable(tableName); 1660 Scan scan = new Scan(getBytes(startRow)); 1661 addAttributes(scan, attributes); 1662 if(columns != null && !columns.isEmpty()) { 1663 for(ByteBuffer column : columns) { 1664 byte [][] famQf = CellUtil.parseColumn(getBytes(column)); 1665 if(famQf.length == 1) { 1666 scan.addFamily(famQf[0]); 1667 } else { 1668 scan.addColumn(famQf[0], famQf[1]); 1669 } 1670 } 1671 } 1672 return addScanner(table.getScanner(scan), false); 1673 } catch (IOException e) { 1674 LOG.warn(e.getMessage(), e); 1675 throw getIOError(e); 1676 } finally{ 1677 closeTable(table); 1678 } 1679 } 1680 1681 @Override 1682 public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow, 1683 ByteBuffer stopRow, List<ByteBuffer> columns, 1684 Map<ByteBuffer, ByteBuffer> attributes) 1685 throws IOError, TException { 1686 1687 Table table = null; 1688 try { 1689 table = getTable(tableName); 1690 Scan scan = new Scan(getBytes(startRow), getBytes(stopRow)); 1691 addAttributes(scan, attributes); 1692 if(columns != null && !columns.isEmpty()) { 1693 for(ByteBuffer column : columns) { 1694 byte [][] famQf = CellUtil.parseColumn(getBytes(column)); 1695 if(famQf.length == 1) { 1696 scan.addFamily(famQf[0]); 1697 } else { 1698 scan.addColumn(famQf[0], famQf[1]); 1699 } 1700 } 1701 } 1702 return addScanner(table.getScanner(scan), false); 1703 } catch (IOException e) { 1704 LOG.warn(e.getMessage(), e); 1705 throw getIOError(e); 1706 } finally{ 1707 closeTable(table); 1708 } 1709 } 1710 1711 @Override 1712 public int scannerOpenWithPrefix(ByteBuffer tableName, 1713 ByteBuffer startAndPrefix, 1714 List<ByteBuffer> columns, 1715 Map<ByteBuffer, ByteBuffer> attributes) 1716 throws IOError, TException { 1717 1718 Table table = null; 1719 try { 1720 table = getTable(tableName); 1721 Scan scan = new Scan(getBytes(startAndPrefix)); 1722 addAttributes(scan, attributes); 1723 Filter f = new WhileMatchFilter( 1724 new PrefixFilter(getBytes(startAndPrefix))); 1725 scan.setFilter(f); 1726 if (columns != null && !columns.isEmpty()) { 1727 for(ByteBuffer column : columns) { 1728 byte [][] famQf = CellUtil.parseColumn(getBytes(column)); 1729 if(famQf.length == 1) { 1730 scan.addFamily(famQf[0]); 1731 } else { 1732 scan.addColumn(famQf[0], famQf[1]); 1733 } 1734 } 1735 } 1736 return addScanner(table.getScanner(scan), false); 1737 } catch (IOException e) { 1738 LOG.warn(e.getMessage(), e); 1739 throw getIOError(e); 1740 } finally{ 1741 closeTable(table); 1742 } 1743 } 1744 1745 @Override 1746 public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow, 1747 List<ByteBuffer> columns, long timestamp, 1748 Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException { 1749 1750 Table table = null; 1751 try { 1752 table = getTable(tableName); 1753 Scan scan = new Scan(getBytes(startRow)); 1754 addAttributes(scan, attributes); 1755 scan.setTimeRange(0, timestamp); 1756 if (columns != null && !columns.isEmpty()) { 1757 for (ByteBuffer column : columns) { 1758 byte [][] famQf = CellUtil.parseColumn(getBytes(column)); 1759 if(famQf.length == 1) { 1760 scan.addFamily(famQf[0]); 1761 } else { 1762 scan.addColumn(famQf[0], famQf[1]); 1763 } 1764 } 1765 } 1766 return addScanner(table.getScanner(scan), false); 1767 } catch (IOException e) { 1768 LOG.warn(e.getMessage(), e); 1769 throw getIOError(e); 1770 } finally{ 1771 closeTable(table); 1772 } 1773 } 1774 1775 @Override 1776 public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow, 1777 ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp, 1778 Map<ByteBuffer, ByteBuffer> attributes) 1779 throws IOError, TException { 1780 1781 Table table = null; 1782 try { 1783 table = getTable(tableName); 1784 Scan scan = new Scan(getBytes(startRow), getBytes(stopRow)); 1785 addAttributes(scan, attributes); 1786 scan.setTimeRange(0, timestamp); 1787 if (columns != null && !columns.isEmpty()) { 1788 for (ByteBuffer column : columns) { 1789 byte [][] famQf = CellUtil.parseColumn(getBytes(column)); 1790 if(famQf.length == 1) { 1791 scan.addFamily(famQf[0]); 1792 } else { 1793 scan.addColumn(famQf[0], famQf[1]); 1794 } 1795 } 1796 } 1797 scan.setTimeRange(0, timestamp); 1798 return addScanner(table.getScanner(scan), false); 1799 } catch (IOException e) { 1800 LOG.warn(e.getMessage(), e); 1801 throw getIOError(e); 1802 } finally{ 1803 closeTable(table); 1804 } 1805 } 1806 1807 @Override 1808 public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors( 1809 ByteBuffer tableName) throws IOError, TException { 1810 1811 Table table = null; 1812 try { 1813 TreeMap<ByteBuffer, ColumnDescriptor> columns = new TreeMap<>(); 1814 1815 table = getTable(tableName); 1816 HTableDescriptor desc = table.getTableDescriptor(); 1817 1818 for (HColumnDescriptor e : desc.getFamilies()) { 1819 ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e); 1820 columns.put(col.name, col); 1821 } 1822 return columns; 1823 } catch (IOException e) { 1824 LOG.warn(e.getMessage(), e); 1825 throw getIOError(e); 1826 } finally { 1827 closeTable(table); 1828 } 1829 } 1830 1831 private void closeTable(Table table) throws IOError 1832 { 1833 try{ 1834 if(table != null){ 1835 table.close(); 1836 } 1837 } catch (IOException e){ 1838 LOG.error(e.getMessage(), e); 1839 throw getIOError(e); 1840 } 1841 } 1842 1843 @Override 1844 public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError { 1845 try { 1846 byte[] row = getBytes(searchRow); 1847 Result startRowResult = getReverseScanResult(TableName.META_TABLE_NAME.getName(), row, 1848 HConstants.CATALOG_FAMILY); 1849 1850 if (startRowResult == null) { 1851 throw new IOException("Cannot find row in "+ TableName.META_TABLE_NAME+", row=" 1852 + Bytes.toStringBinary(row)); 1853 } 1854 1855 // find region start and end keys 1856 RegionInfo regionInfo = MetaTableAccessor.getRegionInfo(startRowResult); 1857 if (regionInfo == null) { 1858 throw new IOException("RegionInfo REGIONINFO was null or " + 1859 " empty in Meta for row=" 1860 + Bytes.toStringBinary(row)); 1861 } 1862 TRegionInfo region = new TRegionInfo(); 1863 region.setStartKey(regionInfo.getStartKey()); 1864 region.setEndKey(regionInfo.getEndKey()); 1865 region.id = regionInfo.getRegionId(); 1866 region.setName(regionInfo.getRegionName()); 1867 region.version = HREGION_VERSION; // version not used anymore, PB encoding used. 1868 1869 // find region assignment to server 1870 ServerName serverName = MetaTableAccessor.getServerName(startRowResult, 0); 1871 if (serverName != null) { 1872 region.setServerName(Bytes.toBytes(serverName.getHostname())); 1873 region.port = serverName.getPort(); 1874 } 1875 return region; 1876 } catch (IOException e) { 1877 LOG.warn(e.getMessage(), e); 1878 throw getIOError(e); 1879 } 1880 } 1881 1882 private Result getReverseScanResult(byte[] tableName, byte[] row, byte[] family) 1883 throws IOException { 1884 Scan scan = new Scan(row); 1885 scan.setReversed(true); 1886 scan.addFamily(family); 1887 scan.setStartRow(row); 1888 try (Table table = getTable(tableName); 1889 ResultScanner scanner = table.getScanner(scan)) { 1890 return scanner.next(); 1891 } 1892 } 1893 1894 private void initMetrics(ThriftMetrics metrics) { 1895 this.metrics = metrics; 1896 } 1897 1898 @Override 1899 public void increment(TIncrement tincrement) throws IOError, TException { 1900 1901 if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) { 1902 throw new TException("Must supply a table and a row key; can't increment"); 1903 } 1904 1905 if (conf.getBoolean(COALESCE_INC_KEY, false)) { 1906 this.coalescer.queueIncrement(tincrement); 1907 return; 1908 } 1909 1910 Table table = null; 1911 try { 1912 table = getTable(tincrement.getTable()); 1913 Increment inc = ThriftUtilities.incrementFromThrift(tincrement); 1914 table.increment(inc); 1915 } catch (IOException e) { 1916 LOG.warn(e.getMessage(), e); 1917 throw getIOError(e); 1918 } finally{ 1919 closeTable(table); 1920 } 1921 } 1922 1923 @Override 1924 public void incrementRows(List<TIncrement> tincrements) throws IOError, TException { 1925 if (conf.getBoolean(COALESCE_INC_KEY, false)) { 1926 this.coalescer.queueIncrements(tincrements); 1927 return; 1928 } 1929 for (TIncrement tinc : tincrements) { 1930 increment(tinc); 1931 } 1932 } 1933 1934 @Override 1935 public List<TCell> append(TAppend tappend) throws IOError, TException { 1936 if (tappend.getRow().length == 0 || tappend.getTable().length == 0) { 1937 throw new TException("Must supply a table and a row key; can't append"); 1938 } 1939 1940 Table table = null; 1941 try { 1942 table = getTable(tappend.getTable()); 1943 Append append = ThriftUtilities.appendFromThrift(tappend); 1944 Result result = table.append(append); 1945 return ThriftUtilities.cellFromHBase(result.rawCells()); 1946 } catch (IOException e) { 1947 LOG.warn(e.getMessage(), e); 1948 throw getIOError(e); 1949 } finally{ 1950 closeTable(table); 1951 } 1952 } 1953 1954 @Override 1955 public boolean checkAndPut(ByteBuffer tableName, ByteBuffer row, ByteBuffer column, 1956 ByteBuffer value, Mutation mput, Map<ByteBuffer, ByteBuffer> attributes) throws IOError, 1957 IllegalArgument, TException { 1958 Put put; 1959 try { 1960 put = new Put(getBytes(row), HConstants.LATEST_TIMESTAMP); 1961 addAttributes(put, attributes); 1962 1963 byte[][] famAndQf = CellUtil.parseColumn(getBytes(mput.column)); 1964 put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 1965 .setRow(put.getRow()) 1966 .setFamily(famAndQf[0]) 1967 .setQualifier(famAndQf[1]) 1968 .setTimestamp(put.getTimestamp()) 1969 .setType(Type.Put) 1970 .setValue(mput.value != null ? getBytes(mput.value) 1971 : HConstants.EMPTY_BYTE_ARRAY) 1972 .build()); 1973 put.setDurability(mput.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1974 } catch (IOException | IllegalArgumentException e) { 1975 LOG.warn(e.getMessage(), e); 1976 throw new IllegalArgument(Throwables.getStackTraceAsString(e)); 1977 } 1978 1979 Table table = null; 1980 try { 1981 table = getTable(tableName); 1982 byte[][] famAndQf = CellUtil.parseColumn(getBytes(column)); 1983 Table.CheckAndMutateBuilder mutateBuilder = 1984 table.checkAndMutate(getBytes(row), famAndQf[0]).qualifier(famAndQf[1]); 1985 if (value != null) { 1986 return mutateBuilder.ifEquals(getBytes(value)).thenPut(put); 1987 } else { 1988 return mutateBuilder.ifNotExists().thenPut(put); 1989 } 1990 } catch (IOException e) { 1991 LOG.warn(e.getMessage(), e); 1992 throw getIOError(e); 1993 } catch (IllegalArgumentException e) { 1994 LOG.warn(e.getMessage(), e); 1995 throw new IllegalArgument(Throwables.getStackTraceAsString(e)); 1996 } finally { 1997 closeTable(table); 1998 } 1999 } 2000 } 2001 2002 private static IOError getIOError(Throwable throwable) { 2003 IOError error = new IOErrorWithCause(throwable); 2004 error.setMessage(Throwables.getStackTraceAsString(throwable)); 2005 return error; 2006 } 2007 2008 /** 2009 * Adds all the attributes into the Operation object 2010 */ 2011 private static void addAttributes(OperationWithAttributes op, 2012 Map<ByteBuffer, ByteBuffer> attributes) { 2013 if (attributes == null || attributes.isEmpty()) { 2014 return; 2015 } 2016 for (Map.Entry<ByteBuffer, ByteBuffer> entry : attributes.entrySet()) { 2017 String name = Bytes.toStringBinary(getBytes(entry.getKey())); 2018 byte[] value = getBytes(entry.getValue()); 2019 op.setAttribute(name, value); 2020 } 2021 } 2022 2023 public static void registerFilters(Configuration conf) { 2024 String[] filters = conf.getStrings("hbase.thrift.filters"); 2025 Splitter splitter = Splitter.on(':'); 2026 if(filters != null) { 2027 for(String filterClass: filters) { 2028 List<String> filterPart = splitter.splitToList(filterClass); 2029 if(filterPart.size() != 2) { 2030 LOG.warn("Invalid filter specification " + filterClass + " - skipping"); 2031 } else { 2032 ParseFilter.registerFilter(filterPart.get(0), filterPart.get(1)); 2033 } 2034 } 2035 } 2036 } 2037 2038 public static class IOErrorWithCause extends IOError { 2039 private final Throwable cause; 2040 public IOErrorWithCause(Throwable cause) { 2041 this.cause = cause; 2042 } 2043 2044 @Override 2045 public synchronized Throwable getCause() { 2046 return cause; 2047 } 2048 2049 @Override 2050 public boolean equals(Object other) { 2051 if (super.equals(other) && 2052 other instanceof IOErrorWithCause) { 2053 Throwable otherCause = ((IOErrorWithCause) other).getCause(); 2054 if (this.getCause() != null) { 2055 return otherCause != null && this.getCause().equals(otherCause); 2056 } else { 2057 return otherCause == null; 2058 } 2059 } 2060 return false; 2061 } 2062 2063 @Override 2064 public int hashCode() { 2065 int result = super.hashCode(); 2066 result = 31 * result + (cause != null ? cause.hashCode() : 0); 2067 return result; 2068 } 2069 } 2070}