View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.thrift2;
20  
21  import java.io.IOException;
22  import java.net.InetAddress;
23  import java.net.InetSocketAddress;
24  import java.net.UnknownHostException;
25  import java.security.PrivilegedAction;
26  import java.util.HashMap;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.concurrent.ExecutorService;
30  import java.util.concurrent.LinkedBlockingQueue;
31  import java.util.concurrent.ThreadPoolExecutor;
32  import java.util.concurrent.TimeUnit;
33  
34  import javax.security.auth.callback.Callback;
35  import javax.security.auth.callback.UnsupportedCallbackException;
36  import javax.security.sasl.AuthorizeCallback;
37  import javax.security.sasl.Sasl;
38  import javax.security.sasl.SaslServer;
39  
40  import org.apache.commons.cli.CommandLine;
41  import org.apache.commons.cli.CommandLineParser;
42  import org.apache.commons.cli.HelpFormatter;
43  import org.apache.commons.cli.Option;
44  import org.apache.commons.cli.OptionGroup;
45  import org.apache.commons.cli.Options;
46  import org.apache.commons.cli.ParseException;
47  import org.apache.commons.cli.PosixParser;
48  import org.apache.commons.logging.Log;
49  import org.apache.commons.logging.LogFactory;
50  import org.apache.hadoop.hbase.classification.InterfaceAudience;
51  import org.apache.hadoop.conf.Configuration;
52  import org.apache.hadoop.conf.Configured;
53  import org.apache.hadoop.hbase.HBaseConfiguration;
54  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
55  import org.apache.hadoop.hbase.filter.ParseFilter;
56  import org.apache.hadoop.hbase.http.InfoServer;
57  import org.apache.hadoop.hbase.security.SaslUtil;
58  import org.apache.hadoop.hbase.security.SecurityUtil;
59  import org.apache.hadoop.hbase.security.UserProvider;
60  import org.apache.hadoop.hbase.thrift.CallQueue;
61  import org.apache.hadoop.hbase.thrift.CallQueue.Call;
62  import org.apache.hadoop.hbase.thrift.ThriftMetrics;
63  import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
64  import org.apache.hadoop.hbase.util.DNS;
65  import org.apache.hadoop.hbase.util.JvmPauseMonitor;
66  import org.apache.hadoop.hbase.util.Strings;
67  import org.apache.hadoop.security.UserGroupInformation;
68  import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
69  import org.apache.hadoop.util.Tool;
70  import org.apache.hadoop.util.ToolRunner;
71  import org.apache.thrift.TException;
72  import org.apache.thrift.TProcessor;
73  import org.apache.thrift.protocol.TBinaryProtocol;
74  import org.apache.thrift.protocol.TCompactProtocol;
75  import org.apache.thrift.protocol.TProtocol;
76  import org.apache.thrift.protocol.TProtocolFactory;
77  import org.apache.thrift.server.THsHaServer;
78  import org.apache.thrift.server.TNonblockingServer;
79  import org.apache.thrift.server.TServer;
80  import org.apache.thrift.server.TThreadPoolServer;
81  import org.apache.thrift.transport.TFramedTransport;
82  import org.apache.thrift.transport.TNonblockingServerSocket;
83  import org.apache.thrift.transport.TNonblockingServerTransport;
84  import org.apache.thrift.transport.TSaslServerTransport;
85  import org.apache.thrift.transport.TServerSocket;
86  import org.apache.thrift.transport.TServerTransport;
87  import org.apache.thrift.transport.TTransportException;
88  import org.apache.thrift.transport.TTransportFactory;
89  
90  import com.google.common.util.concurrent.ThreadFactoryBuilder;
91  
92  /**
93   * ThriftServer - this class starts up a Thrift server which implements the HBase API specified in the
94   * HbaseClient.thrift IDL file.
95   */
96  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
97  @SuppressWarnings({ "rawtypes", "unchecked" })
98  public class ThriftServer extends Configured implements Tool {
99    private static final Log log = LogFactory.getLog(ThriftServer.class);
100 
101   /**
102    * Thrift quality of protection configuration key. Valid values can be:
103    * privacy: authentication, integrity and confidentiality checking
104    * integrity: authentication and integrity checking
105    * authentication: authentication only
106    *
107    * This is used to authenticate the callers and support impersonation.
108    * The thrift server and the HBase cluster must run in secure mode.
109    */
110   static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
111 
112   static final String BACKLOG_CONF_KEY = "hbase.regionserver.thrift.backlog";
113 
114   public static final int DEFAULT_LISTEN_PORT = 9090;
115 
116   private static final String READ_TIMEOUT_OPTION = "readTimeout";
117 
118   /**
119    * Amount of time in milliseconds before a server thread will timeout
120    * waiting for client to send data on a connected socket. Currently,
121    * applies only to TBoundedThreadPoolServer
122    */
123   public static final String THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY =
124     "hbase.thrift.server.socket.read.timeout";
125   public static final int THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT = 60000;
126 
127   public ThriftServer() {
128   }
129 
130   private static void printUsage() {
131     HelpFormatter formatter = new HelpFormatter();
132     formatter.printHelp("Thrift", null, getOptions(),
133         "To start the Thrift server run 'bin/hbase-daemon.sh start thrift2'\n" +
134             "To shutdown the thrift server run 'bin/hbase-daemon.sh stop thrift2' or" +
135             " send a kill signal to the thrift server pid",
136         true);
137   }
138 
139   private static Options getOptions() {
140     Options options = new Options();
141     options.addOption("b", "bind", true,
142         "Address to bind the Thrift server to. [default: 0.0.0.0]");
143     options.addOption("p", "port", true, "Port to bind to [default: " + DEFAULT_LISTEN_PORT + "]");
144     options.addOption("f", "framed", false, "Use framed transport");
145     options.addOption("c", "compact", false, "Use the compact protocol");
146     options.addOption("w", "workers", true, "How many worker threads to use.");
147     options.addOption("q", "callQueueSize", true,
148       "Max size of request queue (unbounded by default)");
149     options.addOption("h", "help", false, "Print help information");
150     options.addOption(null, "infoport", true, "Port for web UI");
151     options.addOption("t", READ_TIMEOUT_OPTION, true,
152       "Amount of time in milliseconds before a server thread will timeout " +
153       "waiting for client to send data on a connected socket. Currently, " +
154       "only applies to TBoundedThreadPoolServer");
155     OptionGroup servers = new OptionGroup();
156     servers.addOption(
157         new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport."));
158     servers.addOption(new Option("hsha", false, "Use the THsHaServer. This implies the framed transport."));
159     servers.addOption(new Option("threadpool", false, "Use the TThreadPoolServer. This is the default."));
160     options.addOptionGroup(servers);
161     return options;
162   }
163 
164   private static CommandLine parseArguments(Configuration conf, Options options, String[] args)
165       throws ParseException, IOException {
166     CommandLineParser parser = new PosixParser();
167     return parser.parse(options, args);
168   }
169 
170   private static TProtocolFactory getTProtocolFactory(boolean isCompact) {
171     if (isCompact) {
172       log.debug("Using compact protocol");
173       return new TCompactProtocol.Factory();
174     } else {
175       log.debug("Using binary protocol");
176       return new TBinaryProtocol.Factory();
177     }
178   }
179 
180   private static TTransportFactory getTTransportFactory(
181       SaslUtil.QualityOfProtection qop, String name, String host,
182       boolean framed, int frameSize) {
183     if (framed) {
184       if (qop != null) {
185         throw new RuntimeException("Thrift server authentication"
186           + " doesn't work with framed transport yet");
187       }
188       log.debug("Using framed transport");
189       return new TFramedTransport.Factory(frameSize);
190     } else if (qop == null) {
191       return new TTransportFactory();
192     } else {
193       Map<String, String> saslProperties = new HashMap<String, String>();
194       saslProperties.put(Sasl.QOP, qop.getSaslQop());
195       TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
196       saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
197         new SaslGssCallbackHandler() {
198           @Override
199           public void handle(Callback[] callbacks)
200               throws UnsupportedCallbackException {
201             AuthorizeCallback ac = null;
202             for (Callback callback : callbacks) {
203               if (callback instanceof AuthorizeCallback) {
204                 ac = (AuthorizeCallback) callback;
205               } else {
206                 throw new UnsupportedCallbackException(callback,
207                     "Unrecognized SASL GSSAPI Callback");
208               }
209             }
210             if (ac != null) {
211               String authid = ac.getAuthenticationID();
212               String authzid = ac.getAuthorizationID();
213               if (!authid.equals(authzid)) {
214                 ac.setAuthorized(false);
215               } else {
216                 ac.setAuthorized(true);
217                 String userName = SecurityUtil.getUserFromPrincipal(authzid);
218                 log.info("Effective user: " + userName);
219                 ac.setAuthorizedID(userName);
220               }
221             }
222           }
223         });
224       return saslFactory;
225     }
226   }
227 
228   /*
229    * If bindValue is null, we don't bind.
230    */
231   private static InetSocketAddress bindToPort(String bindValue, int listenPort)
232       throws UnknownHostException {
233     try {
234       if (bindValue == null) {
235         return new InetSocketAddress(listenPort);
236       } else {
237         return new InetSocketAddress(InetAddress.getByName(bindValue), listenPort);
238       }
239     } catch (UnknownHostException e) {
240       throw new RuntimeException("Could not bind to provided ip address", e);
241     }
242   }
243 
244   private static TServer getTNonBlockingServer(TProtocolFactory protocolFactory, TProcessor processor,
245       TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException {
246     TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
247     log.info("starting HBase Nonblocking Thrift server on " + inetSocketAddress.toString());
248     TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport);
249     serverArgs.processor(processor);
250     serverArgs.transportFactory(transportFactory);
251     serverArgs.protocolFactory(protocolFactory);
252     return new TNonblockingServer(serverArgs);
253   }
254 
255   private static TServer getTHsHaServer(TProtocolFactory protocolFactory,
256       TProcessor processor, TTransportFactory transportFactory,
257       int workerThreads, int maxCallQueueSize,
258       InetSocketAddress inetSocketAddress, ThriftMetrics metrics)
259       throws TTransportException {
260     TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
261     log.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString());
262     THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
263     if (workerThreads > 0) {
264       // Could support the min & max threads, avoiding to preserve existing functionality.
265       serverArgs.minWorkerThreads(workerThreads).maxWorkerThreads(workerThreads);
266     }
267     ExecutorService executorService = createExecutor(
268         workerThreads, maxCallQueueSize, metrics);
269     serverArgs.executorService(executorService);
270     serverArgs.processor(processor);
271     serverArgs.transportFactory(transportFactory);
272     serverArgs.protocolFactory(protocolFactory);
273     return new THsHaServer(serverArgs);
274   }
275 
276   private static ExecutorService createExecutor(
277       int workerThreads, int maxCallQueueSize, ThriftMetrics metrics) {
278     CallQueue callQueue;
279     if (maxCallQueueSize > 0) {
280       callQueue = new CallQueue(new LinkedBlockingQueue<Call>(maxCallQueueSize), metrics);
281     } else {
282       callQueue = new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
283     }
284 
285     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
286     tfb.setDaemon(true);
287     tfb.setNameFormat("thrift2-worker-%d");
288     ThreadPoolExecutor pool = new ThreadPoolExecutor(workerThreads, workerThreads,
289             Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
290     pool.prestartAllCoreThreads();
291     return pool;
292   }
293 
294   private static TServer getTThreadPoolServer(TProtocolFactory protocolFactory,
295                                               TProcessor processor,
296                                               TTransportFactory transportFactory,
297                                               int workerThreads,
298                                               InetSocketAddress inetSocketAddress,
299                                               int backlog,
300                                               int clientTimeout)
301       throws TTransportException {
302     TServerTransport serverTransport = new TServerSocket(
303                                            new TServerSocket.ServerSocketTransportArgs().
304                                                bindAddr(inetSocketAddress).backlog(backlog).
305                                                clientTimeout(clientTimeout));
306     log.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString());
307     TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport);
308     serverArgs.processor(processor);
309     serverArgs.transportFactory(transportFactory);
310     serverArgs.protocolFactory(protocolFactory);
311     if (workerThreads > 0) {
312       serverArgs.maxWorkerThreads(workerThreads);
313     }
314     return new TThreadPoolServer(serverArgs);
315   }
316 
317   /**
318    * Adds the option to pre-load filters at startup.
319    *
320    * @param conf  The current configuration instance.
321    */
322   protected static void registerFilters(Configuration conf) {
323     String[] filters = conf.getStrings("hbase.thrift.filters");
324     if(filters != null) {
325       for(String filterClass: filters) {
326         String[] filterPart = filterClass.split(":");
327         if(filterPart.length != 2) {
328           log.warn("Invalid filter specification " + filterClass + " - skipping");
329         } else {
330           ParseFilter.registerFilter(filterPart[0], filterPart[1]);
331         }
332       }
333     }
334   }
335 
336   /**
337    * Start up the Thrift2 server.
338    */
339   public static void main(String[] args) throws Exception {
340     final Configuration conf = HBaseConfiguration.create();
341     // for now, only time we return is on an argument error.
342     final int status = ToolRunner.run(conf, new ThriftServer(), args);
343     System.exit(status);
344   }
345 
346   @Override
347   public int run(String[] args) throws Exception {
348     final Configuration conf = getConf();
349     TServer server = null;
350     Options options = getOptions();
351     CommandLine cmd = parseArguments(conf, options, args);
352     int workerThreads = 0;
353     int maxCallQueueSize = -1; // use unbounded queue by default
354 
355     /**
356      * This is to please both bin/hbase and bin/hbase-daemon. hbase-daemon provides "start" and "stop" arguments hbase
357      * should print the help if no argument is provided
358      */
359     List<?> argList = cmd.getArgList();
360     if (cmd.hasOption("help") || !argList.contains("start") || argList.contains("stop")) {
361       printUsage();
362       return 1;
363     }
364 
365     // Get address to bind
366     String bindAddress;
367     if (cmd.hasOption("bind")) {
368       bindAddress = cmd.getOptionValue("bind");
369       conf.set("hbase.thrift.info.bindAddress", bindAddress);
370     } else {
371       bindAddress = conf.get("hbase.thrift.info.bindAddress");
372     }
373 
374     // Get read timeout
375     int readTimeout = THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT;
376     if (cmd.hasOption(READ_TIMEOUT_OPTION)) {
377       try {
378         readTimeout = Integer.parseInt(cmd.getOptionValue(READ_TIMEOUT_OPTION));
379       } catch (NumberFormatException e) {
380         throw new RuntimeException("Could not parse the value provided for the timeout option", e);
381       }
382     } else {
383       readTimeout = conf.getInt(THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY,
384         THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT);
385     }
386 
387     // Get port to bind to
388     int listenPort = 0;
389     try {
390       if (cmd.hasOption("port")) {
391         listenPort = Integer.parseInt(cmd.getOptionValue("port"));
392       } else {
393         listenPort = conf.getInt("hbase.regionserver.thrift.port", DEFAULT_LISTEN_PORT);
394       }
395     } catch (NumberFormatException e) {
396       throw new RuntimeException("Could not parse the value provided for the port option", e);
397     }
398 
399     // Thrift's implementation uses '0' as a placeholder for 'use the default.'
400     int backlog = conf.getInt(BACKLOG_CONF_KEY, 0);
401 
402     // Local hostname and user name,
403     // used only if QOP is configured.
404     String host = null;
405     String name = null;
406 
407     UserProvider userProvider = UserProvider.instantiate(conf);
408     // login the server principal (if using secure Hadoop)
409     boolean securityEnabled = userProvider.isHadoopSecurityEnabled()
410       && userProvider.isHBaseSecurityEnabled();
411     if (securityEnabled) {
412       host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
413         conf.get("hbase.thrift.dns.interface", "default"),
414         conf.get("hbase.thrift.dns.nameserver", "default")));
415       userProvider.login("hbase.thrift.keytab.file",
416         "hbase.thrift.kerberos.principal", host);
417     }
418 
419     UserGroupInformation realUser = userProvider.getCurrent().getUGI();
420     String stringQop = conf.get(THRIFT_QOP_KEY);
421     SaslUtil.QualityOfProtection qop = null;
422     if (stringQop != null) {
423       qop = SaslUtil.getQop(stringQop);
424       if (!securityEnabled) {
425         throw new IOException("Thrift server must"
426           + " run in secure mode to support authentication");
427       }
428       // Extract the name from the principal
429       name = SecurityUtil.getUserFromPrincipal(
430         conf.get("hbase.thrift.kerberos.principal"));
431     }
432 
433     boolean nonblocking = cmd.hasOption("nonblocking");
434     boolean hsha = cmd.hasOption("hsha");
435 
436     ThriftMetrics metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.TWO);
437     final JvmPauseMonitor pauseMonitor = new JvmPauseMonitor(conf, metrics.getSource());
438 
439     String implType = "threadpool";
440     if (nonblocking) {
441       implType = "nonblocking";
442     } else if (hsha) {
443       implType = "hsha";
444     }
445 
446     conf.set("hbase.regionserver.thrift.server.type", implType);
447     conf.setInt("hbase.regionserver.thrift.port", listenPort);
448     registerFilters(conf);
449 
450     // Construct correct ProtocolFactory
451     boolean compact = cmd.hasOption("compact") ||
452         conf.getBoolean("hbase.regionserver.thrift.compact", false);
453     TProtocolFactory protocolFactory = getTProtocolFactory(compact);
454     final ThriftHBaseServiceHandler hbaseHandler =
455       new ThriftHBaseServiceHandler(conf, userProvider);
456     THBaseService.Iface handler =
457       ThriftHBaseServiceHandler.newInstance(hbaseHandler, metrics);
458     final THBaseService.Processor p = new THBaseService.Processor(handler);
459     conf.setBoolean("hbase.regionserver.thrift.compact", compact);
460     TProcessor processor = p;
461 
462     boolean framed = cmd.hasOption("framed") ||
463         conf.getBoolean("hbase.regionserver.thrift.framed", false) || nonblocking || hsha;
464     TTransportFactory transportFactory = getTTransportFactory(qop, name, host, framed,
465         conf.getInt("hbase.regionserver.thrift.framed.max_frame_size_in_mb", 2) * 1024 * 1024);
466     InetSocketAddress inetSocketAddress = bindToPort(bindAddress, listenPort);
467     conf.setBoolean("hbase.regionserver.thrift.framed", framed);
468     if (qop != null) {
469       // Create a processor wrapper, to get the caller
470       processor = new TProcessor() {
471         @Override
472         public boolean process(TProtocol inProt,
473             TProtocol outProt) throws TException {
474           TSaslServerTransport saslServerTransport =
475             (TSaslServerTransport)inProt.getTransport();
476           SaslServer saslServer = saslServerTransport.getSaslServer();
477           String principal = saslServer.getAuthorizationID();
478           hbaseHandler.setEffectiveUser(principal);
479           return p.process(inProt, outProt);
480         }
481       };
482     }
483 
484     if (cmd.hasOption("w")) {
485       workerThreads = Integer.parseInt(cmd.getOptionValue("w"));
486     }
487 
488     if (cmd.hasOption("q")) {
489       maxCallQueueSize = Integer.parseInt(cmd.getOptionValue("q"));
490     }
491 
492     // check for user-defined info server port setting, if so override the conf
493     try {
494       if (cmd.hasOption("infoport")) {
495         String val = cmd.getOptionValue("infoport");
496         conf.setInt("hbase.thrift.info.port", Integer.parseInt(val));
497         log.debug("Web UI port set to " + val);
498       }
499     } catch (NumberFormatException e) {
500       log.error("Could not parse the value provided for the infoport option", e);
501       printUsage();
502       System.exit(1);
503     }
504 
505     // Put up info server.
506     int port = conf.getInt("hbase.thrift.info.port", 9095);
507     if (port >= 0) {
508       conf.setLong("startcode", System.currentTimeMillis());
509       String a = conf.get("hbase.thrift.info.bindAddress", "0.0.0.0");
510       InfoServer infoServer = new InfoServer("thrift", a, port, false, conf);
511       infoServer.setAttribute("hbase.conf", conf);
512       infoServer.start();
513     }
514 
515     if (nonblocking) {
516       server = getTNonBlockingServer(protocolFactory,
517           processor,
518           transportFactory,
519           inetSocketAddress);
520     } else if (hsha) {
521       server = getTHsHaServer(protocolFactory,
522           processor,
523           transportFactory,
524           workerThreads,
525           maxCallQueueSize,
526           inetSocketAddress,
527           metrics);
528     } else {
529       server = getTThreadPoolServer(protocolFactory,
530           processor,
531           transportFactory,
532           workerThreads,
533           inetSocketAddress,
534           backlog,
535           readTimeout);
536     }
537 
538     final TServer tserver = server;
539     realUser.doAs(
540       new PrivilegedAction<Object>() {
541         @Override
542         public Object run() {
543           pauseMonitor.start();
544           try {
545             tserver.serve();
546             return null;
547           } finally {
548             pauseMonitor.stop();
549           }
550         }
551       });
552     // when tserver.stop eventually happens we'll get here.
553     return 0;
554   }
555 }