001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.thrift;
020
021import static org.apache.hadoop.hbase.thrift.Constants.BACKLOG_CONF_DEAFULT;
022import static org.apache.hadoop.hbase.thrift.Constants.BACKLOG_CONF_KEY;
023import static org.apache.hadoop.hbase.thrift.Constants.BIND_CONF_KEY;
024import static org.apache.hadoop.hbase.thrift.Constants.BIND_OPTION;
025import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_CONF_DEFAULT;
026import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_CONF_KEY;
027import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_OPTION;
028import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_BIND_ADDR;
029import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_HTTP_MAX_HEADER_SIZE;
030import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_LISTEN_PORT;
031import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_CONF_DEFAULT;
032import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_CONF_KEY;
033import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_OPTION;
034import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MAX_THREADS_KEY;
035import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MAX_THREADS_KEY_DEFAULT;
036import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MIN_THREADS_KEY;
037import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MIN_THREADS_KEY_DEFAULT;
038import static org.apache.hadoop.hbase.thrift.Constants.INFOPORT_OPTION;
039import static org.apache.hadoop.hbase.thrift.Constants.KEEP_ALIVE_SEC_OPTION;
040import static org.apache.hadoop.hbase.thrift.Constants.MAX_FRAME_SIZE_CONF_DEFAULT;
041import static org.apache.hadoop.hbase.thrift.Constants.MAX_FRAME_SIZE_CONF_KEY;
042import static org.apache.hadoop.hbase.thrift.Constants.MAX_QUEUE_SIZE_OPTION;
043import static org.apache.hadoop.hbase.thrift.Constants.MAX_WORKERS_OPTION;
044import static org.apache.hadoop.hbase.thrift.Constants.MIN_WORKERS_OPTION;
045import static org.apache.hadoop.hbase.thrift.Constants.PORT_CONF_KEY;
046import static org.apache.hadoop.hbase.thrift.Constants.PORT_OPTION;
047import static org.apache.hadoop.hbase.thrift.Constants.READ_TIMEOUT_OPTION;
048import static org.apache.hadoop.hbase.thrift.Constants.SELECTOR_NUM_OPTION;
049import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_DNS_INTERFACE_KEY;
050import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_DNS_NAMESERVER_KEY;
051import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_FILTERS;
052import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_HTTP_ALLOW_OPTIONS_METHOD;
053import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_HTTP_ALLOW_OPTIONS_METHOD_DEFAULT;
054import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_BINDING_ADDRESS;
055import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_BINDING_ADDRESS_DEFAULT;
056import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_PORT;
057import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_PORT_DEFAULT;
058import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_KERBEROS_PRINCIPAL_KEY;
059import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_KEYTAB_FILE_KEY;
060import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_QOP_KEY;
061import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SELECTOR_NUM;
062import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT;
063import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY;
064import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SPNEGO_KEYTAB_FILE_KEY;
065import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SPNEGO_PRINCIPAL_KEY;
066import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_ENABLED_KEY;
067import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY;
068import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY;
069import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY;
070import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_INCLUDE_PROTOCOLS_KEY;
071import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY;
072import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_PASSWORD_KEY;
073import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_STORE_KEY;
074import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SUPPORT_PROXYUSER_KEY;
075import static org.apache.hadoop.hbase.thrift.Constants.USE_HTTP_CONF_KEY;
076
077import java.io.IOException;
078import java.net.InetAddress;
079import java.net.InetSocketAddress;
080import java.net.UnknownHostException;
081import java.util.List;
082import java.util.Map;
083import java.util.concurrent.BlockingQueue;
084import java.util.concurrent.ExecutorService;
085import java.util.concurrent.LinkedBlockingQueue;
086import java.util.concurrent.ThreadPoolExecutor;
087import java.util.concurrent.TimeUnit;
088import javax.security.auth.callback.Callback;
089import javax.security.auth.callback.UnsupportedCallbackException;
090import javax.security.sasl.AuthorizeCallback;
091import javax.security.sasl.SaslServer;
092import org.apache.commons.lang3.ArrayUtils;
093import org.apache.hadoop.conf.Configuration;
094import org.apache.hadoop.conf.Configured;
095import org.apache.hadoop.hbase.HBaseConfiguration;
096import org.apache.hadoop.hbase.HBaseInterfaceAudience;
097import org.apache.hadoop.hbase.filter.ParseFilter;
098import org.apache.hadoop.hbase.http.HttpServerUtil;
099import org.apache.hadoop.hbase.http.InfoServer;
100import org.apache.hadoop.hbase.security.SaslUtil;
101import org.apache.hadoop.hbase.security.SecurityUtil;
102import org.apache.hadoop.hbase.security.UserProvider;
103import org.apache.hadoop.hbase.thrift.generated.Hbase;
104import org.apache.hadoop.hbase.util.DNS;
105import org.apache.hadoop.hbase.util.JvmPauseMonitor;
106import org.apache.hadoop.hbase.util.Strings;
107import org.apache.hadoop.hbase.util.VersionInfo;
108import org.apache.hadoop.security.SaslRpcServer;
109import org.apache.hadoop.security.UserGroupInformation;
110import org.apache.hadoop.security.authorize.ProxyUsers;
111import org.apache.hadoop.util.Shell.ExitCodeException;
112import org.apache.hadoop.util.Tool;
113import org.apache.hadoop.util.ToolRunner;
114import org.apache.thrift.TProcessor;
115import org.apache.thrift.protocol.TBinaryProtocol;
116import org.apache.thrift.protocol.TCompactProtocol;
117import org.apache.thrift.protocol.TProtocolFactory;
118import org.apache.thrift.server.THsHaServer;
119import org.apache.thrift.server.TNonblockingServer;
120import org.apache.thrift.server.TServer;
121import org.apache.thrift.server.TServlet;
122import org.apache.thrift.server.TThreadedSelectorServer;
123import org.apache.thrift.transport.TFramedTransport;
124import org.apache.thrift.transport.TNonblockingServerSocket;
125import org.apache.thrift.transport.TNonblockingServerTransport;
126import org.apache.thrift.transport.TSaslServerTransport;
127import org.apache.thrift.transport.TServerSocket;
128import org.apache.thrift.transport.TServerTransport;
129import org.apache.thrift.transport.TTransportFactory;
130import org.apache.yetus.audience.InterfaceAudience;
131import org.slf4j.Logger;
132import org.slf4j.LoggerFactory;
133
134import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
135import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
136import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
137import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
138import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
139import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser;
140import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
141import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
142import org.apache.hbase.thirdparty.org.eclipse.jetty.http.HttpVersion;
143import org.apache.hbase.thirdparty.org.eclipse.jetty.server.HttpConfiguration;
144import org.apache.hbase.thirdparty.org.eclipse.jetty.server.HttpConnectionFactory;
145import org.apache.hbase.thirdparty.org.eclipse.jetty.server.SecureRequestCustomizer;
146import org.apache.hbase.thirdparty.org.eclipse.jetty.server.Server;
147import org.apache.hbase.thirdparty.org.eclipse.jetty.server.ServerConnector;
148import org.apache.hbase.thirdparty.org.eclipse.jetty.server.SslConnectionFactory;
149import org.apache.hbase.thirdparty.org.eclipse.jetty.servlet.ServletContextHandler;
150import org.apache.hbase.thirdparty.org.eclipse.jetty.servlet.ServletHolder;
151import org.apache.hbase.thirdparty.org.eclipse.jetty.util.ssl.SslContextFactory;
152import org.apache.hbase.thirdparty.org.eclipse.jetty.util.thread.QueuedThreadPool;
153
154/**
155 * ThriftServer- this class starts up a Thrift server which implements the
156 * Hbase API specified in the Hbase.thrift IDL file. The server runs in an
157 * independent process.
158 */
159@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
160public class ThriftServer  extends Configured implements Tool {
161
162  private static final Logger LOG = LoggerFactory.getLogger(ThriftServer.class);
163
164
165
166  protected Configuration conf;
167
168  protected InfoServer infoServer;
169
170  protected TProcessor processor;
171
172  protected ThriftMetrics metrics;
173  protected HBaseServiceHandler hbaseServiceHandler;
174  protected UserGroupInformation serviceUGI;
175  protected UserGroupInformation httpUGI;
176  protected boolean httpEnabled;
177
178  protected SaslUtil.QualityOfProtection qop;
179  protected String host;
180  protected int listenPort;
181
182
183  protected boolean securityEnabled;
184  protected boolean doAsEnabled;
185
186  protected JvmPauseMonitor pauseMonitor;
187
188  protected volatile TServer tserver;
189  protected volatile Server httpServer;
190
191
192  //
193  // Main program and support routines
194  //
195
196  public ThriftServer(Configuration conf) {
197    this.conf = HBaseConfiguration.create(conf);
198  }
199
200  protected ThriftMetrics createThriftMetrics(Configuration conf) {
201    return new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
202  }
203
204  protected void setupParamters() throws IOException {
205    // login the server principal (if using secure Hadoop)
206    UserProvider userProvider = UserProvider.instantiate(conf);
207    securityEnabled = userProvider.isHadoopSecurityEnabled()
208        && userProvider.isHBaseSecurityEnabled();
209    if (securityEnabled) {
210      host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
211          conf.get(THRIFT_DNS_INTERFACE_KEY, "default"),
212          conf.get(THRIFT_DNS_NAMESERVER_KEY, "default")));
213      userProvider.login(THRIFT_KEYTAB_FILE_KEY, THRIFT_KERBEROS_PRINCIPAL_KEY, host);
214
215      // Setup the SPNEGO user for HTTP if configured
216      String spnegoPrincipal = getSpengoPrincipal(conf, host);
217      String spnegoKeytab = getSpnegoKeytab(conf);
218      UserGroupInformation.setConfiguration(conf);
219      // login the SPNEGO principal using UGI to avoid polluting the login user
220      this.httpUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(spnegoPrincipal,
221        spnegoKeytab);
222    }
223    this.serviceUGI = userProvider.getCurrent().getUGI();
224    if (httpUGI == null) {
225      this.httpUGI = serviceUGI;
226    }
227
228    this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
229    this.metrics = createThriftMetrics(conf);
230    this.pauseMonitor = new JvmPauseMonitor(conf, this.metrics.getSource());
231    this.hbaseServiceHandler = createHandler(conf, userProvider);
232    this.hbaseServiceHandler.initMetrics(metrics);
233    this.processor = createProcessor();
234
235    httpEnabled = conf.getBoolean(USE_HTTP_CONF_KEY, false);
236    doAsEnabled = conf.getBoolean(THRIFT_SUPPORT_PROXYUSER_KEY, false);
237    if (doAsEnabled && !httpEnabled) {
238      LOG.warn("Fail to enable the doAs feature. " + USE_HTTP_CONF_KEY + " is not configured");
239    }
240
241    String strQop = conf.get(THRIFT_QOP_KEY);
242    if (strQop != null) {
243      this.qop = SaslUtil.getQop(strQop);
244    }
245    if (qop != null) {
246      if (qop != SaslUtil.QualityOfProtection.AUTHENTICATION &&
247          qop != SaslUtil.QualityOfProtection.INTEGRITY &&
248          qop != SaslUtil.QualityOfProtection.PRIVACY) {
249        throw new IOException(String.format("Invalid %s: It must be one of %s, %s, or %s.",
250            THRIFT_QOP_KEY,
251            SaslUtil.QualityOfProtection.AUTHENTICATION.name(),
252            SaslUtil.QualityOfProtection.INTEGRITY.name(),
253            SaslUtil.QualityOfProtection.PRIVACY.name()));
254      }
255      checkHttpSecurity(qop, conf);
256      if (!securityEnabled) {
257        throw new IOException("Thrift server must run in secure mode to support authentication");
258      }
259    }
260    registerFilters(conf);
261    pauseMonitor.start();
262  }
263
264  private String getSpengoPrincipal(Configuration conf, String host) throws IOException {
265    String principal = conf.get(THRIFT_SPNEGO_PRINCIPAL_KEY);
266    if (principal == null) {
267      // We cannot use the Hadoop configuration deprecation handling here since
268      // the THRIFT_KERBEROS_PRINCIPAL_KEY config is still valid for regular Kerberos
269      // communication. The preference should be to use the THRIFT_SPNEGO_PRINCIPAL_KEY
270      // config so that THRIFT_KERBEROS_PRINCIPAL_KEY doesn't control both backend
271      // Kerberos principal and SPNEGO principal.
272      LOG.info("Using deprecated {} config for SPNEGO principal. Use {} instead.",
273        THRIFT_KERBEROS_PRINCIPAL_KEY, THRIFT_SPNEGO_PRINCIPAL_KEY);
274      principal = conf.get(THRIFT_KERBEROS_PRINCIPAL_KEY);
275    }
276    // Handle _HOST in principal value
277    return org.apache.hadoop.security.SecurityUtil.getServerPrincipal(principal, host);
278  }
279
280  private String getSpnegoKeytab(Configuration conf) {
281    String keytab = conf.get(THRIFT_SPNEGO_KEYTAB_FILE_KEY);
282    if (keytab == null) {
283      // We cannot use the Hadoop configuration deprecation handling here since
284      // the THRIFT_KEYTAB_FILE_KEY config is still valid for regular Kerberos
285      // communication. The preference should be to use the THRIFT_SPNEGO_KEYTAB_FILE_KEY
286      // config so that THRIFT_KEYTAB_FILE_KEY doesn't control both backend
287      // Kerberos keytab and SPNEGO keytab.
288      LOG.info("Using deprecated {} config for SPNEGO keytab. Use {} instead.",
289        THRIFT_KEYTAB_FILE_KEY, THRIFT_SPNEGO_KEYTAB_FILE_KEY);
290      keytab = conf.get(THRIFT_KEYTAB_FILE_KEY);
291    }
292    return keytab;
293  }
294
295  protected void startInfoServer() throws IOException {
296    // Put up info server.
297    int port = conf.getInt(THRIFT_INFO_SERVER_PORT , THRIFT_INFO_SERVER_PORT_DEFAULT);
298
299    if (port >= 0) {
300      conf.setLong("startcode", System.currentTimeMillis());
301      String a = conf
302          .get(THRIFT_INFO_SERVER_BINDING_ADDRESS, THRIFT_INFO_SERVER_BINDING_ADDRESS_DEFAULT);
303      infoServer = new InfoServer("thrift", a, port, false, conf);
304      infoServer.setAttribute("hbase.conf", conf);
305      infoServer.setAttribute("hbase.thrift.server.type", metrics.getThriftServerType().name());
306      infoServer.start();
307    }
308  }
309
310  protected void checkHttpSecurity(SaslUtil.QualityOfProtection qop, Configuration conf) {
311    if (qop == SaslUtil.QualityOfProtection.PRIVACY &&
312        conf.getBoolean(USE_HTTP_CONF_KEY, false) &&
313        !conf.getBoolean(THRIFT_SSL_ENABLED_KEY, false)) {
314      throw new IllegalArgumentException("Thrift HTTP Server's QoP is privacy, but " +
315          THRIFT_SSL_ENABLED_KEY + " is false");
316    }
317  }
318
319  protected HBaseServiceHandler createHandler(Configuration conf, UserProvider userProvider)
320      throws IOException {
321    return new ThriftHBaseServiceHandler(conf, userProvider);
322  }
323
324  protected TProcessor createProcessor() {
325    return new Hbase.Processor<>(
326        HbaseHandlerMetricsProxy.newInstance((Hbase.Iface) hbaseServiceHandler, metrics, conf));
327  }
328
329  /**
330   * the thrift server, not null means the server is started, for test only
331   * @return the tServer
332   */
333  @InterfaceAudience.Private
334  public TServer getTserver() {
335    return tserver;
336  }
337
338  /**
339   * the Jetty server, not null means the HTTP server is started, for test only
340   * @return the http server
341   */
342  @InterfaceAudience.Private
343  public Server getHttpServer() {
344    return httpServer;
345  }
346
347  protected void printUsageAndExit(Options options, int exitCode)
348      throws ExitCodeException {
349    HelpFormatter formatter = new HelpFormatter();
350    formatter.printHelp("Thrift", null, options,
351        "To start the Thrift server run 'hbase-daemon.sh start thrift' or " +
352            "'hbase thrift'\n" +
353            "To shutdown the thrift server run 'hbase-daemon.sh stop " +
354            "thrift' or send a kill signal to the thrift server pid",
355        true);
356    throw new ExitCodeException(exitCode, "");
357  }
358
359  /**
360   * Create a Servlet for the http server
361   * @param protocolFactory protocolFactory
362   * @return the servlet
363   */
364  protected TServlet createTServlet(TProtocolFactory protocolFactory) {
365    return new ThriftHttpServlet(processor, protocolFactory, serviceUGI, httpUGI,
366        hbaseServiceHandler, securityEnabled, doAsEnabled);
367  }
368
369  /**
370   * Setup an HTTP Server using Jetty to serve calls from THttpClient
371   *
372   * @throws IOException IOException
373   */
374  protected void setupHTTPServer() throws IOException {
375    TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
376    TServlet thriftHttpServlet = createTServlet(protocolFactory);
377
378    // Set the default max thread number to 100 to limit
379    // the number of concurrent requests so that Thrfit HTTP server doesn't OOM easily.
380    // Jetty set the default max thread number to 250, if we don't set it.
381    //
382    // Our default min thread number 2 is the same as that used by Jetty.
383    int minThreads = conf.getInt(HTTP_MIN_THREADS_KEY,
384        conf.getInt(TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY,
385            HTTP_MIN_THREADS_KEY_DEFAULT));
386    int maxThreads = conf.getInt(HTTP_MAX_THREADS_KEY,
387        conf.getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY,
388            HTTP_MAX_THREADS_KEY_DEFAULT));
389    QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads);
390    threadPool.setMinThreads(minThreads);
391    httpServer = new Server(threadPool);
392
393    // Context handler
394    ServletContextHandler ctxHandler = new ServletContextHandler(httpServer, "/",
395        ServletContextHandler.SESSIONS);
396    ctxHandler.addServlet(new ServletHolder(thriftHttpServlet), "/*");
397    HttpServerUtil.constrainHttpMethods(ctxHandler,
398        conf.getBoolean(THRIFT_HTTP_ALLOW_OPTIONS_METHOD,
399            THRIFT_HTTP_ALLOW_OPTIONS_METHOD_DEFAULT));
400
401    // set up Jetty and run the embedded server
402    HttpConfiguration httpConfig = new HttpConfiguration();
403    httpConfig.setSecureScheme("https");
404    httpConfig.setSecurePort(listenPort);
405    httpConfig.setHeaderCacheSize(DEFAULT_HTTP_MAX_HEADER_SIZE);
406    httpConfig.setRequestHeaderSize(DEFAULT_HTTP_MAX_HEADER_SIZE);
407    httpConfig.setResponseHeaderSize(DEFAULT_HTTP_MAX_HEADER_SIZE);
408    httpConfig.setSendServerVersion(false);
409    httpConfig.setSendDateHeader(false);
410
411    ServerConnector serverConnector;
412    if(conf.getBoolean(THRIFT_SSL_ENABLED_KEY, false)) {
413      HttpConfiguration httpsConfig = new HttpConfiguration(httpConfig);
414      httpsConfig.addCustomizer(new SecureRequestCustomizer());
415
416      SslContextFactory sslCtxFactory = new SslContextFactory();
417      String keystore = conf.get(THRIFT_SSL_KEYSTORE_STORE_KEY);
418      String password = HBaseConfiguration.getPassword(conf,
419          THRIFT_SSL_KEYSTORE_PASSWORD_KEY, null);
420      String keyPassword = HBaseConfiguration.getPassword(conf,
421          THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY, password);
422      sslCtxFactory.setKeyStorePath(keystore);
423      sslCtxFactory.setKeyStorePassword(password);
424      sslCtxFactory.setKeyManagerPassword(keyPassword);
425
426      String[] excludeCiphers = conf.getStrings(
427          THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY, ArrayUtils.EMPTY_STRING_ARRAY);
428      if (excludeCiphers.length != 0) {
429        sslCtxFactory.setExcludeCipherSuites(excludeCiphers);
430      }
431      String[] includeCiphers = conf.getStrings(
432          THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY, ArrayUtils.EMPTY_STRING_ARRAY);
433      if (includeCiphers.length != 0) {
434        sslCtxFactory.setIncludeCipherSuites(includeCiphers);
435      }
436
437      // Disable SSLv3 by default due to "Poodle" Vulnerability - CVE-2014-3566
438      String[] excludeProtocols = conf.getStrings(
439          THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY, "SSLv3");
440      if (excludeProtocols.length != 0) {
441        sslCtxFactory.setExcludeProtocols(excludeProtocols);
442      }
443      String[] includeProtocols = conf.getStrings(
444          THRIFT_SSL_INCLUDE_PROTOCOLS_KEY, ArrayUtils.EMPTY_STRING_ARRAY);
445      if (includeProtocols.length != 0) {
446        sslCtxFactory.setIncludeProtocols(includeProtocols);
447      }
448
449      serverConnector = new ServerConnector(httpServer,
450          new SslConnectionFactory(sslCtxFactory, HttpVersion.HTTP_1_1.toString()),
451          new HttpConnectionFactory(httpsConfig));
452    } else {
453      serverConnector = new ServerConnector(httpServer, new HttpConnectionFactory(httpConfig));
454    }
455    serverConnector.setPort(listenPort);
456    serverConnector.setHost(getBindAddress(conf).getHostAddress());
457    httpServer.addConnector(serverConnector);
458    httpServer.setStopAtShutdown(true);
459
460    if (doAsEnabled) {
461      ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
462    }
463    LOG.info("Starting Thrift HTTP Server on {}", Integer.toString(listenPort));
464  }
465
466  /**
467   * Setting up the thrift TServer
468   */
469  protected void setupServer() throws Exception {
470    // Construct correct ProtocolFactory
471    TProtocolFactory protocolFactory = getProtocolFactory();
472
473    ImplType implType = ImplType.getServerImpl(conf);
474    TProcessor processorToUse = processor;
475
476    // Construct correct TransportFactory
477    TTransportFactory transportFactory;
478    if (conf.getBoolean(FRAMED_CONF_KEY, FRAMED_CONF_DEFAULT) || implType.isAlwaysFramed) {
479      if (qop != null) {
480        throw new RuntimeException("Thrift server authentication"
481            + " doesn't work with framed transport yet");
482      }
483      transportFactory = new TFramedTransport.Factory(
484          conf.getInt(MAX_FRAME_SIZE_CONF_KEY, MAX_FRAME_SIZE_CONF_DEFAULT) * 1024 * 1024);
485      LOG.debug("Using framed transport");
486    } else if (qop == null) {
487      transportFactory = new TTransportFactory();
488    } else {
489      // Extract the name from the principal
490      String thriftKerberosPrincipal = conf.get(THRIFT_KERBEROS_PRINCIPAL_KEY);
491      if (thriftKerberosPrincipal == null) {
492        throw new IllegalArgumentException(THRIFT_KERBEROS_PRINCIPAL_KEY + " cannot be null");
493      }
494      String name = SecurityUtil.getUserFromPrincipal(thriftKerberosPrincipal);
495      Map<String, String> saslProperties = SaslUtil.initSaslProperties(qop.name());
496      TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
497      saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
498          new SaslRpcServer.SaslGssCallbackHandler() {
499            @Override
500            public void handle(Callback[] callbacks)
501                throws UnsupportedCallbackException {
502              AuthorizeCallback ac = null;
503              for (Callback callback : callbacks) {
504                if (callback instanceof AuthorizeCallback) {
505                  ac = (AuthorizeCallback) callback;
506                } else {
507                  throw new UnsupportedCallbackException(callback,
508                      "Unrecognized SASL GSSAPI Callback");
509                }
510              }
511              if (ac != null) {
512                String authid = ac.getAuthenticationID();
513                String authzid = ac.getAuthorizationID();
514                if (!authid.equals(authzid)) {
515                  ac.setAuthorized(false);
516                } else {
517                  ac.setAuthorized(true);
518                  String userName = SecurityUtil.getUserFromPrincipal(authzid);
519                  LOG.info("Effective user: {}", userName);
520                  ac.setAuthorizedID(userName);
521                }
522              }
523            }
524          });
525      transportFactory = saslFactory;
526
527      // Create a processor wrapper, to get the caller
528      processorToUse = (inProt, outProt) -> {
529        TSaslServerTransport saslServerTransport =
530            (TSaslServerTransport)inProt.getTransport();
531        SaslServer saslServer = saslServerTransport.getSaslServer();
532        String principal = saslServer.getAuthorizationID();
533        hbaseServiceHandler.setEffectiveUser(principal);
534        processor.process(inProt, outProt);
535      };
536    }
537
538    if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
539      LOG.error("Server types {} don't support IP address binding at the moment. See " +
540              "https://issues.apache.org/jira/browse/HBASE-2155 for details.",
541          Joiner.on(", ").join(ImplType.serversThatCannotSpecifyBindIP()));
542      throw new RuntimeException("-" + BIND_CONF_KEY + " not supported with " + implType);
543    }
544
545    InetSocketAddress inetSocketAddress = new InetSocketAddress(getBindAddress(conf), listenPort);
546    if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING ||
547        implType == ImplType.THREADED_SELECTOR) {
548      TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
549      if (implType == ImplType.NONBLOCKING) {
550        tserver = getTNonBlockingServer(serverTransport, protocolFactory, processorToUse,
551            transportFactory, inetSocketAddress);
552      } else if (implType == ImplType.HS_HA) {
553        tserver = getTHsHaServer(serverTransport, protocolFactory, processorToUse, transportFactory,
554            inetSocketAddress);
555      } else { // THREADED_SELECTOR
556        tserver = getTThreadedSelectorServer(serverTransport, protocolFactory, processorToUse,
557            transportFactory, inetSocketAddress);
558      }
559      LOG.info("starting HBase {} server on {}", implType.simpleClassName(),
560          Integer.toString(listenPort));
561    } else if (implType == ImplType.THREAD_POOL) {
562      this.tserver = getTThreadPoolServer(protocolFactory, processorToUse, transportFactory,
563          inetSocketAddress);
564    } else {
565      throw new AssertionError("Unsupported Thrift server implementation: " +
566          implType.simpleClassName());
567    }
568
569    // A sanity check that we instantiated the right type of server.
570    if (tserver.getClass() != implType.serverClass) {
571      throw new AssertionError("Expected to create Thrift server class " +
572          implType.serverClass.getName() + " but got " +
573          tserver.getClass().getName());
574    }
575  }
576
577  protected TServer getTNonBlockingServer(TNonblockingServerTransport serverTransport,
578      TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory,
579      InetSocketAddress inetSocketAddress) {
580    LOG.info("starting HBase Nonblocking Thrift server on " + inetSocketAddress.toString());
581    TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport);
582    serverArgs.processor(processor);
583    serverArgs.transportFactory(transportFactory);
584    serverArgs.protocolFactory(protocolFactory);
585    return new TNonblockingServer(serverArgs);
586  }
587
588  protected TServer getTHsHaServer(TNonblockingServerTransport serverTransport,
589      TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory,
590      InetSocketAddress inetSocketAddress) {
591    LOG.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString());
592    THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
593    int queueSize = conf.getInt(TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY,
594        TBoundedThreadPoolServer.DEFAULT_MAX_QUEUED_REQUESTS);
595    CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(queueSize), metrics);
596    int workerThread = conf.getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY,
597        serverArgs.getMaxWorkerThreads());
598    ExecutorService executorService = createExecutor(
599        callQueue, workerThread, workerThread);
600    serverArgs.executorService(executorService).processor(processor)
601        .transportFactory(transportFactory).protocolFactory(protocolFactory);
602    return new THsHaServer(serverArgs);
603  }
604
605  protected TServer getTThreadedSelectorServer(TNonblockingServerTransport serverTransport,
606      TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory,
607      InetSocketAddress inetSocketAddress) {
608    LOG.info("starting HBase ThreadedSelector Thrift server on " + inetSocketAddress.toString());
609    TThreadedSelectorServer.Args serverArgs =
610        new HThreadedSelectorServerArgs(serverTransport, conf);
611    int queueSize = conf.getInt(TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY,
612        TBoundedThreadPoolServer.DEFAULT_MAX_QUEUED_REQUESTS);
613    CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(queueSize), metrics);
614    int workerThreads = conf.getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY,
615        serverArgs.getWorkerThreads());
616    int selectorThreads = conf.getInt(THRIFT_SELECTOR_NUM, serverArgs.getSelectorThreads());
617    serverArgs.selectorThreads(selectorThreads);
618    ExecutorService executorService = createExecutor(
619        callQueue, workerThreads, workerThreads);
620    serverArgs.executorService(executorService).processor(processor)
621        .transportFactory(transportFactory).protocolFactory(protocolFactory);
622    return new TThreadedSelectorServer(serverArgs);
623  }
624
625  protected TServer getTThreadPoolServer(TProtocolFactory protocolFactory, TProcessor processor,
626      TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws Exception {
627    LOG.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString());
628    // Thrift's implementation uses '0' as a placeholder for 'use the default.'
629    int backlog = conf.getInt(BACKLOG_CONF_KEY, BACKLOG_CONF_DEAFULT);
630    int readTimeout = conf.getInt(THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY,
631        THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT);
632    TServerTransport serverTransport = new TServerSocket(
633        new TServerSocket.ServerSocketTransportArgs().
634            bindAddr(inetSocketAddress).backlog(backlog).
635            clientTimeout(readTimeout));
636
637    TBoundedThreadPoolServer.Args serverArgs =
638        new TBoundedThreadPoolServer.Args(serverTransport, conf);
639    serverArgs.processor(processor).transportFactory(transportFactory)
640        .protocolFactory(protocolFactory);
641    return new TBoundedThreadPoolServer(serverArgs, metrics);
642  }
643
644  protected TProtocolFactory getProtocolFactory() {
645    TProtocolFactory protocolFactory;
646
647    if (conf.getBoolean(COMPACT_CONF_KEY, COMPACT_CONF_DEFAULT)) {
648      LOG.debug("Using compact protocol");
649      protocolFactory = new TCompactProtocol.Factory();
650    } else {
651      LOG.debug("Using binary protocol");
652      protocolFactory = new TBinaryProtocol.Factory();
653    }
654
655    return protocolFactory;
656  }
657
658  protected ExecutorService createExecutor(BlockingQueue<Runnable> callQueue,
659      int minWorkers, int maxWorkers) {
660    ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
661    tfb.setDaemon(true);
662    tfb.setNameFormat("thrift-worker-%d");
663    ThreadPoolExecutor threadPool = new THBaseThreadPoolExecutor(minWorkers, maxWorkers,
664        Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build(), metrics);
665    threadPool.allowCoreThreadTimeOut(true);
666    return threadPool;
667  }
668
669  protected InetAddress getBindAddress(Configuration conf)
670      throws UnknownHostException {
671    String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
672    return InetAddress.getByName(bindAddressStr);
673  }
674
675
676  public static void registerFilters(Configuration conf) {
677    String[] filters = conf.getStrings(THRIFT_FILTERS);
678    Splitter splitter = Splitter.on(':');
679    if(filters != null) {
680      for(String filterClass: filters) {
681        List<String> filterPart = splitter.splitToList(filterClass);
682        if(filterPart.size() != 2) {
683          LOG.warn("Invalid filter specification " + filterClass + " - skipping");
684        } else {
685          ParseFilter.registerFilter(filterPart.get(0), filterPart.get(1));
686        }
687      }
688    }
689  }
690
691  /**
692   * Add options to command lines
693   * @param options options
694   */
695  protected void addOptions(Options options) {
696    options.addOption("b", BIND_OPTION, true, "Address to bind " +
697        "the Thrift server to. [default: " + DEFAULT_BIND_ADDR + "]");
698    options.addOption("p", PORT_OPTION, true, "Port to bind to [default: " +
699        DEFAULT_LISTEN_PORT + "]");
700    options.addOption("f", FRAMED_OPTION, false, "Use framed transport");
701    options.addOption("c", COMPACT_OPTION, false, "Use the compact protocol");
702    options.addOption("h", "help", false, "Print help information");
703    options.addOption("s", SELECTOR_NUM_OPTION, true, "How many selector threads to use.");
704    options.addOption(null, INFOPORT_OPTION, true, "Port for web UI");
705
706    options.addOption("m", MIN_WORKERS_OPTION, true,
707        "The minimum number of worker threads for " +
708            ImplType.THREAD_POOL.simpleClassName());
709
710    options.addOption("w", MAX_WORKERS_OPTION, true,
711        "The maximum number of worker threads for " +
712            ImplType.THREAD_POOL.simpleClassName());
713
714    options.addOption("q", MAX_QUEUE_SIZE_OPTION, true,
715        "The maximum number of queued requests in " +
716            ImplType.THREAD_POOL.simpleClassName());
717
718    options.addOption("k", KEEP_ALIVE_SEC_OPTION, true,
719        "The amount of time in secods to keep a thread alive when idle in " +
720            ImplType.THREAD_POOL.simpleClassName());
721
722    options.addOption("t", READ_TIMEOUT_OPTION, true,
723        "Amount of time in milliseconds before a server thread will timeout " +
724            "waiting for client to send data on a connected socket. Currently, " +
725            "only applies to TBoundedThreadPoolServer");
726
727    options.addOptionGroup(ImplType.createOptionGroup());
728  }
729
730  protected void parseCommandLine(CommandLine cmd, Options options) throws ExitCodeException {
731    // Get port to bind to
732    try {
733      if (cmd.hasOption(PORT_OPTION)) {
734        int listenPort = Integer.parseInt(cmd.getOptionValue(PORT_OPTION));
735        conf.setInt(PORT_CONF_KEY, listenPort);
736      }
737    } catch (NumberFormatException e) {
738      LOG.error("Could not parse the value provided for the port option", e);
739      printUsageAndExit(options, -1);
740    }
741    // check for user-defined info server port setting, if so override the conf
742    try {
743      if (cmd.hasOption(INFOPORT_OPTION)) {
744        String val = cmd.getOptionValue(INFOPORT_OPTION);
745        conf.setInt(THRIFT_INFO_SERVER_PORT, Integer.parseInt(val));
746        LOG.debug("Web UI port set to " + val);
747      }
748    } catch (NumberFormatException e) {
749      LOG.error("Could not parse the value provided for the " + INFOPORT_OPTION +
750          " option", e);
751      printUsageAndExit(options, -1);
752    }
753    // Make optional changes to the configuration based on command-line options
754    optionToConf(cmd, MIN_WORKERS_OPTION,
755        conf, TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY);
756    optionToConf(cmd, MAX_WORKERS_OPTION,
757        conf, TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY);
758    optionToConf(cmd, MAX_QUEUE_SIZE_OPTION,
759        conf, TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY);
760    optionToConf(cmd, KEEP_ALIVE_SEC_OPTION,
761        conf, TBoundedThreadPoolServer.THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY);
762    optionToConf(cmd, READ_TIMEOUT_OPTION, conf, THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY);
763    optionToConf(cmd, SELECTOR_NUM_OPTION, conf, THRIFT_SELECTOR_NUM);
764
765    // Set general thrift server options
766    boolean compact = cmd.hasOption(COMPACT_OPTION) ||
767        conf.getBoolean(COMPACT_CONF_KEY, false);
768    conf.setBoolean(COMPACT_CONF_KEY, compact);
769    boolean framed = cmd.hasOption(FRAMED_OPTION) ||
770        conf.getBoolean(FRAMED_CONF_KEY, false);
771    conf.setBoolean(FRAMED_CONF_KEY, framed);
772
773    optionToConf(cmd, BIND_OPTION, conf, BIND_CONF_KEY);
774
775
776    ImplType.setServerImpl(cmd, conf);
777  }
778
779  /**
780   * Parse the command line options to set parameters the conf.
781   */
782  protected void processOptions(final String[] args) throws Exception {
783    if (args == null || args.length == 0) {
784      return;
785    }
786    Options options = new Options();
787    addOptions(options);
788
789    CommandLineParser parser = new DefaultParser();
790    CommandLine cmd = parser.parse(options, args);
791
792    if (cmd.hasOption("help")) {
793      printUsageAndExit(options, 1);
794    }
795    parseCommandLine(cmd, options);
796  }
797
798  public void stop() {
799    if (this.infoServer != null) {
800      LOG.info("Stopping infoServer");
801      try {
802        this.infoServer.stop();
803      } catch (Exception ex) {
804        LOG.error("Failed to stop infoServer", ex);
805      }
806    }
807    if (pauseMonitor != null) {
808      pauseMonitor.stop();
809    }
810    if (tserver != null) {
811      tserver.stop();
812      tserver = null;
813    }
814    if (httpServer != null) {
815      try {
816        httpServer.stop();
817        httpServer = null;
818      } catch (Exception e) {
819        LOG.error("Problem encountered in shutting down HTTP server", e);
820      }
821      httpServer = null;
822    }
823  }
824
825  protected static void optionToConf(CommandLine cmd, String option,
826      Configuration conf, String destConfKey) {
827    if (cmd.hasOption(option)) {
828      String value = cmd.getOptionValue(option);
829      LOG.info("Set configuration key:" + destConfKey + " value:" + value);
830      conf.set(destConfKey, value);
831    }
832  }
833
834  /**
835   * Run without any command line arguments
836   * @return exit code
837   * @throws Exception exception
838   */
839  public int run() throws Exception {
840    return run(null);
841  }
842
843  @Override
844  public int run(String[] strings) throws Exception {
845    processOptions(strings);
846    setupParamters();
847    startInfoServer();
848    if (httpEnabled) {
849      setupHTTPServer();
850      httpServer.start();
851      httpServer.join();
852    } else {
853      setupServer();
854      tserver.serve();
855    }
856    return 0;
857  }
858
859  public static void main(String [] args) throws Exception {
860    LOG.info("***** STARTING service '" + ThriftServer.class.getSimpleName() + "' *****");
861    VersionInfo.logVersion();
862    final Configuration conf = HBaseConfiguration.create();
863    // for now, only time we return is on an argument error.
864    final int status = ToolRunner.run(conf, new ThriftServer(conf), args);
865    LOG.info("***** STOPPING service '" + ThriftServer.class.getSimpleName() + "' *****");
866    System.exit(status);
867  }
868}