001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.thrift; 020 021import static org.apache.hadoop.hbase.thrift.Constants.BACKLOG_CONF_DEAFULT; 022import static org.apache.hadoop.hbase.thrift.Constants.BACKLOG_CONF_KEY; 023import static org.apache.hadoop.hbase.thrift.Constants.BIND_CONF_KEY; 024import static org.apache.hadoop.hbase.thrift.Constants.BIND_OPTION; 025import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_CONF_DEFAULT; 026import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_CONF_KEY; 027import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_OPTION; 028import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_BIND_ADDR; 029import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_HTTP_MAX_HEADER_SIZE; 030import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_LISTEN_PORT; 031import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_CONF_DEFAULT; 032import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_CONF_KEY; 033import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_OPTION; 034import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MAX_THREADS_KEY; 035import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MAX_THREADS_KEY_DEFAULT; 036import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MIN_THREADS_KEY; 037import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MIN_THREADS_KEY_DEFAULT; 038import static org.apache.hadoop.hbase.thrift.Constants.INFOPORT_OPTION; 039import static org.apache.hadoop.hbase.thrift.Constants.KEEP_ALIVE_SEC_OPTION; 040import static org.apache.hadoop.hbase.thrift.Constants.MAX_FRAME_SIZE_CONF_DEFAULT; 041import static org.apache.hadoop.hbase.thrift.Constants.MAX_FRAME_SIZE_CONF_KEY; 042import static org.apache.hadoop.hbase.thrift.Constants.MAX_QUEUE_SIZE_OPTION; 043import static org.apache.hadoop.hbase.thrift.Constants.MAX_WORKERS_OPTION; 044import static org.apache.hadoop.hbase.thrift.Constants.MIN_WORKERS_OPTION; 045import static org.apache.hadoop.hbase.thrift.Constants.PORT_CONF_KEY; 046import static org.apache.hadoop.hbase.thrift.Constants.PORT_OPTION; 047import static org.apache.hadoop.hbase.thrift.Constants.READ_TIMEOUT_OPTION; 048import static org.apache.hadoop.hbase.thrift.Constants.SELECTOR_NUM_OPTION; 049import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_DNS_INTERFACE_KEY; 050import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_DNS_NAMESERVER_KEY; 051import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_FILTERS; 052import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_HTTP_ALLOW_OPTIONS_METHOD; 053import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_HTTP_ALLOW_OPTIONS_METHOD_DEFAULT; 054import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_BINDING_ADDRESS; 055import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_BINDING_ADDRESS_DEFAULT; 056import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_PORT; 057import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_PORT_DEFAULT; 058import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_KERBEROS_PRINCIPAL_KEY; 059import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_KEYTAB_FILE_KEY; 060import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_QOP_KEY; 061import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SELECTOR_NUM; 062import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT; 063import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY; 064import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SPNEGO_KEYTAB_FILE_KEY; 065import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SPNEGO_PRINCIPAL_KEY; 066import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_ENABLED_KEY; 067import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY; 068import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY; 069import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY; 070import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_INCLUDE_PROTOCOLS_KEY; 071import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY; 072import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_PASSWORD_KEY; 073import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_STORE_KEY; 074import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SUPPORT_PROXYUSER_KEY; 075import static org.apache.hadoop.hbase.thrift.Constants.USE_HTTP_CONF_KEY; 076 077import java.io.IOException; 078import java.net.InetAddress; 079import java.net.InetSocketAddress; 080import java.net.UnknownHostException; 081import java.util.List; 082import java.util.Map; 083import java.util.concurrent.BlockingQueue; 084import java.util.concurrent.ExecutorService; 085import java.util.concurrent.LinkedBlockingQueue; 086import java.util.concurrent.ThreadPoolExecutor; 087import java.util.concurrent.TimeUnit; 088import javax.security.auth.callback.Callback; 089import javax.security.auth.callback.UnsupportedCallbackException; 090import javax.security.sasl.AuthorizeCallback; 091import javax.security.sasl.SaslServer; 092import org.apache.commons.lang3.ArrayUtils; 093import org.apache.hadoop.conf.Configuration; 094import org.apache.hadoop.conf.Configured; 095import org.apache.hadoop.hbase.HBaseConfiguration; 096import org.apache.hadoop.hbase.HBaseInterfaceAudience; 097import org.apache.hadoop.hbase.filter.ParseFilter; 098import org.apache.hadoop.hbase.http.HttpServerUtil; 099import org.apache.hadoop.hbase.http.InfoServer; 100import org.apache.hadoop.hbase.security.SaslUtil; 101import org.apache.hadoop.hbase.security.SecurityUtil; 102import org.apache.hadoop.hbase.security.UserProvider; 103import org.apache.hadoop.hbase.thrift.generated.Hbase; 104import org.apache.hadoop.hbase.util.DNS; 105import org.apache.hadoop.hbase.util.JvmPauseMonitor; 106import org.apache.hadoop.hbase.util.Strings; 107import org.apache.hadoop.hbase.util.VersionInfo; 108import org.apache.hadoop.security.SaslRpcServer; 109import org.apache.hadoop.security.UserGroupInformation; 110import org.apache.hadoop.security.authorize.ProxyUsers; 111import org.apache.hadoop.util.Shell.ExitCodeException; 112import org.apache.hadoop.util.Tool; 113import org.apache.hadoop.util.ToolRunner; 114import org.apache.thrift.TProcessor; 115import org.apache.thrift.protocol.TBinaryProtocol; 116import org.apache.thrift.protocol.TCompactProtocol; 117import org.apache.thrift.protocol.TProtocolFactory; 118import org.apache.thrift.server.THsHaServer; 119import org.apache.thrift.server.TNonblockingServer; 120import org.apache.thrift.server.TServer; 121import org.apache.thrift.server.TServlet; 122import org.apache.thrift.server.TThreadedSelectorServer; 123import org.apache.thrift.transport.TFramedTransport; 124import org.apache.thrift.transport.TNonblockingServerSocket; 125import org.apache.thrift.transport.TNonblockingServerTransport; 126import org.apache.thrift.transport.TSaslServerTransport; 127import org.apache.thrift.transport.TServerSocket; 128import org.apache.thrift.transport.TServerTransport; 129import org.apache.thrift.transport.TTransportFactory; 130import org.apache.yetus.audience.InterfaceAudience; 131import org.slf4j.Logger; 132import org.slf4j.LoggerFactory; 133 134import org.apache.hbase.thirdparty.com.google.common.base.Joiner; 135import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 136import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 137import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 138import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser; 139import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser; 140import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; 141import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; 142import org.apache.hbase.thirdparty.org.eclipse.jetty.http.HttpVersion; 143import org.apache.hbase.thirdparty.org.eclipse.jetty.server.HttpConfiguration; 144import org.apache.hbase.thirdparty.org.eclipse.jetty.server.HttpConnectionFactory; 145import org.apache.hbase.thirdparty.org.eclipse.jetty.server.SecureRequestCustomizer; 146import org.apache.hbase.thirdparty.org.eclipse.jetty.server.Server; 147import org.apache.hbase.thirdparty.org.eclipse.jetty.server.ServerConnector; 148import org.apache.hbase.thirdparty.org.eclipse.jetty.server.SslConnectionFactory; 149import org.apache.hbase.thirdparty.org.eclipse.jetty.servlet.ServletContextHandler; 150import org.apache.hbase.thirdparty.org.eclipse.jetty.servlet.ServletHolder; 151import org.apache.hbase.thirdparty.org.eclipse.jetty.util.ssl.SslContextFactory; 152import org.apache.hbase.thirdparty.org.eclipse.jetty.util.thread.QueuedThreadPool; 153 154/** 155 * ThriftServer- this class starts up a Thrift server which implements the 156 * Hbase API specified in the Hbase.thrift IDL file. The server runs in an 157 * independent process. 158 */ 159@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 160public class ThriftServer extends Configured implements Tool { 161 162 private static final Logger LOG = LoggerFactory.getLogger(ThriftServer.class); 163 164 165 166 protected Configuration conf; 167 168 protected InfoServer infoServer; 169 170 protected TProcessor processor; 171 172 protected ThriftMetrics metrics; 173 protected HBaseServiceHandler hbaseServiceHandler; 174 protected UserGroupInformation serviceUGI; 175 protected UserGroupInformation httpUGI; 176 protected boolean httpEnabled; 177 178 protected SaslUtil.QualityOfProtection qop; 179 protected String host; 180 protected int listenPort; 181 182 183 protected boolean securityEnabled; 184 protected boolean doAsEnabled; 185 186 protected JvmPauseMonitor pauseMonitor; 187 188 protected volatile TServer tserver; 189 protected volatile Server httpServer; 190 191 192 // 193 // Main program and support routines 194 // 195 196 public ThriftServer(Configuration conf) { 197 this.conf = HBaseConfiguration.create(conf); 198 } 199 200 protected ThriftMetrics createThriftMetrics(Configuration conf) { 201 return new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE); 202 } 203 204 protected void setupParamters() throws IOException { 205 // login the server principal (if using secure Hadoop) 206 UserProvider userProvider = UserProvider.instantiate(conf); 207 securityEnabled = userProvider.isHadoopSecurityEnabled() 208 && userProvider.isHBaseSecurityEnabled(); 209 if (securityEnabled) { 210 host = Strings.domainNamePointerToHostName(DNS.getDefaultHost( 211 conf.get(THRIFT_DNS_INTERFACE_KEY, "default"), 212 conf.get(THRIFT_DNS_NAMESERVER_KEY, "default"))); 213 userProvider.login(THRIFT_KEYTAB_FILE_KEY, THRIFT_KERBEROS_PRINCIPAL_KEY, host); 214 215 // Setup the SPNEGO user for HTTP if configured 216 String spnegoPrincipal = getSpengoPrincipal(conf, host); 217 String spnegoKeytab = getSpnegoKeytab(conf); 218 UserGroupInformation.setConfiguration(conf); 219 // login the SPNEGO principal using UGI to avoid polluting the login user 220 this.httpUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(spnegoPrincipal, 221 spnegoKeytab); 222 } 223 this.serviceUGI = userProvider.getCurrent().getUGI(); 224 if (httpUGI == null) { 225 this.httpUGI = serviceUGI; 226 } 227 228 this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT); 229 this.metrics = createThriftMetrics(conf); 230 this.pauseMonitor = new JvmPauseMonitor(conf, this.metrics.getSource()); 231 this.hbaseServiceHandler = createHandler(conf, userProvider); 232 this.hbaseServiceHandler.initMetrics(metrics); 233 this.processor = createProcessor(); 234 235 httpEnabled = conf.getBoolean(USE_HTTP_CONF_KEY, false); 236 doAsEnabled = conf.getBoolean(THRIFT_SUPPORT_PROXYUSER_KEY, false); 237 if (doAsEnabled && !httpEnabled) { 238 LOG.warn("Fail to enable the doAs feature. " + USE_HTTP_CONF_KEY + " is not configured"); 239 } 240 241 String strQop = conf.get(THRIFT_QOP_KEY); 242 if (strQop != null) { 243 this.qop = SaslUtil.getQop(strQop); 244 } 245 if (qop != null) { 246 if (qop != SaslUtil.QualityOfProtection.AUTHENTICATION && 247 qop != SaslUtil.QualityOfProtection.INTEGRITY && 248 qop != SaslUtil.QualityOfProtection.PRIVACY) { 249 throw new IOException(String.format("Invalid %s: It must be one of %s, %s, or %s.", 250 THRIFT_QOP_KEY, 251 SaslUtil.QualityOfProtection.AUTHENTICATION.name(), 252 SaslUtil.QualityOfProtection.INTEGRITY.name(), 253 SaslUtil.QualityOfProtection.PRIVACY.name())); 254 } 255 checkHttpSecurity(qop, conf); 256 if (!securityEnabled) { 257 throw new IOException("Thrift server must run in secure mode to support authentication"); 258 } 259 } 260 registerFilters(conf); 261 pauseMonitor.start(); 262 } 263 264 private String getSpengoPrincipal(Configuration conf, String host) throws IOException { 265 String principal = conf.get(THRIFT_SPNEGO_PRINCIPAL_KEY); 266 if (principal == null) { 267 // We cannot use the Hadoop configuration deprecation handling here since 268 // the THRIFT_KERBEROS_PRINCIPAL_KEY config is still valid for regular Kerberos 269 // communication. The preference should be to use the THRIFT_SPNEGO_PRINCIPAL_KEY 270 // config so that THRIFT_KERBEROS_PRINCIPAL_KEY doesn't control both backend 271 // Kerberos principal and SPNEGO principal. 272 LOG.info("Using deprecated {} config for SPNEGO principal. Use {} instead.", 273 THRIFT_KERBEROS_PRINCIPAL_KEY, THRIFT_SPNEGO_PRINCIPAL_KEY); 274 principal = conf.get(THRIFT_KERBEROS_PRINCIPAL_KEY); 275 } 276 // Handle _HOST in principal value 277 return org.apache.hadoop.security.SecurityUtil.getServerPrincipal(principal, host); 278 } 279 280 private String getSpnegoKeytab(Configuration conf) { 281 String keytab = conf.get(THRIFT_SPNEGO_KEYTAB_FILE_KEY); 282 if (keytab == null) { 283 // We cannot use the Hadoop configuration deprecation handling here since 284 // the THRIFT_KEYTAB_FILE_KEY config is still valid for regular Kerberos 285 // communication. The preference should be to use the THRIFT_SPNEGO_KEYTAB_FILE_KEY 286 // config so that THRIFT_KEYTAB_FILE_KEY doesn't control both backend 287 // Kerberos keytab and SPNEGO keytab. 288 LOG.info("Using deprecated {} config for SPNEGO keytab. Use {} instead.", 289 THRIFT_KEYTAB_FILE_KEY, THRIFT_SPNEGO_KEYTAB_FILE_KEY); 290 keytab = conf.get(THRIFT_KEYTAB_FILE_KEY); 291 } 292 return keytab; 293 } 294 295 protected void startInfoServer() throws IOException { 296 // Put up info server. 297 int port = conf.getInt(THRIFT_INFO_SERVER_PORT , THRIFT_INFO_SERVER_PORT_DEFAULT); 298 299 if (port >= 0) { 300 conf.setLong("startcode", System.currentTimeMillis()); 301 String a = conf 302 .get(THRIFT_INFO_SERVER_BINDING_ADDRESS, THRIFT_INFO_SERVER_BINDING_ADDRESS_DEFAULT); 303 infoServer = new InfoServer("thrift", a, port, false, conf); 304 infoServer.setAttribute("hbase.conf", conf); 305 infoServer.setAttribute("hbase.thrift.server.type", metrics.getThriftServerType().name()); 306 infoServer.start(); 307 } 308 } 309 310 protected void checkHttpSecurity(SaslUtil.QualityOfProtection qop, Configuration conf) { 311 if (qop == SaslUtil.QualityOfProtection.PRIVACY && 312 conf.getBoolean(USE_HTTP_CONF_KEY, false) && 313 !conf.getBoolean(THRIFT_SSL_ENABLED_KEY, false)) { 314 throw new IllegalArgumentException("Thrift HTTP Server's QoP is privacy, but " + 315 THRIFT_SSL_ENABLED_KEY + " is false"); 316 } 317 } 318 319 protected HBaseServiceHandler createHandler(Configuration conf, UserProvider userProvider) 320 throws IOException { 321 return new ThriftHBaseServiceHandler(conf, userProvider); 322 } 323 324 protected TProcessor createProcessor() { 325 return new Hbase.Processor<>( 326 HbaseHandlerMetricsProxy.newInstance((Hbase.Iface) hbaseServiceHandler, metrics, conf)); 327 } 328 329 /** 330 * the thrift server, not null means the server is started, for test only 331 * @return the tServer 332 */ 333 @InterfaceAudience.Private 334 public TServer getTserver() { 335 return tserver; 336 } 337 338 /** 339 * the Jetty server, not null means the HTTP server is started, for test only 340 * @return the http server 341 */ 342 @InterfaceAudience.Private 343 public Server getHttpServer() { 344 return httpServer; 345 } 346 347 protected void printUsageAndExit(Options options, int exitCode) 348 throws ExitCodeException { 349 HelpFormatter formatter = new HelpFormatter(); 350 formatter.printHelp("Thrift", null, options, 351 "To start the Thrift server run 'hbase-daemon.sh start thrift' or " + 352 "'hbase thrift'\n" + 353 "To shutdown the thrift server run 'hbase-daemon.sh stop " + 354 "thrift' or send a kill signal to the thrift server pid", 355 true); 356 throw new ExitCodeException(exitCode, ""); 357 } 358 359 /** 360 * Create a Servlet for the http server 361 * @param protocolFactory protocolFactory 362 * @return the servlet 363 */ 364 protected TServlet createTServlet(TProtocolFactory protocolFactory) { 365 return new ThriftHttpServlet(processor, protocolFactory, serviceUGI, httpUGI, 366 hbaseServiceHandler, securityEnabled, doAsEnabled); 367 } 368 369 /** 370 * Setup an HTTP Server using Jetty to serve calls from THttpClient 371 * 372 * @throws IOException IOException 373 */ 374 protected void setupHTTPServer() throws IOException { 375 TProtocolFactory protocolFactory = new TBinaryProtocol.Factory(); 376 TServlet thriftHttpServlet = createTServlet(protocolFactory); 377 378 // Set the default max thread number to 100 to limit 379 // the number of concurrent requests so that Thrfit HTTP server doesn't OOM easily. 380 // Jetty set the default max thread number to 250, if we don't set it. 381 // 382 // Our default min thread number 2 is the same as that used by Jetty. 383 int minThreads = conf.getInt(HTTP_MIN_THREADS_KEY, 384 conf.getInt(TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY, 385 HTTP_MIN_THREADS_KEY_DEFAULT)); 386 int maxThreads = conf.getInt(HTTP_MAX_THREADS_KEY, 387 conf.getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY, 388 HTTP_MAX_THREADS_KEY_DEFAULT)); 389 QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads); 390 threadPool.setMinThreads(minThreads); 391 httpServer = new Server(threadPool); 392 393 // Context handler 394 ServletContextHandler ctxHandler = new ServletContextHandler(httpServer, "/", 395 ServletContextHandler.SESSIONS); 396 ctxHandler.addServlet(new ServletHolder(thriftHttpServlet), "/*"); 397 HttpServerUtil.constrainHttpMethods(ctxHandler, 398 conf.getBoolean(THRIFT_HTTP_ALLOW_OPTIONS_METHOD, 399 THRIFT_HTTP_ALLOW_OPTIONS_METHOD_DEFAULT)); 400 401 // set up Jetty and run the embedded server 402 HttpConfiguration httpConfig = new HttpConfiguration(); 403 httpConfig.setSecureScheme("https"); 404 httpConfig.setSecurePort(listenPort); 405 httpConfig.setHeaderCacheSize(DEFAULT_HTTP_MAX_HEADER_SIZE); 406 httpConfig.setRequestHeaderSize(DEFAULT_HTTP_MAX_HEADER_SIZE); 407 httpConfig.setResponseHeaderSize(DEFAULT_HTTP_MAX_HEADER_SIZE); 408 httpConfig.setSendServerVersion(false); 409 httpConfig.setSendDateHeader(false); 410 411 ServerConnector serverConnector; 412 if(conf.getBoolean(THRIFT_SSL_ENABLED_KEY, false)) { 413 HttpConfiguration httpsConfig = new HttpConfiguration(httpConfig); 414 httpsConfig.addCustomizer(new SecureRequestCustomizer()); 415 416 SslContextFactory sslCtxFactory = new SslContextFactory(); 417 String keystore = conf.get(THRIFT_SSL_KEYSTORE_STORE_KEY); 418 String password = HBaseConfiguration.getPassword(conf, 419 THRIFT_SSL_KEYSTORE_PASSWORD_KEY, null); 420 String keyPassword = HBaseConfiguration.getPassword(conf, 421 THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY, password); 422 sslCtxFactory.setKeyStorePath(keystore); 423 sslCtxFactory.setKeyStorePassword(password); 424 sslCtxFactory.setKeyManagerPassword(keyPassword); 425 426 String[] excludeCiphers = conf.getStrings( 427 THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY, ArrayUtils.EMPTY_STRING_ARRAY); 428 if (excludeCiphers.length != 0) { 429 sslCtxFactory.setExcludeCipherSuites(excludeCiphers); 430 } 431 String[] includeCiphers = conf.getStrings( 432 THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY, ArrayUtils.EMPTY_STRING_ARRAY); 433 if (includeCiphers.length != 0) { 434 sslCtxFactory.setIncludeCipherSuites(includeCiphers); 435 } 436 437 // Disable SSLv3 by default due to "Poodle" Vulnerability - CVE-2014-3566 438 String[] excludeProtocols = conf.getStrings( 439 THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY, "SSLv3"); 440 if (excludeProtocols.length != 0) { 441 sslCtxFactory.setExcludeProtocols(excludeProtocols); 442 } 443 String[] includeProtocols = conf.getStrings( 444 THRIFT_SSL_INCLUDE_PROTOCOLS_KEY, ArrayUtils.EMPTY_STRING_ARRAY); 445 if (includeProtocols.length != 0) { 446 sslCtxFactory.setIncludeProtocols(includeProtocols); 447 } 448 449 serverConnector = new ServerConnector(httpServer, 450 new SslConnectionFactory(sslCtxFactory, HttpVersion.HTTP_1_1.toString()), 451 new HttpConnectionFactory(httpsConfig)); 452 } else { 453 serverConnector = new ServerConnector(httpServer, new HttpConnectionFactory(httpConfig)); 454 } 455 serverConnector.setPort(listenPort); 456 serverConnector.setHost(getBindAddress(conf).getHostAddress()); 457 httpServer.addConnector(serverConnector); 458 httpServer.setStopAtShutdown(true); 459 460 if (doAsEnabled) { 461 ProxyUsers.refreshSuperUserGroupsConfiguration(conf); 462 } 463 LOG.info("Starting Thrift HTTP Server on {}", Integer.toString(listenPort)); 464 } 465 466 /** 467 * Setting up the thrift TServer 468 */ 469 protected void setupServer() throws Exception { 470 // Construct correct ProtocolFactory 471 TProtocolFactory protocolFactory = getProtocolFactory(); 472 473 ImplType implType = ImplType.getServerImpl(conf); 474 TProcessor processorToUse = processor; 475 476 // Construct correct TransportFactory 477 TTransportFactory transportFactory; 478 if (conf.getBoolean(FRAMED_CONF_KEY, FRAMED_CONF_DEFAULT) || implType.isAlwaysFramed) { 479 if (qop != null) { 480 throw new RuntimeException("Thrift server authentication" 481 + " doesn't work with framed transport yet"); 482 } 483 transportFactory = new TFramedTransport.Factory( 484 conf.getInt(MAX_FRAME_SIZE_CONF_KEY, MAX_FRAME_SIZE_CONF_DEFAULT) * 1024 * 1024); 485 LOG.debug("Using framed transport"); 486 } else if (qop == null) { 487 transportFactory = new TTransportFactory(); 488 } else { 489 // Extract the name from the principal 490 String thriftKerberosPrincipal = conf.get(THRIFT_KERBEROS_PRINCIPAL_KEY); 491 if (thriftKerberosPrincipal == null) { 492 throw new IllegalArgumentException(THRIFT_KERBEROS_PRINCIPAL_KEY + " cannot be null"); 493 } 494 String name = SecurityUtil.getUserFromPrincipal(thriftKerberosPrincipal); 495 Map<String, String> saslProperties = SaslUtil.initSaslProperties(qop.name()); 496 TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory(); 497 saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties, 498 new SaslRpcServer.SaslGssCallbackHandler() { 499 @Override 500 public void handle(Callback[] callbacks) 501 throws UnsupportedCallbackException { 502 AuthorizeCallback ac = null; 503 for (Callback callback : callbacks) { 504 if (callback instanceof AuthorizeCallback) { 505 ac = (AuthorizeCallback) callback; 506 } else { 507 throw new UnsupportedCallbackException(callback, 508 "Unrecognized SASL GSSAPI Callback"); 509 } 510 } 511 if (ac != null) { 512 String authid = ac.getAuthenticationID(); 513 String authzid = ac.getAuthorizationID(); 514 if (!authid.equals(authzid)) { 515 ac.setAuthorized(false); 516 } else { 517 ac.setAuthorized(true); 518 String userName = SecurityUtil.getUserFromPrincipal(authzid); 519 LOG.info("Effective user: {}", userName); 520 ac.setAuthorizedID(userName); 521 } 522 } 523 } 524 }); 525 transportFactory = saslFactory; 526 527 // Create a processor wrapper, to get the caller 528 processorToUse = (inProt, outProt) -> { 529 TSaslServerTransport saslServerTransport = 530 (TSaslServerTransport)inProt.getTransport(); 531 SaslServer saslServer = saslServerTransport.getSaslServer(); 532 String principal = saslServer.getAuthorizationID(); 533 hbaseServiceHandler.setEffectiveUser(principal); 534 processor.process(inProt, outProt); 535 }; 536 } 537 538 if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) { 539 LOG.error("Server types {} don't support IP address binding at the moment. See " + 540 "https://issues.apache.org/jira/browse/HBASE-2155 for details.", 541 Joiner.on(", ").join(ImplType.serversThatCannotSpecifyBindIP())); 542 throw new RuntimeException("-" + BIND_CONF_KEY + " not supported with " + implType); 543 } 544 545 InetSocketAddress inetSocketAddress = new InetSocketAddress(getBindAddress(conf), listenPort); 546 if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING || 547 implType == ImplType.THREADED_SELECTOR) { 548 TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress); 549 if (implType == ImplType.NONBLOCKING) { 550 tserver = getTNonBlockingServer(serverTransport, protocolFactory, processorToUse, 551 transportFactory, inetSocketAddress); 552 } else if (implType == ImplType.HS_HA) { 553 tserver = getTHsHaServer(serverTransport, protocolFactory, processorToUse, transportFactory, 554 inetSocketAddress); 555 } else { // THREADED_SELECTOR 556 tserver = getTThreadedSelectorServer(serverTransport, protocolFactory, processorToUse, 557 transportFactory, inetSocketAddress); 558 } 559 LOG.info("starting HBase {} server on {}", implType.simpleClassName(), 560 Integer.toString(listenPort)); 561 } else if (implType == ImplType.THREAD_POOL) { 562 this.tserver = getTThreadPoolServer(protocolFactory, processorToUse, transportFactory, 563 inetSocketAddress); 564 } else { 565 throw new AssertionError("Unsupported Thrift server implementation: " + 566 implType.simpleClassName()); 567 } 568 569 // A sanity check that we instantiated the right type of server. 570 if (tserver.getClass() != implType.serverClass) { 571 throw new AssertionError("Expected to create Thrift server class " + 572 implType.serverClass.getName() + " but got " + 573 tserver.getClass().getName()); 574 } 575 } 576 577 protected TServer getTNonBlockingServer(TNonblockingServerTransport serverTransport, 578 TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory, 579 InetSocketAddress inetSocketAddress) { 580 LOG.info("starting HBase Nonblocking Thrift server on " + inetSocketAddress.toString()); 581 TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport); 582 serverArgs.processor(processor); 583 serverArgs.transportFactory(transportFactory); 584 serverArgs.protocolFactory(protocolFactory); 585 return new TNonblockingServer(serverArgs); 586 } 587 588 protected TServer getTHsHaServer(TNonblockingServerTransport serverTransport, 589 TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory, 590 InetSocketAddress inetSocketAddress) { 591 LOG.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString()); 592 THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport); 593 int queueSize = conf.getInt(TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY, 594 TBoundedThreadPoolServer.DEFAULT_MAX_QUEUED_REQUESTS); 595 CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(queueSize), metrics); 596 int workerThread = conf.getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY, 597 serverArgs.getMaxWorkerThreads()); 598 ExecutorService executorService = createExecutor( 599 callQueue, workerThread, workerThread); 600 serverArgs.executorService(executorService).processor(processor) 601 .transportFactory(transportFactory).protocolFactory(protocolFactory); 602 return new THsHaServer(serverArgs); 603 } 604 605 protected TServer getTThreadedSelectorServer(TNonblockingServerTransport serverTransport, 606 TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory, 607 InetSocketAddress inetSocketAddress) { 608 LOG.info("starting HBase ThreadedSelector Thrift server on " + inetSocketAddress.toString()); 609 TThreadedSelectorServer.Args serverArgs = 610 new HThreadedSelectorServerArgs(serverTransport, conf); 611 int queueSize = conf.getInt(TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY, 612 TBoundedThreadPoolServer.DEFAULT_MAX_QUEUED_REQUESTS); 613 CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(queueSize), metrics); 614 int workerThreads = conf.getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY, 615 serverArgs.getWorkerThreads()); 616 int selectorThreads = conf.getInt(THRIFT_SELECTOR_NUM, serverArgs.getSelectorThreads()); 617 serverArgs.selectorThreads(selectorThreads); 618 ExecutorService executorService = createExecutor( 619 callQueue, workerThreads, workerThreads); 620 serverArgs.executorService(executorService).processor(processor) 621 .transportFactory(transportFactory).protocolFactory(protocolFactory); 622 return new TThreadedSelectorServer(serverArgs); 623 } 624 625 protected TServer getTThreadPoolServer(TProtocolFactory protocolFactory, TProcessor processor, 626 TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws Exception { 627 LOG.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString()); 628 // Thrift's implementation uses '0' as a placeholder for 'use the default.' 629 int backlog = conf.getInt(BACKLOG_CONF_KEY, BACKLOG_CONF_DEAFULT); 630 int readTimeout = conf.getInt(THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY, 631 THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT); 632 TServerTransport serverTransport = new TServerSocket( 633 new TServerSocket.ServerSocketTransportArgs(). 634 bindAddr(inetSocketAddress).backlog(backlog). 635 clientTimeout(readTimeout)); 636 637 TBoundedThreadPoolServer.Args serverArgs = 638 new TBoundedThreadPoolServer.Args(serverTransport, conf); 639 serverArgs.processor(processor).transportFactory(transportFactory) 640 .protocolFactory(protocolFactory); 641 return new TBoundedThreadPoolServer(serverArgs, metrics); 642 } 643 644 protected TProtocolFactory getProtocolFactory() { 645 TProtocolFactory protocolFactory; 646 647 if (conf.getBoolean(COMPACT_CONF_KEY, COMPACT_CONF_DEFAULT)) { 648 LOG.debug("Using compact protocol"); 649 protocolFactory = new TCompactProtocol.Factory(); 650 } else { 651 LOG.debug("Using binary protocol"); 652 protocolFactory = new TBinaryProtocol.Factory(); 653 } 654 655 return protocolFactory; 656 } 657 658 protected ExecutorService createExecutor(BlockingQueue<Runnable> callQueue, 659 int minWorkers, int maxWorkers) { 660 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); 661 tfb.setDaemon(true); 662 tfb.setNameFormat("thrift-worker-%d"); 663 ThreadPoolExecutor threadPool = new THBaseThreadPoolExecutor(minWorkers, maxWorkers, 664 Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build(), metrics); 665 threadPool.allowCoreThreadTimeOut(true); 666 return threadPool; 667 } 668 669 protected InetAddress getBindAddress(Configuration conf) 670 throws UnknownHostException { 671 String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR); 672 return InetAddress.getByName(bindAddressStr); 673 } 674 675 676 public static void registerFilters(Configuration conf) { 677 String[] filters = conf.getStrings(THRIFT_FILTERS); 678 Splitter splitter = Splitter.on(':'); 679 if(filters != null) { 680 for(String filterClass: filters) { 681 List<String> filterPart = splitter.splitToList(filterClass); 682 if(filterPart.size() != 2) { 683 LOG.warn("Invalid filter specification " + filterClass + " - skipping"); 684 } else { 685 ParseFilter.registerFilter(filterPart.get(0), filterPart.get(1)); 686 } 687 } 688 } 689 } 690 691 /** 692 * Add options to command lines 693 * @param options options 694 */ 695 protected void addOptions(Options options) { 696 options.addOption("b", BIND_OPTION, true, "Address to bind " + 697 "the Thrift server to. [default: " + DEFAULT_BIND_ADDR + "]"); 698 options.addOption("p", PORT_OPTION, true, "Port to bind to [default: " + 699 DEFAULT_LISTEN_PORT + "]"); 700 options.addOption("f", FRAMED_OPTION, false, "Use framed transport"); 701 options.addOption("c", COMPACT_OPTION, false, "Use the compact protocol"); 702 options.addOption("h", "help", false, "Print help information"); 703 options.addOption("s", SELECTOR_NUM_OPTION, true, "How many selector threads to use."); 704 options.addOption(null, INFOPORT_OPTION, true, "Port for web UI"); 705 706 options.addOption("m", MIN_WORKERS_OPTION, true, 707 "The minimum number of worker threads for " + 708 ImplType.THREAD_POOL.simpleClassName()); 709 710 options.addOption("w", MAX_WORKERS_OPTION, true, 711 "The maximum number of worker threads for " + 712 ImplType.THREAD_POOL.simpleClassName()); 713 714 options.addOption("q", MAX_QUEUE_SIZE_OPTION, true, 715 "The maximum number of queued requests in " + 716 ImplType.THREAD_POOL.simpleClassName()); 717 718 options.addOption("k", KEEP_ALIVE_SEC_OPTION, true, 719 "The amount of time in secods to keep a thread alive when idle in " + 720 ImplType.THREAD_POOL.simpleClassName()); 721 722 options.addOption("t", READ_TIMEOUT_OPTION, true, 723 "Amount of time in milliseconds before a server thread will timeout " + 724 "waiting for client to send data on a connected socket. Currently, " + 725 "only applies to TBoundedThreadPoolServer"); 726 727 options.addOptionGroup(ImplType.createOptionGroup()); 728 } 729 730 protected void parseCommandLine(CommandLine cmd, Options options) throws ExitCodeException { 731 // Get port to bind to 732 try { 733 if (cmd.hasOption(PORT_OPTION)) { 734 int listenPort = Integer.parseInt(cmd.getOptionValue(PORT_OPTION)); 735 conf.setInt(PORT_CONF_KEY, listenPort); 736 } 737 } catch (NumberFormatException e) { 738 LOG.error("Could not parse the value provided for the port option", e); 739 printUsageAndExit(options, -1); 740 } 741 // check for user-defined info server port setting, if so override the conf 742 try { 743 if (cmd.hasOption(INFOPORT_OPTION)) { 744 String val = cmd.getOptionValue(INFOPORT_OPTION); 745 conf.setInt(THRIFT_INFO_SERVER_PORT, Integer.parseInt(val)); 746 LOG.debug("Web UI port set to " + val); 747 } 748 } catch (NumberFormatException e) { 749 LOG.error("Could not parse the value provided for the " + INFOPORT_OPTION + 750 " option", e); 751 printUsageAndExit(options, -1); 752 } 753 // Make optional changes to the configuration based on command-line options 754 optionToConf(cmd, MIN_WORKERS_OPTION, 755 conf, TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY); 756 optionToConf(cmd, MAX_WORKERS_OPTION, 757 conf, TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY); 758 optionToConf(cmd, MAX_QUEUE_SIZE_OPTION, 759 conf, TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY); 760 optionToConf(cmd, KEEP_ALIVE_SEC_OPTION, 761 conf, TBoundedThreadPoolServer.THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY); 762 optionToConf(cmd, READ_TIMEOUT_OPTION, conf, THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY); 763 optionToConf(cmd, SELECTOR_NUM_OPTION, conf, THRIFT_SELECTOR_NUM); 764 765 // Set general thrift server options 766 boolean compact = cmd.hasOption(COMPACT_OPTION) || 767 conf.getBoolean(COMPACT_CONF_KEY, false); 768 conf.setBoolean(COMPACT_CONF_KEY, compact); 769 boolean framed = cmd.hasOption(FRAMED_OPTION) || 770 conf.getBoolean(FRAMED_CONF_KEY, false); 771 conf.setBoolean(FRAMED_CONF_KEY, framed); 772 773 optionToConf(cmd, BIND_OPTION, conf, BIND_CONF_KEY); 774 775 776 ImplType.setServerImpl(cmd, conf); 777 } 778 779 /** 780 * Parse the command line options to set parameters the conf. 781 */ 782 protected void processOptions(final String[] args) throws Exception { 783 if (args == null || args.length == 0) { 784 return; 785 } 786 Options options = new Options(); 787 addOptions(options); 788 789 CommandLineParser parser = new DefaultParser(); 790 CommandLine cmd = parser.parse(options, args); 791 792 if (cmd.hasOption("help")) { 793 printUsageAndExit(options, 1); 794 } 795 parseCommandLine(cmd, options); 796 } 797 798 public void stop() { 799 if (this.infoServer != null) { 800 LOG.info("Stopping infoServer"); 801 try { 802 this.infoServer.stop(); 803 } catch (Exception ex) { 804 LOG.error("Failed to stop infoServer", ex); 805 } 806 } 807 if (pauseMonitor != null) { 808 pauseMonitor.stop(); 809 } 810 if (tserver != null) { 811 tserver.stop(); 812 tserver = null; 813 } 814 if (httpServer != null) { 815 try { 816 httpServer.stop(); 817 httpServer = null; 818 } catch (Exception e) { 819 LOG.error("Problem encountered in shutting down HTTP server", e); 820 } 821 httpServer = null; 822 } 823 } 824 825 protected static void optionToConf(CommandLine cmd, String option, 826 Configuration conf, String destConfKey) { 827 if (cmd.hasOption(option)) { 828 String value = cmd.getOptionValue(option); 829 LOG.info("Set configuration key:" + destConfKey + " value:" + value); 830 conf.set(destConfKey, value); 831 } 832 } 833 834 /** 835 * Run without any command line arguments 836 * @return exit code 837 * @throws Exception exception 838 */ 839 public int run() throws Exception { 840 return run(null); 841 } 842 843 @Override 844 public int run(String[] strings) throws Exception { 845 processOptions(strings); 846 setupParamters(); 847 startInfoServer(); 848 if (httpEnabled) { 849 setupHTTPServer(); 850 httpServer.start(); 851 httpServer.join(); 852 } else { 853 setupServer(); 854 tserver.serve(); 855 } 856 return 0; 857 } 858 859 public static void main(String [] args) throws Exception { 860 LOG.info("***** STARTING service '" + ThriftServer.class.getSimpleName() + "' *****"); 861 VersionInfo.logVersion(); 862 final Configuration conf = HBaseConfiguration.create(); 863 // for now, only time we return is on an argument error. 864 final int status = ToolRunner.run(conf, new ThriftServer(conf), args); 865 LOG.info("***** STOPPING service '" + ThriftServer.class.getSimpleName() + "' *****"); 866 System.exit(status); 867 } 868}