001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.thrift2;
020
021import java.io.IOException;
022import java.net.InetAddress;
023import java.net.InetSocketAddress;
024import java.net.UnknownHostException;
025import java.security.PrivilegedAction;
026import java.util.Map;
027import java.util.concurrent.ExecutorService;
028import java.util.concurrent.LinkedBlockingQueue;
029import java.util.concurrent.SynchronousQueue;
030import java.util.concurrent.ThreadPoolExecutor;
031import java.util.concurrent.TimeUnit;
032
033import javax.security.auth.callback.Callback;
034import javax.security.auth.callback.UnsupportedCallbackException;
035import javax.security.sasl.AuthorizeCallback;
036import javax.security.sasl.SaslServer;
037
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.conf.Configured;
040import org.apache.hadoop.hbase.HBaseConfiguration;
041import org.apache.hadoop.hbase.HBaseInterfaceAudience;
042import org.apache.hadoop.hbase.filter.ParseFilter;
043import org.apache.hadoop.hbase.http.InfoServer;
044import org.apache.hadoop.hbase.security.SaslUtil;
045import org.apache.hadoop.hbase.security.SecurityUtil;
046import org.apache.hadoop.hbase.security.UserProvider;
047import org.apache.hadoop.hbase.thrift.CallQueue;
048import org.apache.hadoop.hbase.thrift.THBaseThreadPoolExecutor;
049import org.apache.hadoop.hbase.thrift.ThriftMetrics;
050import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
051import org.apache.hadoop.hbase.util.DNS;
052import org.apache.hadoop.hbase.util.JvmPauseMonitor;
053import org.apache.hadoop.hbase.util.Strings;
054import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
055import org.apache.hadoop.security.UserGroupInformation;
056import org.apache.hadoop.util.Tool;
057import org.apache.hadoop.util.ToolRunner;
058import org.apache.thrift.TException;
059import org.apache.thrift.TProcessor;
060import org.apache.thrift.protocol.TBinaryProtocol;
061import org.apache.thrift.protocol.TCompactProtocol;
062import org.apache.thrift.protocol.TProtocol;
063import org.apache.thrift.protocol.TProtocolFactory;
064import org.apache.thrift.server.THsHaServer;
065import org.apache.thrift.server.TNonblockingServer;
066import org.apache.thrift.server.TServer;
067import org.apache.thrift.server.TThreadPoolServer;
068import org.apache.thrift.server.TThreadedSelectorServer;
069import org.apache.thrift.transport.TFramedTransport;
070import org.apache.thrift.transport.TNonblockingServerSocket;
071import org.apache.thrift.transport.TNonblockingServerTransport;
072import org.apache.thrift.transport.TSaslServerTransport;
073import org.apache.thrift.transport.TServerSocket;
074import org.apache.thrift.transport.TServerTransport;
075import org.apache.thrift.transport.TTransportException;
076import org.apache.thrift.transport.TTransportFactory;
077import org.apache.yetus.audience.InterfaceAudience;
078import org.slf4j.Logger;
079import org.slf4j.LoggerFactory;
080import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
081import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
082import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
083import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser;
084import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
085import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
086import org.apache.hbase.thirdparty.org.apache.commons.cli.OptionGroup;
087import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
088import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
089
090/**
091 * ThriftServer - this class starts up a Thrift server which implements the HBase API specified in the
092 * HbaseClient.thrift IDL file.
093 */
094@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
095@SuppressWarnings({ "rawtypes", "unchecked" })
096public class ThriftServer extends Configured implements Tool {
097  private static final Logger log = LoggerFactory.getLogger(ThriftServer.class);
098
099  /**
100   * Thrift quality of protection configuration key. Valid values can be:
101   * privacy: authentication, integrity and confidentiality checking
102   * integrity: authentication and integrity checking
103   * authentication: authentication only
104   *
105   * This is used to authenticate the callers and support impersonation.
106   * The thrift server and the HBase cluster must run in secure mode.
107   */
108  static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
109
110  static final String BACKLOG_CONF_KEY = "hbase.regionserver.thrift.backlog";
111
112  public static final int DEFAULT_LISTEN_PORT = 9090;
113
114  private static final String READ_TIMEOUT_OPTION = "readTimeout";
115
116  /**
117   * Amount of time in milliseconds before a server thread will timeout
118   * waiting for client to send data on a connected socket. Currently,
119   * applies only to TBoundedThreadPoolServer
120   */
121  public static final String THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY =
122    "hbase.thrift.server.socket.read.timeout";
123  public static final int THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT = 60000;
124
125  public ThriftServer() {
126  }
127
128  private static void printUsage() {
129    HelpFormatter formatter = new HelpFormatter();
130    formatter.printHelp("Thrift", null, getOptions(),
131        "To start the Thrift server run 'hbase-daemon.sh start thrift2' or " +
132        "'hbase thrift2'\n" +
133        "To shutdown the thrift server run 'hbase-daemon.sh stop thrift2' or" +
134        " send a kill signal to the thrift server pid",
135        true);
136  }
137
138  private static Options getOptions() {
139    Options options = new Options();
140    options.addOption("b", "bind", true,
141        "Address to bind the Thrift server to. [default: 0.0.0.0]");
142    options.addOption("p", "port", true, "Port to bind to [default: " + DEFAULT_LISTEN_PORT + "]");
143    options.addOption("f", "framed", false, "Use framed transport");
144    options.addOption("c", "compact", false, "Use the compact protocol");
145    options.addOption("w", "workers", true, "How many worker threads to use.");
146    options.addOption("s", "selectors", true, "How many selector threads to use.");
147    options.addOption("q", "callQueueSize", true,
148      "Max size of request queue (unbounded by default)");
149    options.addOption("h", "help", false, "Print help information");
150    options.addOption(null, "infoport", true, "Port for web UI");
151    options.addOption("t", READ_TIMEOUT_OPTION, true,
152      "Amount of time in milliseconds before a server thread will timeout " +
153      "waiting for client to send data on a connected socket. Currently, " +
154      "only applies to TBoundedThreadPoolServer");
155    options.addOption("ro", "readonly", false,
156      "Respond only to read method requests [default: false]");
157    OptionGroup servers = new OptionGroup();
158    servers.addOption(
159        new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport."));
160    servers.addOption(new Option("hsha", false, "Use the THsHaServer. This implies the framed transport."));
161    servers.addOption(new Option("selector", false, "Use the TThreadedSelectorServer. This implies the framed transport."));
162    servers.addOption(new Option("threadpool", false, "Use the TThreadPoolServer. This is the default."));
163    options.addOptionGroup(servers);
164    return options;
165  }
166
167  private static CommandLine parseArguments(Configuration conf, Options options, String[] args)
168      throws ParseException, IOException {
169    CommandLineParser parser = new DefaultParser();
170    return parser.parse(options, args);
171  }
172
173  private static TProtocolFactory getTProtocolFactory(boolean isCompact) {
174    if (isCompact) {
175      log.debug("Using compact protocol");
176      return new TCompactProtocol.Factory();
177    } else {
178      log.debug("Using binary protocol");
179      return new TBinaryProtocol.Factory();
180    }
181  }
182
183  private static TTransportFactory getTTransportFactory(
184      SaslUtil.QualityOfProtection qop, String name, String host,
185      boolean framed, int frameSize) {
186    if (framed) {
187      if (qop != null) {
188        throw new RuntimeException("Thrift server authentication"
189          + " doesn't work with framed transport yet");
190      }
191      log.debug("Using framed transport");
192      return new TFramedTransport.Factory(frameSize);
193    } else if (qop == null) {
194      return new TTransportFactory();
195    } else {
196      Map<String, String> saslProperties = SaslUtil.initSaslProperties(qop.name());
197      TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
198      saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
199        new SaslGssCallbackHandler() {
200          @Override
201          public void handle(Callback[] callbacks)
202              throws UnsupportedCallbackException {
203            AuthorizeCallback ac = null;
204            for (Callback callback : callbacks) {
205              if (callback instanceof AuthorizeCallback) {
206                ac = (AuthorizeCallback) callback;
207              } else {
208                throw new UnsupportedCallbackException(callback,
209                    "Unrecognized SASL GSSAPI Callback");
210              }
211            }
212            if (ac != null) {
213              String authid = ac.getAuthenticationID();
214              String authzid = ac.getAuthorizationID();
215              if (!authid.equals(authzid)) {
216                ac.setAuthorized(false);
217              } else {
218                ac.setAuthorized(true);
219                String userName = SecurityUtil.getUserFromPrincipal(authzid);
220                log.info("Effective user: " + userName);
221                ac.setAuthorizedID(userName);
222              }
223            }
224          }
225        });
226      return saslFactory;
227    }
228  }
229
230  /*
231   * If bindValue is null, we don't bind.
232   */
233  private static InetSocketAddress bindToPort(String bindValue, int listenPort)
234      throws UnknownHostException {
235    try {
236      if (bindValue == null) {
237        return new InetSocketAddress(listenPort);
238      } else {
239        return new InetSocketAddress(InetAddress.getByName(bindValue), listenPort);
240      }
241    } catch (UnknownHostException e) {
242      throw new RuntimeException("Could not bind to provided ip address", e);
243    }
244  }
245
246  private static TServer getTNonBlockingServer(TProtocolFactory protocolFactory, TProcessor processor,
247      TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException {
248    TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
249    log.info("starting HBase Nonblocking Thrift server on " + inetSocketAddress.toString());
250    TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport);
251    serverArgs.processor(processor);
252    serverArgs.transportFactory(transportFactory);
253    serverArgs.protocolFactory(protocolFactory);
254    return new TNonblockingServer(serverArgs);
255  }
256
257  private static TServer getTHsHaServer(TProtocolFactory protocolFactory,
258      TProcessor processor, TTransportFactory transportFactory,
259      int workerThreads, int maxCallQueueSize,
260      InetSocketAddress inetSocketAddress, ThriftMetrics metrics)
261      throws TTransportException {
262    TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
263    log.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString());
264    THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
265    if (workerThreads > 0) {
266      // Could support the min & max threads, avoiding to preserve existing functionality.
267      serverArgs.minWorkerThreads(workerThreads).maxWorkerThreads(workerThreads);
268    }
269    ExecutorService executorService = createExecutor(
270        workerThreads, maxCallQueueSize, metrics);
271    serverArgs.executorService(executorService);
272    serverArgs.processor(processor);
273    serverArgs.transportFactory(transportFactory);
274    serverArgs.protocolFactory(protocolFactory);
275    return new THsHaServer(serverArgs);
276  }
277
278  private static TServer getTThreadedSelectorServer(TProtocolFactory protocolFactory,
279      TProcessor processor, TTransportFactory transportFactory,
280      int workerThreads, int selectorThreads, int maxCallQueueSize,
281      InetSocketAddress inetSocketAddress, ThriftMetrics metrics)
282      throws TTransportException {
283    TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
284    log.info("starting HBase ThreadedSelector Thrift server on " + inetSocketAddress.toString());
285    TThreadedSelectorServer.Args serverArgs = new TThreadedSelectorServer.Args(serverTransport);
286    if (workerThreads > 0) {
287        serverArgs.workerThreads(workerThreads);
288    }
289    if (selectorThreads > 0) {
290        serverArgs.selectorThreads(selectorThreads);
291    }
292
293    ExecutorService executorService = createExecutor(
294        workerThreads, maxCallQueueSize, metrics);
295    serverArgs.executorService(executorService);
296    serverArgs.processor(processor);
297    serverArgs.transportFactory(transportFactory);
298    serverArgs.protocolFactory(protocolFactory);
299    return new TThreadedSelectorServer(serverArgs);
300  }
301
302  private static ExecutorService createExecutor(
303      int workerThreads, int maxCallQueueSize, ThriftMetrics metrics) {
304    CallQueue callQueue;
305    if (maxCallQueueSize > 0) {
306      callQueue = new CallQueue(new LinkedBlockingQueue<>(maxCallQueueSize), metrics);
307    } else {
308      callQueue = new CallQueue(new LinkedBlockingQueue<>(), metrics);
309    }
310
311    ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
312    tfb.setDaemon(true);
313    tfb.setNameFormat("thrift2-worker-%d");
314    ThreadPoolExecutor pool = new THBaseThreadPoolExecutor(workerThreads, workerThreads,
315            Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build(), metrics);
316    pool.prestartAllCoreThreads();
317    return pool;
318  }
319
320  private static TServer getTThreadPoolServer(TProtocolFactory protocolFactory,
321                                              TProcessor processor,
322                                              TTransportFactory transportFactory,
323                                              int workerThreads,
324                                              InetSocketAddress inetSocketAddress,
325                                              int backlog,
326                                              int clientTimeout,
327                                              ThriftMetrics metrics)
328      throws TTransportException {
329    TServerTransport serverTransport = new TServerSocket(
330                                           new TServerSocket.ServerSocketTransportArgs().
331                                               bindAddr(inetSocketAddress).backlog(backlog).
332                                               clientTimeout(clientTimeout));
333    log.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString());
334    TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport);
335    serverArgs.processor(processor);
336    serverArgs.transportFactory(transportFactory);
337    serverArgs.protocolFactory(protocolFactory);
338    if (workerThreads > 0) {
339      serverArgs.maxWorkerThreads(workerThreads);
340    }
341    ThreadPoolExecutor executor = new THBaseThreadPoolExecutor(serverArgs.minWorkerThreads,
342        serverArgs.maxWorkerThreads, serverArgs.stopTimeoutVal, TimeUnit.SECONDS,
343        new SynchronousQueue<>(), metrics);
344    serverArgs.executorService(executor);
345
346    return new TThreadPoolServer(serverArgs);
347  }
348
349  /**
350   * Adds the option to pre-load filters at startup.
351   *
352   * @param conf  The current configuration instance.
353   */
354  protected static void registerFilters(Configuration conf) {
355    String[] filters = conf.getStrings("hbase.thrift.filters");
356    if(filters != null) {
357      for(String filterClass: filters) {
358        String[] filterPart = filterClass.split(":");
359        if(filterPart.length != 2) {
360          log.warn("Invalid filter specification " + filterClass + " - skipping");
361        } else {
362          ParseFilter.registerFilter(filterPart[0], filterPart[1]);
363        }
364      }
365    }
366  }
367
368  /**
369   * Start up the Thrift2 server.
370   */
371  public static void main(String[] args) throws Exception {
372    final Configuration conf = HBaseConfiguration.create();
373    // for now, only time we return is on an argument error.
374    final int status = ToolRunner.run(conf, new ThriftServer(), args);
375    System.exit(status);
376  }
377
378  @Override
379  public int run(String[] args) throws Exception {
380    final Configuration conf = getConf();
381    TServer server = null;
382    Options options = getOptions();
383    CommandLine cmd = parseArguments(conf, options, args);
384    int workerThreads = 0;
385    int selectorThreads = 0;
386    int maxCallQueueSize = -1; // use unbounded queue by default
387
388    if (cmd.hasOption("help")) {
389      printUsage();
390      return 1;
391    }
392
393    // Get address to bind
394    String bindAddress;
395    if (cmd.hasOption("bind")) {
396      bindAddress = cmd.getOptionValue("bind");
397      conf.set("hbase.thrift.info.bindAddress", bindAddress);
398    } else {
399      bindAddress = conf.get("hbase.thrift.info.bindAddress");
400    }
401
402    // check if server should only process read requests, if so override the conf
403    if (cmd.hasOption("readonly")) {
404      conf.setBoolean("hbase.thrift.readonly", true);
405      if (log.isDebugEnabled()) {
406        log.debug("readonly set to true");
407      }
408    }
409
410    // Get read timeout
411    int readTimeout = THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT;
412    if (cmd.hasOption(READ_TIMEOUT_OPTION)) {
413      try {
414        readTimeout = Integer.parseInt(cmd.getOptionValue(READ_TIMEOUT_OPTION));
415      } catch (NumberFormatException e) {
416        throw new RuntimeException("Could not parse the value provided for the timeout option", e);
417      }
418    } else {
419      readTimeout = conf.getInt(THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY,
420        THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT);
421    }
422
423    // Get port to bind to
424    int listenPort = 0;
425    try {
426      if (cmd.hasOption("port")) {
427        listenPort = Integer.parseInt(cmd.getOptionValue("port"));
428      } else {
429        listenPort = conf.getInt("hbase.regionserver.thrift.port", DEFAULT_LISTEN_PORT);
430      }
431    } catch (NumberFormatException e) {
432      throw new RuntimeException("Could not parse the value provided for the port option", e);
433    }
434
435    // Thrift's implementation uses '0' as a placeholder for 'use the default.'
436    int backlog = conf.getInt(BACKLOG_CONF_KEY, 0);
437
438    // Local hostname and user name,
439    // used only if QOP is configured.
440    String host = null;
441    String name = null;
442
443    UserProvider userProvider = UserProvider.instantiate(conf);
444    // login the server principal (if using secure Hadoop)
445    boolean securityEnabled = userProvider.isHadoopSecurityEnabled()
446      && userProvider.isHBaseSecurityEnabled();
447    if (securityEnabled) {
448      host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
449        conf.get("hbase.thrift.dns.interface", "default"),
450        conf.get("hbase.thrift.dns.nameserver", "default")));
451      userProvider.login("hbase.thrift.keytab.file",
452        "hbase.thrift.kerberos.principal", host);
453    }
454
455    UserGroupInformation realUser = userProvider.getCurrent().getUGI();
456    String stringQop = conf.get(THRIFT_QOP_KEY);
457    SaslUtil.QualityOfProtection qop = null;
458    if (stringQop != null) {
459      qop = SaslUtil.getQop(stringQop);
460      if (!securityEnabled) {
461        throw new IOException("Thrift server must"
462          + " run in secure mode to support authentication");
463      }
464      // Extract the name from the principal
465      name = SecurityUtil.getUserFromPrincipal(
466        conf.get("hbase.thrift.kerberos.principal"));
467    }
468
469    boolean nonblocking = cmd.hasOption("nonblocking");
470    boolean hsha = cmd.hasOption("hsha");
471    boolean selector = cmd.hasOption("selector");
472
473    ThriftMetrics metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.TWO);
474    final JvmPauseMonitor pauseMonitor = new JvmPauseMonitor(conf, metrics.getSource());
475
476    String implType = "threadpool";
477    if (nonblocking) {
478      implType = "nonblocking";
479    } else if (hsha) {
480      implType = "hsha";
481    } else if (selector) {
482      implType = "selector";
483    }
484
485    conf.set("hbase.regionserver.thrift.server.type", implType);
486    conf.setInt("hbase.regionserver.thrift.port", listenPort);
487    registerFilters(conf);
488
489    // Construct correct ProtocolFactory
490    boolean compact = cmd.hasOption("compact") ||
491        conf.getBoolean("hbase.regionserver.thrift.compact", false);
492    TProtocolFactory protocolFactory = getTProtocolFactory(compact);
493    final ThriftHBaseServiceHandler hbaseHandler =
494      new ThriftHBaseServiceHandler(conf, userProvider);
495    THBaseService.Iface handler =
496      ThriftHBaseServiceHandler.newInstance(hbaseHandler, metrics);
497    final THBaseService.Processor p = new THBaseService.Processor(handler);
498    conf.setBoolean("hbase.regionserver.thrift.compact", compact);
499    TProcessor processor = p;
500
501    boolean framed = cmd.hasOption("framed") ||
502        conf.getBoolean("hbase.regionserver.thrift.framed", false) || nonblocking || hsha;
503    TTransportFactory transportFactory = getTTransportFactory(qop, name, host, framed,
504        conf.getInt("hbase.regionserver.thrift.framed.max_frame_size_in_mb", 2) * 1024 * 1024);
505    InetSocketAddress inetSocketAddress = bindToPort(bindAddress, listenPort);
506    conf.setBoolean("hbase.regionserver.thrift.framed", framed);
507    if (qop != null) {
508      // Create a processor wrapper, to get the caller
509      processor = new TProcessor() {
510        @Override
511        public boolean process(TProtocol inProt,
512            TProtocol outProt) throws TException {
513          TSaslServerTransport saslServerTransport =
514            (TSaslServerTransport)inProt.getTransport();
515          SaslServer saslServer = saslServerTransport.getSaslServer();
516          String principal = saslServer.getAuthorizationID();
517          hbaseHandler.setEffectiveUser(principal);
518          return p.process(inProt, outProt);
519        }
520      };
521    }
522
523    if (cmd.hasOption("w")) {
524      workerThreads = Integer.parseInt(cmd.getOptionValue("w"));
525    }
526    if (cmd.hasOption("s")) {
527      selectorThreads = Integer.parseInt(cmd.getOptionValue("s"));
528    }
529    if (cmd.hasOption("q")) {
530      maxCallQueueSize = Integer.parseInt(cmd.getOptionValue("q"));
531    }
532
533    // check for user-defined info server port setting, if so override the conf
534    try {
535      if (cmd.hasOption("infoport")) {
536        String val = cmd.getOptionValue("infoport");
537        conf.setInt("hbase.thrift.info.port", Integer.parseInt(val));
538        log.debug("Web UI port set to " + val);
539      }
540    } catch (NumberFormatException e) {
541      log.error("Could not parse the value provided for the infoport option", e);
542      printUsage();
543      System.exit(1);
544    }
545
546    // Put up info server.
547    int port = conf.getInt("hbase.thrift.info.port", 9095);
548    if (port >= 0) {
549      conf.setLong("startcode", System.currentTimeMillis());
550      String a = conf.get("hbase.thrift.info.bindAddress", "0.0.0.0");
551      InfoServer infoServer = new InfoServer("thrift", a, port, false, conf);
552      infoServer.setAttribute("hbase.conf", conf);
553      infoServer.start();
554    }
555
556    if (nonblocking) {
557      server = getTNonBlockingServer(protocolFactory,
558          processor,
559          transportFactory,
560          inetSocketAddress);
561    } else if (hsha) {
562      server = getTHsHaServer(protocolFactory,
563          processor,
564          transportFactory,
565          workerThreads,
566          maxCallQueueSize,
567          inetSocketAddress,
568          metrics);
569    } else if (selector) {
570      server = getTThreadedSelectorServer(protocolFactory,
571          processor,
572          transportFactory,
573          workerThreads,
574          selectorThreads,
575          maxCallQueueSize,
576          inetSocketAddress,
577          metrics);
578    } else {
579      server = getTThreadPoolServer(protocolFactory,
580          processor,
581          transportFactory,
582          workerThreads,
583          inetSocketAddress,
584          backlog,
585          readTimeout,
586          metrics);
587    }
588
589    final TServer tserver = server;
590    realUser.doAs(
591      new PrivilegedAction<Object>() {
592        @Override
593        public Object run() {
594          pauseMonitor.start();
595          try {
596            tserver.serve();
597            return null;
598          } finally {
599            pauseMonitor.stop();
600          }
601        }
602      });
603    // when tserver.stop eventually happens we'll get here.
604    return 0;
605  }
606}