001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.thrift;
020
021import static org.apache.hadoop.hbase.thrift.Constants.BACKLOG_CONF_DEAFULT;
022import static org.apache.hadoop.hbase.thrift.Constants.BACKLOG_CONF_KEY;
023import static org.apache.hadoop.hbase.thrift.Constants.BIND_CONF_KEY;
024import static org.apache.hadoop.hbase.thrift.Constants.BIND_OPTION;
025import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_CONF_DEFAULT;
026import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_CONF_KEY;
027import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_OPTION;
028import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_BIND_ADDR;
029import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_HTTP_MAX_HEADER_SIZE;
030import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_LISTEN_PORT;
031import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_CONF_DEFAULT;
032import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_CONF_KEY;
033import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_OPTION;
034import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MAX_THREADS_KEY;
035import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MAX_THREADS_KEY_DEFAULT;
036import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MIN_THREADS_KEY;
037import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MIN_THREADS_KEY_DEFAULT;
038import static org.apache.hadoop.hbase.thrift.Constants.INFOPORT_OPTION;
039import static org.apache.hadoop.hbase.thrift.Constants.KEEP_ALIVE_SEC_OPTION;
040import static org.apache.hadoop.hbase.thrift.Constants.MAX_FRAME_SIZE_CONF_DEFAULT;
041import static org.apache.hadoop.hbase.thrift.Constants.MAX_FRAME_SIZE_CONF_KEY;
042import static org.apache.hadoop.hbase.thrift.Constants.MAX_QUEUE_SIZE_OPTION;
043import static org.apache.hadoop.hbase.thrift.Constants.MAX_WORKERS_OPTION;
044import static org.apache.hadoop.hbase.thrift.Constants.MIN_WORKERS_OPTION;
045import static org.apache.hadoop.hbase.thrift.Constants.PORT_CONF_KEY;
046import static org.apache.hadoop.hbase.thrift.Constants.PORT_OPTION;
047import static org.apache.hadoop.hbase.thrift.Constants.READ_TIMEOUT_OPTION;
048import static org.apache.hadoop.hbase.thrift.Constants.SELECTOR_NUM_OPTION;
049import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_DNS_INTERFACE_KEY;
050import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_DNS_NAMESERVER_KEY;
051import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_FILTERS;
052import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_HTTP_ALLOW_OPTIONS_METHOD;
053import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_HTTP_ALLOW_OPTIONS_METHOD_DEFAULT;
054import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_BINDING_ADDRESS;
055import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_BINDING_ADDRESS_DEFAULT;
056import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_PORT;
057import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_PORT_DEFAULT;
058import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_KERBEROS_PRINCIPAL_KEY;
059import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_KEYTAB_FILE_KEY;
060import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_QOP_KEY;
061import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SELECTOR_NUM;
062import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT;
063import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY;
064import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SPNEGO_KEYTAB_FILE_KEY;
065import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SPNEGO_PRINCIPAL_KEY;
066import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_ENABLED_KEY;
067import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY;
068import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY;
069import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY;
070import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_INCLUDE_PROTOCOLS_KEY;
071import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY;
072import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_PASSWORD_KEY;
073import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_STORE_KEY;
074import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SUPPORT_PROXYUSER_KEY;
075import static org.apache.hadoop.hbase.thrift.Constants.USE_HTTP_CONF_KEY;
076
077import java.io.IOException;
078import java.net.InetAddress;
079import java.net.InetSocketAddress;
080import java.net.UnknownHostException;
081import java.util.List;
082import java.util.Map;
083import java.util.concurrent.BlockingQueue;
084import java.util.concurrent.ExecutorService;
085import java.util.concurrent.LinkedBlockingQueue;
086import java.util.concurrent.ThreadPoolExecutor;
087import java.util.concurrent.TimeUnit;
088import javax.security.auth.callback.Callback;
089import javax.security.auth.callback.UnsupportedCallbackException;
090import javax.security.sasl.AuthorizeCallback;
091import javax.security.sasl.SaslServer;
092
093import org.apache.commons.lang3.ArrayUtils;
094import org.apache.hadoop.conf.Configuration;
095import org.apache.hadoop.conf.Configured;
096import org.apache.hadoop.hbase.HBaseConfiguration;
097import org.apache.hadoop.hbase.HBaseInterfaceAudience;
098import org.apache.hadoop.hbase.filter.ParseFilter;
099import org.apache.hadoop.hbase.http.HttpServerUtil;
100import org.apache.hadoop.hbase.http.InfoServer;
101import org.apache.hadoop.hbase.security.SaslUtil;
102import org.apache.hadoop.hbase.security.SecurityUtil;
103import org.apache.hadoop.hbase.security.UserProvider;
104import org.apache.hadoop.hbase.thrift.generated.Hbase;
105import org.apache.hadoop.hbase.util.DNS;
106import org.apache.hadoop.hbase.util.JvmPauseMonitor;
107import org.apache.hadoop.hbase.util.Strings;
108import org.apache.hadoop.hbase.util.VersionInfo;
109import org.apache.hadoop.security.SaslRpcServer;
110import org.apache.hadoop.security.UserGroupInformation;
111import org.apache.hadoop.security.authorize.ProxyUsers;
112import org.apache.hadoop.util.Shell.ExitCodeException;
113import org.apache.hadoop.util.Tool;
114import org.apache.hadoop.util.ToolRunner;
115import org.apache.thrift.TProcessor;
116import org.apache.thrift.protocol.TBinaryProtocol;
117import org.apache.thrift.protocol.TCompactProtocol;
118import org.apache.thrift.protocol.TProtocolFactory;
119import org.apache.thrift.server.THsHaServer;
120import org.apache.thrift.server.TNonblockingServer;
121import org.apache.thrift.server.TServer;
122import org.apache.thrift.server.TServlet;
123import org.apache.thrift.server.TThreadedSelectorServer;
124import org.apache.thrift.transport.TFramedTransport;
125import org.apache.thrift.transport.TNonblockingServerSocket;
126import org.apache.thrift.transport.TNonblockingServerTransport;
127import org.apache.thrift.transport.TSaslServerTransport;
128import org.apache.thrift.transport.TServerSocket;
129import org.apache.thrift.transport.TServerTransport;
130import org.apache.thrift.transport.TTransportFactory;
131import org.apache.yetus.audience.InterfaceAudience;
132import org.eclipse.jetty.http.HttpVersion;
133import org.eclipse.jetty.server.HttpConfiguration;
134import org.eclipse.jetty.server.HttpConnectionFactory;
135import org.eclipse.jetty.server.SecureRequestCustomizer;
136import org.eclipse.jetty.server.Server;
137import org.eclipse.jetty.server.ServerConnector;
138import org.eclipse.jetty.server.SslConnectionFactory;
139import org.eclipse.jetty.servlet.ServletContextHandler;
140import org.eclipse.jetty.servlet.ServletHolder;
141import org.eclipse.jetty.util.ssl.SslContextFactory;
142import org.eclipse.jetty.util.thread.QueuedThreadPool;
143import org.slf4j.Logger;
144import org.slf4j.LoggerFactory;
145
146import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
147import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
148import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
149import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
150import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
151import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
152import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser;
153import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
154import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
155
156/**
157 * ThriftServer- this class starts up a Thrift server which implements the
158 * Hbase API specified in the Hbase.thrift IDL file. The server runs in an
159 * independent process.
160 */
161@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
162public class ThriftServer  extends Configured implements Tool {
163
164  private static final Logger LOG = LoggerFactory.getLogger(ThriftServer.class);
165
166
167
168  protected Configuration conf;
169
170  protected InfoServer infoServer;
171
172  protected TProcessor processor;
173
174  protected ThriftMetrics metrics;
175  protected HBaseServiceHandler hbaseServiceHandler;
176  protected UserGroupInformation serviceUGI;
177  protected UserGroupInformation httpUGI;
178  protected boolean httpEnabled;
179
180  protected SaslUtil.QualityOfProtection qop;
181  protected String host;
182  protected int listenPort;
183
184
185  protected boolean securityEnabled;
186  protected boolean doAsEnabled;
187
188  protected JvmPauseMonitor pauseMonitor;
189
190  protected volatile TServer tserver;
191  protected volatile Server httpServer;
192
193
194  //
195  // Main program and support routines
196  //
197
198  public ThriftServer(Configuration conf) {
199    this.conf = HBaseConfiguration.create(conf);
200  }
201
202  protected ThriftMetrics createThriftMetrics(Configuration conf) {
203    return new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
204  }
205
206  protected void setupParamters() throws IOException {
207    // login the server principal (if using secure Hadoop)
208    UserProvider userProvider = UserProvider.instantiate(conf);
209    securityEnabled = userProvider.isHadoopSecurityEnabled()
210        && userProvider.isHBaseSecurityEnabled();
211    if (securityEnabled) {
212      host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
213          conf.get(THRIFT_DNS_INTERFACE_KEY, "default"),
214          conf.get(THRIFT_DNS_NAMESERVER_KEY, "default")));
215      userProvider.login(THRIFT_KEYTAB_FILE_KEY, THRIFT_KERBEROS_PRINCIPAL_KEY, host);
216
217      // Setup the SPNEGO user for HTTP if configured
218      String spnegoPrincipal = getSpengoPrincipal(conf, host);
219      String spnegoKeytab = getSpnegoKeytab(conf);
220      UserGroupInformation.setConfiguration(conf);
221      // login the SPNEGO principal using UGI to avoid polluting the login user
222      this.httpUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(spnegoPrincipal,
223        spnegoKeytab);
224    }
225    this.serviceUGI = userProvider.getCurrent().getUGI();
226    if (httpUGI == null) {
227      this.httpUGI = serviceUGI;
228    }
229
230    this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
231    this.metrics = createThriftMetrics(conf);
232    this.pauseMonitor = new JvmPauseMonitor(conf, this.metrics.getSource());
233    this.hbaseServiceHandler = createHandler(conf, userProvider);
234    this.hbaseServiceHandler.initMetrics(metrics);
235    this.processor = createProcessor();
236
237    httpEnabled = conf.getBoolean(USE_HTTP_CONF_KEY, false);
238    doAsEnabled = conf.getBoolean(THRIFT_SUPPORT_PROXYUSER_KEY, false);
239    if (doAsEnabled && !httpEnabled) {
240      LOG.warn("Fail to enable the doAs feature. " + USE_HTTP_CONF_KEY + " is not configured");
241    }
242
243    String strQop = conf.get(THRIFT_QOP_KEY);
244    if (strQop != null) {
245      this.qop = SaslUtil.getQop(strQop);
246    }
247    if (qop != null) {
248      if (qop != SaslUtil.QualityOfProtection.AUTHENTICATION &&
249          qop != SaslUtil.QualityOfProtection.INTEGRITY &&
250          qop != SaslUtil.QualityOfProtection.PRIVACY) {
251        throw new IOException(String.format("Invalid %s: It must be one of %s, %s, or %s.",
252            THRIFT_QOP_KEY,
253            SaslUtil.QualityOfProtection.AUTHENTICATION.name(),
254            SaslUtil.QualityOfProtection.INTEGRITY.name(),
255            SaslUtil.QualityOfProtection.PRIVACY.name()));
256      }
257      checkHttpSecurity(qop, conf);
258      if (!securityEnabled) {
259        throw new IOException("Thrift server must run in secure mode to support authentication");
260      }
261    }
262    registerFilters(conf);
263    pauseMonitor.start();
264  }
265
266  private String getSpengoPrincipal(Configuration conf, String host) throws IOException {
267    String principal = conf.get(THRIFT_SPNEGO_PRINCIPAL_KEY);
268    if (principal == null) {
269      // We cannot use the Hadoop configuration deprecation handling here since
270      // the THRIFT_KERBEROS_PRINCIPAL_KEY config is still valid for regular Kerberos
271      // communication. The preference should be to use the THRIFT_SPNEGO_PRINCIPAL_KEY
272      // config so that THRIFT_KERBEROS_PRINCIPAL_KEY doesn't control both backend
273      // Kerberos principal and SPNEGO principal.
274      LOG.info("Using deprecated {} config for SPNEGO principal. Use {} instead.",
275        THRIFT_KERBEROS_PRINCIPAL_KEY, THRIFT_SPNEGO_PRINCIPAL_KEY);
276      principal = conf.get(THRIFT_KERBEROS_PRINCIPAL_KEY);
277    }
278    // Handle _HOST in principal value
279    return org.apache.hadoop.security.SecurityUtil.getServerPrincipal(principal, host);
280  }
281
282  private String getSpnegoKeytab(Configuration conf) {
283    String keytab = conf.get(THRIFT_SPNEGO_KEYTAB_FILE_KEY);
284    if (keytab == null) {
285      // We cannot use the Hadoop configuration deprecation handling here since
286      // the THRIFT_KEYTAB_FILE_KEY config is still valid for regular Kerberos
287      // communication. The preference should be to use the THRIFT_SPNEGO_KEYTAB_FILE_KEY
288      // config so that THRIFT_KEYTAB_FILE_KEY doesn't control both backend
289      // Kerberos keytab and SPNEGO keytab.
290      LOG.info("Using deprecated {} config for SPNEGO keytab. Use {} instead.",
291        THRIFT_KEYTAB_FILE_KEY, THRIFT_SPNEGO_KEYTAB_FILE_KEY);
292      keytab = conf.get(THRIFT_KEYTAB_FILE_KEY);
293    }
294    return keytab;
295  }
296
297  protected void startInfoServer() throws IOException {
298    // Put up info server.
299    int port = conf.getInt(THRIFT_INFO_SERVER_PORT , THRIFT_INFO_SERVER_PORT_DEFAULT);
300
301    if (port >= 0) {
302      conf.setLong("startcode", System.currentTimeMillis());
303      String a = conf
304          .get(THRIFT_INFO_SERVER_BINDING_ADDRESS, THRIFT_INFO_SERVER_BINDING_ADDRESS_DEFAULT);
305      infoServer = new InfoServer("thrift", a, port, false, conf);
306      infoServer.setAttribute("hbase.conf", conf);
307      infoServer.start();
308    }
309  }
310
311  protected void checkHttpSecurity(SaslUtil.QualityOfProtection qop, Configuration conf) {
312    if (qop == SaslUtil.QualityOfProtection.PRIVACY &&
313        conf.getBoolean(USE_HTTP_CONF_KEY, false) &&
314        !conf.getBoolean(THRIFT_SSL_ENABLED_KEY, false)) {
315      throw new IllegalArgumentException("Thrift HTTP Server's QoP is privacy, but " +
316          THRIFT_SSL_ENABLED_KEY + " is false");
317    }
318  }
319
320  protected HBaseServiceHandler createHandler(Configuration conf, UserProvider userProvider)
321      throws IOException {
322    return new ThriftHBaseServiceHandler(conf, userProvider);
323  }
324
325  protected TProcessor createProcessor() {
326    return new Hbase.Processor<>(
327        HbaseHandlerMetricsProxy.newInstance((Hbase.Iface) hbaseServiceHandler, metrics, conf));
328  }
329
330  /**
331   * the thrift server, not null means the server is started, for test only
332   * @return the tServer
333   */
334  @VisibleForTesting
335  public TServer getTserver() {
336    return tserver;
337  }
338
339  /**
340   * the Jetty server, not null means the HTTP server is started, for test only
341   * @return the http server
342   */
343  @VisibleForTesting
344  public Server getHttpServer() {
345    return httpServer;
346  }
347
348  protected void printUsageAndExit(Options options, int exitCode)
349      throws ExitCodeException {
350    HelpFormatter formatter = new HelpFormatter();
351    formatter.printHelp("Thrift", null, options,
352        "To start the Thrift server run 'hbase-daemon.sh start thrift' or " +
353            "'hbase thrift'\n" +
354            "To shutdown the thrift server run 'hbase-daemon.sh stop " +
355            "thrift' or send a kill signal to the thrift server pid",
356        true);
357    throw new ExitCodeException(exitCode, "");
358  }
359
360  /**
361   * Create a Servlet for the http server
362   * @param protocolFactory protocolFactory
363   * @return the servlet
364   */
365  protected TServlet createTServlet(TProtocolFactory protocolFactory) {
366    return new ThriftHttpServlet(processor, protocolFactory, serviceUGI, httpUGI,
367        hbaseServiceHandler, securityEnabled, doAsEnabled);
368  }
369
370  /**
371   * Setup an HTTP Server using Jetty to serve calls from THttpClient
372   *
373   * @throws IOException IOException
374   */
375  protected void setupHTTPServer() throws IOException {
376    TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
377    TServlet thriftHttpServlet = createTServlet(protocolFactory);
378
379    // Set the default max thread number to 100 to limit
380    // the number of concurrent requests so that Thrfit HTTP server doesn't OOM easily.
381    // Jetty set the default max thread number to 250, if we don't set it.
382    //
383    // Our default min thread number 2 is the same as that used by Jetty.
384    int minThreads = conf.getInt(HTTP_MIN_THREADS_KEY,
385        conf.getInt(TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY,
386            HTTP_MIN_THREADS_KEY_DEFAULT));
387    int maxThreads = conf.getInt(HTTP_MAX_THREADS_KEY,
388        conf.getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY,
389            HTTP_MAX_THREADS_KEY_DEFAULT));
390    QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads);
391    threadPool.setMinThreads(minThreads);
392    httpServer = new Server(threadPool);
393
394    // Context handler
395    ServletContextHandler ctxHandler = new ServletContextHandler(httpServer, "/",
396        ServletContextHandler.SESSIONS);
397    ctxHandler.addServlet(new ServletHolder(thriftHttpServlet), "/*");
398    HttpServerUtil.constrainHttpMethods(ctxHandler,
399        conf.getBoolean(THRIFT_HTTP_ALLOW_OPTIONS_METHOD,
400            THRIFT_HTTP_ALLOW_OPTIONS_METHOD_DEFAULT));
401
402    // set up Jetty and run the embedded server
403    HttpConfiguration httpConfig = new HttpConfiguration();
404    httpConfig.setSecureScheme("https");
405    httpConfig.setSecurePort(listenPort);
406    httpConfig.setHeaderCacheSize(DEFAULT_HTTP_MAX_HEADER_SIZE);
407    httpConfig.setRequestHeaderSize(DEFAULT_HTTP_MAX_HEADER_SIZE);
408    httpConfig.setResponseHeaderSize(DEFAULT_HTTP_MAX_HEADER_SIZE);
409    httpConfig.setSendServerVersion(false);
410    httpConfig.setSendDateHeader(false);
411
412    ServerConnector serverConnector;
413    if(conf.getBoolean(THRIFT_SSL_ENABLED_KEY, false)) {
414      HttpConfiguration httpsConfig = new HttpConfiguration(httpConfig);
415      httpsConfig.addCustomizer(new SecureRequestCustomizer());
416
417      SslContextFactory sslCtxFactory = new SslContextFactory();
418      String keystore = conf.get(THRIFT_SSL_KEYSTORE_STORE_KEY);
419      String password = HBaseConfiguration.getPassword(conf,
420          THRIFT_SSL_KEYSTORE_PASSWORD_KEY, null);
421      String keyPassword = HBaseConfiguration.getPassword(conf,
422          THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY, password);
423      sslCtxFactory.setKeyStorePath(keystore);
424      sslCtxFactory.setKeyStorePassword(password);
425      sslCtxFactory.setKeyManagerPassword(keyPassword);
426
427      String[] excludeCiphers = conf.getStrings(
428          THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY, ArrayUtils.EMPTY_STRING_ARRAY);
429      if (excludeCiphers.length != 0) {
430        sslCtxFactory.setExcludeCipherSuites(excludeCiphers);
431      }
432      String[] includeCiphers = conf.getStrings(
433          THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY, ArrayUtils.EMPTY_STRING_ARRAY);
434      if (includeCiphers.length != 0) {
435        sslCtxFactory.setIncludeCipherSuites(includeCiphers);
436      }
437
438      // Disable SSLv3 by default due to "Poodle" Vulnerability - CVE-2014-3566
439      String[] excludeProtocols = conf.getStrings(
440          THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY, "SSLv3");
441      if (excludeProtocols.length != 0) {
442        sslCtxFactory.setExcludeProtocols(excludeProtocols);
443      }
444      String[] includeProtocols = conf.getStrings(
445          THRIFT_SSL_INCLUDE_PROTOCOLS_KEY, ArrayUtils.EMPTY_STRING_ARRAY);
446      if (includeProtocols.length != 0) {
447        sslCtxFactory.setIncludeProtocols(includeProtocols);
448      }
449
450      serverConnector = new ServerConnector(httpServer,
451          new SslConnectionFactory(sslCtxFactory, HttpVersion.HTTP_1_1.toString()),
452          new HttpConnectionFactory(httpsConfig));
453    } else {
454      serverConnector = new ServerConnector(httpServer, new HttpConnectionFactory(httpConfig));
455    }
456    serverConnector.setPort(listenPort);
457    serverConnector.setHost(getBindAddress(conf).getHostAddress());
458    httpServer.addConnector(serverConnector);
459    httpServer.setStopAtShutdown(true);
460
461    if (doAsEnabled) {
462      ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
463    }
464    LOG.info("Starting Thrift HTTP Server on {}", Integer.toString(listenPort));
465  }
466
467  /**
468   * Setting up the thrift TServer
469   */
470  protected void setupServer() throws Exception {
471    // Construct correct ProtocolFactory
472    TProtocolFactory protocolFactory = getProtocolFactory();
473
474    ImplType implType = ImplType.getServerImpl(conf);
475    TProcessor processorToUse = processor;
476
477    // Construct correct TransportFactory
478    TTransportFactory transportFactory;
479    if (conf.getBoolean(FRAMED_CONF_KEY, FRAMED_CONF_DEFAULT) || implType.isAlwaysFramed) {
480      if (qop != null) {
481        throw new RuntimeException("Thrift server authentication"
482            + " doesn't work with framed transport yet");
483      }
484      transportFactory = new TFramedTransport.Factory(
485          conf.getInt(MAX_FRAME_SIZE_CONF_KEY, MAX_FRAME_SIZE_CONF_DEFAULT) * 1024 * 1024);
486      LOG.debug("Using framed transport");
487    } else if (qop == null) {
488      transportFactory = new TTransportFactory();
489    } else {
490      // Extract the name from the principal
491      String thriftKerberosPrincipal = conf.get(THRIFT_KERBEROS_PRINCIPAL_KEY);
492      if (thriftKerberosPrincipal == null) {
493        throw new IllegalArgumentException(THRIFT_KERBEROS_PRINCIPAL_KEY + " cannot be null");
494      }
495      String name = SecurityUtil.getUserFromPrincipal(thriftKerberosPrincipal);
496      Map<String, String> saslProperties = SaslUtil.initSaslProperties(qop.name());
497      TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
498      saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
499          new SaslRpcServer.SaslGssCallbackHandler() {
500            @Override
501            public void handle(Callback[] callbacks)
502                throws UnsupportedCallbackException {
503              AuthorizeCallback ac = null;
504              for (Callback callback : callbacks) {
505                if (callback instanceof AuthorizeCallback) {
506                  ac = (AuthorizeCallback) callback;
507                } else {
508                  throw new UnsupportedCallbackException(callback,
509                      "Unrecognized SASL GSSAPI Callback");
510                }
511              }
512              if (ac != null) {
513                String authid = ac.getAuthenticationID();
514                String authzid = ac.getAuthorizationID();
515                if (!authid.equals(authzid)) {
516                  ac.setAuthorized(false);
517                } else {
518                  ac.setAuthorized(true);
519                  String userName = SecurityUtil.getUserFromPrincipal(authzid);
520                  LOG.info("Effective user: {}", userName);
521                  ac.setAuthorizedID(userName);
522                }
523              }
524            }
525          });
526      transportFactory = saslFactory;
527
528      // Create a processor wrapper, to get the caller
529      processorToUse = (inProt, outProt) -> {
530        TSaslServerTransport saslServerTransport =
531            (TSaslServerTransport)inProt.getTransport();
532        SaslServer saslServer = saslServerTransport.getSaslServer();
533        String principal = saslServer.getAuthorizationID();
534        hbaseServiceHandler.setEffectiveUser(principal);
535        return processor.process(inProt, outProt);
536      };
537    }
538
539    if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
540      LOG.error("Server types {} don't support IP address binding at the moment. See " +
541              "https://issues.apache.org/jira/browse/HBASE-2155 for details.",
542          Joiner.on(", ").join(ImplType.serversThatCannotSpecifyBindIP()));
543      throw new RuntimeException("-" + BIND_CONF_KEY + " not supported with " + implType);
544    }
545
546    InetSocketAddress inetSocketAddress = new InetSocketAddress(getBindAddress(conf), listenPort);
547    if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING ||
548        implType == ImplType.THREADED_SELECTOR) {
549      TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
550      if (implType == ImplType.NONBLOCKING) {
551        tserver = getTNonBlockingServer(serverTransport, protocolFactory, processorToUse,
552            transportFactory, inetSocketAddress);
553      } else if (implType == ImplType.HS_HA) {
554        tserver = getTHsHaServer(serverTransport, protocolFactory, processorToUse, transportFactory,
555            inetSocketAddress);
556      } else { // THREADED_SELECTOR
557        tserver = getTThreadedSelectorServer(serverTransport, protocolFactory, processorToUse,
558            transportFactory, inetSocketAddress);
559      }
560      LOG.info("starting HBase {} server on {}", implType.simpleClassName(),
561          Integer.toString(listenPort));
562    } else if (implType == ImplType.THREAD_POOL) {
563      this.tserver = getTThreadPoolServer(protocolFactory, processorToUse, transportFactory,
564          inetSocketAddress);
565    } else {
566      throw new AssertionError("Unsupported Thrift server implementation: " +
567          implType.simpleClassName());
568    }
569
570    // A sanity check that we instantiated the right type of server.
571    if (tserver.getClass() != implType.serverClass) {
572      throw new AssertionError("Expected to create Thrift server class " +
573          implType.serverClass.getName() + " but got " +
574          tserver.getClass().getName());
575    }
576  }
577
578  protected TServer getTNonBlockingServer(TNonblockingServerTransport serverTransport,
579      TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory,
580      InetSocketAddress inetSocketAddress) {
581    LOG.info("starting HBase Nonblocking Thrift server on " + inetSocketAddress.toString());
582    TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport);
583    serverArgs.processor(processor);
584    serverArgs.transportFactory(transportFactory);
585    serverArgs.protocolFactory(protocolFactory);
586    return new TNonblockingServer(serverArgs);
587  }
588
589  protected TServer getTHsHaServer(TNonblockingServerTransport serverTransport,
590      TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory,
591      InetSocketAddress inetSocketAddress) {
592    LOG.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString());
593    THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
594    int queueSize = conf.getInt(TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY,
595        TBoundedThreadPoolServer.DEFAULT_MAX_QUEUED_REQUESTS);
596    CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(queueSize), metrics);
597    int workerThread = conf.getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY,
598        serverArgs.getMaxWorkerThreads());
599    ExecutorService executorService = createExecutor(
600        callQueue, workerThread, workerThread);
601    serverArgs.executorService(executorService).processor(processor)
602        .transportFactory(transportFactory).protocolFactory(protocolFactory);
603    return new THsHaServer(serverArgs);
604  }
605
606  protected TServer getTThreadedSelectorServer(TNonblockingServerTransport serverTransport,
607      TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory,
608      InetSocketAddress inetSocketAddress) {
609    LOG.info("starting HBase ThreadedSelector Thrift server on " + inetSocketAddress.toString());
610    TThreadedSelectorServer.Args serverArgs =
611        new HThreadedSelectorServerArgs(serverTransport, conf);
612    int queueSize = conf.getInt(TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY,
613        TBoundedThreadPoolServer.DEFAULT_MAX_QUEUED_REQUESTS);
614    CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(queueSize), metrics);
615    int workerThreads = conf.getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY,
616        serverArgs.getWorkerThreads());
617    int selectorThreads = conf.getInt(THRIFT_SELECTOR_NUM, serverArgs.getSelectorThreads());
618    serverArgs.selectorThreads(selectorThreads);
619    ExecutorService executorService = createExecutor(
620        callQueue, workerThreads, workerThreads);
621    serverArgs.executorService(executorService).processor(processor)
622        .transportFactory(transportFactory).protocolFactory(protocolFactory);
623    return new TThreadedSelectorServer(serverArgs);
624  }
625
626  protected TServer getTThreadPoolServer(TProtocolFactory protocolFactory, TProcessor processor,
627      TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws Exception {
628    LOG.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString());
629    // Thrift's implementation uses '0' as a placeholder for 'use the default.'
630    int backlog = conf.getInt(BACKLOG_CONF_KEY, BACKLOG_CONF_DEAFULT);
631    int readTimeout = conf.getInt(THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY,
632        THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT);
633    TServerTransport serverTransport = new TServerSocket(
634        new TServerSocket.ServerSocketTransportArgs().
635            bindAddr(inetSocketAddress).backlog(backlog).
636            clientTimeout(readTimeout));
637
638    TBoundedThreadPoolServer.Args serverArgs =
639        new TBoundedThreadPoolServer.Args(serverTransport, conf);
640    serverArgs.processor(processor).transportFactory(transportFactory)
641        .protocolFactory(protocolFactory);
642    return new TBoundedThreadPoolServer(serverArgs, metrics);
643  }
644
645  protected TProtocolFactory getProtocolFactory() {
646    TProtocolFactory protocolFactory;
647
648    if (conf.getBoolean(COMPACT_CONF_KEY, COMPACT_CONF_DEFAULT)) {
649      LOG.debug("Using compact protocol");
650      protocolFactory = new TCompactProtocol.Factory();
651    } else {
652      LOG.debug("Using binary protocol");
653      protocolFactory = new TBinaryProtocol.Factory();
654    }
655
656    return protocolFactory;
657  }
658
659  protected ExecutorService createExecutor(BlockingQueue<Runnable> callQueue,
660      int minWorkers, int maxWorkers) {
661    ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
662    tfb.setDaemon(true);
663    tfb.setNameFormat("thrift-worker-%d");
664    ThreadPoolExecutor threadPool = new THBaseThreadPoolExecutor(minWorkers, maxWorkers,
665        Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build(), metrics);
666    threadPool.allowCoreThreadTimeOut(true);
667    return threadPool;
668  }
669
670  protected InetAddress getBindAddress(Configuration conf)
671      throws UnknownHostException {
672    String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
673    return InetAddress.getByName(bindAddressStr);
674  }
675
676
677  public static void registerFilters(Configuration conf) {
678    String[] filters = conf.getStrings(THRIFT_FILTERS);
679    Splitter splitter = Splitter.on(':');
680    if(filters != null) {
681      for(String filterClass: filters) {
682        List<String> filterPart = splitter.splitToList(filterClass);
683        if(filterPart.size() != 2) {
684          LOG.warn("Invalid filter specification " + filterClass + " - skipping");
685        } else {
686          ParseFilter.registerFilter(filterPart.get(0), filterPart.get(1));
687        }
688      }
689    }
690  }
691
692  /**
693   * Add options to command lines
694   * @param options options
695   */
696  protected void addOptions(Options options) {
697    options.addOption("b", BIND_OPTION, true, "Address to bind " +
698        "the Thrift server to. [default: " + DEFAULT_BIND_ADDR + "]");
699    options.addOption("p", PORT_OPTION, true, "Port to bind to [default: " +
700        DEFAULT_LISTEN_PORT + "]");
701    options.addOption("f", FRAMED_OPTION, false, "Use framed transport");
702    options.addOption("c", COMPACT_OPTION, false, "Use the compact protocol");
703    options.addOption("h", "help", false, "Print help information");
704    options.addOption("s", SELECTOR_NUM_OPTION, true, "How many selector threads to use.");
705    options.addOption(null, INFOPORT_OPTION, true, "Port for web UI");
706
707    options.addOption("m", MIN_WORKERS_OPTION, true,
708        "The minimum number of worker threads for " +
709            ImplType.THREAD_POOL.simpleClassName());
710
711    options.addOption("w", MAX_WORKERS_OPTION, true,
712        "The maximum number of worker threads for " +
713            ImplType.THREAD_POOL.simpleClassName());
714
715    options.addOption("q", MAX_QUEUE_SIZE_OPTION, true,
716        "The maximum number of queued requests in " +
717            ImplType.THREAD_POOL.simpleClassName());
718
719    options.addOption("k", KEEP_ALIVE_SEC_OPTION, true,
720        "The amount of time in secods to keep a thread alive when idle in " +
721            ImplType.THREAD_POOL.simpleClassName());
722
723    options.addOption("t", READ_TIMEOUT_OPTION, true,
724        "Amount of time in milliseconds before a server thread will timeout " +
725            "waiting for client to send data on a connected socket. Currently, " +
726            "only applies to TBoundedThreadPoolServer");
727
728    options.addOptionGroup(ImplType.createOptionGroup());
729  }
730
731  protected void parseCommandLine(CommandLine cmd, Options options) throws ExitCodeException {
732    // Get port to bind to
733    try {
734      if (cmd.hasOption(PORT_OPTION)) {
735        int listenPort = Integer.parseInt(cmd.getOptionValue(PORT_OPTION));
736        conf.setInt(PORT_CONF_KEY, listenPort);
737      }
738    } catch (NumberFormatException e) {
739      LOG.error("Could not parse the value provided for the port option", e);
740      printUsageAndExit(options, -1);
741    }
742    // check for user-defined info server port setting, if so override the conf
743    try {
744      if (cmd.hasOption(INFOPORT_OPTION)) {
745        String val = cmd.getOptionValue(INFOPORT_OPTION);
746        conf.setInt(THRIFT_INFO_SERVER_PORT, Integer.parseInt(val));
747        LOG.debug("Web UI port set to " + val);
748      }
749    } catch (NumberFormatException e) {
750      LOG.error("Could not parse the value provided for the " + INFOPORT_OPTION +
751          " option", e);
752      printUsageAndExit(options, -1);
753    }
754    // Make optional changes to the configuration based on command-line options
755    optionToConf(cmd, MIN_WORKERS_OPTION,
756        conf, TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY);
757    optionToConf(cmd, MAX_WORKERS_OPTION,
758        conf, TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY);
759    optionToConf(cmd, MAX_QUEUE_SIZE_OPTION,
760        conf, TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY);
761    optionToConf(cmd, KEEP_ALIVE_SEC_OPTION,
762        conf, TBoundedThreadPoolServer.THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY);
763    optionToConf(cmd, READ_TIMEOUT_OPTION, conf, THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY);
764    optionToConf(cmd, SELECTOR_NUM_OPTION, conf, THRIFT_SELECTOR_NUM);
765
766    // Set general thrift server options
767    boolean compact = cmd.hasOption(COMPACT_OPTION) ||
768        conf.getBoolean(COMPACT_CONF_KEY, false);
769    conf.setBoolean(COMPACT_CONF_KEY, compact);
770    boolean framed = cmd.hasOption(FRAMED_OPTION) ||
771        conf.getBoolean(FRAMED_CONF_KEY, false);
772    conf.setBoolean(FRAMED_CONF_KEY, framed);
773
774    optionToConf(cmd, BIND_OPTION, conf, BIND_CONF_KEY);
775
776
777    ImplType.setServerImpl(cmd, conf);
778  }
779
780  /**
781   * Parse the command line options to set parameters the conf.
782   */
783  protected void processOptions(final String[] args) throws Exception {
784    if (args == null || args.length == 0) {
785      return;
786    }
787    Options options = new Options();
788    addOptions(options);
789
790    CommandLineParser parser = new DefaultParser();
791    CommandLine cmd = parser.parse(options, args);
792
793    if (cmd.hasOption("help")) {
794      printUsageAndExit(options, 1);
795    }
796    parseCommandLine(cmd, options);
797  }
798
799  public void stop() {
800    if (this.infoServer != null) {
801      LOG.info("Stopping infoServer");
802      try {
803        this.infoServer.stop();
804      } catch (Exception ex) {
805        LOG.error("Failed to stop infoServer", ex);
806      }
807    }
808    if (pauseMonitor != null) {
809      pauseMonitor.stop();
810    }
811    if (tserver != null) {
812      tserver.stop();
813      tserver = null;
814    }
815    if (httpServer != null) {
816      try {
817        httpServer.stop();
818        httpServer = null;
819      } catch (Exception e) {
820        LOG.error("Problem encountered in shutting down HTTP server", e);
821      }
822      httpServer = null;
823    }
824  }
825
826  protected static void optionToConf(CommandLine cmd, String option,
827      Configuration conf, String destConfKey) {
828    if (cmd.hasOption(option)) {
829      String value = cmd.getOptionValue(option);
830      LOG.info("Set configuration key:" + destConfKey + " value:" + value);
831      conf.set(destConfKey, value);
832    }
833  }
834
835  /**
836   * Run without any command line arguments
837   * @return exit code
838   * @throws Exception exception
839   */
840  public int run() throws Exception {
841    return run(null);
842  }
843
844  @Override
845  public int run(String[] strings) throws Exception {
846    processOptions(strings);
847    setupParamters();
848    startInfoServer();
849    if (httpEnabled) {
850      setupHTTPServer();
851      httpServer.start();
852      httpServer.join();
853    } else {
854      setupServer();
855      tserver.serve();
856    }
857    return 0;
858  }
859
860  public static void main(String [] args) throws Exception {
861    LOG.info("***** STARTING service '" + ThriftServer.class.getSimpleName() + "' *****");
862    VersionInfo.logVersion();
863    final Configuration conf = HBaseConfiguration.create();
864    // for now, only time we return is on an argument error.
865    final int status = ToolRunner.run(conf, new ThriftServer(conf), args);
866    LOG.info("***** STOPPING service '" + ThriftServer.class.getSimpleName() + "' *****");
867    System.exit(status);
868  }
869}