001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.thrift;
019
020import static org.apache.hadoop.hbase.http.HttpServerUtil.PATH_SPEC_ANY;
021import static org.apache.hadoop.hbase.thrift.Constants.BACKLOG_CONF_DEAFULT;
022import static org.apache.hadoop.hbase.thrift.Constants.BACKLOG_CONF_KEY;
023import static org.apache.hadoop.hbase.thrift.Constants.BIND_CONF_KEY;
024import static org.apache.hadoop.hbase.thrift.Constants.BIND_OPTION;
025import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_CONF_DEFAULT;
026import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_CONF_KEY;
027import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_OPTION;
028import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_BIND_ADDR;
029import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_HTTP_MAX_HEADER_SIZE;
030import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_LISTEN_PORT;
031import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_CONF_DEFAULT;
032import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_CONF_KEY;
033import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_OPTION;
034import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MAX_THREADS_KEY;
035import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MAX_THREADS_KEY_DEFAULT;
036import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MIN_THREADS_KEY;
037import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MIN_THREADS_KEY_DEFAULT;
038import static org.apache.hadoop.hbase.thrift.Constants.INFOPORT_OPTION;
039import static org.apache.hadoop.hbase.thrift.Constants.KEEP_ALIVE_SEC_OPTION;
040import static org.apache.hadoop.hbase.thrift.Constants.MAX_FRAME_SIZE_CONF_DEFAULT;
041import static org.apache.hadoop.hbase.thrift.Constants.MAX_FRAME_SIZE_CONF_KEY;
042import static org.apache.hadoop.hbase.thrift.Constants.MAX_QUEUE_SIZE_OPTION;
043import static org.apache.hadoop.hbase.thrift.Constants.MAX_WORKERS_OPTION;
044import static org.apache.hadoop.hbase.thrift.Constants.MIN_WORKERS_OPTION;
045import static org.apache.hadoop.hbase.thrift.Constants.PORT_CONF_KEY;
046import static org.apache.hadoop.hbase.thrift.Constants.PORT_OPTION;
047import static org.apache.hadoop.hbase.thrift.Constants.READ_TIMEOUT_OPTION;
048import static org.apache.hadoop.hbase.thrift.Constants.SELECTOR_NUM_OPTION;
049import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_DNS_INTERFACE_KEY;
050import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_DNS_NAMESERVER_KEY;
051import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_FILTERS;
052import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_HTTP_ALLOW_OPTIONS_METHOD;
053import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_HTTP_ALLOW_OPTIONS_METHOD_DEFAULT;
054import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_BINDING_ADDRESS;
055import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_BINDING_ADDRESS_DEFAULT;
056import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_PORT;
057import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_PORT_DEFAULT;
058import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_KERBEROS_PRINCIPAL_KEY;
059import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_KEYTAB_FILE_KEY;
060import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_QOP_KEY;
061import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SELECTOR_NUM;
062import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT;
063import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY;
064import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SPNEGO_KEYTAB_FILE_KEY;
065import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SPNEGO_PRINCIPAL_KEY;
066import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_ENABLED_KEY;
067import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY;
068import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY;
069import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY;
070import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_INCLUDE_PROTOCOLS_KEY;
071import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY;
072import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_PASSWORD_KEY;
073import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_STORE_KEY;
074import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_TYPE_DEFAULT;
075import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_TYPE_KEY;
076import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SUPPORT_PROXYUSER_KEY;
077import static org.apache.hadoop.hbase.thrift.Constants.USE_HTTP_CONF_KEY;
078
079import java.io.IOException;
080import java.net.InetAddress;
081import java.net.InetSocketAddress;
082import java.net.UnknownHostException;
083import java.security.PrivilegedAction;
084import java.util.List;
085import java.util.Map;
086import java.util.concurrent.BlockingQueue;
087import java.util.concurrent.ExecutorService;
088import java.util.concurrent.LinkedBlockingQueue;
089import java.util.concurrent.ThreadPoolExecutor;
090import java.util.concurrent.TimeUnit;
091import javax.security.auth.callback.Callback;
092import javax.security.auth.callback.UnsupportedCallbackException;
093import javax.security.sasl.AuthorizeCallback;
094import javax.security.sasl.SaslServer;
095import org.apache.commons.lang3.ArrayUtils;
096import org.apache.hadoop.conf.Configuration;
097import org.apache.hadoop.conf.Configured;
098import org.apache.hadoop.hbase.HBaseConfiguration;
099import org.apache.hadoop.hbase.HBaseInterfaceAudience;
100import org.apache.hadoop.hbase.filter.ParseFilter;
101import org.apache.hadoop.hbase.http.HttpServerUtil;
102import org.apache.hadoop.hbase.http.InfoServer;
103import org.apache.hadoop.hbase.log.HBaseMarkers;
104import org.apache.hadoop.hbase.security.SaslUtil;
105import org.apache.hadoop.hbase.security.SecurityUtil;
106import org.apache.hadoop.hbase.security.UserProvider;
107import org.apache.hadoop.hbase.thrift.generated.Hbase;
108import org.apache.hadoop.hbase.util.DNS;
109import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
110import org.apache.hadoop.hbase.util.JvmPauseMonitor;
111import org.apache.hadoop.hbase.util.Strings;
112import org.apache.hadoop.hbase.util.VersionInfo;
113import org.apache.hadoop.security.SaslRpcServer;
114import org.apache.hadoop.security.UserGroupInformation;
115import org.apache.hadoop.security.authorize.ProxyUsers;
116import org.apache.hadoop.util.Shell.ExitCodeException;
117import org.apache.hadoop.util.Tool;
118import org.apache.hadoop.util.ToolRunner;
119import org.apache.thrift.TProcessor;
120import org.apache.thrift.protocol.TBinaryProtocol;
121import org.apache.thrift.protocol.TCompactProtocol;
122import org.apache.thrift.protocol.TProtocolFactory;
123import org.apache.thrift.server.THsHaServer;
124import org.apache.thrift.server.TNonblockingServer;
125import org.apache.thrift.server.TServer;
126import org.apache.thrift.server.TServlet;
127import org.apache.thrift.server.TThreadedSelectorServer;
128import org.apache.thrift.transport.TNonblockingServerSocket;
129import org.apache.thrift.transport.TNonblockingServerTransport;
130import org.apache.thrift.transport.TSaslServerTransport;
131import org.apache.thrift.transport.TServerSocket;
132import org.apache.thrift.transport.TServerTransport;
133import org.apache.thrift.transport.TTransportFactory;
134import org.apache.thrift.transport.layered.TFramedTransport;
135import org.apache.yetus.audience.InterfaceAudience;
136import org.slf4j.Logger;
137import org.slf4j.LoggerFactory;
138
139import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
140import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
141import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
142import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
143import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
144import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser;
145import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
146import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
147import org.apache.hbase.thirdparty.org.eclipse.jetty.http.HttpVersion;
148import org.apache.hbase.thirdparty.org.eclipse.jetty.server.HttpConfiguration;
149import org.apache.hbase.thirdparty.org.eclipse.jetty.server.HttpConnectionFactory;
150import org.apache.hbase.thirdparty.org.eclipse.jetty.server.SecureRequestCustomizer;
151import org.apache.hbase.thirdparty.org.eclipse.jetty.server.Server;
152import org.apache.hbase.thirdparty.org.eclipse.jetty.server.ServerConnector;
153import org.apache.hbase.thirdparty.org.eclipse.jetty.server.SslConnectionFactory;
154import org.apache.hbase.thirdparty.org.eclipse.jetty.servlet.ServletContextHandler;
155import org.apache.hbase.thirdparty.org.eclipse.jetty.servlet.ServletHolder;
156import org.apache.hbase.thirdparty.org.eclipse.jetty.util.ssl.SslContextFactory;
157import org.apache.hbase.thirdparty.org.eclipse.jetty.util.thread.QueuedThreadPool;
158
159/**
160 * ThriftServer- this class starts up a Thrift server which implements the Hbase API specified in
161 * the Hbase.thrift IDL file. The server runs in an independent process.
162 */
163@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
164public class ThriftServer extends Configured implements Tool {
165
166  private static final Logger LOG = LoggerFactory.getLogger(ThriftServer.class);
167
168  protected Configuration conf;
169
170  protected InfoServer infoServer;
171
172  protected TProcessor processor;
173
174  protected ThriftMetrics metrics;
175  protected HBaseServiceHandler hbaseServiceHandler;
176  protected UserGroupInformation serviceUGI;
177  protected UserGroupInformation httpUGI;
178  protected boolean httpEnabled;
179
180  protected SaslUtil.QualityOfProtection qop;
181  protected String host;
182  protected int listenPort;
183
184  protected boolean securityEnabled;
185  protected boolean doAsEnabled;
186
187  protected JvmPauseMonitor pauseMonitor;
188
189  protected volatile TServer tserver;
190  protected volatile Server httpServer;
191
192  //
193  // Main program and support routines
194  //
195
196  public ThriftServer(Configuration conf) {
197    this.conf = HBaseConfiguration.create(conf);
198  }
199
200  protected ThriftMetrics createThriftMetrics(Configuration conf) {
201    return new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
202  }
203
204  protected void setupParamters() throws IOException {
205    // login the server principal (if using secure Hadoop)
206    UserProvider userProvider = UserProvider.instantiate(conf);
207    securityEnabled =
208      userProvider.isHadoopSecurityEnabled() && userProvider.isHBaseSecurityEnabled();
209    if (securityEnabled) {
210      host = Strings.domainNamePointerToHostName(
211        DNS.getDefaultHost(conf.get(THRIFT_DNS_INTERFACE_KEY, "default"),
212          conf.get(THRIFT_DNS_NAMESERVER_KEY, "default")));
213      userProvider.login(THRIFT_KEYTAB_FILE_KEY, THRIFT_KERBEROS_PRINCIPAL_KEY, host);
214
215      // Setup the SPNEGO user for HTTP if configured
216      String spnegoPrincipal = getSpengoPrincipal(conf, host);
217      String spnegoKeytab = getSpnegoKeytab(conf);
218      UserGroupInformation.setConfiguration(conf);
219      // login the SPNEGO principal using UGI to avoid polluting the login user
220      this.httpUGI =
221        UserGroupInformation.loginUserFromKeytabAndReturnUGI(spnegoPrincipal, spnegoKeytab);
222    }
223    this.serviceUGI = userProvider.getCurrent().getUGI();
224    if (httpUGI == null) {
225      this.httpUGI = serviceUGI;
226    }
227
228    this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
229    this.metrics = createThriftMetrics(conf);
230    this.pauseMonitor = new JvmPauseMonitor(conf, this.metrics.getSource());
231    this.hbaseServiceHandler = createHandler(conf, userProvider);
232    this.hbaseServiceHandler.initMetrics(metrics);
233    this.processor = createProcessor();
234
235    httpEnabled = conf.getBoolean(USE_HTTP_CONF_KEY, false);
236    doAsEnabled = conf.getBoolean(THRIFT_SUPPORT_PROXYUSER_KEY, false);
237    if (doAsEnabled && !httpEnabled) {
238      LOG.warn("Fail to enable the doAs feature. " + USE_HTTP_CONF_KEY + " is not configured");
239    }
240
241    String strQop = conf.get(THRIFT_QOP_KEY);
242    if (strQop != null) {
243      this.qop = SaslUtil.getQop(strQop);
244    }
245    if (qop != null) {
246      if (
247        qop != SaslUtil.QualityOfProtection.AUTHENTICATION
248          && qop != SaslUtil.QualityOfProtection.INTEGRITY
249          && qop != SaslUtil.QualityOfProtection.PRIVACY
250      ) {
251        throw new IOException(String.format("Invalid %s: It must be one of %s, %s, or %s.",
252          THRIFT_QOP_KEY, SaslUtil.QualityOfProtection.AUTHENTICATION.name(),
253          SaslUtil.QualityOfProtection.INTEGRITY.name(),
254          SaslUtil.QualityOfProtection.PRIVACY.name()));
255      }
256      checkHttpSecurity(qop, conf);
257      if (!securityEnabled) {
258        throw new IOException("Thrift server must run in secure mode to support authentication");
259      }
260    }
261    registerFilters(conf);
262    pauseMonitor.start();
263  }
264
265  private String getSpengoPrincipal(Configuration conf, String host) throws IOException {
266    String principal = conf.get(THRIFT_SPNEGO_PRINCIPAL_KEY);
267    if (principal == null) {
268      // We cannot use the Hadoop configuration deprecation handling here since
269      // the THRIFT_KERBEROS_PRINCIPAL_KEY config is still valid for regular Kerberos
270      // communication. The preference should be to use the THRIFT_SPNEGO_PRINCIPAL_KEY
271      // config so that THRIFT_KERBEROS_PRINCIPAL_KEY doesn't control both backend
272      // Kerberos principal and SPNEGO principal.
273      LOG.info("Using deprecated {} config for SPNEGO principal. Use {} instead.",
274        THRIFT_KERBEROS_PRINCIPAL_KEY, THRIFT_SPNEGO_PRINCIPAL_KEY);
275      principal = conf.get(THRIFT_KERBEROS_PRINCIPAL_KEY);
276    }
277    // Handle _HOST in principal value
278    return org.apache.hadoop.security.SecurityUtil.getServerPrincipal(principal, host);
279  }
280
281  private String getSpnegoKeytab(Configuration conf) {
282    String keytab = conf.get(THRIFT_SPNEGO_KEYTAB_FILE_KEY);
283    if (keytab == null) {
284      // We cannot use the Hadoop configuration deprecation handling here since
285      // the THRIFT_KEYTAB_FILE_KEY config is still valid for regular Kerberos
286      // communication. The preference should be to use the THRIFT_SPNEGO_KEYTAB_FILE_KEY
287      // config so that THRIFT_KEYTAB_FILE_KEY doesn't control both backend
288      // Kerberos keytab and SPNEGO keytab.
289      LOG.info("Using deprecated {} config for SPNEGO keytab. Use {} instead.",
290        THRIFT_KEYTAB_FILE_KEY, THRIFT_SPNEGO_KEYTAB_FILE_KEY);
291      keytab = conf.get(THRIFT_KEYTAB_FILE_KEY);
292    }
293    return keytab;
294  }
295
296  protected void startInfoServer() throws IOException {
297    // Put up info server.
298    int port = conf.getInt(THRIFT_INFO_SERVER_PORT, THRIFT_INFO_SERVER_PORT_DEFAULT);
299
300    if (port >= 0) {
301      conf.setLong("startcode", EnvironmentEdgeManager.currentTime());
302      String a =
303        conf.get(THRIFT_INFO_SERVER_BINDING_ADDRESS, THRIFT_INFO_SERVER_BINDING_ADDRESS_DEFAULT);
304      infoServer = new InfoServer("thrift", a, port, false, conf);
305      infoServer.setAttribute("hbase.conf", conf);
306      infoServer.setAttribute("hbase.thrift.server.type", metrics.getThriftServerType().name());
307      infoServer.start();
308    }
309  }
310
311  protected void checkHttpSecurity(SaslUtil.QualityOfProtection qop, Configuration conf) {
312    if (
313      qop == SaslUtil.QualityOfProtection.PRIVACY && conf.getBoolean(USE_HTTP_CONF_KEY, false)
314        && !conf.getBoolean(THRIFT_SSL_ENABLED_KEY, false)
315    ) {
316      throw new IllegalArgumentException(
317        "Thrift HTTP Server's QoP is privacy, but " + THRIFT_SSL_ENABLED_KEY + " is false");
318    }
319  }
320
321  protected HBaseServiceHandler createHandler(Configuration conf, UserProvider userProvider)
322    throws IOException {
323    return new ThriftHBaseServiceHandler(conf, userProvider);
324  }
325
326  protected TProcessor createProcessor() {
327    return new Hbase.Processor<>(
328      HbaseHandlerMetricsProxy.newInstance((Hbase.Iface) hbaseServiceHandler, metrics, conf));
329  }
330
331  /**
332   * the thrift server, not null means the server is started, for test only
333   * @return the tServer
334   */
335  @InterfaceAudience.Private
336  public TServer getTserver() {
337    return tserver;
338  }
339
340  /**
341   * the Jetty server, not null means the HTTP server is started, for test only
342   * @return the http server
343   */
344  @InterfaceAudience.Private
345  public Server getHttpServer() {
346    return httpServer;
347  }
348
349  protected void printUsageAndExit(Options options, int exitCode) throws ExitCodeException {
350    HelpFormatter formatter = new HelpFormatter();
351    formatter.printHelp("Thrift", null, options,
352      "To start the Thrift server run 'hbase-daemon.sh start thrift' or " + "'hbase thrift'\n"
353        + "To shutdown the thrift server run 'hbase-daemon.sh stop "
354        + "thrift' or send a kill signal to the thrift server pid",
355      true);
356    throw new ExitCodeException(exitCode, "");
357  }
358
359  /**
360   * Create a Servlet for the http server
361   * @param protocolFactory protocolFactory
362   * @return the servlet
363   */
364  protected TServlet createTServlet(TProtocolFactory protocolFactory) {
365    return new ThriftHttpServlet(processor, protocolFactory, serviceUGI, httpUGI,
366      hbaseServiceHandler, securityEnabled, doAsEnabled);
367  }
368
369  /**
370   * Setup an HTTP Server using Jetty to serve calls from THttpClient
371   * @throws IOException IOException
372   */
373  protected void setupHTTPServer() throws IOException {
374    TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
375    TServlet thriftHttpServlet = createTServlet(protocolFactory);
376
377    // Set the default max thread number to 100 to limit
378    // the number of concurrent requests so that Thrfit HTTP server doesn't OOM easily.
379    // Jetty set the default max thread number to 250, if we don't set it.
380    //
381    // Our default min thread number 2 is the same as that used by Jetty.
382    int minThreads = conf.getInt(HTTP_MIN_THREADS_KEY, conf
383      .getInt(TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY, HTTP_MIN_THREADS_KEY_DEFAULT));
384    int maxThreads = conf.getInt(HTTP_MAX_THREADS_KEY, conf
385      .getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY, HTTP_MAX_THREADS_KEY_DEFAULT));
386    QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads);
387    threadPool.setMinThreads(minThreads);
388    httpServer = new Server(threadPool);
389
390    // Context handler
391    boolean isSecure = conf.getBoolean(THRIFT_SSL_ENABLED_KEY, false);
392    ServletContextHandler ctxHandler =
393      new ServletContextHandler(httpServer, "/", ServletContextHandler.SESSIONS);
394    HttpServerUtil.addClickjackingPreventionFilter(ctxHandler, conf, PATH_SPEC_ANY);
395    HttpServerUtil.addSecurityHeadersFilter(ctxHandler, conf, isSecure, PATH_SPEC_ANY);
396    ctxHandler.addServlet(new ServletHolder(thriftHttpServlet), PATH_SPEC_ANY);
397    HttpServerUtil.constrainHttpMethods(ctxHandler,
398      conf.getBoolean(THRIFT_HTTP_ALLOW_OPTIONS_METHOD, THRIFT_HTTP_ALLOW_OPTIONS_METHOD_DEFAULT));
399
400    // set up Jetty and run the embedded server
401    HttpConfiguration httpConfig = new HttpConfiguration();
402    httpConfig.setSecureScheme("https");
403    httpConfig.setSecurePort(listenPort);
404    httpConfig.setHeaderCacheSize(DEFAULT_HTTP_MAX_HEADER_SIZE);
405    httpConfig.setRequestHeaderSize(DEFAULT_HTTP_MAX_HEADER_SIZE);
406    httpConfig.setResponseHeaderSize(DEFAULT_HTTP_MAX_HEADER_SIZE);
407    httpConfig.setSendServerVersion(false);
408    httpConfig.setSendDateHeader(false);
409
410    ServerConnector serverConnector;
411    if (isSecure) {
412      HttpConfiguration httpsConfig = new HttpConfiguration(httpConfig);
413      httpsConfig.addCustomizer(new SecureRequestCustomizer());
414
415      SslContextFactory.Server sslCtxFactory = new SslContextFactory.Server();
416      String keystore = conf.get(THRIFT_SSL_KEYSTORE_STORE_KEY);
417      String password =
418        HBaseConfiguration.getPassword(conf, THRIFT_SSL_KEYSTORE_PASSWORD_KEY, null);
419      String keyPassword =
420        HBaseConfiguration.getPassword(conf, THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY, password);
421      sslCtxFactory.setKeyStorePath(keystore);
422      sslCtxFactory.setKeyStorePassword(password);
423      sslCtxFactory.setKeyManagerPassword(keyPassword);
424      sslCtxFactory
425        .setKeyStoreType(conf.get(THRIFT_SSL_KEYSTORE_TYPE_KEY, THRIFT_SSL_KEYSTORE_TYPE_DEFAULT));
426
427      String[] excludeCiphers =
428        conf.getStrings(THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY, ArrayUtils.EMPTY_STRING_ARRAY);
429      if (excludeCiphers.length != 0) {
430        sslCtxFactory.setExcludeCipherSuites(excludeCiphers);
431      }
432      String[] includeCiphers =
433        conf.getStrings(THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY, ArrayUtils.EMPTY_STRING_ARRAY);
434      if (includeCiphers.length != 0) {
435        sslCtxFactory.setIncludeCipherSuites(includeCiphers);
436      }
437
438      // Disable SSLv3 by default due to "Poodle" Vulnerability - CVE-2014-3566
439      String[] excludeProtocols = conf.getStrings(THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY, "SSLv3");
440      if (excludeProtocols.length != 0) {
441        sslCtxFactory.setExcludeProtocols(excludeProtocols);
442      }
443      String[] includeProtocols =
444        conf.getStrings(THRIFT_SSL_INCLUDE_PROTOCOLS_KEY, ArrayUtils.EMPTY_STRING_ARRAY);
445      if (includeProtocols.length != 0) {
446        sslCtxFactory.setIncludeProtocols(includeProtocols);
447      }
448
449      serverConnector = new ServerConnector(httpServer,
450        new SslConnectionFactory(sslCtxFactory, HttpVersion.HTTP_1_1.toString()),
451        new HttpConnectionFactory(httpsConfig));
452    } else {
453      serverConnector = new ServerConnector(httpServer, new HttpConnectionFactory(httpConfig));
454    }
455    serverConnector.setPort(listenPort);
456    serverConnector.setHost(getBindAddress(conf).getHostAddress());
457    httpServer.addConnector(serverConnector);
458    httpServer.setStopAtShutdown(true);
459
460    if (doAsEnabled) {
461      ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
462    }
463    LOG.info("Starting Thrift HTTP Server on {}", Integer.toString(listenPort));
464  }
465
466  /**
467   * Setting up the thrift TServer
468   */
469  protected void setupServer() throws Exception {
470    // Construct correct ProtocolFactory
471    TProtocolFactory protocolFactory = getProtocolFactory();
472
473    ImplType implType = ImplType.getServerImpl(conf);
474    TProcessor processorToUse = processor;
475
476    // Construct correct TransportFactory
477    TTransportFactory transportFactory;
478    if (conf.getBoolean(FRAMED_CONF_KEY, FRAMED_CONF_DEFAULT) || implType.isAlwaysFramed) {
479      if (qop != null) {
480        throw new RuntimeException(
481          "Thrift server authentication" + " doesn't work with framed transport yet");
482      }
483      transportFactory = new TFramedTransport.Factory(
484        conf.getInt(MAX_FRAME_SIZE_CONF_KEY, MAX_FRAME_SIZE_CONF_DEFAULT) * 1024 * 1024);
485      LOG.debug("Using framed transport");
486    } else if (qop == null) {
487      transportFactory = new TTransportFactory();
488    } else {
489      // Extract the name from the principal
490      String thriftKerberosPrincipal = conf.get(THRIFT_KERBEROS_PRINCIPAL_KEY);
491      if (thriftKerberosPrincipal == null) {
492        throw new IllegalArgumentException(THRIFT_KERBEROS_PRINCIPAL_KEY + " cannot be null");
493      }
494      String name = SecurityUtil.getUserFromPrincipal(thriftKerberosPrincipal);
495      Map<String, String> saslProperties = SaslUtil.initSaslProperties(qop.name());
496      TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
497      saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
498        new SaslRpcServer.SaslGssCallbackHandler() {
499          @Override
500          public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
501            AuthorizeCallback ac = null;
502            for (Callback callback : callbacks) {
503              if (callback instanceof AuthorizeCallback) {
504                ac = (AuthorizeCallback) callback;
505              } else {
506                throw new UnsupportedCallbackException(callback,
507                  "Unrecognized SASL GSSAPI Callback");
508              }
509            }
510            if (ac != null) {
511              String authid = ac.getAuthenticationID();
512              String authzid = ac.getAuthorizationID();
513              if (!authid.equals(authzid)) {
514                ac.setAuthorized(false);
515              } else {
516                ac.setAuthorized(true);
517                String userName = SecurityUtil.getUserFromPrincipal(authzid);
518                LOG.info("Effective user: {}", userName);
519                ac.setAuthorizedID(userName);
520              }
521            }
522          }
523        });
524      transportFactory = saslFactory;
525
526      // Create a processor wrapper, to get the caller
527      processorToUse = (inProt, outProt) -> {
528        TSaslServerTransport saslServerTransport = (TSaslServerTransport) inProt.getTransport();
529        SaslServer saslServer = saslServerTransport.getSaslServer();
530        String principal = saslServer.getAuthorizationID();
531        hbaseServiceHandler.setEffectiveUser(principal);
532        processor.process(inProt, outProt);
533      };
534    }
535
536    if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
537      LOG.error(
538        "Server types {} don't support IP address binding at the moment. See "
539          + "https://issues.apache.org/jira/browse/HBASE-2155 for details.",
540        Joiner.on(", ").join(ImplType.serversThatCannotSpecifyBindIP()));
541      throw new RuntimeException("-" + BIND_CONF_KEY + " not supported with " + implType);
542    }
543
544    InetSocketAddress inetSocketAddress = new InetSocketAddress(getBindAddress(conf), listenPort);
545    if (
546      implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING
547        || implType == ImplType.THREADED_SELECTOR
548    ) {
549      TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
550      if (implType == ImplType.NONBLOCKING) {
551        tserver = getTNonBlockingServer(serverTransport, protocolFactory, processorToUse,
552          transportFactory, inetSocketAddress);
553      } else if (implType == ImplType.HS_HA) {
554        tserver = getTHsHaServer(serverTransport, protocolFactory, processorToUse, transportFactory,
555          inetSocketAddress);
556      } else { // THREADED_SELECTOR
557        tserver = getTThreadedSelectorServer(serverTransport, protocolFactory, processorToUse,
558          transportFactory, inetSocketAddress);
559      }
560      LOG.info("starting HBase {} server on {}", implType.simpleClassName(),
561        Integer.toString(listenPort));
562    } else if (implType == ImplType.THREAD_POOL) {
563      this.tserver =
564        getTThreadPoolServer(protocolFactory, processorToUse, transportFactory, inetSocketAddress);
565    } else {
566      throw new AssertionError(
567        "Unsupported Thrift server implementation: " + implType.simpleClassName());
568    }
569
570    // A sanity check that we instantiated the right type of server.
571    if (tserver.getClass() != implType.serverClass) {
572      throw new AssertionError("Expected to create Thrift server class "
573        + implType.serverClass.getName() + " but got " + tserver.getClass().getName());
574    }
575  }
576
577  protected TServer getTNonBlockingServer(TNonblockingServerTransport serverTransport,
578    TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory,
579    InetSocketAddress inetSocketAddress) {
580    LOG.info("starting HBase Nonblocking Thrift server on " + inetSocketAddress.toString());
581    TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport);
582    serverArgs.processor(processor);
583    serverArgs.transportFactory(transportFactory);
584    serverArgs.protocolFactory(protocolFactory);
585    return new TNonblockingServer(serverArgs);
586  }
587
588  protected TServer getTHsHaServer(TNonblockingServerTransport serverTransport,
589    TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory,
590    InetSocketAddress inetSocketAddress) {
591    LOG.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString());
592    THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
593    int queueSize = conf.getInt(TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY,
594      TBoundedThreadPoolServer.DEFAULT_MAX_QUEUED_REQUESTS);
595    CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(queueSize), metrics);
596    int workerThread = conf.getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY,
597      serverArgs.getMaxWorkerThreads());
598    ExecutorService executorService = createExecutor(callQueue, workerThread, workerThread);
599    serverArgs.executorService(executorService).processor(processor)
600      .transportFactory(transportFactory).protocolFactory(protocolFactory);
601    return new THsHaServer(serverArgs);
602  }
603
604  protected TServer getTThreadedSelectorServer(TNonblockingServerTransport serverTransport,
605    TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory,
606    InetSocketAddress inetSocketAddress) {
607    LOG.info("starting HBase ThreadedSelector Thrift server on " + inetSocketAddress.toString());
608    TThreadedSelectorServer.Args serverArgs =
609      new HThreadedSelectorServerArgs(serverTransport, conf);
610    int queueSize = conf.getInt(TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY,
611      TBoundedThreadPoolServer.DEFAULT_MAX_QUEUED_REQUESTS);
612    CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(queueSize), metrics);
613    int workerThreads = conf.getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY,
614      serverArgs.getWorkerThreads());
615    int selectorThreads = conf.getInt(THRIFT_SELECTOR_NUM, serverArgs.getSelectorThreads());
616    serverArgs.selectorThreads(selectorThreads);
617    ExecutorService executorService = createExecutor(callQueue, workerThreads, workerThreads);
618    serverArgs.executorService(executorService).processor(processor)
619      .transportFactory(transportFactory).protocolFactory(protocolFactory);
620    return new TThreadedSelectorServer(serverArgs);
621  }
622
623  protected TServer getTThreadPoolServer(TProtocolFactory protocolFactory, TProcessor processor,
624    TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws Exception {
625    LOG.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString());
626    // Thrift's implementation uses '0' as a placeholder for 'use the default.'
627    int backlog = conf.getInt(BACKLOG_CONF_KEY, BACKLOG_CONF_DEAFULT);
628    int readTimeout =
629      conf.getInt(THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY, THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT);
630    TServerTransport serverTransport =
631      new TServerSocket(new TServerSocket.ServerSocketTransportArgs().bindAddr(inetSocketAddress)
632        .backlog(backlog).clientTimeout(readTimeout));
633
634    TBoundedThreadPoolServer.Args serverArgs =
635      new TBoundedThreadPoolServer.Args(serverTransport, conf);
636    serverArgs.processor(processor).transportFactory(transportFactory)
637      .protocolFactory(protocolFactory);
638    return new TBoundedThreadPoolServer(serverArgs, metrics);
639  }
640
641  protected TProtocolFactory getProtocolFactory() {
642    TProtocolFactory protocolFactory;
643
644    if (conf.getBoolean(COMPACT_CONF_KEY, COMPACT_CONF_DEFAULT)) {
645      LOG.debug("Using compact protocol");
646      protocolFactory = new TCompactProtocol.Factory();
647    } else {
648      LOG.debug("Using binary protocol");
649      protocolFactory = new TBinaryProtocol.Factory();
650    }
651
652    return protocolFactory;
653  }
654
655  protected ExecutorService createExecutor(BlockingQueue<Runnable> callQueue, int minWorkers,
656    int maxWorkers) {
657    ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
658    tfb.setDaemon(true);
659    tfb.setNameFormat("thrift-worker-%d");
660    ThreadPoolExecutor threadPool = new THBaseThreadPoolExecutor(minWorkers, maxWorkers,
661      Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build(), metrics);
662    threadPool.allowCoreThreadTimeOut(true);
663    return threadPool;
664  }
665
666  protected InetAddress getBindAddress(Configuration conf) throws UnknownHostException {
667    String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
668    return InetAddress.getByName(bindAddressStr);
669  }
670
671  public static void registerFilters(Configuration conf) {
672    String[] filters = conf.getStrings(THRIFT_FILTERS);
673    Splitter splitter = Splitter.on(':');
674    if (filters != null) {
675      for (String filterClass : filters) {
676        List<String> filterPart = splitter.splitToList(filterClass);
677        if (filterPart.size() != 2) {
678          LOG.warn("Invalid filter specification " + filterClass + " - skipping");
679        } else {
680          ParseFilter.registerFilter(filterPart.get(0), filterPart.get(1));
681        }
682      }
683    }
684  }
685
686  /**
687   * Add options to command lines
688   * @param options options
689   */
690  protected void addOptions(Options options) {
691    options.addOption("b", BIND_OPTION, true,
692      "Address to bind " + "the Thrift server to. [default: " + DEFAULT_BIND_ADDR + "]");
693    options.addOption("p", PORT_OPTION, true,
694      "Port to bind to [default: " + DEFAULT_LISTEN_PORT + "]");
695    options.addOption("f", FRAMED_OPTION, false, "Use framed transport");
696    options.addOption("c", COMPACT_OPTION, false, "Use the compact protocol");
697    options.addOption("h", "help", false, "Print help information");
698    options.addOption("s", SELECTOR_NUM_OPTION, true, "How many selector threads to use.");
699    options.addOption(null, INFOPORT_OPTION, true, "Port for web UI");
700
701    options.addOption("m", MIN_WORKERS_OPTION, true,
702      "The minimum number of worker threads for " + ImplType.THREAD_POOL.simpleClassName());
703
704    options.addOption("w", MAX_WORKERS_OPTION, true,
705      "The maximum number of worker threads for " + ImplType.THREAD_POOL.simpleClassName());
706
707    options.addOption("q", MAX_QUEUE_SIZE_OPTION, true,
708      "The maximum number of queued requests in " + ImplType.THREAD_POOL.simpleClassName());
709
710    options.addOption("k", KEEP_ALIVE_SEC_OPTION, true,
711      "The amount of time in secods to keep a thread alive when idle in "
712        + ImplType.THREAD_POOL.simpleClassName());
713
714    options.addOption("t", READ_TIMEOUT_OPTION, true,
715      "Amount of time in milliseconds before a server thread will timeout "
716        + "waiting for client to send data on a connected socket. Currently, "
717        + "only applies to TBoundedThreadPoolServer");
718
719    options.addOptionGroup(ImplType.createOptionGroup());
720  }
721
722  protected void parseCommandLine(CommandLine cmd, Options options) throws ExitCodeException {
723    // Get port to bind to
724    try {
725      if (cmd.hasOption(PORT_OPTION)) {
726        int listenPort = Integer.parseInt(cmd.getOptionValue(PORT_OPTION));
727        conf.setInt(PORT_CONF_KEY, listenPort);
728      }
729    } catch (NumberFormatException e) {
730      LOG.error("Could not parse the value provided for the port option", e);
731      printUsageAndExit(options, -1);
732    }
733    // check for user-defined info server port setting, if so override the conf
734    try {
735      if (cmd.hasOption(INFOPORT_OPTION)) {
736        String val = cmd.getOptionValue(INFOPORT_OPTION);
737        conf.setInt(THRIFT_INFO_SERVER_PORT, Integer.parseInt(val));
738        LOG.debug("Web UI port set to " + val);
739      }
740    } catch (NumberFormatException e) {
741      LOG.error("Could not parse the value provided for the " + INFOPORT_OPTION + " option", e);
742      printUsageAndExit(options, -1);
743    }
744    // Make optional changes to the configuration based on command-line options
745    optionToConf(cmd, MIN_WORKERS_OPTION, conf,
746      TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY);
747    optionToConf(cmd, MAX_WORKERS_OPTION, conf,
748      TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY);
749    optionToConf(cmd, MAX_QUEUE_SIZE_OPTION, conf,
750      TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY);
751    optionToConf(cmd, KEEP_ALIVE_SEC_OPTION, conf,
752      TBoundedThreadPoolServer.THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY);
753    optionToConf(cmd, READ_TIMEOUT_OPTION, conf, THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY);
754    optionToConf(cmd, SELECTOR_NUM_OPTION, conf, THRIFT_SELECTOR_NUM);
755
756    // Set general thrift server options
757    boolean compact = cmd.hasOption(COMPACT_OPTION) || conf.getBoolean(COMPACT_CONF_KEY, false);
758    conf.setBoolean(COMPACT_CONF_KEY, compact);
759    boolean framed = cmd.hasOption(FRAMED_OPTION) || conf.getBoolean(FRAMED_CONF_KEY, false);
760    conf.setBoolean(FRAMED_CONF_KEY, framed);
761
762    optionToConf(cmd, BIND_OPTION, conf, BIND_CONF_KEY);
763
764    ImplType.setServerImpl(cmd, conf);
765  }
766
767  /**
768   * Parse the command line options to set parameters the conf.
769   */
770  protected void processOptions(final String[] args) throws Exception {
771    if (args == null || args.length == 0) {
772      return;
773    }
774    Options options = new Options();
775    addOptions(options);
776
777    CommandLineParser parser = new DefaultParser();
778    CommandLine cmd = parser.parse(options, args);
779
780    if (cmd.hasOption("help")) {
781      printUsageAndExit(options, 1);
782    }
783    parseCommandLine(cmd, options);
784  }
785
786  public void stop() {
787    if (this.infoServer != null) {
788      LOG.info("Stopping infoServer");
789      try {
790        this.infoServer.stop();
791      } catch (Exception ex) {
792        LOG.error("Failed to stop infoServer", ex);
793      }
794    }
795    if (pauseMonitor != null) {
796      pauseMonitor.stop();
797    }
798    if (tserver != null) {
799      tserver.stop();
800      tserver = null;
801    }
802    if (httpServer != null) {
803      try {
804        httpServer.stop();
805        httpServer = null;
806      } catch (Exception e) {
807        LOG.error("Problem encountered in shutting down HTTP server", e);
808      }
809      httpServer = null;
810    }
811  }
812
813  protected static void optionToConf(CommandLine cmd, String option, Configuration conf,
814    String destConfKey) {
815    if (cmd.hasOption(option)) {
816      String value = cmd.getOptionValue(option);
817      LOG.info("Set configuration key:" + destConfKey + " value:" + value);
818      conf.set(destConfKey, value);
819    }
820  }
821
822  /**
823   * Run without any command line arguments
824   * @return exit code
825   * @throws Exception exception
826   */
827  public int run() throws Exception {
828    return run(null);
829  }
830
831  @Override
832  public int run(String[] strings) throws Exception {
833    processOptions(strings);
834    setupParamters();
835    if (httpEnabled) {
836      setupHTTPServer();
837    } else {
838      setupServer();
839    }
840    return serviceUGI.doAs(new PrivilegedAction<Integer>() {
841      @Override
842      public Integer run() {
843        try {
844          startInfoServer();
845          if (httpEnabled) {
846            httpServer.start();
847            httpServer.join();
848          } else {
849            tserver.serve();
850          }
851          return 0;
852        } catch (Exception e) {
853          LOG.error(HBaseMarkers.FATAL, "Cannot run ThriftServer", e);
854          return -1;
855        }
856      }
857    });
858  }
859
860  public static void main(String[] args) throws Exception {
861    LOG.info("***** STARTING service '" + ThriftServer.class.getSimpleName() + "' *****");
862    VersionInfo.logVersion();
863    final Configuration conf = HBaseConfiguration.create();
864    // for now, only time we return is on an argument error.
865    final int status = ToolRunner.run(conf, new ThriftServer(conf), args);
866    LOG.info("***** STOPPING service '" + ThriftServer.class.getSimpleName() + "' *****");
867    System.exit(status);
868  }
869}