001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.thrift2; 020 021import java.io.IOException; 022import java.net.InetAddress; 023import java.net.InetSocketAddress; 024import java.net.UnknownHostException; 025import java.security.PrivilegedAction; 026import java.util.Map; 027import java.util.concurrent.ExecutorService; 028import java.util.concurrent.LinkedBlockingQueue; 029import java.util.concurrent.SynchronousQueue; 030import java.util.concurrent.ThreadPoolExecutor; 031import java.util.concurrent.TimeUnit; 032 033import javax.security.auth.callback.Callback; 034import javax.security.auth.callback.UnsupportedCallbackException; 035import javax.security.sasl.AuthorizeCallback; 036import javax.security.sasl.SaslServer; 037 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.conf.Configured; 040import org.apache.hadoop.hbase.HBaseConfiguration; 041import org.apache.hadoop.hbase.HBaseInterfaceAudience; 042import org.apache.hadoop.hbase.filter.ParseFilter; 043import org.apache.hadoop.hbase.http.InfoServer; 044import org.apache.hadoop.hbase.security.SaslUtil; 045import org.apache.hadoop.hbase.security.SecurityUtil; 046import org.apache.hadoop.hbase.security.UserProvider; 047import org.apache.hadoop.hbase.thrift.CallQueue; 048import org.apache.hadoop.hbase.thrift.THBaseThreadPoolExecutor; 049import org.apache.hadoop.hbase.thrift.ThriftMetrics; 050import org.apache.hadoop.hbase.thrift2.generated.THBaseService; 051import org.apache.hadoop.hbase.util.DNS; 052import org.apache.hadoop.hbase.util.JvmPauseMonitor; 053import org.apache.hadoop.hbase.util.Strings; 054import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler; 055import org.apache.hadoop.security.UserGroupInformation; 056import org.apache.hadoop.util.Tool; 057import org.apache.hadoop.util.ToolRunner; 058import org.apache.thrift.TException; 059import org.apache.thrift.TProcessor; 060import org.apache.thrift.protocol.TBinaryProtocol; 061import org.apache.thrift.protocol.TCompactProtocol; 062import org.apache.thrift.protocol.TProtocol; 063import org.apache.thrift.protocol.TProtocolFactory; 064import org.apache.thrift.server.THsHaServer; 065import org.apache.thrift.server.TNonblockingServer; 066import org.apache.thrift.server.TServer; 067import org.apache.thrift.server.TThreadPoolServer; 068import org.apache.thrift.server.TThreadedSelectorServer; 069import org.apache.thrift.transport.TFramedTransport; 070import org.apache.thrift.transport.TNonblockingServerSocket; 071import org.apache.thrift.transport.TNonblockingServerTransport; 072import org.apache.thrift.transport.TSaslServerTransport; 073import org.apache.thrift.transport.TServerSocket; 074import org.apache.thrift.transport.TServerTransport; 075import org.apache.thrift.transport.TTransportException; 076import org.apache.thrift.transport.TTransportFactory; 077import org.apache.yetus.audience.InterfaceAudience; 078import org.slf4j.Logger; 079import org.slf4j.LoggerFactory; 080import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 081import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 082import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser; 083import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser; 084import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; 085import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; 086import org.apache.hbase.thirdparty.org.apache.commons.cli.OptionGroup; 087import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; 088import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException; 089 090/** 091 * ThriftServer - this class starts up a Thrift server which implements the HBase API specified in the 092 * HbaseClient.thrift IDL file. 093 */ 094@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 095@SuppressWarnings({ "rawtypes", "unchecked" }) 096public class ThriftServer extends Configured implements Tool { 097 private static final Logger log = LoggerFactory.getLogger(ThriftServer.class); 098 099 /** 100 * Thrift quality of protection configuration key. Valid values can be: 101 * privacy: authentication, integrity and confidentiality checking 102 * integrity: authentication and integrity checking 103 * authentication: authentication only 104 * 105 * This is used to authenticate the callers and support impersonation. 106 * The thrift server and the HBase cluster must run in secure mode. 107 */ 108 static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop"; 109 110 static final String BACKLOG_CONF_KEY = "hbase.regionserver.thrift.backlog"; 111 112 public static final int DEFAULT_LISTEN_PORT = 9090; 113 114 private static final String READ_TIMEOUT_OPTION = "readTimeout"; 115 116 /** 117 * Amount of time in milliseconds before a server thread will timeout 118 * waiting for client to send data on a connected socket. Currently, 119 * applies only to TBoundedThreadPoolServer 120 */ 121 public static final String THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY = 122 "hbase.thrift.server.socket.read.timeout"; 123 public static final int THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT = 60000; 124 125 public ThriftServer() { 126 } 127 128 private static void printUsage() { 129 HelpFormatter formatter = new HelpFormatter(); 130 formatter.printHelp("Thrift", null, getOptions(), 131 "To start the Thrift server run 'hbase-daemon.sh start thrift2' or " + 132 "'hbase thrift2'\n" + 133 "To shutdown the thrift server run 'hbase-daemon.sh stop thrift2' or" + 134 " send a kill signal to the thrift server pid", 135 true); 136 } 137 138 private static Options getOptions() { 139 Options options = new Options(); 140 options.addOption("b", "bind", true, 141 "Address to bind the Thrift server to. [default: 0.0.0.0]"); 142 options.addOption("p", "port", true, "Port to bind to [default: " + DEFAULT_LISTEN_PORT + "]"); 143 options.addOption("f", "framed", false, "Use framed transport"); 144 options.addOption("c", "compact", false, "Use the compact protocol"); 145 options.addOption("w", "workers", true, "How many worker threads to use."); 146 options.addOption("s", "selectors", true, "How many selector threads to use."); 147 options.addOption("q", "callQueueSize", true, 148 "Max size of request queue (unbounded by default)"); 149 options.addOption("h", "help", false, "Print help information"); 150 options.addOption(null, "infoport", true, "Port for web UI"); 151 options.addOption("t", READ_TIMEOUT_OPTION, true, 152 "Amount of time in milliseconds before a server thread will timeout " + 153 "waiting for client to send data on a connected socket. Currently, " + 154 "only applies to TBoundedThreadPoolServer"); 155 options.addOption("ro", "readonly", false, 156 "Respond only to read method requests [default: false]"); 157 OptionGroup servers = new OptionGroup(); 158 servers.addOption( 159 new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport.")); 160 servers.addOption(new Option("hsha", false, "Use the THsHaServer. This implies the framed transport.")); 161 servers.addOption(new Option("selector", false, "Use the TThreadedSelectorServer. This implies the framed transport.")); 162 servers.addOption(new Option("threadpool", false, "Use the TThreadPoolServer. This is the default.")); 163 options.addOptionGroup(servers); 164 return options; 165 } 166 167 private static CommandLine parseArguments(Configuration conf, Options options, String[] args) 168 throws ParseException, IOException { 169 CommandLineParser parser = new DefaultParser(); 170 return parser.parse(options, args); 171 } 172 173 private static TProtocolFactory getTProtocolFactory(boolean isCompact) { 174 if (isCompact) { 175 log.debug("Using compact protocol"); 176 return new TCompactProtocol.Factory(); 177 } else { 178 log.debug("Using binary protocol"); 179 return new TBinaryProtocol.Factory(); 180 } 181 } 182 183 private static TTransportFactory getTTransportFactory( 184 SaslUtil.QualityOfProtection qop, String name, String host, 185 boolean framed, int frameSize) { 186 if (framed) { 187 if (qop != null) { 188 throw new RuntimeException("Thrift server authentication" 189 + " doesn't work with framed transport yet"); 190 } 191 log.debug("Using framed transport"); 192 return new TFramedTransport.Factory(frameSize); 193 } else if (qop == null) { 194 return new TTransportFactory(); 195 } else { 196 Map<String, String> saslProperties = SaslUtil.initSaslProperties(qop.name()); 197 TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory(); 198 saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties, 199 new SaslGssCallbackHandler() { 200 @Override 201 public void handle(Callback[] callbacks) 202 throws UnsupportedCallbackException { 203 AuthorizeCallback ac = null; 204 for (Callback callback : callbacks) { 205 if (callback instanceof AuthorizeCallback) { 206 ac = (AuthorizeCallback) callback; 207 } else { 208 throw new UnsupportedCallbackException(callback, 209 "Unrecognized SASL GSSAPI Callback"); 210 } 211 } 212 if (ac != null) { 213 String authid = ac.getAuthenticationID(); 214 String authzid = ac.getAuthorizationID(); 215 if (!authid.equals(authzid)) { 216 ac.setAuthorized(false); 217 } else { 218 ac.setAuthorized(true); 219 String userName = SecurityUtil.getUserFromPrincipal(authzid); 220 log.info("Effective user: " + userName); 221 ac.setAuthorizedID(userName); 222 } 223 } 224 } 225 }); 226 return saslFactory; 227 } 228 } 229 230 /* 231 * If bindValue is null, we don't bind. 232 */ 233 private static InetSocketAddress bindToPort(String bindValue, int listenPort) 234 throws UnknownHostException { 235 try { 236 if (bindValue == null) { 237 return new InetSocketAddress(listenPort); 238 } else { 239 return new InetSocketAddress(InetAddress.getByName(bindValue), listenPort); 240 } 241 } catch (UnknownHostException e) { 242 throw new RuntimeException("Could not bind to provided ip address", e); 243 } 244 } 245 246 private static TServer getTNonBlockingServer(TProtocolFactory protocolFactory, TProcessor processor, 247 TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException { 248 TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress); 249 log.info("starting HBase Nonblocking Thrift server on " + inetSocketAddress.toString()); 250 TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport); 251 serverArgs.processor(processor); 252 serverArgs.transportFactory(transportFactory); 253 serverArgs.protocolFactory(protocolFactory); 254 return new TNonblockingServer(serverArgs); 255 } 256 257 private static TServer getTHsHaServer(TProtocolFactory protocolFactory, 258 TProcessor processor, TTransportFactory transportFactory, 259 int workerThreads, int maxCallQueueSize, 260 InetSocketAddress inetSocketAddress, ThriftMetrics metrics) 261 throws TTransportException { 262 TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress); 263 log.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString()); 264 THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport); 265 if (workerThreads > 0) { 266 // Could support the min & max threads, avoiding to preserve existing functionality. 267 serverArgs.minWorkerThreads(workerThreads).maxWorkerThreads(workerThreads); 268 } 269 ExecutorService executorService = createExecutor( 270 workerThreads, maxCallQueueSize, metrics); 271 serverArgs.executorService(executorService); 272 serverArgs.processor(processor); 273 serverArgs.transportFactory(transportFactory); 274 serverArgs.protocolFactory(protocolFactory); 275 return new THsHaServer(serverArgs); 276 } 277 278 private static TServer getTThreadedSelectorServer(TProtocolFactory protocolFactory, 279 TProcessor processor, TTransportFactory transportFactory, 280 int workerThreads, int selectorThreads, int maxCallQueueSize, 281 InetSocketAddress inetSocketAddress, ThriftMetrics metrics) 282 throws TTransportException { 283 TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress); 284 log.info("starting HBase ThreadedSelector Thrift server on " + inetSocketAddress.toString()); 285 TThreadedSelectorServer.Args serverArgs = new TThreadedSelectorServer.Args(serverTransport); 286 if (workerThreads > 0) { 287 serverArgs.workerThreads(workerThreads); 288 } 289 if (selectorThreads > 0) { 290 serverArgs.selectorThreads(selectorThreads); 291 } 292 293 ExecutorService executorService = createExecutor( 294 workerThreads, maxCallQueueSize, metrics); 295 serverArgs.executorService(executorService); 296 serverArgs.processor(processor); 297 serverArgs.transportFactory(transportFactory); 298 serverArgs.protocolFactory(protocolFactory); 299 return new TThreadedSelectorServer(serverArgs); 300 } 301 302 private static ExecutorService createExecutor( 303 int workerThreads, int maxCallQueueSize, ThriftMetrics metrics) { 304 CallQueue callQueue; 305 if (maxCallQueueSize > 0) { 306 callQueue = new CallQueue(new LinkedBlockingQueue<>(maxCallQueueSize), metrics); 307 } else { 308 callQueue = new CallQueue(new LinkedBlockingQueue<>(), metrics); 309 } 310 311 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); 312 tfb.setDaemon(true); 313 tfb.setNameFormat("thrift2-worker-%d"); 314 ThreadPoolExecutor pool = new THBaseThreadPoolExecutor(workerThreads, workerThreads, 315 Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build(), metrics); 316 pool.prestartAllCoreThreads(); 317 return pool; 318 } 319 320 private static TServer getTThreadPoolServer(TProtocolFactory protocolFactory, 321 TProcessor processor, 322 TTransportFactory transportFactory, 323 int workerThreads, 324 InetSocketAddress inetSocketAddress, 325 int backlog, 326 int clientTimeout, 327 ThriftMetrics metrics) 328 throws TTransportException { 329 TServerTransport serverTransport = new TServerSocket( 330 new TServerSocket.ServerSocketTransportArgs(). 331 bindAddr(inetSocketAddress).backlog(backlog). 332 clientTimeout(clientTimeout)); 333 log.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString()); 334 TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport); 335 serverArgs.processor(processor); 336 serverArgs.transportFactory(transportFactory); 337 serverArgs.protocolFactory(protocolFactory); 338 if (workerThreads > 0) { 339 serverArgs.maxWorkerThreads(workerThreads); 340 } 341 ThreadPoolExecutor executor = new THBaseThreadPoolExecutor(serverArgs.minWorkerThreads, 342 serverArgs.maxWorkerThreads, serverArgs.stopTimeoutVal, TimeUnit.SECONDS, 343 new SynchronousQueue<>(), metrics); 344 serverArgs.executorService(executor); 345 346 return new TThreadPoolServer(serverArgs); 347 } 348 349 /** 350 * Adds the option to pre-load filters at startup. 351 * 352 * @param conf The current configuration instance. 353 */ 354 protected static void registerFilters(Configuration conf) { 355 String[] filters = conf.getStrings("hbase.thrift.filters"); 356 if(filters != null) { 357 for(String filterClass: filters) { 358 String[] filterPart = filterClass.split(":"); 359 if(filterPart.length != 2) { 360 log.warn("Invalid filter specification " + filterClass + " - skipping"); 361 } else { 362 ParseFilter.registerFilter(filterPart[0], filterPart[1]); 363 } 364 } 365 } 366 } 367 368 /** 369 * Start up the Thrift2 server. 370 */ 371 public static void main(String[] args) throws Exception { 372 final Configuration conf = HBaseConfiguration.create(); 373 // for now, only time we return is on an argument error. 374 final int status = ToolRunner.run(conf, new ThriftServer(), args); 375 System.exit(status); 376 } 377 378 @Override 379 public int run(String[] args) throws Exception { 380 final Configuration conf = getConf(); 381 TServer server = null; 382 Options options = getOptions(); 383 CommandLine cmd = parseArguments(conf, options, args); 384 int workerThreads = 0; 385 int selectorThreads = 0; 386 int maxCallQueueSize = -1; // use unbounded queue by default 387 388 if (cmd.hasOption("help")) { 389 printUsage(); 390 return 1; 391 } 392 393 // Get address to bind 394 String bindAddress; 395 if (cmd.hasOption("bind")) { 396 bindAddress = cmd.getOptionValue("bind"); 397 conf.set("hbase.thrift.info.bindAddress", bindAddress); 398 } else { 399 bindAddress = conf.get("hbase.thrift.info.bindAddress"); 400 } 401 402 // check if server should only process read requests, if so override the conf 403 if (cmd.hasOption("readonly")) { 404 conf.setBoolean("hbase.thrift.readonly", true); 405 if (log.isDebugEnabled()) { 406 log.debug("readonly set to true"); 407 } 408 } 409 410 // Get read timeout 411 int readTimeout = THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT; 412 if (cmd.hasOption(READ_TIMEOUT_OPTION)) { 413 try { 414 readTimeout = Integer.parseInt(cmd.getOptionValue(READ_TIMEOUT_OPTION)); 415 } catch (NumberFormatException e) { 416 throw new RuntimeException("Could not parse the value provided for the timeout option", e); 417 } 418 } else { 419 readTimeout = conf.getInt(THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY, 420 THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT); 421 } 422 423 // Get port to bind to 424 int listenPort = 0; 425 try { 426 if (cmd.hasOption("port")) { 427 listenPort = Integer.parseInt(cmd.getOptionValue("port")); 428 } else { 429 listenPort = conf.getInt("hbase.regionserver.thrift.port", DEFAULT_LISTEN_PORT); 430 } 431 } catch (NumberFormatException e) { 432 throw new RuntimeException("Could not parse the value provided for the port option", e); 433 } 434 435 // Thrift's implementation uses '0' as a placeholder for 'use the default.' 436 int backlog = conf.getInt(BACKLOG_CONF_KEY, 0); 437 438 // Local hostname and user name, 439 // used only if QOP is configured. 440 String host = null; 441 String name = null; 442 443 UserProvider userProvider = UserProvider.instantiate(conf); 444 // login the server principal (if using secure Hadoop) 445 boolean securityEnabled = userProvider.isHadoopSecurityEnabled() 446 && userProvider.isHBaseSecurityEnabled(); 447 if (securityEnabled) { 448 host = Strings.domainNamePointerToHostName(DNS.getDefaultHost( 449 conf.get("hbase.thrift.dns.interface", "default"), 450 conf.get("hbase.thrift.dns.nameserver", "default"))); 451 userProvider.login("hbase.thrift.keytab.file", 452 "hbase.thrift.kerberos.principal", host); 453 } 454 455 UserGroupInformation realUser = userProvider.getCurrent().getUGI(); 456 String stringQop = conf.get(THRIFT_QOP_KEY); 457 SaslUtil.QualityOfProtection qop = null; 458 if (stringQop != null) { 459 qop = SaslUtil.getQop(stringQop); 460 if (!securityEnabled) { 461 throw new IOException("Thrift server must" 462 + " run in secure mode to support authentication"); 463 } 464 // Extract the name from the principal 465 name = SecurityUtil.getUserFromPrincipal( 466 conf.get("hbase.thrift.kerberos.principal")); 467 } 468 469 boolean nonblocking = cmd.hasOption("nonblocking"); 470 boolean hsha = cmd.hasOption("hsha"); 471 boolean selector = cmd.hasOption("selector"); 472 473 ThriftMetrics metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.TWO); 474 final JvmPauseMonitor pauseMonitor = new JvmPauseMonitor(conf, metrics.getSource()); 475 476 String implType = "threadpool"; 477 if (nonblocking) { 478 implType = "nonblocking"; 479 } else if (hsha) { 480 implType = "hsha"; 481 } else if (selector) { 482 implType = "selector"; 483 } 484 485 conf.set("hbase.regionserver.thrift.server.type", implType); 486 conf.setInt("hbase.regionserver.thrift.port", listenPort); 487 registerFilters(conf); 488 489 // Construct correct ProtocolFactory 490 boolean compact = cmd.hasOption("compact") || 491 conf.getBoolean("hbase.regionserver.thrift.compact", false); 492 TProtocolFactory protocolFactory = getTProtocolFactory(compact); 493 final ThriftHBaseServiceHandler hbaseHandler = 494 new ThriftHBaseServiceHandler(conf, userProvider); 495 THBaseService.Iface handler = 496 ThriftHBaseServiceHandler.newInstance(hbaseHandler, metrics); 497 final THBaseService.Processor p = new THBaseService.Processor(handler); 498 conf.setBoolean("hbase.regionserver.thrift.compact", compact); 499 TProcessor processor = p; 500 501 boolean framed = cmd.hasOption("framed") || 502 conf.getBoolean("hbase.regionserver.thrift.framed", false) || nonblocking || hsha; 503 TTransportFactory transportFactory = getTTransportFactory(qop, name, host, framed, 504 conf.getInt("hbase.regionserver.thrift.framed.max_frame_size_in_mb", 2) * 1024 * 1024); 505 InetSocketAddress inetSocketAddress = bindToPort(bindAddress, listenPort); 506 conf.setBoolean("hbase.regionserver.thrift.framed", framed); 507 if (qop != null) { 508 // Create a processor wrapper, to get the caller 509 processor = new TProcessor() { 510 @Override 511 public boolean process(TProtocol inProt, 512 TProtocol outProt) throws TException { 513 TSaslServerTransport saslServerTransport = 514 (TSaslServerTransport)inProt.getTransport(); 515 SaslServer saslServer = saslServerTransport.getSaslServer(); 516 String principal = saslServer.getAuthorizationID(); 517 hbaseHandler.setEffectiveUser(principal); 518 return p.process(inProt, outProt); 519 } 520 }; 521 } 522 523 if (cmd.hasOption("w")) { 524 workerThreads = Integer.parseInt(cmd.getOptionValue("w")); 525 } 526 if (cmd.hasOption("s")) { 527 selectorThreads = Integer.parseInt(cmd.getOptionValue("s")); 528 } 529 if (cmd.hasOption("q")) { 530 maxCallQueueSize = Integer.parseInt(cmd.getOptionValue("q")); 531 } 532 533 // check for user-defined info server port setting, if so override the conf 534 try { 535 if (cmd.hasOption("infoport")) { 536 String val = cmd.getOptionValue("infoport"); 537 conf.setInt("hbase.thrift.info.port", Integer.parseInt(val)); 538 log.debug("Web UI port set to " + val); 539 } 540 } catch (NumberFormatException e) { 541 log.error("Could not parse the value provided for the infoport option", e); 542 printUsage(); 543 System.exit(1); 544 } 545 546 // Put up info server. 547 int port = conf.getInt("hbase.thrift.info.port", 9095); 548 if (port >= 0) { 549 conf.setLong("startcode", System.currentTimeMillis()); 550 String a = conf.get("hbase.thrift.info.bindAddress", "0.0.0.0"); 551 InfoServer infoServer = new InfoServer("thrift", a, port, false, conf); 552 infoServer.setAttribute("hbase.conf", conf); 553 infoServer.start(); 554 } 555 556 if (nonblocking) { 557 server = getTNonBlockingServer(protocolFactory, 558 processor, 559 transportFactory, 560 inetSocketAddress); 561 } else if (hsha) { 562 server = getTHsHaServer(protocolFactory, 563 processor, 564 transportFactory, 565 workerThreads, 566 maxCallQueueSize, 567 inetSocketAddress, 568 metrics); 569 } else if (selector) { 570 server = getTThreadedSelectorServer(protocolFactory, 571 processor, 572 transportFactory, 573 workerThreads, 574 selectorThreads, 575 maxCallQueueSize, 576 inetSocketAddress, 577 metrics); 578 } else { 579 server = getTThreadPoolServer(protocolFactory, 580 processor, 581 transportFactory, 582 workerThreads, 583 inetSocketAddress, 584 backlog, 585 readTimeout, 586 metrics); 587 } 588 589 final TServer tserver = server; 590 realUser.doAs( 591 new PrivilegedAction<Object>() { 592 @Override 593 public Object run() { 594 pauseMonitor.start(); 595 try { 596 tserver.serve(); 597 return null; 598 } finally { 599 pauseMonitor.stop(); 600 } 601 } 602 }); 603 // when tserver.stop eventually happens we'll get here. 604 return 0; 605 } 606}