001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.thrift; 019 020import static org.apache.hadoop.hbase.http.HttpServerUtil.PATH_SPEC_ANY; 021import static org.apache.hadoop.hbase.thrift.Constants.BACKLOG_CONF_DEAFULT; 022import static org.apache.hadoop.hbase.thrift.Constants.BACKLOG_CONF_KEY; 023import static org.apache.hadoop.hbase.thrift.Constants.BIND_CONF_KEY; 024import static org.apache.hadoop.hbase.thrift.Constants.BIND_OPTION; 025import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_CONF_DEFAULT; 026import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_CONF_KEY; 027import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_OPTION; 028import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_BIND_ADDR; 029import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_HTTP_MAX_HEADER_SIZE; 030import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_LISTEN_PORT; 031import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_CONF_DEFAULT; 032import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_CONF_KEY; 033import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_OPTION; 034import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MAX_THREADS_KEY; 035import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MAX_THREADS_KEY_DEFAULT; 036import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MIN_THREADS_KEY; 037import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MIN_THREADS_KEY_DEFAULT; 038import static org.apache.hadoop.hbase.thrift.Constants.INFOPORT_OPTION; 039import static org.apache.hadoop.hbase.thrift.Constants.KEEP_ALIVE_SEC_OPTION; 040import static org.apache.hadoop.hbase.thrift.Constants.MAX_FRAME_SIZE_CONF_DEFAULT; 041import static org.apache.hadoop.hbase.thrift.Constants.MAX_FRAME_SIZE_CONF_KEY; 042import static org.apache.hadoop.hbase.thrift.Constants.MAX_QUEUE_SIZE_OPTION; 043import static org.apache.hadoop.hbase.thrift.Constants.MAX_WORKERS_OPTION; 044import static org.apache.hadoop.hbase.thrift.Constants.MIN_WORKERS_OPTION; 045import static org.apache.hadoop.hbase.thrift.Constants.PORT_CONF_KEY; 046import static org.apache.hadoop.hbase.thrift.Constants.PORT_OPTION; 047import static org.apache.hadoop.hbase.thrift.Constants.READ_TIMEOUT_OPTION; 048import static org.apache.hadoop.hbase.thrift.Constants.SELECTOR_NUM_OPTION; 049import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_DNS_INTERFACE_KEY; 050import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_DNS_NAMESERVER_KEY; 051import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_FILTERS; 052import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_HTTP_ALLOW_OPTIONS_METHOD; 053import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_HTTP_ALLOW_OPTIONS_METHOD_DEFAULT; 054import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_BINDING_ADDRESS; 055import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_BINDING_ADDRESS_DEFAULT; 056import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_PORT; 057import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_PORT_DEFAULT; 058import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_KERBEROS_PRINCIPAL_KEY; 059import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_KEYTAB_FILE_KEY; 060import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_QOP_KEY; 061import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SELECTOR_NUM; 062import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT; 063import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY; 064import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SPNEGO_KEYTAB_FILE_KEY; 065import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SPNEGO_PRINCIPAL_KEY; 066import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_ENABLED_KEY; 067import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY; 068import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY; 069import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY; 070import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_INCLUDE_PROTOCOLS_KEY; 071import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY; 072import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_PASSWORD_KEY; 073import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_STORE_KEY; 074import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_TYPE_DEFAULT; 075import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_TYPE_KEY; 076import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SUPPORT_PROXYUSER_KEY; 077import static org.apache.hadoop.hbase.thrift.Constants.USE_HTTP_CONF_KEY; 078 079import java.io.IOException; 080import java.net.InetAddress; 081import java.net.InetSocketAddress; 082import java.net.UnknownHostException; 083import java.security.PrivilegedAction; 084import java.util.List; 085import java.util.Map; 086import java.util.concurrent.BlockingQueue; 087import java.util.concurrent.ExecutorService; 088import java.util.concurrent.LinkedBlockingQueue; 089import java.util.concurrent.ThreadPoolExecutor; 090import java.util.concurrent.TimeUnit; 091import javax.security.auth.callback.Callback; 092import javax.security.auth.callback.UnsupportedCallbackException; 093import javax.security.sasl.AuthorizeCallback; 094import javax.security.sasl.SaslServer; 095import org.apache.commons.lang3.ArrayUtils; 096import org.apache.hadoop.conf.Configuration; 097import org.apache.hadoop.conf.Configured; 098import org.apache.hadoop.hbase.HBaseConfiguration; 099import org.apache.hadoop.hbase.HBaseInterfaceAudience; 100import org.apache.hadoop.hbase.filter.ParseFilter; 101import org.apache.hadoop.hbase.http.HttpServerUtil; 102import org.apache.hadoop.hbase.http.InfoServer; 103import org.apache.hadoop.hbase.log.HBaseMarkers; 104import org.apache.hadoop.hbase.security.SaslUtil; 105import org.apache.hadoop.hbase.security.SecurityUtil; 106import org.apache.hadoop.hbase.security.UserProvider; 107import org.apache.hadoop.hbase.thrift.generated.Hbase; 108import org.apache.hadoop.hbase.util.DNS; 109import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 110import org.apache.hadoop.hbase.util.JvmPauseMonitor; 111import org.apache.hadoop.hbase.util.Strings; 112import org.apache.hadoop.hbase.util.VersionInfo; 113import org.apache.hadoop.security.SaslRpcServer; 114import org.apache.hadoop.security.UserGroupInformation; 115import org.apache.hadoop.security.authorize.ProxyUsers; 116import org.apache.hadoop.util.Shell.ExitCodeException; 117import org.apache.hadoop.util.Tool; 118import org.apache.hadoop.util.ToolRunner; 119import org.apache.thrift.TProcessor; 120import org.apache.thrift.protocol.TBinaryProtocol; 121import org.apache.thrift.protocol.TCompactProtocol; 122import org.apache.thrift.protocol.TProtocolFactory; 123import org.apache.thrift.server.THsHaServer; 124import org.apache.thrift.server.TNonblockingServer; 125import org.apache.thrift.server.TServer; 126import org.apache.thrift.server.TServlet; 127import org.apache.thrift.server.TThreadedSelectorServer; 128import org.apache.thrift.transport.TNonblockingServerSocket; 129import org.apache.thrift.transport.TNonblockingServerTransport; 130import org.apache.thrift.transport.TSaslServerTransport; 131import org.apache.thrift.transport.TServerSocket; 132import org.apache.thrift.transport.TServerTransport; 133import org.apache.thrift.transport.TTransportFactory; 134import org.apache.thrift.transport.layered.TFramedTransport; 135import org.apache.yetus.audience.InterfaceAudience; 136import org.slf4j.Logger; 137import org.slf4j.LoggerFactory; 138 139import org.apache.hbase.thirdparty.com.google.common.base.Joiner; 140import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 141import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 142import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 143import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser; 144import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser; 145import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; 146import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; 147import org.apache.hbase.thirdparty.org.eclipse.jetty.http.HttpVersion; 148import org.apache.hbase.thirdparty.org.eclipse.jetty.server.HttpConfiguration; 149import org.apache.hbase.thirdparty.org.eclipse.jetty.server.HttpConnectionFactory; 150import org.apache.hbase.thirdparty.org.eclipse.jetty.server.SecureRequestCustomizer; 151import org.apache.hbase.thirdparty.org.eclipse.jetty.server.Server; 152import org.apache.hbase.thirdparty.org.eclipse.jetty.server.ServerConnector; 153import org.apache.hbase.thirdparty.org.eclipse.jetty.server.SslConnectionFactory; 154import org.apache.hbase.thirdparty.org.eclipse.jetty.servlet.ServletContextHandler; 155import org.apache.hbase.thirdparty.org.eclipse.jetty.servlet.ServletHolder; 156import org.apache.hbase.thirdparty.org.eclipse.jetty.util.ssl.SslContextFactory; 157import org.apache.hbase.thirdparty.org.eclipse.jetty.util.thread.QueuedThreadPool; 158 159/** 160 * ThriftServer- this class starts up a Thrift server which implements the Hbase API specified in 161 * the Hbase.thrift IDL file. The server runs in an independent process. 162 */ 163@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 164public class ThriftServer extends Configured implements Tool { 165 166 private static final Logger LOG = LoggerFactory.getLogger(ThriftServer.class); 167 168 protected Configuration conf; 169 170 protected InfoServer infoServer; 171 172 protected TProcessor processor; 173 174 protected ThriftMetrics metrics; 175 protected HBaseServiceHandler hbaseServiceHandler; 176 protected UserGroupInformation serviceUGI; 177 protected UserGroupInformation httpUGI; 178 protected boolean httpEnabled; 179 180 protected SaslUtil.QualityOfProtection qop; 181 protected String host; 182 protected int listenPort; 183 184 protected boolean securityEnabled; 185 protected boolean doAsEnabled; 186 187 protected JvmPauseMonitor pauseMonitor; 188 189 protected volatile TServer tserver; 190 protected volatile Server httpServer; 191 192 // 193 // Main program and support routines 194 // 195 196 public ThriftServer(Configuration conf) { 197 this.conf = HBaseConfiguration.create(conf); 198 } 199 200 protected ThriftMetrics createThriftMetrics(Configuration conf) { 201 return new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE); 202 } 203 204 protected void setupParamters() throws IOException { 205 // login the server principal (if using secure Hadoop) 206 UserProvider userProvider = UserProvider.instantiate(conf); 207 securityEnabled = 208 userProvider.isHadoopSecurityEnabled() && userProvider.isHBaseSecurityEnabled(); 209 if (securityEnabled) { 210 host = Strings.domainNamePointerToHostName( 211 DNS.getDefaultHost(conf.get(THRIFT_DNS_INTERFACE_KEY, "default"), 212 conf.get(THRIFT_DNS_NAMESERVER_KEY, "default"))); 213 userProvider.login(THRIFT_KEYTAB_FILE_KEY, THRIFT_KERBEROS_PRINCIPAL_KEY, host); 214 215 // Setup the SPNEGO user for HTTP if configured 216 String spnegoPrincipal = getSpengoPrincipal(conf, host); 217 String spnegoKeytab = getSpnegoKeytab(conf); 218 UserGroupInformation.setConfiguration(conf); 219 // login the SPNEGO principal using UGI to avoid polluting the login user 220 this.httpUGI = 221 UserGroupInformation.loginUserFromKeytabAndReturnUGI(spnegoPrincipal, spnegoKeytab); 222 } 223 this.serviceUGI = userProvider.getCurrent().getUGI(); 224 if (httpUGI == null) { 225 this.httpUGI = serviceUGI; 226 } 227 228 this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT); 229 this.metrics = createThriftMetrics(conf); 230 this.pauseMonitor = new JvmPauseMonitor(conf, this.metrics.getSource()); 231 this.hbaseServiceHandler = createHandler(conf, userProvider); 232 this.hbaseServiceHandler.initMetrics(metrics); 233 this.processor = createProcessor(); 234 235 httpEnabled = conf.getBoolean(USE_HTTP_CONF_KEY, false); 236 doAsEnabled = conf.getBoolean(THRIFT_SUPPORT_PROXYUSER_KEY, false); 237 if (doAsEnabled && !httpEnabled) { 238 LOG.warn("Fail to enable the doAs feature. " + USE_HTTP_CONF_KEY + " is not configured"); 239 } 240 241 String strQop = conf.get(THRIFT_QOP_KEY); 242 if (strQop != null) { 243 this.qop = SaslUtil.getQop(strQop); 244 } 245 if (qop != null) { 246 if ( 247 qop != SaslUtil.QualityOfProtection.AUTHENTICATION 248 && qop != SaslUtil.QualityOfProtection.INTEGRITY 249 && qop != SaslUtil.QualityOfProtection.PRIVACY 250 ) { 251 throw new IOException(String.format("Invalid %s: It must be one of %s, %s, or %s.", 252 THRIFT_QOP_KEY, SaslUtil.QualityOfProtection.AUTHENTICATION.name(), 253 SaslUtil.QualityOfProtection.INTEGRITY.name(), 254 SaslUtil.QualityOfProtection.PRIVACY.name())); 255 } 256 checkHttpSecurity(qop, conf); 257 if (!securityEnabled) { 258 throw new IOException("Thrift server must run in secure mode to support authentication"); 259 } 260 } 261 registerFilters(conf); 262 pauseMonitor.start(); 263 } 264 265 private String getSpengoPrincipal(Configuration conf, String host) throws IOException { 266 String principal = conf.get(THRIFT_SPNEGO_PRINCIPAL_KEY); 267 if (principal == null) { 268 // We cannot use the Hadoop configuration deprecation handling here since 269 // the THRIFT_KERBEROS_PRINCIPAL_KEY config is still valid for regular Kerberos 270 // communication. The preference should be to use the THRIFT_SPNEGO_PRINCIPAL_KEY 271 // config so that THRIFT_KERBEROS_PRINCIPAL_KEY doesn't control both backend 272 // Kerberos principal and SPNEGO principal. 273 LOG.info("Using deprecated {} config for SPNEGO principal. Use {} instead.", 274 THRIFT_KERBEROS_PRINCIPAL_KEY, THRIFT_SPNEGO_PRINCIPAL_KEY); 275 principal = conf.get(THRIFT_KERBEROS_PRINCIPAL_KEY); 276 } 277 // Handle _HOST in principal value 278 return org.apache.hadoop.security.SecurityUtil.getServerPrincipal(principal, host); 279 } 280 281 private String getSpnegoKeytab(Configuration conf) { 282 String keytab = conf.get(THRIFT_SPNEGO_KEYTAB_FILE_KEY); 283 if (keytab == null) { 284 // We cannot use the Hadoop configuration deprecation handling here since 285 // the THRIFT_KEYTAB_FILE_KEY config is still valid for regular Kerberos 286 // communication. The preference should be to use the THRIFT_SPNEGO_KEYTAB_FILE_KEY 287 // config so that THRIFT_KEYTAB_FILE_KEY doesn't control both backend 288 // Kerberos keytab and SPNEGO keytab. 289 LOG.info("Using deprecated {} config for SPNEGO keytab. Use {} instead.", 290 THRIFT_KEYTAB_FILE_KEY, THRIFT_SPNEGO_KEYTAB_FILE_KEY); 291 keytab = conf.get(THRIFT_KEYTAB_FILE_KEY); 292 } 293 return keytab; 294 } 295 296 protected void startInfoServer() throws IOException { 297 // Put up info server. 298 int port = conf.getInt(THRIFT_INFO_SERVER_PORT, THRIFT_INFO_SERVER_PORT_DEFAULT); 299 300 if (port >= 0) { 301 conf.setLong("startcode", EnvironmentEdgeManager.currentTime()); 302 String a = 303 conf.get(THRIFT_INFO_SERVER_BINDING_ADDRESS, THRIFT_INFO_SERVER_BINDING_ADDRESS_DEFAULT); 304 infoServer = new InfoServer("thrift", a, port, false, conf); 305 infoServer.setAttribute("hbase.conf", conf); 306 infoServer.setAttribute("hbase.thrift.server.type", metrics.getThriftServerType().name()); 307 infoServer.start(); 308 } 309 } 310 311 protected void checkHttpSecurity(SaslUtil.QualityOfProtection qop, Configuration conf) { 312 if ( 313 qop == SaslUtil.QualityOfProtection.PRIVACY && conf.getBoolean(USE_HTTP_CONF_KEY, false) 314 && !conf.getBoolean(THRIFT_SSL_ENABLED_KEY, false) 315 ) { 316 throw new IllegalArgumentException( 317 "Thrift HTTP Server's QoP is privacy, but " + THRIFT_SSL_ENABLED_KEY + " is false"); 318 } 319 } 320 321 protected HBaseServiceHandler createHandler(Configuration conf, UserProvider userProvider) 322 throws IOException { 323 return new ThriftHBaseServiceHandler(conf, userProvider); 324 } 325 326 protected TProcessor createProcessor() { 327 return new Hbase.Processor<>( 328 HbaseHandlerMetricsProxy.newInstance((Hbase.Iface) hbaseServiceHandler, metrics, conf)); 329 } 330 331 /** 332 * the thrift server, not null means the server is started, for test only 333 * @return the tServer 334 */ 335 @InterfaceAudience.Private 336 public TServer getTserver() { 337 return tserver; 338 } 339 340 /** 341 * the Jetty server, not null means the HTTP server is started, for test only 342 * @return the http server 343 */ 344 @InterfaceAudience.Private 345 public Server getHttpServer() { 346 return httpServer; 347 } 348 349 protected void printUsageAndExit(Options options, int exitCode) throws ExitCodeException { 350 HelpFormatter formatter = new HelpFormatter(); 351 formatter.printHelp("Thrift", null, options, 352 "To start the Thrift server run 'hbase-daemon.sh start thrift' or " + "'hbase thrift'\n" 353 + "To shutdown the thrift server run 'hbase-daemon.sh stop " 354 + "thrift' or send a kill signal to the thrift server pid", 355 true); 356 throw new ExitCodeException(exitCode, ""); 357 } 358 359 /** 360 * Create a Servlet for the http server 361 * @param protocolFactory protocolFactory 362 * @return the servlet 363 */ 364 protected TServlet createTServlet(TProtocolFactory protocolFactory) { 365 return new ThriftHttpServlet(processor, protocolFactory, serviceUGI, httpUGI, 366 hbaseServiceHandler, securityEnabled, doAsEnabled); 367 } 368 369 /** 370 * Setup an HTTP Server using Jetty to serve calls from THttpClient 371 * @throws IOException IOException 372 */ 373 protected void setupHTTPServer() throws IOException { 374 TProtocolFactory protocolFactory = new TBinaryProtocol.Factory(); 375 TServlet thriftHttpServlet = createTServlet(protocolFactory); 376 377 // Set the default max thread number to 100 to limit 378 // the number of concurrent requests so that Thrfit HTTP server doesn't OOM easily. 379 // Jetty set the default max thread number to 250, if we don't set it. 380 // 381 // Our default min thread number 2 is the same as that used by Jetty. 382 int minThreads = conf.getInt(HTTP_MIN_THREADS_KEY, conf 383 .getInt(TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY, HTTP_MIN_THREADS_KEY_DEFAULT)); 384 int maxThreads = conf.getInt(HTTP_MAX_THREADS_KEY, conf 385 .getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY, HTTP_MAX_THREADS_KEY_DEFAULT)); 386 QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads); 387 threadPool.setMinThreads(minThreads); 388 httpServer = new Server(threadPool); 389 390 // Context handler 391 boolean isSecure = conf.getBoolean(THRIFT_SSL_ENABLED_KEY, false); 392 ServletContextHandler ctxHandler = 393 new ServletContextHandler(httpServer, "/", ServletContextHandler.SESSIONS); 394 HttpServerUtil.addClickjackingPreventionFilter(ctxHandler, conf, PATH_SPEC_ANY); 395 HttpServerUtil.addSecurityHeadersFilter(ctxHandler, conf, isSecure, PATH_SPEC_ANY); 396 ctxHandler.addServlet(new ServletHolder(thriftHttpServlet), PATH_SPEC_ANY); 397 HttpServerUtil.constrainHttpMethods(ctxHandler, 398 conf.getBoolean(THRIFT_HTTP_ALLOW_OPTIONS_METHOD, THRIFT_HTTP_ALLOW_OPTIONS_METHOD_DEFAULT)); 399 400 // set up Jetty and run the embedded server 401 HttpConfiguration httpConfig = new HttpConfiguration(); 402 httpConfig.setSecureScheme("https"); 403 httpConfig.setSecurePort(listenPort); 404 httpConfig.setHeaderCacheSize(DEFAULT_HTTP_MAX_HEADER_SIZE); 405 httpConfig.setRequestHeaderSize(DEFAULT_HTTP_MAX_HEADER_SIZE); 406 httpConfig.setResponseHeaderSize(DEFAULT_HTTP_MAX_HEADER_SIZE); 407 httpConfig.setSendServerVersion(false); 408 httpConfig.setSendDateHeader(false); 409 410 ServerConnector serverConnector; 411 if (isSecure) { 412 HttpConfiguration httpsConfig = new HttpConfiguration(httpConfig); 413 httpsConfig.addCustomizer(new SecureRequestCustomizer()); 414 415 SslContextFactory.Server sslCtxFactory = new SslContextFactory.Server(); 416 String keystore = conf.get(THRIFT_SSL_KEYSTORE_STORE_KEY); 417 String password = 418 HBaseConfiguration.getPassword(conf, THRIFT_SSL_KEYSTORE_PASSWORD_KEY, null); 419 String keyPassword = 420 HBaseConfiguration.getPassword(conf, THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY, password); 421 sslCtxFactory.setKeyStorePath(keystore); 422 sslCtxFactory.setKeyStorePassword(password); 423 sslCtxFactory.setKeyManagerPassword(keyPassword); 424 sslCtxFactory 425 .setKeyStoreType(conf.get(THRIFT_SSL_KEYSTORE_TYPE_KEY, THRIFT_SSL_KEYSTORE_TYPE_DEFAULT)); 426 427 String[] excludeCiphers = 428 conf.getStrings(THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY, ArrayUtils.EMPTY_STRING_ARRAY); 429 if (excludeCiphers.length != 0) { 430 sslCtxFactory.setExcludeCipherSuites(excludeCiphers); 431 } 432 String[] includeCiphers = 433 conf.getStrings(THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY, ArrayUtils.EMPTY_STRING_ARRAY); 434 if (includeCiphers.length != 0) { 435 sslCtxFactory.setIncludeCipherSuites(includeCiphers); 436 } 437 438 // Disable SSLv3 by default due to "Poodle" Vulnerability - CVE-2014-3566 439 String[] excludeProtocols = conf.getStrings(THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY, "SSLv3"); 440 if (excludeProtocols.length != 0) { 441 sslCtxFactory.setExcludeProtocols(excludeProtocols); 442 } 443 String[] includeProtocols = 444 conf.getStrings(THRIFT_SSL_INCLUDE_PROTOCOLS_KEY, ArrayUtils.EMPTY_STRING_ARRAY); 445 if (includeProtocols.length != 0) { 446 sslCtxFactory.setIncludeProtocols(includeProtocols); 447 } 448 449 serverConnector = new ServerConnector(httpServer, 450 new SslConnectionFactory(sslCtxFactory, HttpVersion.HTTP_1_1.toString()), 451 new HttpConnectionFactory(httpsConfig)); 452 } else { 453 serverConnector = new ServerConnector(httpServer, new HttpConnectionFactory(httpConfig)); 454 } 455 serverConnector.setPort(listenPort); 456 serverConnector.setHost(getBindAddress(conf).getHostAddress()); 457 httpServer.addConnector(serverConnector); 458 httpServer.setStopAtShutdown(true); 459 460 if (doAsEnabled) { 461 ProxyUsers.refreshSuperUserGroupsConfiguration(conf); 462 } 463 LOG.info("Starting Thrift HTTP Server on {}", Integer.toString(listenPort)); 464 } 465 466 /** 467 * Setting up the thrift TServer 468 */ 469 protected void setupServer() throws Exception { 470 // Construct correct ProtocolFactory 471 TProtocolFactory protocolFactory = getProtocolFactory(); 472 473 ImplType implType = ImplType.getServerImpl(conf); 474 TProcessor processorToUse = processor; 475 476 // Construct correct TransportFactory 477 TTransportFactory transportFactory; 478 if (conf.getBoolean(FRAMED_CONF_KEY, FRAMED_CONF_DEFAULT) || implType.isAlwaysFramed) { 479 if (qop != null) { 480 throw new RuntimeException( 481 "Thrift server authentication" + " doesn't work with framed transport yet"); 482 } 483 transportFactory = new TFramedTransport.Factory( 484 conf.getInt(MAX_FRAME_SIZE_CONF_KEY, MAX_FRAME_SIZE_CONF_DEFAULT) * 1024 * 1024); 485 LOG.debug("Using framed transport"); 486 } else if (qop == null) { 487 transportFactory = new TTransportFactory(); 488 } else { 489 // Extract the name from the principal 490 String thriftKerberosPrincipal = conf.get(THRIFT_KERBEROS_PRINCIPAL_KEY); 491 if (thriftKerberosPrincipal == null) { 492 throw new IllegalArgumentException(THRIFT_KERBEROS_PRINCIPAL_KEY + " cannot be null"); 493 } 494 String name = SecurityUtil.getUserFromPrincipal(thriftKerberosPrincipal); 495 Map<String, String> saslProperties = SaslUtil.initSaslProperties(qop.name()); 496 TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory(); 497 saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties, 498 new SaslRpcServer.SaslGssCallbackHandler() { 499 @Override 500 public void handle(Callback[] callbacks) throws UnsupportedCallbackException { 501 AuthorizeCallback ac = null; 502 for (Callback callback : callbacks) { 503 if (callback instanceof AuthorizeCallback) { 504 ac = (AuthorizeCallback) callback; 505 } else { 506 throw new UnsupportedCallbackException(callback, 507 "Unrecognized SASL GSSAPI Callback"); 508 } 509 } 510 if (ac != null) { 511 String authid = ac.getAuthenticationID(); 512 String authzid = ac.getAuthorizationID(); 513 if (!authid.equals(authzid)) { 514 ac.setAuthorized(false); 515 } else { 516 ac.setAuthorized(true); 517 String userName = SecurityUtil.getUserFromPrincipal(authzid); 518 LOG.info("Effective user: {}", userName); 519 ac.setAuthorizedID(userName); 520 } 521 } 522 } 523 }); 524 transportFactory = saslFactory; 525 526 // Create a processor wrapper, to get the caller 527 processorToUse = (inProt, outProt) -> { 528 TSaslServerTransport saslServerTransport = (TSaslServerTransport) inProt.getTransport(); 529 SaslServer saslServer = saslServerTransport.getSaslServer(); 530 String principal = saslServer.getAuthorizationID(); 531 hbaseServiceHandler.setEffectiveUser(principal); 532 processor.process(inProt, outProt); 533 }; 534 } 535 536 if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) { 537 LOG.error( 538 "Server types {} don't support IP address binding at the moment. See " 539 + "https://issues.apache.org/jira/browse/HBASE-2155 for details.", 540 Joiner.on(", ").join(ImplType.serversThatCannotSpecifyBindIP())); 541 throw new RuntimeException("-" + BIND_CONF_KEY + " not supported with " + implType); 542 } 543 544 InetSocketAddress inetSocketAddress = new InetSocketAddress(getBindAddress(conf), listenPort); 545 if ( 546 implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING 547 || implType == ImplType.THREADED_SELECTOR 548 ) { 549 TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress); 550 if (implType == ImplType.NONBLOCKING) { 551 tserver = getTNonBlockingServer(serverTransport, protocolFactory, processorToUse, 552 transportFactory, inetSocketAddress); 553 } else if (implType == ImplType.HS_HA) { 554 tserver = getTHsHaServer(serverTransport, protocolFactory, processorToUse, transportFactory, 555 inetSocketAddress); 556 } else { // THREADED_SELECTOR 557 tserver = getTThreadedSelectorServer(serverTransport, protocolFactory, processorToUse, 558 transportFactory, inetSocketAddress); 559 } 560 LOG.info("starting HBase {} server on {}", implType.simpleClassName(), 561 Integer.toString(listenPort)); 562 } else if (implType == ImplType.THREAD_POOL) { 563 this.tserver = 564 getTThreadPoolServer(protocolFactory, processorToUse, transportFactory, inetSocketAddress); 565 } else { 566 throw new AssertionError( 567 "Unsupported Thrift server implementation: " + implType.simpleClassName()); 568 } 569 570 // A sanity check that we instantiated the right type of server. 571 if (tserver.getClass() != implType.serverClass) { 572 throw new AssertionError("Expected to create Thrift server class " 573 + implType.serverClass.getName() + " but got " + tserver.getClass().getName()); 574 } 575 } 576 577 protected TServer getTNonBlockingServer(TNonblockingServerTransport serverTransport, 578 TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory, 579 InetSocketAddress inetSocketAddress) { 580 LOG.info("starting HBase Nonblocking Thrift server on " + inetSocketAddress.toString()); 581 TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport); 582 serverArgs.processor(processor); 583 serverArgs.transportFactory(transportFactory); 584 serverArgs.protocolFactory(protocolFactory); 585 return new TNonblockingServer(serverArgs); 586 } 587 588 protected TServer getTHsHaServer(TNonblockingServerTransport serverTransport, 589 TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory, 590 InetSocketAddress inetSocketAddress) { 591 LOG.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString()); 592 THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport); 593 int queueSize = conf.getInt(TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY, 594 TBoundedThreadPoolServer.DEFAULT_MAX_QUEUED_REQUESTS); 595 CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(queueSize), metrics); 596 int workerThread = conf.getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY, 597 serverArgs.getMaxWorkerThreads()); 598 ExecutorService executorService = createExecutor(callQueue, workerThread, workerThread); 599 serverArgs.executorService(executorService).processor(processor) 600 .transportFactory(transportFactory).protocolFactory(protocolFactory); 601 return new THsHaServer(serverArgs); 602 } 603 604 protected TServer getTThreadedSelectorServer(TNonblockingServerTransport serverTransport, 605 TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory, 606 InetSocketAddress inetSocketAddress) { 607 LOG.info("starting HBase ThreadedSelector Thrift server on " + inetSocketAddress.toString()); 608 TThreadedSelectorServer.Args serverArgs = 609 new HThreadedSelectorServerArgs(serverTransport, conf); 610 int queueSize = conf.getInt(TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY, 611 TBoundedThreadPoolServer.DEFAULT_MAX_QUEUED_REQUESTS); 612 CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(queueSize), metrics); 613 int workerThreads = conf.getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY, 614 serverArgs.getWorkerThreads()); 615 int selectorThreads = conf.getInt(THRIFT_SELECTOR_NUM, serverArgs.getSelectorThreads()); 616 serverArgs.selectorThreads(selectorThreads); 617 ExecutorService executorService = createExecutor(callQueue, workerThreads, workerThreads); 618 serverArgs.executorService(executorService).processor(processor) 619 .transportFactory(transportFactory).protocolFactory(protocolFactory); 620 return new TThreadedSelectorServer(serverArgs); 621 } 622 623 protected TServer getTThreadPoolServer(TProtocolFactory protocolFactory, TProcessor processor, 624 TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws Exception { 625 LOG.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString()); 626 // Thrift's implementation uses '0' as a placeholder for 'use the default.' 627 int backlog = conf.getInt(BACKLOG_CONF_KEY, BACKLOG_CONF_DEAFULT); 628 int readTimeout = 629 conf.getInt(THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY, THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT); 630 TServerTransport serverTransport = 631 new TServerSocket(new TServerSocket.ServerSocketTransportArgs().bindAddr(inetSocketAddress) 632 .backlog(backlog).clientTimeout(readTimeout)); 633 634 TBoundedThreadPoolServer.Args serverArgs = 635 new TBoundedThreadPoolServer.Args(serverTransport, conf); 636 serverArgs.processor(processor).transportFactory(transportFactory) 637 .protocolFactory(protocolFactory); 638 return new TBoundedThreadPoolServer(serverArgs, metrics); 639 } 640 641 protected TProtocolFactory getProtocolFactory() { 642 TProtocolFactory protocolFactory; 643 644 if (conf.getBoolean(COMPACT_CONF_KEY, COMPACT_CONF_DEFAULT)) { 645 LOG.debug("Using compact protocol"); 646 protocolFactory = new TCompactProtocol.Factory(); 647 } else { 648 LOG.debug("Using binary protocol"); 649 protocolFactory = new TBinaryProtocol.Factory(); 650 } 651 652 return protocolFactory; 653 } 654 655 protected ExecutorService createExecutor(BlockingQueue<Runnable> callQueue, int minWorkers, 656 int maxWorkers) { 657 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); 658 tfb.setDaemon(true); 659 tfb.setNameFormat("thrift-worker-%d"); 660 ThreadPoolExecutor threadPool = new THBaseThreadPoolExecutor(minWorkers, maxWorkers, 661 Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build(), metrics); 662 threadPool.allowCoreThreadTimeOut(true); 663 return threadPool; 664 } 665 666 protected InetAddress getBindAddress(Configuration conf) throws UnknownHostException { 667 String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR); 668 return InetAddress.getByName(bindAddressStr); 669 } 670 671 public static void registerFilters(Configuration conf) { 672 String[] filters = conf.getStrings(THRIFT_FILTERS); 673 Splitter splitter = Splitter.on(':'); 674 if (filters != null) { 675 for (String filterClass : filters) { 676 List<String> filterPart = splitter.splitToList(filterClass); 677 if (filterPart.size() != 2) { 678 LOG.warn("Invalid filter specification " + filterClass + " - skipping"); 679 } else { 680 ParseFilter.registerFilter(filterPart.get(0), filterPart.get(1)); 681 } 682 } 683 } 684 } 685 686 /** 687 * Add options to command lines 688 * @param options options 689 */ 690 protected void addOptions(Options options) { 691 options.addOption("b", BIND_OPTION, true, 692 "Address to bind " + "the Thrift server to. [default: " + DEFAULT_BIND_ADDR + "]"); 693 options.addOption("p", PORT_OPTION, true, 694 "Port to bind to [default: " + DEFAULT_LISTEN_PORT + "]"); 695 options.addOption("f", FRAMED_OPTION, false, "Use framed transport"); 696 options.addOption("c", COMPACT_OPTION, false, "Use the compact protocol"); 697 options.addOption("h", "help", false, "Print help information"); 698 options.addOption("s", SELECTOR_NUM_OPTION, true, "How many selector threads to use."); 699 options.addOption(null, INFOPORT_OPTION, true, "Port for web UI"); 700 701 options.addOption("m", MIN_WORKERS_OPTION, true, 702 "The minimum number of worker threads for " + ImplType.THREAD_POOL.simpleClassName()); 703 704 options.addOption("w", MAX_WORKERS_OPTION, true, 705 "The maximum number of worker threads for " + ImplType.THREAD_POOL.simpleClassName()); 706 707 options.addOption("q", MAX_QUEUE_SIZE_OPTION, true, 708 "The maximum number of queued requests in " + ImplType.THREAD_POOL.simpleClassName()); 709 710 options.addOption("k", KEEP_ALIVE_SEC_OPTION, true, 711 "The amount of time in secods to keep a thread alive when idle in " 712 + ImplType.THREAD_POOL.simpleClassName()); 713 714 options.addOption("t", READ_TIMEOUT_OPTION, true, 715 "Amount of time in milliseconds before a server thread will timeout " 716 + "waiting for client to send data on a connected socket. Currently, " 717 + "only applies to TBoundedThreadPoolServer"); 718 719 options.addOptionGroup(ImplType.createOptionGroup()); 720 } 721 722 protected void parseCommandLine(CommandLine cmd, Options options) throws ExitCodeException { 723 // Get port to bind to 724 try { 725 if (cmd.hasOption(PORT_OPTION)) { 726 int listenPort = Integer.parseInt(cmd.getOptionValue(PORT_OPTION)); 727 conf.setInt(PORT_CONF_KEY, listenPort); 728 } 729 } catch (NumberFormatException e) { 730 LOG.error("Could not parse the value provided for the port option", e); 731 printUsageAndExit(options, -1); 732 } 733 // check for user-defined info server port setting, if so override the conf 734 try { 735 if (cmd.hasOption(INFOPORT_OPTION)) { 736 String val = cmd.getOptionValue(INFOPORT_OPTION); 737 conf.setInt(THRIFT_INFO_SERVER_PORT, Integer.parseInt(val)); 738 LOG.debug("Web UI port set to " + val); 739 } 740 } catch (NumberFormatException e) { 741 LOG.error("Could not parse the value provided for the " + INFOPORT_OPTION + " option", e); 742 printUsageAndExit(options, -1); 743 } 744 // Make optional changes to the configuration based on command-line options 745 optionToConf(cmd, MIN_WORKERS_OPTION, conf, 746 TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY); 747 optionToConf(cmd, MAX_WORKERS_OPTION, conf, 748 TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY); 749 optionToConf(cmd, MAX_QUEUE_SIZE_OPTION, conf, 750 TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY); 751 optionToConf(cmd, KEEP_ALIVE_SEC_OPTION, conf, 752 TBoundedThreadPoolServer.THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY); 753 optionToConf(cmd, READ_TIMEOUT_OPTION, conf, THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY); 754 optionToConf(cmd, SELECTOR_NUM_OPTION, conf, THRIFT_SELECTOR_NUM); 755 756 // Set general thrift server options 757 boolean compact = cmd.hasOption(COMPACT_OPTION) || conf.getBoolean(COMPACT_CONF_KEY, false); 758 conf.setBoolean(COMPACT_CONF_KEY, compact); 759 boolean framed = cmd.hasOption(FRAMED_OPTION) || conf.getBoolean(FRAMED_CONF_KEY, false); 760 conf.setBoolean(FRAMED_CONF_KEY, framed); 761 762 optionToConf(cmd, BIND_OPTION, conf, BIND_CONF_KEY); 763 764 ImplType.setServerImpl(cmd, conf); 765 } 766 767 /** 768 * Parse the command line options to set parameters the conf. 769 */ 770 protected void processOptions(final String[] args) throws Exception { 771 if (args == null || args.length == 0) { 772 return; 773 } 774 Options options = new Options(); 775 addOptions(options); 776 777 CommandLineParser parser = new DefaultParser(); 778 CommandLine cmd = parser.parse(options, args); 779 780 if (cmd.hasOption("help")) { 781 printUsageAndExit(options, 1); 782 } 783 parseCommandLine(cmd, options); 784 } 785 786 public void stop() { 787 if (this.infoServer != null) { 788 LOG.info("Stopping infoServer"); 789 try { 790 this.infoServer.stop(); 791 } catch (Exception ex) { 792 LOG.error("Failed to stop infoServer", ex); 793 } 794 } 795 if (pauseMonitor != null) { 796 pauseMonitor.stop(); 797 } 798 if (tserver != null) { 799 tserver.stop(); 800 tserver = null; 801 } 802 if (httpServer != null) { 803 try { 804 httpServer.stop(); 805 httpServer = null; 806 } catch (Exception e) { 807 LOG.error("Problem encountered in shutting down HTTP server", e); 808 } 809 httpServer = null; 810 } 811 } 812 813 protected static void optionToConf(CommandLine cmd, String option, Configuration conf, 814 String destConfKey) { 815 if (cmd.hasOption(option)) { 816 String value = cmd.getOptionValue(option); 817 LOG.info("Set configuration key:" + destConfKey + " value:" + value); 818 conf.set(destConfKey, value); 819 } 820 } 821 822 /** 823 * Run without any command line arguments 824 * @return exit code 825 * @throws Exception exception 826 */ 827 public int run() throws Exception { 828 return run(null); 829 } 830 831 @Override 832 public int run(String[] strings) throws Exception { 833 processOptions(strings); 834 setupParamters(); 835 if (httpEnabled) { 836 setupHTTPServer(); 837 } else { 838 setupServer(); 839 } 840 return serviceUGI.doAs(new PrivilegedAction<Integer>() { 841 @Override 842 public Integer run() { 843 try { 844 startInfoServer(); 845 if (httpEnabled) { 846 httpServer.start(); 847 httpServer.join(); 848 } else { 849 tserver.serve(); 850 } 851 return 0; 852 } catch (Exception e) { 853 LOG.error(HBaseMarkers.FATAL, "Cannot run ThriftServer", e); 854 return -1; 855 } 856 } 857 }); 858 } 859 860 public static void main(String[] args) throws Exception { 861 LOG.info("***** STARTING service '" + ThriftServer.class.getSimpleName() + "' *****"); 862 VersionInfo.logVersion(); 863 final Configuration conf = HBaseConfiguration.create(); 864 // for now, only time we return is on an argument error. 865 final int status = ToolRunner.run(conf, new ThriftServer(conf), args); 866 LOG.info("***** STOPPING service '" + ThriftServer.class.getSimpleName() + "' *****"); 867 System.exit(status); 868 } 869}