001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.thrift;
020
021import static org.apache.hadoop.hbase.thrift.Constants.BACKLOG_CONF_DEAFULT;
022import static org.apache.hadoop.hbase.thrift.Constants.BACKLOG_CONF_KEY;
023import static org.apache.hadoop.hbase.thrift.Constants.BIND_CONF_KEY;
024import static org.apache.hadoop.hbase.thrift.Constants.BIND_OPTION;
025import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_CONF_DEFAULT;
026import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_CONF_KEY;
027import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_OPTION;
028import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_BIND_ADDR;
029import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_HTTP_MAX_HEADER_SIZE;
030import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_LISTEN_PORT;
031import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_CONF_DEFAULT;
032import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_CONF_KEY;
033import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_OPTION;
034import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MAX_THREADS_KEY;
035import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MAX_THREADS_KEY_DEFAULT;
036import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MIN_THREADS_KEY;
037import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MIN_THREADS_KEY_DEFAULT;
038import static org.apache.hadoop.hbase.thrift.Constants.INFOPORT_OPTION;
039import static org.apache.hadoop.hbase.thrift.Constants.KEEP_ALIVE_SEC_OPTION;
040import static org.apache.hadoop.hbase.thrift.Constants.MAX_FRAME_SIZE_CONF_DEFAULT;
041import static org.apache.hadoop.hbase.thrift.Constants.MAX_FRAME_SIZE_CONF_KEY;
042import static org.apache.hadoop.hbase.thrift.Constants.MAX_QUEUE_SIZE_OPTION;
043import static org.apache.hadoop.hbase.thrift.Constants.MAX_WORKERS_OPTION;
044import static org.apache.hadoop.hbase.thrift.Constants.MIN_WORKERS_OPTION;
045import static org.apache.hadoop.hbase.thrift.Constants.PORT_CONF_KEY;
046import static org.apache.hadoop.hbase.thrift.Constants.PORT_OPTION;
047import static org.apache.hadoop.hbase.thrift.Constants.READ_TIMEOUT_OPTION;
048import static org.apache.hadoop.hbase.thrift.Constants.SELECTOR_NUM_OPTION;
049import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_DNS_INTERFACE_KEY;
050import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_DNS_NAMESERVER_KEY;
051import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_FILTERS;
052import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_HTTP_ALLOW_OPTIONS_METHOD;
053import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_HTTP_ALLOW_OPTIONS_METHOD_DEFAULT;
054import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_BINDING_ADDRESS;
055import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_BINDING_ADDRESS_DEFAULT;
056import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_PORT;
057import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_PORT_DEFAULT;
058import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_KERBEROS_PRINCIPAL_KEY;
059import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_KEYTAB_FILE_KEY;
060import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_QOP_KEY;
061import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SELECTOR_NUM;
062import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT;
063import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY;
064import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SPNEGO_KEYTAB_FILE_KEY;
065import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SPNEGO_PRINCIPAL_KEY;
066import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_ENABLED_KEY;
067import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY;
068import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY;
069import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY;
070import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_INCLUDE_PROTOCOLS_KEY;
071import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY;
072import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_PASSWORD_KEY;
073import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_STORE_KEY;
074import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SUPPORT_PROXYUSER_KEY;
075import static org.apache.hadoop.hbase.thrift.Constants.USE_HTTP_CONF_KEY;
076
077import java.io.IOException;
078import java.net.InetAddress;
079import java.net.InetSocketAddress;
080import java.net.UnknownHostException;
081import java.util.List;
082import java.util.Map;
083import java.util.concurrent.BlockingQueue;
084import java.util.concurrent.ExecutorService;
085import java.util.concurrent.LinkedBlockingQueue;
086import java.util.concurrent.ThreadPoolExecutor;
087import java.util.concurrent.TimeUnit;
088import javax.security.auth.callback.Callback;
089import javax.security.auth.callback.UnsupportedCallbackException;
090import javax.security.sasl.AuthorizeCallback;
091import javax.security.sasl.SaslServer;
092
093import org.apache.commons.lang3.ArrayUtils;
094import org.apache.hadoop.conf.Configuration;
095import org.apache.hadoop.conf.Configured;
096import org.apache.hadoop.hbase.HBaseConfiguration;
097import org.apache.hadoop.hbase.HBaseInterfaceAudience;
098import org.apache.hadoop.hbase.filter.ParseFilter;
099import org.apache.hadoop.hbase.http.HttpServerUtil;
100import org.apache.hadoop.hbase.http.InfoServer;
101import org.apache.hadoop.hbase.security.SaslUtil;
102import org.apache.hadoop.hbase.security.SecurityUtil;
103import org.apache.hadoop.hbase.security.UserProvider;
104import org.apache.hadoop.hbase.thrift.generated.Hbase;
105import org.apache.hadoop.hbase.util.DNS;
106import org.apache.hadoop.hbase.util.JvmPauseMonitor;
107import org.apache.hadoop.hbase.util.Strings;
108import org.apache.hadoop.hbase.util.VersionInfo;
109import org.apache.hadoop.security.SaslRpcServer;
110import org.apache.hadoop.security.UserGroupInformation;
111import org.apache.hadoop.security.authorize.ProxyUsers;
112import org.apache.hadoop.util.Shell.ExitCodeException;
113import org.apache.hadoop.util.Tool;
114import org.apache.hadoop.util.ToolRunner;
115import org.apache.thrift.TProcessor;
116import org.apache.thrift.protocol.TBinaryProtocol;
117import org.apache.thrift.protocol.TCompactProtocol;
118import org.apache.thrift.protocol.TProtocolFactory;
119import org.apache.thrift.server.THsHaServer;
120import org.apache.thrift.server.TNonblockingServer;
121import org.apache.thrift.server.TServer;
122import org.apache.thrift.server.TServlet;
123import org.apache.thrift.server.TThreadedSelectorServer;
124import org.apache.thrift.transport.TFramedTransport;
125import org.apache.thrift.transport.TNonblockingServerSocket;
126import org.apache.thrift.transport.TNonblockingServerTransport;
127import org.apache.thrift.transport.TSaslServerTransport;
128import org.apache.thrift.transport.TServerSocket;
129import org.apache.thrift.transport.TServerTransport;
130import org.apache.thrift.transport.TTransportFactory;
131import org.apache.yetus.audience.InterfaceAudience;
132import org.eclipse.jetty.http.HttpVersion;
133import org.eclipse.jetty.server.HttpConfiguration;
134import org.eclipse.jetty.server.HttpConnectionFactory;
135import org.eclipse.jetty.server.SecureRequestCustomizer;
136import org.eclipse.jetty.server.Server;
137import org.eclipse.jetty.server.ServerConnector;
138import org.eclipse.jetty.server.SslConnectionFactory;
139import org.eclipse.jetty.servlet.ServletContextHandler;
140import org.eclipse.jetty.servlet.ServletHolder;
141import org.eclipse.jetty.util.ssl.SslContextFactory;
142import org.eclipse.jetty.util.thread.QueuedThreadPool;
143import org.slf4j.Logger;
144import org.slf4j.LoggerFactory;
145
146import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
147import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
148import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
149import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
150import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
151import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
152import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser;
153import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
154import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
155
156/**
157 * ThriftServer- this class starts up a Thrift server which implements the
158 * Hbase API specified in the Hbase.thrift IDL file. The server runs in an
159 * independent process.
160 */
161@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
162public class ThriftServer  extends Configured implements Tool {
163
164  private static final Logger LOG = LoggerFactory.getLogger(ThriftServer.class);
165
166
167
168  protected Configuration conf;
169
170  protected InfoServer infoServer;
171
172  protected TProcessor processor;
173
174  protected ThriftMetrics metrics;
175  protected HBaseServiceHandler hbaseServiceHandler;
176  protected UserGroupInformation serviceUGI;
177  protected UserGroupInformation httpUGI;
178  protected boolean httpEnabled;
179
180  protected SaslUtil.QualityOfProtection qop;
181  protected String host;
182  protected int listenPort;
183
184
185  protected boolean securityEnabled;
186  protected boolean doAsEnabled;
187
188  protected JvmPauseMonitor pauseMonitor;
189
190  protected volatile TServer tserver;
191  protected volatile Server httpServer;
192
193
194  //
195  // Main program and support routines
196  //
197
198  public ThriftServer(Configuration conf) {
199    this.conf = HBaseConfiguration.create(conf);
200  }
201
202  protected ThriftMetrics createThriftMetrics(Configuration conf) {
203    return new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
204  }
205
206  protected void setupParamters() throws IOException {
207    // login the server principal (if using secure Hadoop)
208    UserProvider userProvider = UserProvider.instantiate(conf);
209    securityEnabled = userProvider.isHadoopSecurityEnabled()
210        && userProvider.isHBaseSecurityEnabled();
211    if (securityEnabled) {
212      host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
213          conf.get(THRIFT_DNS_INTERFACE_KEY, "default"),
214          conf.get(THRIFT_DNS_NAMESERVER_KEY, "default")));
215      userProvider.login(THRIFT_KEYTAB_FILE_KEY, THRIFT_KERBEROS_PRINCIPAL_KEY, host);
216
217      // Setup the SPNEGO user for HTTP if configured
218      String spnegoPrincipal = getSpengoPrincipal(conf, host);
219      String spnegoKeytab = getSpnegoKeytab(conf);
220      UserGroupInformation.setConfiguration(conf);
221      // login the SPNEGO principal using UGI to avoid polluting the login user
222      this.httpUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(spnegoPrincipal,
223        spnegoKeytab);
224    }
225    this.serviceUGI = userProvider.getCurrent().getUGI();
226    if (httpUGI == null) {
227      this.httpUGI = serviceUGI;
228    }
229
230    this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
231    this.metrics = createThriftMetrics(conf);
232    this.pauseMonitor = new JvmPauseMonitor(conf, this.metrics.getSource());
233    this.hbaseServiceHandler = createHandler(conf, userProvider);
234    this.hbaseServiceHandler.initMetrics(metrics);
235    this.processor = createProcessor();
236
237    httpEnabled = conf.getBoolean(USE_HTTP_CONF_KEY, false);
238    doAsEnabled = conf.getBoolean(THRIFT_SUPPORT_PROXYUSER_KEY, false);
239    if (doAsEnabled && !httpEnabled) {
240      LOG.warn("Fail to enable the doAs feature. " + USE_HTTP_CONF_KEY + " is not configured");
241    }
242
243    String strQop = conf.get(THRIFT_QOP_KEY);
244    if (strQop != null) {
245      this.qop = SaslUtil.getQop(strQop);
246    }
247    if (qop != null) {
248      if (qop != SaslUtil.QualityOfProtection.AUTHENTICATION &&
249          qop != SaslUtil.QualityOfProtection.INTEGRITY &&
250          qop != SaslUtil.QualityOfProtection.PRIVACY) {
251        throw new IOException(String.format("Invalid %s: It must be one of %s, %s, or %s.",
252            THRIFT_QOP_KEY,
253            SaslUtil.QualityOfProtection.AUTHENTICATION.name(),
254            SaslUtil.QualityOfProtection.INTEGRITY.name(),
255            SaslUtil.QualityOfProtection.PRIVACY.name()));
256      }
257      checkHttpSecurity(qop, conf);
258      if (!securityEnabled) {
259        throw new IOException("Thrift server must run in secure mode to support authentication");
260      }
261    }
262    registerFilters(conf);
263    pauseMonitor.start();
264  }
265
266  private String getSpengoPrincipal(Configuration conf, String host) throws IOException {
267    String principal = conf.get(THRIFT_SPNEGO_PRINCIPAL_KEY);
268    if (principal == null) {
269      // We cannot use the Hadoop configuration deprecation handling here since
270      // the THRIFT_KERBEROS_PRINCIPAL_KEY config is still valid for regular Kerberos
271      // communication. The preference should be to use the THRIFT_SPNEGO_PRINCIPAL_KEY
272      // config so that THRIFT_KERBEROS_PRINCIPAL_KEY doesn't control both backend
273      // Kerberos principal and SPNEGO principal.
274      LOG.info("Using deprecated {} config for SPNEGO principal. Use {} instead.",
275        THRIFT_KERBEROS_PRINCIPAL_KEY, THRIFT_SPNEGO_PRINCIPAL_KEY);
276      principal = conf.get(THRIFT_KERBEROS_PRINCIPAL_KEY);
277    }
278    // Handle _HOST in principal value
279    return org.apache.hadoop.security.SecurityUtil.getServerPrincipal(principal, host);
280  }
281
282  private String getSpnegoKeytab(Configuration conf) {
283    String keytab = conf.get(THRIFT_SPNEGO_KEYTAB_FILE_KEY);
284    if (keytab == null) {
285      // We cannot use the Hadoop configuration deprecation handling here since
286      // the THRIFT_KEYTAB_FILE_KEY config is still valid for regular Kerberos
287      // communication. The preference should be to use the THRIFT_SPNEGO_KEYTAB_FILE_KEY
288      // config so that THRIFT_KEYTAB_FILE_KEY doesn't control both backend
289      // Kerberos keytab and SPNEGO keytab.
290      LOG.info("Using deprecated {} config for SPNEGO keytab. Use {} instead.",
291        THRIFT_KEYTAB_FILE_KEY, THRIFT_SPNEGO_KEYTAB_FILE_KEY);
292      keytab = conf.get(THRIFT_KEYTAB_FILE_KEY);
293    }
294    return keytab;
295  }
296
297  protected void startInfoServer() throws IOException {
298    // Put up info server.
299    int port = conf.getInt(THRIFT_INFO_SERVER_PORT , THRIFT_INFO_SERVER_PORT_DEFAULT);
300
301    if (port >= 0) {
302      conf.setLong("startcode", System.currentTimeMillis());
303      String a = conf
304          .get(THRIFT_INFO_SERVER_BINDING_ADDRESS, THRIFT_INFO_SERVER_BINDING_ADDRESS_DEFAULT);
305      infoServer = new InfoServer("thrift", a, port, false, conf);
306      infoServer.setAttribute("hbase.conf", conf);
307      infoServer.setAttribute("hbase.thrift.server.type", metrics.getThriftServerType().name());
308      infoServer.start();
309    }
310  }
311
312  protected void checkHttpSecurity(SaslUtil.QualityOfProtection qop, Configuration conf) {
313    if (qop == SaslUtil.QualityOfProtection.PRIVACY &&
314        conf.getBoolean(USE_HTTP_CONF_KEY, false) &&
315        !conf.getBoolean(THRIFT_SSL_ENABLED_KEY, false)) {
316      throw new IllegalArgumentException("Thrift HTTP Server's QoP is privacy, but " +
317          THRIFT_SSL_ENABLED_KEY + " is false");
318    }
319  }
320
321  protected HBaseServiceHandler createHandler(Configuration conf, UserProvider userProvider)
322      throws IOException {
323    return new ThriftHBaseServiceHandler(conf, userProvider);
324  }
325
326  protected TProcessor createProcessor() {
327    return new Hbase.Processor<>(
328        HbaseHandlerMetricsProxy.newInstance((Hbase.Iface) hbaseServiceHandler, metrics, conf));
329  }
330
331  /**
332   * the thrift server, not null means the server is started, for test only
333   * @return the tServer
334   */
335  @VisibleForTesting
336  public TServer getTserver() {
337    return tserver;
338  }
339
340  /**
341   * the Jetty server, not null means the HTTP server is started, for test only
342   * @return the http server
343   */
344  @VisibleForTesting
345  public Server getHttpServer() {
346    return httpServer;
347  }
348
349  protected void printUsageAndExit(Options options, int exitCode)
350      throws ExitCodeException {
351    HelpFormatter formatter = new HelpFormatter();
352    formatter.printHelp("Thrift", null, options,
353        "To start the Thrift server run 'hbase-daemon.sh start thrift' or " +
354            "'hbase thrift'\n" +
355            "To shutdown the thrift server run 'hbase-daemon.sh stop " +
356            "thrift' or send a kill signal to the thrift server pid",
357        true);
358    throw new ExitCodeException(exitCode, "");
359  }
360
361  /**
362   * Create a Servlet for the http server
363   * @param protocolFactory protocolFactory
364   * @return the servlet
365   */
366  protected TServlet createTServlet(TProtocolFactory protocolFactory) {
367    return new ThriftHttpServlet(processor, protocolFactory, serviceUGI, httpUGI,
368        hbaseServiceHandler, securityEnabled, doAsEnabled);
369  }
370
371  /**
372   * Setup an HTTP Server using Jetty to serve calls from THttpClient
373   *
374   * @throws IOException IOException
375   */
376  protected void setupHTTPServer() throws IOException {
377    TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
378    TServlet thriftHttpServlet = createTServlet(protocolFactory);
379
380    // Set the default max thread number to 100 to limit
381    // the number of concurrent requests so that Thrfit HTTP server doesn't OOM easily.
382    // Jetty set the default max thread number to 250, if we don't set it.
383    //
384    // Our default min thread number 2 is the same as that used by Jetty.
385    int minThreads = conf.getInt(HTTP_MIN_THREADS_KEY,
386        conf.getInt(TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY,
387            HTTP_MIN_THREADS_KEY_DEFAULT));
388    int maxThreads = conf.getInt(HTTP_MAX_THREADS_KEY,
389        conf.getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY,
390            HTTP_MAX_THREADS_KEY_DEFAULT));
391    QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads);
392    threadPool.setMinThreads(minThreads);
393    httpServer = new Server(threadPool);
394
395    // Context handler
396    ServletContextHandler ctxHandler = new ServletContextHandler(httpServer, "/",
397        ServletContextHandler.SESSIONS);
398    ctxHandler.addServlet(new ServletHolder(thriftHttpServlet), "/*");
399    HttpServerUtil.constrainHttpMethods(ctxHandler,
400        conf.getBoolean(THRIFT_HTTP_ALLOW_OPTIONS_METHOD,
401            THRIFT_HTTP_ALLOW_OPTIONS_METHOD_DEFAULT));
402
403    // set up Jetty and run the embedded server
404    HttpConfiguration httpConfig = new HttpConfiguration();
405    httpConfig.setSecureScheme("https");
406    httpConfig.setSecurePort(listenPort);
407    httpConfig.setHeaderCacheSize(DEFAULT_HTTP_MAX_HEADER_SIZE);
408    httpConfig.setRequestHeaderSize(DEFAULT_HTTP_MAX_HEADER_SIZE);
409    httpConfig.setResponseHeaderSize(DEFAULT_HTTP_MAX_HEADER_SIZE);
410    httpConfig.setSendServerVersion(false);
411    httpConfig.setSendDateHeader(false);
412
413    ServerConnector serverConnector;
414    if(conf.getBoolean(THRIFT_SSL_ENABLED_KEY, false)) {
415      HttpConfiguration httpsConfig = new HttpConfiguration(httpConfig);
416      httpsConfig.addCustomizer(new SecureRequestCustomizer());
417
418      SslContextFactory sslCtxFactory = new SslContextFactory();
419      String keystore = conf.get(THRIFT_SSL_KEYSTORE_STORE_KEY);
420      String password = HBaseConfiguration.getPassword(conf,
421          THRIFT_SSL_KEYSTORE_PASSWORD_KEY, null);
422      String keyPassword = HBaseConfiguration.getPassword(conf,
423          THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY, password);
424      sslCtxFactory.setKeyStorePath(keystore);
425      sslCtxFactory.setKeyStorePassword(password);
426      sslCtxFactory.setKeyManagerPassword(keyPassword);
427
428      String[] excludeCiphers = conf.getStrings(
429          THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY, ArrayUtils.EMPTY_STRING_ARRAY);
430      if (excludeCiphers.length != 0) {
431        sslCtxFactory.setExcludeCipherSuites(excludeCiphers);
432      }
433      String[] includeCiphers = conf.getStrings(
434          THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY, ArrayUtils.EMPTY_STRING_ARRAY);
435      if (includeCiphers.length != 0) {
436        sslCtxFactory.setIncludeCipherSuites(includeCiphers);
437      }
438
439      // Disable SSLv3 by default due to "Poodle" Vulnerability - CVE-2014-3566
440      String[] excludeProtocols = conf.getStrings(
441          THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY, "SSLv3");
442      if (excludeProtocols.length != 0) {
443        sslCtxFactory.setExcludeProtocols(excludeProtocols);
444      }
445      String[] includeProtocols = conf.getStrings(
446          THRIFT_SSL_INCLUDE_PROTOCOLS_KEY, ArrayUtils.EMPTY_STRING_ARRAY);
447      if (includeProtocols.length != 0) {
448        sslCtxFactory.setIncludeProtocols(includeProtocols);
449      }
450
451      serverConnector = new ServerConnector(httpServer,
452          new SslConnectionFactory(sslCtxFactory, HttpVersion.HTTP_1_1.toString()),
453          new HttpConnectionFactory(httpsConfig));
454    } else {
455      serverConnector = new ServerConnector(httpServer, new HttpConnectionFactory(httpConfig));
456    }
457    serverConnector.setPort(listenPort);
458    serverConnector.setHost(getBindAddress(conf).getHostAddress());
459    httpServer.addConnector(serverConnector);
460    httpServer.setStopAtShutdown(true);
461
462    if (doAsEnabled) {
463      ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
464    }
465    LOG.info("Starting Thrift HTTP Server on {}", Integer.toString(listenPort));
466  }
467
468  /**
469   * Setting up the thrift TServer
470   */
471  protected void setupServer() throws Exception {
472    // Construct correct ProtocolFactory
473    TProtocolFactory protocolFactory = getProtocolFactory();
474
475    ImplType implType = ImplType.getServerImpl(conf);
476    TProcessor processorToUse = processor;
477
478    // Construct correct TransportFactory
479    TTransportFactory transportFactory;
480    if (conf.getBoolean(FRAMED_CONF_KEY, FRAMED_CONF_DEFAULT) || implType.isAlwaysFramed) {
481      if (qop != null) {
482        throw new RuntimeException("Thrift server authentication"
483            + " doesn't work with framed transport yet");
484      }
485      transportFactory = new TFramedTransport.Factory(
486          conf.getInt(MAX_FRAME_SIZE_CONF_KEY, MAX_FRAME_SIZE_CONF_DEFAULT) * 1024 * 1024);
487      LOG.debug("Using framed transport");
488    } else if (qop == null) {
489      transportFactory = new TTransportFactory();
490    } else {
491      // Extract the name from the principal
492      String thriftKerberosPrincipal = conf.get(THRIFT_KERBEROS_PRINCIPAL_KEY);
493      if (thriftKerberosPrincipal == null) {
494        throw new IllegalArgumentException(THRIFT_KERBEROS_PRINCIPAL_KEY + " cannot be null");
495      }
496      String name = SecurityUtil.getUserFromPrincipal(thriftKerberosPrincipal);
497      Map<String, String> saslProperties = SaslUtil.initSaslProperties(qop.name());
498      TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
499      saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
500          new SaslRpcServer.SaslGssCallbackHandler() {
501            @Override
502            public void handle(Callback[] callbacks)
503                throws UnsupportedCallbackException {
504              AuthorizeCallback ac = null;
505              for (Callback callback : callbacks) {
506                if (callback instanceof AuthorizeCallback) {
507                  ac = (AuthorizeCallback) callback;
508                } else {
509                  throw new UnsupportedCallbackException(callback,
510                      "Unrecognized SASL GSSAPI Callback");
511                }
512              }
513              if (ac != null) {
514                String authid = ac.getAuthenticationID();
515                String authzid = ac.getAuthorizationID();
516                if (!authid.equals(authzid)) {
517                  ac.setAuthorized(false);
518                } else {
519                  ac.setAuthorized(true);
520                  String userName = SecurityUtil.getUserFromPrincipal(authzid);
521                  LOG.info("Effective user: {}", userName);
522                  ac.setAuthorizedID(userName);
523                }
524              }
525            }
526          });
527      transportFactory = saslFactory;
528
529      // Create a processor wrapper, to get the caller
530      processorToUse = (inProt, outProt) -> {
531        TSaslServerTransport saslServerTransport =
532            (TSaslServerTransport)inProt.getTransport();
533        SaslServer saslServer = saslServerTransport.getSaslServer();
534        String principal = saslServer.getAuthorizationID();
535        hbaseServiceHandler.setEffectiveUser(principal);
536        processor.process(inProt, outProt);
537      };
538    }
539
540    if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
541      LOG.error("Server types {} don't support IP address binding at the moment. See " +
542              "https://issues.apache.org/jira/browse/HBASE-2155 for details.",
543          Joiner.on(", ").join(ImplType.serversThatCannotSpecifyBindIP()));
544      throw new RuntimeException("-" + BIND_CONF_KEY + " not supported with " + implType);
545    }
546
547    InetSocketAddress inetSocketAddress = new InetSocketAddress(getBindAddress(conf), listenPort);
548    if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING ||
549        implType == ImplType.THREADED_SELECTOR) {
550      TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
551      if (implType == ImplType.NONBLOCKING) {
552        tserver = getTNonBlockingServer(serverTransport, protocolFactory, processorToUse,
553            transportFactory, inetSocketAddress);
554      } else if (implType == ImplType.HS_HA) {
555        tserver = getTHsHaServer(serverTransport, protocolFactory, processorToUse, transportFactory,
556            inetSocketAddress);
557      } else { // THREADED_SELECTOR
558        tserver = getTThreadedSelectorServer(serverTransport, protocolFactory, processorToUse,
559            transportFactory, inetSocketAddress);
560      }
561      LOG.info("starting HBase {} server on {}", implType.simpleClassName(),
562          Integer.toString(listenPort));
563    } else if (implType == ImplType.THREAD_POOL) {
564      this.tserver = getTThreadPoolServer(protocolFactory, processorToUse, transportFactory,
565          inetSocketAddress);
566    } else {
567      throw new AssertionError("Unsupported Thrift server implementation: " +
568          implType.simpleClassName());
569    }
570
571    // A sanity check that we instantiated the right type of server.
572    if (tserver.getClass() != implType.serverClass) {
573      throw new AssertionError("Expected to create Thrift server class " +
574          implType.serverClass.getName() + " but got " +
575          tserver.getClass().getName());
576    }
577  }
578
579  protected TServer getTNonBlockingServer(TNonblockingServerTransport serverTransport,
580      TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory,
581      InetSocketAddress inetSocketAddress) {
582    LOG.info("starting HBase Nonblocking Thrift server on " + inetSocketAddress.toString());
583    TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport);
584    serverArgs.processor(processor);
585    serverArgs.transportFactory(transportFactory);
586    serverArgs.protocolFactory(protocolFactory);
587    return new TNonblockingServer(serverArgs);
588  }
589
590  protected TServer getTHsHaServer(TNonblockingServerTransport serverTransport,
591      TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory,
592      InetSocketAddress inetSocketAddress) {
593    LOG.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString());
594    THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
595    int queueSize = conf.getInt(TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY,
596        TBoundedThreadPoolServer.DEFAULT_MAX_QUEUED_REQUESTS);
597    CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(queueSize), metrics);
598    int workerThread = conf.getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY,
599        serverArgs.getMaxWorkerThreads());
600    ExecutorService executorService = createExecutor(
601        callQueue, workerThread, workerThread);
602    serverArgs.executorService(executorService).processor(processor)
603        .transportFactory(transportFactory).protocolFactory(protocolFactory);
604    return new THsHaServer(serverArgs);
605  }
606
607  protected TServer getTThreadedSelectorServer(TNonblockingServerTransport serverTransport,
608      TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory,
609      InetSocketAddress inetSocketAddress) {
610    LOG.info("starting HBase ThreadedSelector Thrift server on " + inetSocketAddress.toString());
611    TThreadedSelectorServer.Args serverArgs =
612        new HThreadedSelectorServerArgs(serverTransport, conf);
613    int queueSize = conf.getInt(TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY,
614        TBoundedThreadPoolServer.DEFAULT_MAX_QUEUED_REQUESTS);
615    CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(queueSize), metrics);
616    int workerThreads = conf.getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY,
617        serverArgs.getWorkerThreads());
618    int selectorThreads = conf.getInt(THRIFT_SELECTOR_NUM, serverArgs.getSelectorThreads());
619    serverArgs.selectorThreads(selectorThreads);
620    ExecutorService executorService = createExecutor(
621        callQueue, workerThreads, workerThreads);
622    serverArgs.executorService(executorService).processor(processor)
623        .transportFactory(transportFactory).protocolFactory(protocolFactory);
624    return new TThreadedSelectorServer(serverArgs);
625  }
626
627  protected TServer getTThreadPoolServer(TProtocolFactory protocolFactory, TProcessor processor,
628      TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws Exception {
629    LOG.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString());
630    // Thrift's implementation uses '0' as a placeholder for 'use the default.'
631    int backlog = conf.getInt(BACKLOG_CONF_KEY, BACKLOG_CONF_DEAFULT);
632    int readTimeout = conf.getInt(THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY,
633        THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT);
634    TServerTransport serverTransport = new TServerSocket(
635        new TServerSocket.ServerSocketTransportArgs().
636            bindAddr(inetSocketAddress).backlog(backlog).
637            clientTimeout(readTimeout));
638
639    TBoundedThreadPoolServer.Args serverArgs =
640        new TBoundedThreadPoolServer.Args(serverTransport, conf);
641    serverArgs.processor(processor).transportFactory(transportFactory)
642        .protocolFactory(protocolFactory);
643    return new TBoundedThreadPoolServer(serverArgs, metrics);
644  }
645
646  protected TProtocolFactory getProtocolFactory() {
647    TProtocolFactory protocolFactory;
648
649    if (conf.getBoolean(COMPACT_CONF_KEY, COMPACT_CONF_DEFAULT)) {
650      LOG.debug("Using compact protocol");
651      protocolFactory = new TCompactProtocol.Factory();
652    } else {
653      LOG.debug("Using binary protocol");
654      protocolFactory = new TBinaryProtocol.Factory();
655    }
656
657    return protocolFactory;
658  }
659
660  protected ExecutorService createExecutor(BlockingQueue<Runnable> callQueue,
661      int minWorkers, int maxWorkers) {
662    ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
663    tfb.setDaemon(true);
664    tfb.setNameFormat("thrift-worker-%d");
665    ThreadPoolExecutor threadPool = new THBaseThreadPoolExecutor(minWorkers, maxWorkers,
666        Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build(), metrics);
667    threadPool.allowCoreThreadTimeOut(true);
668    return threadPool;
669  }
670
671  protected InetAddress getBindAddress(Configuration conf)
672      throws UnknownHostException {
673    String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
674    return InetAddress.getByName(bindAddressStr);
675  }
676
677
678  public static void registerFilters(Configuration conf) {
679    String[] filters = conf.getStrings(THRIFT_FILTERS);
680    Splitter splitter = Splitter.on(':');
681    if(filters != null) {
682      for(String filterClass: filters) {
683        List<String> filterPart = splitter.splitToList(filterClass);
684        if(filterPart.size() != 2) {
685          LOG.warn("Invalid filter specification " + filterClass + " - skipping");
686        } else {
687          ParseFilter.registerFilter(filterPart.get(0), filterPart.get(1));
688        }
689      }
690    }
691  }
692
693  /**
694   * Add options to command lines
695   * @param options options
696   */
697  protected void addOptions(Options options) {
698    options.addOption("b", BIND_OPTION, true, "Address to bind " +
699        "the Thrift server to. [default: " + DEFAULT_BIND_ADDR + "]");
700    options.addOption("p", PORT_OPTION, true, "Port to bind to [default: " +
701        DEFAULT_LISTEN_PORT + "]");
702    options.addOption("f", FRAMED_OPTION, false, "Use framed transport");
703    options.addOption("c", COMPACT_OPTION, false, "Use the compact protocol");
704    options.addOption("h", "help", false, "Print help information");
705    options.addOption("s", SELECTOR_NUM_OPTION, true, "How many selector threads to use.");
706    options.addOption(null, INFOPORT_OPTION, true, "Port for web UI");
707
708    options.addOption("m", MIN_WORKERS_OPTION, true,
709        "The minimum number of worker threads for " +
710            ImplType.THREAD_POOL.simpleClassName());
711
712    options.addOption("w", MAX_WORKERS_OPTION, true,
713        "The maximum number of worker threads for " +
714            ImplType.THREAD_POOL.simpleClassName());
715
716    options.addOption("q", MAX_QUEUE_SIZE_OPTION, true,
717        "The maximum number of queued requests in " +
718            ImplType.THREAD_POOL.simpleClassName());
719
720    options.addOption("k", KEEP_ALIVE_SEC_OPTION, true,
721        "The amount of time in secods to keep a thread alive when idle in " +
722            ImplType.THREAD_POOL.simpleClassName());
723
724    options.addOption("t", READ_TIMEOUT_OPTION, true,
725        "Amount of time in milliseconds before a server thread will timeout " +
726            "waiting for client to send data on a connected socket. Currently, " +
727            "only applies to TBoundedThreadPoolServer");
728
729    options.addOptionGroup(ImplType.createOptionGroup());
730  }
731
732  protected void parseCommandLine(CommandLine cmd, Options options) throws ExitCodeException {
733    // Get port to bind to
734    try {
735      if (cmd.hasOption(PORT_OPTION)) {
736        int listenPort = Integer.parseInt(cmd.getOptionValue(PORT_OPTION));
737        conf.setInt(PORT_CONF_KEY, listenPort);
738      }
739    } catch (NumberFormatException e) {
740      LOG.error("Could not parse the value provided for the port option", e);
741      printUsageAndExit(options, -1);
742    }
743    // check for user-defined info server port setting, if so override the conf
744    try {
745      if (cmd.hasOption(INFOPORT_OPTION)) {
746        String val = cmd.getOptionValue(INFOPORT_OPTION);
747        conf.setInt(THRIFT_INFO_SERVER_PORT, Integer.parseInt(val));
748        LOG.debug("Web UI port set to " + val);
749      }
750    } catch (NumberFormatException e) {
751      LOG.error("Could not parse the value provided for the " + INFOPORT_OPTION +
752          " option", e);
753      printUsageAndExit(options, -1);
754    }
755    // Make optional changes to the configuration based on command-line options
756    optionToConf(cmd, MIN_WORKERS_OPTION,
757        conf, TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY);
758    optionToConf(cmd, MAX_WORKERS_OPTION,
759        conf, TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY);
760    optionToConf(cmd, MAX_QUEUE_SIZE_OPTION,
761        conf, TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY);
762    optionToConf(cmd, KEEP_ALIVE_SEC_OPTION,
763        conf, TBoundedThreadPoolServer.THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY);
764    optionToConf(cmd, READ_TIMEOUT_OPTION, conf, THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY);
765    optionToConf(cmd, SELECTOR_NUM_OPTION, conf, THRIFT_SELECTOR_NUM);
766
767    // Set general thrift server options
768    boolean compact = cmd.hasOption(COMPACT_OPTION) ||
769        conf.getBoolean(COMPACT_CONF_KEY, false);
770    conf.setBoolean(COMPACT_CONF_KEY, compact);
771    boolean framed = cmd.hasOption(FRAMED_OPTION) ||
772        conf.getBoolean(FRAMED_CONF_KEY, false);
773    conf.setBoolean(FRAMED_CONF_KEY, framed);
774
775    optionToConf(cmd, BIND_OPTION, conf, BIND_CONF_KEY);
776
777
778    ImplType.setServerImpl(cmd, conf);
779  }
780
781  /**
782   * Parse the command line options to set parameters the conf.
783   */
784  protected void processOptions(final String[] args) throws Exception {
785    if (args == null || args.length == 0) {
786      return;
787    }
788    Options options = new Options();
789    addOptions(options);
790
791    CommandLineParser parser = new DefaultParser();
792    CommandLine cmd = parser.parse(options, args);
793
794    if (cmd.hasOption("help")) {
795      printUsageAndExit(options, 1);
796    }
797    parseCommandLine(cmd, options);
798  }
799
800  public void stop() {
801    if (this.infoServer != null) {
802      LOG.info("Stopping infoServer");
803      try {
804        this.infoServer.stop();
805      } catch (Exception ex) {
806        LOG.error("Failed to stop infoServer", ex);
807      }
808    }
809    if (pauseMonitor != null) {
810      pauseMonitor.stop();
811    }
812    if (tserver != null) {
813      tserver.stop();
814      tserver = null;
815    }
816    if (httpServer != null) {
817      try {
818        httpServer.stop();
819        httpServer = null;
820      } catch (Exception e) {
821        LOG.error("Problem encountered in shutting down HTTP server", e);
822      }
823      httpServer = null;
824    }
825  }
826
827  protected static void optionToConf(CommandLine cmd, String option,
828      Configuration conf, String destConfKey) {
829    if (cmd.hasOption(option)) {
830      String value = cmd.getOptionValue(option);
831      LOG.info("Set configuration key:" + destConfKey + " value:" + value);
832      conf.set(destConfKey, value);
833    }
834  }
835
836  /**
837   * Run without any command line arguments
838   * @return exit code
839   * @throws Exception exception
840   */
841  public int run() throws Exception {
842    return run(null);
843  }
844
845  @Override
846  public int run(String[] strings) throws Exception {
847    processOptions(strings);
848    setupParamters();
849    startInfoServer();
850    if (httpEnabled) {
851      setupHTTPServer();
852      httpServer.start();
853      httpServer.join();
854    } else {
855      setupServer();
856      tserver.serve();
857    }
858    return 0;
859  }
860
861  public static void main(String [] args) throws Exception {
862    LOG.info("***** STARTING service '" + ThriftServer.class.getSimpleName() + "' *****");
863    VersionInfo.logVersion();
864    final Configuration conf = HBaseConfiguration.create();
865    // for now, only time we return is on an argument error.
866    final int status = ToolRunner.run(conf, new ThriftServer(conf), args);
867    LOG.info("***** STOPPING service '" + ThriftServer.class.getSimpleName() + "' *****");
868    System.exit(status);
869  }
870}