001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.thrift; 020 021import static org.apache.hadoop.hbase.thrift.Constants.BACKLOG_CONF_DEAFULT; 022import static org.apache.hadoop.hbase.thrift.Constants.BACKLOG_CONF_KEY; 023import static org.apache.hadoop.hbase.thrift.Constants.BIND_CONF_KEY; 024import static org.apache.hadoop.hbase.thrift.Constants.BIND_OPTION; 025import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_CONF_DEFAULT; 026import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_CONF_KEY; 027import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_OPTION; 028import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_BIND_ADDR; 029import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_HTTP_MAX_HEADER_SIZE; 030import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_LISTEN_PORT; 031import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_CONF_DEFAULT; 032import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_CONF_KEY; 033import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_OPTION; 034import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MAX_THREADS_KEY; 035import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MAX_THREADS_KEY_DEFAULT; 036import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MIN_THREADS_KEY; 037import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MIN_THREADS_KEY_DEFAULT; 038import static org.apache.hadoop.hbase.thrift.Constants.INFOPORT_OPTION; 039import static org.apache.hadoop.hbase.thrift.Constants.KEEP_ALIVE_SEC_OPTION; 040import static org.apache.hadoop.hbase.thrift.Constants.MAX_FRAME_SIZE_CONF_DEFAULT; 041import static org.apache.hadoop.hbase.thrift.Constants.MAX_FRAME_SIZE_CONF_KEY; 042import static org.apache.hadoop.hbase.thrift.Constants.MAX_QUEUE_SIZE_OPTION; 043import static org.apache.hadoop.hbase.thrift.Constants.MAX_WORKERS_OPTION; 044import static org.apache.hadoop.hbase.thrift.Constants.MIN_WORKERS_OPTION; 045import static org.apache.hadoop.hbase.thrift.Constants.PORT_CONF_KEY; 046import static org.apache.hadoop.hbase.thrift.Constants.PORT_OPTION; 047import static org.apache.hadoop.hbase.thrift.Constants.READ_TIMEOUT_OPTION; 048import static org.apache.hadoop.hbase.thrift.Constants.SELECTOR_NUM_OPTION; 049import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_DNS_INTERFACE_KEY; 050import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_DNS_NAMESERVER_KEY; 051import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_FILTERS; 052import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_HTTP_ALLOW_OPTIONS_METHOD; 053import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_HTTP_ALLOW_OPTIONS_METHOD_DEFAULT; 054import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_BINDING_ADDRESS; 055import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_BINDING_ADDRESS_DEFAULT; 056import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_PORT; 057import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_PORT_DEFAULT; 058import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_KERBEROS_PRINCIPAL_KEY; 059import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_KEYTAB_FILE_KEY; 060import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_QOP_KEY; 061import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SELECTOR_NUM; 062import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT; 063import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY; 064import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SPNEGO_KEYTAB_FILE_KEY; 065import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SPNEGO_PRINCIPAL_KEY; 066import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_ENABLED_KEY; 067import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY; 068import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY; 069import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY; 070import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_INCLUDE_PROTOCOLS_KEY; 071import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY; 072import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_PASSWORD_KEY; 073import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_STORE_KEY; 074import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SUPPORT_PROXYUSER_KEY; 075import static org.apache.hadoop.hbase.thrift.Constants.USE_HTTP_CONF_KEY; 076 077import java.io.IOException; 078import java.net.InetAddress; 079import java.net.InetSocketAddress; 080import java.net.UnknownHostException; 081import java.util.List; 082import java.util.Map; 083import java.util.concurrent.BlockingQueue; 084import java.util.concurrent.ExecutorService; 085import java.util.concurrent.LinkedBlockingQueue; 086import java.util.concurrent.ThreadPoolExecutor; 087import java.util.concurrent.TimeUnit; 088import javax.security.auth.callback.Callback; 089import javax.security.auth.callback.UnsupportedCallbackException; 090import javax.security.sasl.AuthorizeCallback; 091import javax.security.sasl.SaslServer; 092 093import org.apache.commons.lang3.ArrayUtils; 094import org.apache.hadoop.conf.Configuration; 095import org.apache.hadoop.conf.Configured; 096import org.apache.hadoop.hbase.HBaseConfiguration; 097import org.apache.hadoop.hbase.HBaseInterfaceAudience; 098import org.apache.hadoop.hbase.filter.ParseFilter; 099import org.apache.hadoop.hbase.http.HttpServerUtil; 100import org.apache.hadoop.hbase.http.InfoServer; 101import org.apache.hadoop.hbase.security.SaslUtil; 102import org.apache.hadoop.hbase.security.SecurityUtil; 103import org.apache.hadoop.hbase.security.UserProvider; 104import org.apache.hadoop.hbase.thrift.generated.Hbase; 105import org.apache.hadoop.hbase.util.DNS; 106import org.apache.hadoop.hbase.util.JvmPauseMonitor; 107import org.apache.hadoop.hbase.util.Strings; 108import org.apache.hadoop.hbase.util.VersionInfo; 109import org.apache.hadoop.security.SaslRpcServer; 110import org.apache.hadoop.security.UserGroupInformation; 111import org.apache.hadoop.security.authorize.ProxyUsers; 112import org.apache.hadoop.util.Shell.ExitCodeException; 113import org.apache.hadoop.util.Tool; 114import org.apache.hadoop.util.ToolRunner; 115import org.apache.thrift.TProcessor; 116import org.apache.thrift.protocol.TBinaryProtocol; 117import org.apache.thrift.protocol.TCompactProtocol; 118import org.apache.thrift.protocol.TProtocolFactory; 119import org.apache.thrift.server.THsHaServer; 120import org.apache.thrift.server.TNonblockingServer; 121import org.apache.thrift.server.TServer; 122import org.apache.thrift.server.TServlet; 123import org.apache.thrift.server.TThreadedSelectorServer; 124import org.apache.thrift.transport.TFramedTransport; 125import org.apache.thrift.transport.TNonblockingServerSocket; 126import org.apache.thrift.transport.TNonblockingServerTransport; 127import org.apache.thrift.transport.TSaslServerTransport; 128import org.apache.thrift.transport.TServerSocket; 129import org.apache.thrift.transport.TServerTransport; 130import org.apache.thrift.transport.TTransportFactory; 131import org.apache.yetus.audience.InterfaceAudience; 132import org.eclipse.jetty.http.HttpVersion; 133import org.eclipse.jetty.server.HttpConfiguration; 134import org.eclipse.jetty.server.HttpConnectionFactory; 135import org.eclipse.jetty.server.SecureRequestCustomizer; 136import org.eclipse.jetty.server.Server; 137import org.eclipse.jetty.server.ServerConnector; 138import org.eclipse.jetty.server.SslConnectionFactory; 139import org.eclipse.jetty.servlet.ServletContextHandler; 140import org.eclipse.jetty.servlet.ServletHolder; 141import org.eclipse.jetty.util.ssl.SslContextFactory; 142import org.eclipse.jetty.util.thread.QueuedThreadPool; 143import org.slf4j.Logger; 144import org.slf4j.LoggerFactory; 145 146import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 147import org.apache.hbase.thirdparty.com.google.common.base.Joiner; 148import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 149import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 150import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 151import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser; 152import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser; 153import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; 154import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; 155 156/** 157 * ThriftServer- this class starts up a Thrift server which implements the 158 * Hbase API specified in the Hbase.thrift IDL file. The server runs in an 159 * independent process. 160 */ 161@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 162public class ThriftServer extends Configured implements Tool { 163 164 private static final Logger LOG = LoggerFactory.getLogger(ThriftServer.class); 165 166 167 168 protected Configuration conf; 169 170 protected InfoServer infoServer; 171 172 protected TProcessor processor; 173 174 protected ThriftMetrics metrics; 175 protected HBaseServiceHandler hbaseServiceHandler; 176 protected UserGroupInformation serviceUGI; 177 protected UserGroupInformation httpUGI; 178 protected boolean httpEnabled; 179 180 protected SaslUtil.QualityOfProtection qop; 181 protected String host; 182 protected int listenPort; 183 184 185 protected boolean securityEnabled; 186 protected boolean doAsEnabled; 187 188 protected JvmPauseMonitor pauseMonitor; 189 190 protected volatile TServer tserver; 191 protected volatile Server httpServer; 192 193 194 // 195 // Main program and support routines 196 // 197 198 public ThriftServer(Configuration conf) { 199 this.conf = HBaseConfiguration.create(conf); 200 } 201 202 protected ThriftMetrics createThriftMetrics(Configuration conf) { 203 return new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE); 204 } 205 206 protected void setupParamters() throws IOException { 207 // login the server principal (if using secure Hadoop) 208 UserProvider userProvider = UserProvider.instantiate(conf); 209 securityEnabled = userProvider.isHadoopSecurityEnabled() 210 && userProvider.isHBaseSecurityEnabled(); 211 if (securityEnabled) { 212 host = Strings.domainNamePointerToHostName(DNS.getDefaultHost( 213 conf.get(THRIFT_DNS_INTERFACE_KEY, "default"), 214 conf.get(THRIFT_DNS_NAMESERVER_KEY, "default"))); 215 userProvider.login(THRIFT_KEYTAB_FILE_KEY, THRIFT_KERBEROS_PRINCIPAL_KEY, host); 216 217 // Setup the SPNEGO user for HTTP if configured 218 String spnegoPrincipal = getSpengoPrincipal(conf, host); 219 String spnegoKeytab = getSpnegoKeytab(conf); 220 UserGroupInformation.setConfiguration(conf); 221 // login the SPNEGO principal using UGI to avoid polluting the login user 222 this.httpUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(spnegoPrincipal, 223 spnegoKeytab); 224 } 225 this.serviceUGI = userProvider.getCurrent().getUGI(); 226 if (httpUGI == null) { 227 this.httpUGI = serviceUGI; 228 } 229 230 this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT); 231 this.metrics = createThriftMetrics(conf); 232 this.pauseMonitor = new JvmPauseMonitor(conf, this.metrics.getSource()); 233 this.hbaseServiceHandler = createHandler(conf, userProvider); 234 this.hbaseServiceHandler.initMetrics(metrics); 235 this.processor = createProcessor(); 236 237 httpEnabled = conf.getBoolean(USE_HTTP_CONF_KEY, false); 238 doAsEnabled = conf.getBoolean(THRIFT_SUPPORT_PROXYUSER_KEY, false); 239 if (doAsEnabled && !httpEnabled) { 240 LOG.warn("Fail to enable the doAs feature. " + USE_HTTP_CONF_KEY + " is not configured"); 241 } 242 243 String strQop = conf.get(THRIFT_QOP_KEY); 244 if (strQop != null) { 245 this.qop = SaslUtil.getQop(strQop); 246 } 247 if (qop != null) { 248 if (qop != SaslUtil.QualityOfProtection.AUTHENTICATION && 249 qop != SaslUtil.QualityOfProtection.INTEGRITY && 250 qop != SaslUtil.QualityOfProtection.PRIVACY) { 251 throw new IOException(String.format("Invalid %s: It must be one of %s, %s, or %s.", 252 THRIFT_QOP_KEY, 253 SaslUtil.QualityOfProtection.AUTHENTICATION.name(), 254 SaslUtil.QualityOfProtection.INTEGRITY.name(), 255 SaslUtil.QualityOfProtection.PRIVACY.name())); 256 } 257 checkHttpSecurity(qop, conf); 258 if (!securityEnabled) { 259 throw new IOException("Thrift server must run in secure mode to support authentication"); 260 } 261 } 262 registerFilters(conf); 263 pauseMonitor.start(); 264 } 265 266 private String getSpengoPrincipal(Configuration conf, String host) throws IOException { 267 String principal = conf.get(THRIFT_SPNEGO_PRINCIPAL_KEY); 268 if (principal == null) { 269 // We cannot use the Hadoop configuration deprecation handling here since 270 // the THRIFT_KERBEROS_PRINCIPAL_KEY config is still valid for regular Kerberos 271 // communication. The preference should be to use the THRIFT_SPNEGO_PRINCIPAL_KEY 272 // config so that THRIFT_KERBEROS_PRINCIPAL_KEY doesn't control both backend 273 // Kerberos principal and SPNEGO principal. 274 LOG.info("Using deprecated {} config for SPNEGO principal. Use {} instead.", 275 THRIFT_KERBEROS_PRINCIPAL_KEY, THRIFT_SPNEGO_PRINCIPAL_KEY); 276 principal = conf.get(THRIFT_KERBEROS_PRINCIPAL_KEY); 277 } 278 // Handle _HOST in principal value 279 return org.apache.hadoop.security.SecurityUtil.getServerPrincipal(principal, host); 280 } 281 282 private String getSpnegoKeytab(Configuration conf) { 283 String keytab = conf.get(THRIFT_SPNEGO_KEYTAB_FILE_KEY); 284 if (keytab == null) { 285 // We cannot use the Hadoop configuration deprecation handling here since 286 // the THRIFT_KEYTAB_FILE_KEY config is still valid for regular Kerberos 287 // communication. The preference should be to use the THRIFT_SPNEGO_KEYTAB_FILE_KEY 288 // config so that THRIFT_KEYTAB_FILE_KEY doesn't control both backend 289 // Kerberos keytab and SPNEGO keytab. 290 LOG.info("Using deprecated {} config for SPNEGO keytab. Use {} instead.", 291 THRIFT_KEYTAB_FILE_KEY, THRIFT_SPNEGO_KEYTAB_FILE_KEY); 292 keytab = conf.get(THRIFT_KEYTAB_FILE_KEY); 293 } 294 return keytab; 295 } 296 297 protected void startInfoServer() throws IOException { 298 // Put up info server. 299 int port = conf.getInt(THRIFT_INFO_SERVER_PORT , THRIFT_INFO_SERVER_PORT_DEFAULT); 300 301 if (port >= 0) { 302 conf.setLong("startcode", System.currentTimeMillis()); 303 String a = conf 304 .get(THRIFT_INFO_SERVER_BINDING_ADDRESS, THRIFT_INFO_SERVER_BINDING_ADDRESS_DEFAULT); 305 infoServer = new InfoServer("thrift", a, port, false, conf); 306 infoServer.setAttribute("hbase.conf", conf); 307 infoServer.setAttribute("hbase.thrift.server.type", metrics.getThriftServerType().name()); 308 infoServer.start(); 309 } 310 } 311 312 protected void checkHttpSecurity(SaslUtil.QualityOfProtection qop, Configuration conf) { 313 if (qop == SaslUtil.QualityOfProtection.PRIVACY && 314 conf.getBoolean(USE_HTTP_CONF_KEY, false) && 315 !conf.getBoolean(THRIFT_SSL_ENABLED_KEY, false)) { 316 throw new IllegalArgumentException("Thrift HTTP Server's QoP is privacy, but " + 317 THRIFT_SSL_ENABLED_KEY + " is false"); 318 } 319 } 320 321 protected HBaseServiceHandler createHandler(Configuration conf, UserProvider userProvider) 322 throws IOException { 323 return new ThriftHBaseServiceHandler(conf, userProvider); 324 } 325 326 protected TProcessor createProcessor() { 327 return new Hbase.Processor<>( 328 HbaseHandlerMetricsProxy.newInstance((Hbase.Iface) hbaseServiceHandler, metrics, conf)); 329 } 330 331 /** 332 * the thrift server, not null means the server is started, for test only 333 * @return the tServer 334 */ 335 @VisibleForTesting 336 public TServer getTserver() { 337 return tserver; 338 } 339 340 /** 341 * the Jetty server, not null means the HTTP server is started, for test only 342 * @return the http server 343 */ 344 @VisibleForTesting 345 public Server getHttpServer() { 346 return httpServer; 347 } 348 349 protected void printUsageAndExit(Options options, int exitCode) 350 throws ExitCodeException { 351 HelpFormatter formatter = new HelpFormatter(); 352 formatter.printHelp("Thrift", null, options, 353 "To start the Thrift server run 'hbase-daemon.sh start thrift' or " + 354 "'hbase thrift'\n" + 355 "To shutdown the thrift server run 'hbase-daemon.sh stop " + 356 "thrift' or send a kill signal to the thrift server pid", 357 true); 358 throw new ExitCodeException(exitCode, ""); 359 } 360 361 /** 362 * Create a Servlet for the http server 363 * @param protocolFactory protocolFactory 364 * @return the servlet 365 */ 366 protected TServlet createTServlet(TProtocolFactory protocolFactory) { 367 return new ThriftHttpServlet(processor, protocolFactory, serviceUGI, httpUGI, 368 hbaseServiceHandler, securityEnabled, doAsEnabled); 369 } 370 371 /** 372 * Setup an HTTP Server using Jetty to serve calls from THttpClient 373 * 374 * @throws IOException IOException 375 */ 376 protected void setupHTTPServer() throws IOException { 377 TProtocolFactory protocolFactory = new TBinaryProtocol.Factory(); 378 TServlet thriftHttpServlet = createTServlet(protocolFactory); 379 380 // Set the default max thread number to 100 to limit 381 // the number of concurrent requests so that Thrfit HTTP server doesn't OOM easily. 382 // Jetty set the default max thread number to 250, if we don't set it. 383 // 384 // Our default min thread number 2 is the same as that used by Jetty. 385 int minThreads = conf.getInt(HTTP_MIN_THREADS_KEY, 386 conf.getInt(TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY, 387 HTTP_MIN_THREADS_KEY_DEFAULT)); 388 int maxThreads = conf.getInt(HTTP_MAX_THREADS_KEY, 389 conf.getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY, 390 HTTP_MAX_THREADS_KEY_DEFAULT)); 391 QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads); 392 threadPool.setMinThreads(minThreads); 393 httpServer = new Server(threadPool); 394 395 // Context handler 396 ServletContextHandler ctxHandler = new ServletContextHandler(httpServer, "/", 397 ServletContextHandler.SESSIONS); 398 ctxHandler.addServlet(new ServletHolder(thriftHttpServlet), "/*"); 399 HttpServerUtil.constrainHttpMethods(ctxHandler, 400 conf.getBoolean(THRIFT_HTTP_ALLOW_OPTIONS_METHOD, 401 THRIFT_HTTP_ALLOW_OPTIONS_METHOD_DEFAULT)); 402 403 // set up Jetty and run the embedded server 404 HttpConfiguration httpConfig = new HttpConfiguration(); 405 httpConfig.setSecureScheme("https"); 406 httpConfig.setSecurePort(listenPort); 407 httpConfig.setHeaderCacheSize(DEFAULT_HTTP_MAX_HEADER_SIZE); 408 httpConfig.setRequestHeaderSize(DEFAULT_HTTP_MAX_HEADER_SIZE); 409 httpConfig.setResponseHeaderSize(DEFAULT_HTTP_MAX_HEADER_SIZE); 410 httpConfig.setSendServerVersion(false); 411 httpConfig.setSendDateHeader(false); 412 413 ServerConnector serverConnector; 414 if(conf.getBoolean(THRIFT_SSL_ENABLED_KEY, false)) { 415 HttpConfiguration httpsConfig = new HttpConfiguration(httpConfig); 416 httpsConfig.addCustomizer(new SecureRequestCustomizer()); 417 418 SslContextFactory sslCtxFactory = new SslContextFactory(); 419 String keystore = conf.get(THRIFT_SSL_KEYSTORE_STORE_KEY); 420 String password = HBaseConfiguration.getPassword(conf, 421 THRIFT_SSL_KEYSTORE_PASSWORD_KEY, null); 422 String keyPassword = HBaseConfiguration.getPassword(conf, 423 THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY, password); 424 sslCtxFactory.setKeyStorePath(keystore); 425 sslCtxFactory.setKeyStorePassword(password); 426 sslCtxFactory.setKeyManagerPassword(keyPassword); 427 428 String[] excludeCiphers = conf.getStrings( 429 THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY, ArrayUtils.EMPTY_STRING_ARRAY); 430 if (excludeCiphers.length != 0) { 431 sslCtxFactory.setExcludeCipherSuites(excludeCiphers); 432 } 433 String[] includeCiphers = conf.getStrings( 434 THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY, ArrayUtils.EMPTY_STRING_ARRAY); 435 if (includeCiphers.length != 0) { 436 sslCtxFactory.setIncludeCipherSuites(includeCiphers); 437 } 438 439 // Disable SSLv3 by default due to "Poodle" Vulnerability - CVE-2014-3566 440 String[] excludeProtocols = conf.getStrings( 441 THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY, "SSLv3"); 442 if (excludeProtocols.length != 0) { 443 sslCtxFactory.setExcludeProtocols(excludeProtocols); 444 } 445 String[] includeProtocols = conf.getStrings( 446 THRIFT_SSL_INCLUDE_PROTOCOLS_KEY, ArrayUtils.EMPTY_STRING_ARRAY); 447 if (includeProtocols.length != 0) { 448 sslCtxFactory.setIncludeProtocols(includeProtocols); 449 } 450 451 serverConnector = new ServerConnector(httpServer, 452 new SslConnectionFactory(sslCtxFactory, HttpVersion.HTTP_1_1.toString()), 453 new HttpConnectionFactory(httpsConfig)); 454 } else { 455 serverConnector = new ServerConnector(httpServer, new HttpConnectionFactory(httpConfig)); 456 } 457 serverConnector.setPort(listenPort); 458 serverConnector.setHost(getBindAddress(conf).getHostAddress()); 459 httpServer.addConnector(serverConnector); 460 httpServer.setStopAtShutdown(true); 461 462 if (doAsEnabled) { 463 ProxyUsers.refreshSuperUserGroupsConfiguration(conf); 464 } 465 LOG.info("Starting Thrift HTTP Server on {}", Integer.toString(listenPort)); 466 } 467 468 /** 469 * Setting up the thrift TServer 470 */ 471 protected void setupServer() throws Exception { 472 // Construct correct ProtocolFactory 473 TProtocolFactory protocolFactory = getProtocolFactory(); 474 475 ImplType implType = ImplType.getServerImpl(conf); 476 TProcessor processorToUse = processor; 477 478 // Construct correct TransportFactory 479 TTransportFactory transportFactory; 480 if (conf.getBoolean(FRAMED_CONF_KEY, FRAMED_CONF_DEFAULT) || implType.isAlwaysFramed) { 481 if (qop != null) { 482 throw new RuntimeException("Thrift server authentication" 483 + " doesn't work with framed transport yet"); 484 } 485 transportFactory = new TFramedTransport.Factory( 486 conf.getInt(MAX_FRAME_SIZE_CONF_KEY, MAX_FRAME_SIZE_CONF_DEFAULT) * 1024 * 1024); 487 LOG.debug("Using framed transport"); 488 } else if (qop == null) { 489 transportFactory = new TTransportFactory(); 490 } else { 491 // Extract the name from the principal 492 String thriftKerberosPrincipal = conf.get(THRIFT_KERBEROS_PRINCIPAL_KEY); 493 if (thriftKerberosPrincipal == null) { 494 throw new IllegalArgumentException(THRIFT_KERBEROS_PRINCIPAL_KEY + " cannot be null"); 495 } 496 String name = SecurityUtil.getUserFromPrincipal(thriftKerberosPrincipal); 497 Map<String, String> saslProperties = SaslUtil.initSaslProperties(qop.name()); 498 TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory(); 499 saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties, 500 new SaslRpcServer.SaslGssCallbackHandler() { 501 @Override 502 public void handle(Callback[] callbacks) 503 throws UnsupportedCallbackException { 504 AuthorizeCallback ac = null; 505 for (Callback callback : callbacks) { 506 if (callback instanceof AuthorizeCallback) { 507 ac = (AuthorizeCallback) callback; 508 } else { 509 throw new UnsupportedCallbackException(callback, 510 "Unrecognized SASL GSSAPI Callback"); 511 } 512 } 513 if (ac != null) { 514 String authid = ac.getAuthenticationID(); 515 String authzid = ac.getAuthorizationID(); 516 if (!authid.equals(authzid)) { 517 ac.setAuthorized(false); 518 } else { 519 ac.setAuthorized(true); 520 String userName = SecurityUtil.getUserFromPrincipal(authzid); 521 LOG.info("Effective user: {}", userName); 522 ac.setAuthorizedID(userName); 523 } 524 } 525 } 526 }); 527 transportFactory = saslFactory; 528 529 // Create a processor wrapper, to get the caller 530 processorToUse = (inProt, outProt) -> { 531 TSaslServerTransport saslServerTransport = 532 (TSaslServerTransport)inProt.getTransport(); 533 SaslServer saslServer = saslServerTransport.getSaslServer(); 534 String principal = saslServer.getAuthorizationID(); 535 hbaseServiceHandler.setEffectiveUser(principal); 536 processor.process(inProt, outProt); 537 }; 538 } 539 540 if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) { 541 LOG.error("Server types {} don't support IP address binding at the moment. See " + 542 "https://issues.apache.org/jira/browse/HBASE-2155 for details.", 543 Joiner.on(", ").join(ImplType.serversThatCannotSpecifyBindIP())); 544 throw new RuntimeException("-" + BIND_CONF_KEY + " not supported with " + implType); 545 } 546 547 InetSocketAddress inetSocketAddress = new InetSocketAddress(getBindAddress(conf), listenPort); 548 if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING || 549 implType == ImplType.THREADED_SELECTOR) { 550 TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress); 551 if (implType == ImplType.NONBLOCKING) { 552 tserver = getTNonBlockingServer(serverTransport, protocolFactory, processorToUse, 553 transportFactory, inetSocketAddress); 554 } else if (implType == ImplType.HS_HA) { 555 tserver = getTHsHaServer(serverTransport, protocolFactory, processorToUse, transportFactory, 556 inetSocketAddress); 557 } else { // THREADED_SELECTOR 558 tserver = getTThreadedSelectorServer(serverTransport, protocolFactory, processorToUse, 559 transportFactory, inetSocketAddress); 560 } 561 LOG.info("starting HBase {} server on {}", implType.simpleClassName(), 562 Integer.toString(listenPort)); 563 } else if (implType == ImplType.THREAD_POOL) { 564 this.tserver = getTThreadPoolServer(protocolFactory, processorToUse, transportFactory, 565 inetSocketAddress); 566 } else { 567 throw new AssertionError("Unsupported Thrift server implementation: " + 568 implType.simpleClassName()); 569 } 570 571 // A sanity check that we instantiated the right type of server. 572 if (tserver.getClass() != implType.serverClass) { 573 throw new AssertionError("Expected to create Thrift server class " + 574 implType.serverClass.getName() + " but got " + 575 tserver.getClass().getName()); 576 } 577 } 578 579 protected TServer getTNonBlockingServer(TNonblockingServerTransport serverTransport, 580 TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory, 581 InetSocketAddress inetSocketAddress) { 582 LOG.info("starting HBase Nonblocking Thrift server on " + inetSocketAddress.toString()); 583 TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport); 584 serverArgs.processor(processor); 585 serverArgs.transportFactory(transportFactory); 586 serverArgs.protocolFactory(protocolFactory); 587 return new TNonblockingServer(serverArgs); 588 } 589 590 protected TServer getTHsHaServer(TNonblockingServerTransport serverTransport, 591 TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory, 592 InetSocketAddress inetSocketAddress) { 593 LOG.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString()); 594 THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport); 595 int queueSize = conf.getInt(TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY, 596 TBoundedThreadPoolServer.DEFAULT_MAX_QUEUED_REQUESTS); 597 CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(queueSize), metrics); 598 int workerThread = conf.getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY, 599 serverArgs.getMaxWorkerThreads()); 600 ExecutorService executorService = createExecutor( 601 callQueue, workerThread, workerThread); 602 serverArgs.executorService(executorService).processor(processor) 603 .transportFactory(transportFactory).protocolFactory(protocolFactory); 604 return new THsHaServer(serverArgs); 605 } 606 607 protected TServer getTThreadedSelectorServer(TNonblockingServerTransport serverTransport, 608 TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory, 609 InetSocketAddress inetSocketAddress) { 610 LOG.info("starting HBase ThreadedSelector Thrift server on " + inetSocketAddress.toString()); 611 TThreadedSelectorServer.Args serverArgs = 612 new HThreadedSelectorServerArgs(serverTransport, conf); 613 int queueSize = conf.getInt(TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY, 614 TBoundedThreadPoolServer.DEFAULT_MAX_QUEUED_REQUESTS); 615 CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(queueSize), metrics); 616 int workerThreads = conf.getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY, 617 serverArgs.getWorkerThreads()); 618 int selectorThreads = conf.getInt(THRIFT_SELECTOR_NUM, serverArgs.getSelectorThreads()); 619 serverArgs.selectorThreads(selectorThreads); 620 ExecutorService executorService = createExecutor( 621 callQueue, workerThreads, workerThreads); 622 serverArgs.executorService(executorService).processor(processor) 623 .transportFactory(transportFactory).protocolFactory(protocolFactory); 624 return new TThreadedSelectorServer(serverArgs); 625 } 626 627 protected TServer getTThreadPoolServer(TProtocolFactory protocolFactory, TProcessor processor, 628 TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws Exception { 629 LOG.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString()); 630 // Thrift's implementation uses '0' as a placeholder for 'use the default.' 631 int backlog = conf.getInt(BACKLOG_CONF_KEY, BACKLOG_CONF_DEAFULT); 632 int readTimeout = conf.getInt(THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY, 633 THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT); 634 TServerTransport serverTransport = new TServerSocket( 635 new TServerSocket.ServerSocketTransportArgs(). 636 bindAddr(inetSocketAddress).backlog(backlog). 637 clientTimeout(readTimeout)); 638 639 TBoundedThreadPoolServer.Args serverArgs = 640 new TBoundedThreadPoolServer.Args(serverTransport, conf); 641 serverArgs.processor(processor).transportFactory(transportFactory) 642 .protocolFactory(protocolFactory); 643 return new TBoundedThreadPoolServer(serverArgs, metrics); 644 } 645 646 protected TProtocolFactory getProtocolFactory() { 647 TProtocolFactory protocolFactory; 648 649 if (conf.getBoolean(COMPACT_CONF_KEY, COMPACT_CONF_DEFAULT)) { 650 LOG.debug("Using compact protocol"); 651 protocolFactory = new TCompactProtocol.Factory(); 652 } else { 653 LOG.debug("Using binary protocol"); 654 protocolFactory = new TBinaryProtocol.Factory(); 655 } 656 657 return protocolFactory; 658 } 659 660 protected ExecutorService createExecutor(BlockingQueue<Runnable> callQueue, 661 int minWorkers, int maxWorkers) { 662 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); 663 tfb.setDaemon(true); 664 tfb.setNameFormat("thrift-worker-%d"); 665 ThreadPoolExecutor threadPool = new THBaseThreadPoolExecutor(minWorkers, maxWorkers, 666 Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build(), metrics); 667 threadPool.allowCoreThreadTimeOut(true); 668 return threadPool; 669 } 670 671 protected InetAddress getBindAddress(Configuration conf) 672 throws UnknownHostException { 673 String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR); 674 return InetAddress.getByName(bindAddressStr); 675 } 676 677 678 public static void registerFilters(Configuration conf) { 679 String[] filters = conf.getStrings(THRIFT_FILTERS); 680 Splitter splitter = Splitter.on(':'); 681 if(filters != null) { 682 for(String filterClass: filters) { 683 List<String> filterPart = splitter.splitToList(filterClass); 684 if(filterPart.size() != 2) { 685 LOG.warn("Invalid filter specification " + filterClass + " - skipping"); 686 } else { 687 ParseFilter.registerFilter(filterPart.get(0), filterPart.get(1)); 688 } 689 } 690 } 691 } 692 693 /** 694 * Add options to command lines 695 * @param options options 696 */ 697 protected void addOptions(Options options) { 698 options.addOption("b", BIND_OPTION, true, "Address to bind " + 699 "the Thrift server to. [default: " + DEFAULT_BIND_ADDR + "]"); 700 options.addOption("p", PORT_OPTION, true, "Port to bind to [default: " + 701 DEFAULT_LISTEN_PORT + "]"); 702 options.addOption("f", FRAMED_OPTION, false, "Use framed transport"); 703 options.addOption("c", COMPACT_OPTION, false, "Use the compact protocol"); 704 options.addOption("h", "help", false, "Print help information"); 705 options.addOption("s", SELECTOR_NUM_OPTION, true, "How many selector threads to use."); 706 options.addOption(null, INFOPORT_OPTION, true, "Port for web UI"); 707 708 options.addOption("m", MIN_WORKERS_OPTION, true, 709 "The minimum number of worker threads for " + 710 ImplType.THREAD_POOL.simpleClassName()); 711 712 options.addOption("w", MAX_WORKERS_OPTION, true, 713 "The maximum number of worker threads for " + 714 ImplType.THREAD_POOL.simpleClassName()); 715 716 options.addOption("q", MAX_QUEUE_SIZE_OPTION, true, 717 "The maximum number of queued requests in " + 718 ImplType.THREAD_POOL.simpleClassName()); 719 720 options.addOption("k", KEEP_ALIVE_SEC_OPTION, true, 721 "The amount of time in secods to keep a thread alive when idle in " + 722 ImplType.THREAD_POOL.simpleClassName()); 723 724 options.addOption("t", READ_TIMEOUT_OPTION, true, 725 "Amount of time in milliseconds before a server thread will timeout " + 726 "waiting for client to send data on a connected socket. Currently, " + 727 "only applies to TBoundedThreadPoolServer"); 728 729 options.addOptionGroup(ImplType.createOptionGroup()); 730 } 731 732 protected void parseCommandLine(CommandLine cmd, Options options) throws ExitCodeException { 733 // Get port to bind to 734 try { 735 if (cmd.hasOption(PORT_OPTION)) { 736 int listenPort = Integer.parseInt(cmd.getOptionValue(PORT_OPTION)); 737 conf.setInt(PORT_CONF_KEY, listenPort); 738 } 739 } catch (NumberFormatException e) { 740 LOG.error("Could not parse the value provided for the port option", e); 741 printUsageAndExit(options, -1); 742 } 743 // check for user-defined info server port setting, if so override the conf 744 try { 745 if (cmd.hasOption(INFOPORT_OPTION)) { 746 String val = cmd.getOptionValue(INFOPORT_OPTION); 747 conf.setInt(THRIFT_INFO_SERVER_PORT, Integer.parseInt(val)); 748 LOG.debug("Web UI port set to " + val); 749 } 750 } catch (NumberFormatException e) { 751 LOG.error("Could not parse the value provided for the " + INFOPORT_OPTION + 752 " option", e); 753 printUsageAndExit(options, -1); 754 } 755 // Make optional changes to the configuration based on command-line options 756 optionToConf(cmd, MIN_WORKERS_OPTION, 757 conf, TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY); 758 optionToConf(cmd, MAX_WORKERS_OPTION, 759 conf, TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY); 760 optionToConf(cmd, MAX_QUEUE_SIZE_OPTION, 761 conf, TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY); 762 optionToConf(cmd, KEEP_ALIVE_SEC_OPTION, 763 conf, TBoundedThreadPoolServer.THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY); 764 optionToConf(cmd, READ_TIMEOUT_OPTION, conf, THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY); 765 optionToConf(cmd, SELECTOR_NUM_OPTION, conf, THRIFT_SELECTOR_NUM); 766 767 // Set general thrift server options 768 boolean compact = cmd.hasOption(COMPACT_OPTION) || 769 conf.getBoolean(COMPACT_CONF_KEY, false); 770 conf.setBoolean(COMPACT_CONF_KEY, compact); 771 boolean framed = cmd.hasOption(FRAMED_OPTION) || 772 conf.getBoolean(FRAMED_CONF_KEY, false); 773 conf.setBoolean(FRAMED_CONF_KEY, framed); 774 775 optionToConf(cmd, BIND_OPTION, conf, BIND_CONF_KEY); 776 777 778 ImplType.setServerImpl(cmd, conf); 779 } 780 781 /** 782 * Parse the command line options to set parameters the conf. 783 */ 784 protected void processOptions(final String[] args) throws Exception { 785 if (args == null || args.length == 0) { 786 return; 787 } 788 Options options = new Options(); 789 addOptions(options); 790 791 CommandLineParser parser = new DefaultParser(); 792 CommandLine cmd = parser.parse(options, args); 793 794 if (cmd.hasOption("help")) { 795 printUsageAndExit(options, 1); 796 } 797 parseCommandLine(cmd, options); 798 } 799 800 public void stop() { 801 if (this.infoServer != null) { 802 LOG.info("Stopping infoServer"); 803 try { 804 this.infoServer.stop(); 805 } catch (Exception ex) { 806 LOG.error("Failed to stop infoServer", ex); 807 } 808 } 809 if (pauseMonitor != null) { 810 pauseMonitor.stop(); 811 } 812 if (tserver != null) { 813 tserver.stop(); 814 tserver = null; 815 } 816 if (httpServer != null) { 817 try { 818 httpServer.stop(); 819 httpServer = null; 820 } catch (Exception e) { 821 LOG.error("Problem encountered in shutting down HTTP server", e); 822 } 823 httpServer = null; 824 } 825 } 826 827 protected static void optionToConf(CommandLine cmd, String option, 828 Configuration conf, String destConfKey) { 829 if (cmd.hasOption(option)) { 830 String value = cmd.getOptionValue(option); 831 LOG.info("Set configuration key:" + destConfKey + " value:" + value); 832 conf.set(destConfKey, value); 833 } 834 } 835 836 /** 837 * Run without any command line arguments 838 * @return exit code 839 * @throws Exception exception 840 */ 841 public int run() throws Exception { 842 return run(null); 843 } 844 845 @Override 846 public int run(String[] strings) throws Exception { 847 processOptions(strings); 848 setupParamters(); 849 startInfoServer(); 850 if (httpEnabled) { 851 setupHTTPServer(); 852 httpServer.start(); 853 httpServer.join(); 854 } else { 855 setupServer(); 856 tserver.serve(); 857 } 858 return 0; 859 } 860 861 public static void main(String [] args) throws Exception { 862 LOG.info("***** STARTING service '" + ThriftServer.class.getSimpleName() + "' *****"); 863 VersionInfo.logVersion(); 864 final Configuration conf = HBaseConfiguration.create(); 865 // for now, only time we return is on an argument error. 866 final int status = ToolRunner.run(conf, new ThriftServer(conf), args); 867 LOG.info("***** STOPPING service '" + ThriftServer.class.getSimpleName() + "' *****"); 868 System.exit(status); 869 } 870}