View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.thrift2;
20  
21  import java.io.IOException;
22  import java.net.InetAddress;
23  import java.net.InetSocketAddress;
24  import java.net.UnknownHostException;
25  import java.security.PrivilegedAction;
26  import java.util.HashMap;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.concurrent.ExecutorService;
30  import java.util.concurrent.LinkedBlockingQueue;
31  import java.util.concurrent.ThreadPoolExecutor;
32  import java.util.concurrent.TimeUnit;
33  
34  import javax.security.auth.callback.Callback;
35  import javax.security.auth.callback.UnsupportedCallbackException;
36  import javax.security.sasl.AuthorizeCallback;
37  import javax.security.sasl.Sasl;
38  import javax.security.sasl.SaslServer;
39  
40  import org.apache.commons.cli.CommandLine;
41  import org.apache.commons.cli.CommandLineParser;
42  import org.apache.commons.cli.HelpFormatter;
43  import org.apache.commons.cli.Option;
44  import org.apache.commons.cli.OptionGroup;
45  import org.apache.commons.cli.Options;
46  import org.apache.commons.cli.ParseException;
47  import org.apache.commons.cli.PosixParser;
48  import org.apache.commons.logging.Log;
49  import org.apache.commons.logging.LogFactory;
50  import org.apache.hadoop.hbase.classification.InterfaceAudience;
51  import org.apache.hadoop.conf.Configuration;
52  import org.apache.hadoop.hbase.HBaseConfiguration;
53  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
54  import org.apache.hadoop.hbase.filter.ParseFilter;
55  import org.apache.hadoop.hbase.http.InfoServer;
56  import org.apache.hadoop.hbase.security.SaslUtil;
57  import org.apache.hadoop.hbase.security.SecurityUtil;
58  import org.apache.hadoop.hbase.security.UserProvider;
59  import org.apache.hadoop.hbase.thrift.CallQueue;
60  import org.apache.hadoop.hbase.thrift.CallQueue.Call;
61  import org.apache.hadoop.hbase.thrift.ThriftMetrics;
62  import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
63  import org.apache.hadoop.hbase.util.DNS;
64  import org.apache.hadoop.hbase.util.Strings;
65  import org.apache.hadoop.security.UserGroupInformation;
66  import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
67  import org.apache.hadoop.util.GenericOptionsParser;
68  import org.apache.thrift.TException;
69  import org.apache.thrift.TProcessor;
70  import org.apache.thrift.protocol.TBinaryProtocol;
71  import org.apache.thrift.protocol.TCompactProtocol;
72  import org.apache.thrift.protocol.TProtocol;
73  import org.apache.thrift.protocol.TProtocolFactory;
74  import org.apache.thrift.server.THsHaServer;
75  import org.apache.thrift.server.TNonblockingServer;
76  import org.apache.thrift.server.TServer;
77  import org.apache.thrift.server.TThreadPoolServer;
78  import org.apache.thrift.server.TThreadedSelectorServer;
79  import org.apache.thrift.transport.TFramedTransport;
80  import org.apache.thrift.transport.TNonblockingServerSocket;
81  import org.apache.thrift.transport.TNonblockingServerTransport;
82  import org.apache.thrift.transport.TSaslServerTransport;
83  import org.apache.thrift.transport.TServerSocket;
84  import org.apache.thrift.transport.TServerTransport;
85  import org.apache.thrift.transport.TTransportException;
86  import org.apache.thrift.transport.TTransportFactory;
87  
88  import com.google.common.util.concurrent.ThreadFactoryBuilder;
89  
90  /**
91   * ThriftServer - this class starts up a Thrift server which implements the HBase API specified in the
92   * HbaseClient.thrift IDL file.
93   */
94  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
95  @SuppressWarnings({ "rawtypes", "unchecked" })
96  public class ThriftServer {
97    private static final Log log = LogFactory.getLog(ThriftServer.class);
98  
99    /**
100    * Thrift quality of protection configuration key. Valid values can be:
101    * privacy: authentication, integrity and confidentiality checking
102    * integrity: authentication and integrity checking
103    * authentication: authentication only
104    *
105    * This is used to authenticate the callers and support impersonation.
106    * The thrift server and the HBase cluster must run in secure mode.
107    */
108   static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
109 
110   public static final int DEFAULT_LISTEN_PORT = 9090;
111 
112   private static final String READ_TIMEOUT_OPTION = "readTimeout";
113 
114   static final String BACKLOG_CONF_KEY = "hbase.regionserver.thrift.backlog";
115 
116   /**
117    * Amount of time in milliseconds before a server thread will timeout
118    * waiting for client to send data on a connected socket. Currently,
119    * applies only to TBoundedThreadPoolServer
120    */
121   public static final String THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY =
122     "hbase.thrift.server.socket.read.timeout";
123   public static final int THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT = 60000;
124 
125   public ThriftServer() {
126   }
127 
128   private static void printUsage() {
129     HelpFormatter formatter = new HelpFormatter();
130     formatter.printHelp("Thrift", null, getOptions(),
131         "To start the Thrift server run 'bin/hbase-daemon.sh start thrift2'\n" +
132             "To shutdown the thrift server run 'bin/hbase-daemon.sh stop thrift2' or" +
133             " send a kill signal to the thrift server pid",
134         true);
135   }
136 
137   private static Options getOptions() {
138     Options options = new Options();
139     options.addOption("b", "bind", true,
140         "Address to bind the Thrift server to. [default: 0.0.0.0]");
141     options.addOption("p", "port", true, "Port to bind to [default: " + DEFAULT_LISTEN_PORT + "]");
142     options.addOption("f", "framed", false, "Use framed transport");
143     options.addOption("c", "compact", false, "Use the compact protocol");
144     options.addOption("w", "workers", true, "How many worker threads to use.");
145     options.addOption("s", "selectors", true, "How many selector threads to use.");
146     options.addOption("h", "help", false, "Print help information");
147     options.addOption(null, "infoport", true, "Port for web UI");
148     options.addOption("t", READ_TIMEOUT_OPTION, true,
149       "Amount of time in milliseconds before a server thread will timeout " +
150       "waiting for client to send data on a connected socket. Currently, " +
151       "only applies to TBoundedThreadPoolServer");
152     OptionGroup servers = new OptionGroup();
153     servers.addOption(
154         new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport."));
155     servers.addOption(new Option("hsha", false, "Use the THsHaServer. This implies the framed transport."));
156     servers.addOption(new Option("selector", false, "Use the TThreadedSelectorServer. This implies the framed transport."));
157     servers.addOption(new Option("threadpool", false, "Use the TThreadPoolServer. This is the default."));
158     options.addOptionGroup(servers);
159     return options;
160   }
161 
162   private static CommandLine parseArguments(Configuration conf, Options options, String[] args)
163       throws ParseException, IOException {
164     GenericOptionsParser genParser = new GenericOptionsParser(conf, args);
165     String[] remainingArgs = genParser.getRemainingArgs();
166     CommandLineParser parser = new PosixParser();
167     return parser.parse(options, remainingArgs);
168   }
169 
170   private static TProtocolFactory getTProtocolFactory(boolean isCompact) {
171     if (isCompact) {
172       log.debug("Using compact protocol");
173       return new TCompactProtocol.Factory();
174     } else {
175       log.debug("Using binary protocol");
176       return new TBinaryProtocol.Factory();
177     }
178   }
179 
180   private static TTransportFactory getTTransportFactory(
181       SaslUtil.QualityOfProtection qop, String name, String host,
182       boolean framed, int frameSize) {
183     if (framed) {
184       if (qop != null) {
185         throw new RuntimeException("Thrift server authentication"
186           + " doesn't work with framed transport yet");
187       }
188       log.debug("Using framed transport");
189       return new TFramedTransport.Factory(frameSize);
190     } else if (qop == null) {
191       return new TTransportFactory();
192     } else {
193       Map<String, String> saslProperties = new HashMap<String, String>();
194       saslProperties.put(Sasl.QOP, qop.getSaslQop());
195       saslProperties.put(Sasl.SERVER_AUTH, "true");
196       TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
197       saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
198         new SaslGssCallbackHandler() {
199           @Override
200           public void handle(Callback[] callbacks)
201               throws UnsupportedCallbackException {
202             AuthorizeCallback ac = null;
203             for (Callback callback : callbacks) {
204               if (callback instanceof AuthorizeCallback) {
205                 ac = (AuthorizeCallback) callback;
206               } else {
207                 throw new UnsupportedCallbackException(callback,
208                     "Unrecognized SASL GSSAPI Callback");
209               }
210             }
211             if (ac != null) {
212               String authid = ac.getAuthenticationID();
213               String authzid = ac.getAuthorizationID();
214               if (!authid.equals(authzid)) {
215                 ac.setAuthorized(false);
216               } else {
217                 ac.setAuthorized(true);
218                 String userName = SecurityUtil.getUserFromPrincipal(authzid);
219                 log.info("Effective user: " + userName);
220                 ac.setAuthorizedID(userName);
221               }
222             }
223           }
224         });
225       return saslFactory;
226     }
227   }
228 
229   /*
230    * If bindValue is null, we don't bind.
231    */
232   private static InetSocketAddress bindToPort(String bindValue, int listenPort)
233       throws UnknownHostException {
234     try {
235       if (bindValue == null) {
236         return new InetSocketAddress(listenPort);
237       } else {
238         return new InetSocketAddress(InetAddress.getByName(bindValue), listenPort);
239       }
240     } catch (UnknownHostException e) {
241       throw new RuntimeException("Could not bind to provided ip address", e);
242     }
243   }
244 
245   private static TServer getTNonBlockingServer(TProtocolFactory protocolFactory, TProcessor processor,
246       TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException {
247     TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
248     log.info("starting HBase Nonblocking Thrift server on " + inetSocketAddress.toString());
249     TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport);
250     serverArgs.processor(processor);
251     serverArgs.transportFactory(transportFactory);
252     serverArgs.protocolFactory(protocolFactory);
253     return new TNonblockingServer(serverArgs);
254   }
255 
256   private static TServer getTHsHaServer(TProtocolFactory protocolFactory,
257       TProcessor processor, TTransportFactory transportFactory,
258       int workerThreads,
259       InetSocketAddress inetSocketAddress, ThriftMetrics metrics)
260       throws TTransportException {
261     TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
262     log.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString());
263     THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
264     if (workerThreads > 0) {
265       // Could support the min & max threads, avoiding to preserve existing functionality.
266       serverArgs.minWorkerThreads(workerThreads).maxWorkerThreads(workerThreads);
267     }
268     ExecutorService executorService = createExecutor(
269         workerThreads, metrics);
270     serverArgs.executorService(executorService);
271     serverArgs.processor(processor);
272     serverArgs.transportFactory(transportFactory);
273     serverArgs.protocolFactory(protocolFactory);
274     return new THsHaServer(serverArgs);
275   }
276 
277   private static TServer getTThreadedSelectorServer(TProtocolFactory protocolFactory,
278           TProcessor processor, TTransportFactory transportFactory,
279           int workerThreads, int selectorThreads,
280           InetSocketAddress inetSocketAddress, ThriftMetrics metrics)
281           throws TTransportException {
282         TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
283         log.info("starting HBase ThreadedSelector Thrift server on " + inetSocketAddress.toString());
284         TThreadedSelectorServer.Args serverArgs = new TThreadedSelectorServer.Args(serverTransport);
285         if (workerThreads > 0) {
286             serverArgs.workerThreads(workerThreads);
287         }
288         if (selectorThreads > 0) {
289             serverArgs.selectorThreads(selectorThreads);
290         }
291 
292         ExecutorService executorService = createExecutor(
293             workerThreads, metrics);
294         serverArgs.executorService(executorService);
295         serverArgs.processor(processor);
296         serverArgs.transportFactory(transportFactory);
297         serverArgs.protocolFactory(protocolFactory);
298         return new TThreadedSelectorServer(serverArgs);
299       }
300 
301   private static ExecutorService createExecutor(
302       int workerThreads, ThriftMetrics metrics) {
303     CallQueue callQueue = new CallQueue(
304         new LinkedBlockingQueue<Call>(), metrics);
305     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
306     tfb.setDaemon(true);
307     tfb.setNameFormat("thrift2-worker-%d");
308     ThreadPoolExecutor pool = new ThreadPoolExecutor(workerThreads, workerThreads,
309             Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
310     pool.prestartAllCoreThreads();
311     return pool;
312   }
313 
314   private static TServer getTThreadPoolServer(TProtocolFactory protocolFactory,
315                                               TProcessor processor,
316                                               TTransportFactory transportFactory,
317                                               int workerThreads,
318                                               InetSocketAddress inetSocketAddress,
319                                               int backlog,
320                                               int clientTimeout)
321       throws TTransportException {
322     TServerTransport serverTransport = new TServerSocket(
323                                            new TServerSocket.ServerSocketTransportArgs().
324                                                bindAddr(inetSocketAddress).backlog(backlog).
325                                                clientTimeout(clientTimeout));
326     log.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString());
327     TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport);
328     serverArgs.processor(processor);
329     serverArgs.transportFactory(transportFactory);
330     serverArgs.protocolFactory(protocolFactory);
331     if (workerThreads > 0) {
332       serverArgs.maxWorkerThreads(workerThreads);
333     }
334     return new TThreadPoolServer(serverArgs);
335   }
336 
337   /**
338    * Adds the option to pre-load filters at startup.
339    *
340    * @param conf  The current configuration instance.
341    */
342   protected static void registerFilters(Configuration conf) {
343     String[] filters = conf.getStrings("hbase.thrift.filters");
344     if(filters != null) {
345       for(String filterClass: filters) {
346         String[] filterPart = filterClass.split(":");
347         if(filterPart.length != 2) {
348           log.warn("Invalid filter specification " + filterClass + " - skipping");
349         } else {
350           ParseFilter.registerFilter(filterPart[0], filterPart[1]);
351         }
352       }
353     }
354   }
355 
356   /**
357    * Start up the Thrift2 server.
358    *
359    * @param args
360    */
361   public static void main(String[] args) throws Exception {
362     TServer server = null;
363     Options options = getOptions();
364     Configuration conf = HBaseConfiguration.create();
365     CommandLine cmd = parseArguments(conf, options, args);
366     int workerThreads = 0;
367     int selectorThreads = 0;
368 
369     /**
370      * This is to please both bin/hbase and bin/hbase-daemon. hbase-daemon provides "start" and "stop" arguments hbase
371      * should print the help if no argument is provided
372      */
373     List<?> argList = cmd.getArgList();
374     if (cmd.hasOption("help") || !argList.contains("start") || argList.contains("stop")) {
375       printUsage();
376       System.exit(1);
377     }
378 
379     // Get address to bind
380     String bindAddress;
381     if (cmd.hasOption("bind")) {
382       bindAddress = cmd.getOptionValue("bind");
383       conf.set("hbase.thrift.info.bindAddress", bindAddress);
384     } else {
385       bindAddress = conf.get("hbase.thrift.info.bindAddress");
386     }
387 
388     // Get read timeout
389     int readTimeout = THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT;
390     if (cmd.hasOption(READ_TIMEOUT_OPTION)) {
391       try {
392         readTimeout = Integer.parseInt(cmd.getOptionValue(READ_TIMEOUT_OPTION));
393       } catch (NumberFormatException e) {
394         throw new RuntimeException("Could not parse the value provided for the timeout option", e);
395       }
396     } else {
397       readTimeout = conf.getInt(THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY,
398         THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT);
399     }
400 
401     // Get port to bind to
402     int listenPort = 0;
403     try {
404       if (cmd.hasOption("port")) {
405         listenPort = Integer.parseInt(cmd.getOptionValue("port"));
406       } else {
407         listenPort = conf.getInt("hbase.regionserver.thrift.port", DEFAULT_LISTEN_PORT);
408       }
409     } catch (NumberFormatException e) {
410       throw new RuntimeException("Could not parse the value provided for the port option", e);
411     }
412 
413     // Thrift's implementation uses '0' as a placeholder for 'use the default.'
414     int backlog = conf.getInt(BACKLOG_CONF_KEY, 0);
415 
416     // Local hostname and user name,
417     // used only if QOP is configured.
418     String host = null;
419     String name = null;
420 
421     UserProvider userProvider = UserProvider.instantiate(conf);
422     // login the server principal (if using secure Hadoop)
423     boolean securityEnabled = userProvider.isHadoopSecurityEnabled()
424       && userProvider.isHBaseSecurityEnabled();
425     if (securityEnabled) {
426       host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
427         conf.get("hbase.thrift.dns.interface", "default"),
428         conf.get("hbase.thrift.dns.nameserver", "default")));
429       userProvider.login("hbase.thrift.keytab.file",
430         "hbase.thrift.kerberos.principal", host);
431     }
432 
433     UserGroupInformation realUser = userProvider.getCurrent().getUGI();
434     String stringQop = conf.get(THRIFT_QOP_KEY);
435     SaslUtil.QualityOfProtection qop = null;
436     if (stringQop != null) {
437       qop = SaslUtil.getQop(stringQop);
438       if (!securityEnabled) {
439         throw new IOException("Thrift server must"
440           + " run in secure mode to support authentication");
441       }
442       // Extract the name from the principal
443       name = SecurityUtil.getUserFromPrincipal(
444         conf.get("hbase.thrift.kerberos.principal"));
445     }
446 
447     boolean nonblocking = cmd.hasOption("nonblocking");
448     boolean hsha = cmd.hasOption("hsha");
449     boolean selector = cmd.hasOption("selector");
450 
451     ThriftMetrics metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.TWO);
452 
453     String implType = "threadpool";
454     if (nonblocking) {
455       implType = "nonblocking";
456     } else if (hsha) {
457       implType = "hsha";
458     } else if (selector) {
459       implType = "selector";
460     }
461 
462     conf.set("hbase.regionserver.thrift.server.type", implType);
463     conf.setInt("hbase.regionserver.thrift.port", listenPort);
464     registerFilters(conf);
465 
466     // Construct correct ProtocolFactory
467     boolean compact = cmd.hasOption("compact") ||
468         conf.getBoolean("hbase.regionserver.thrift.compact", false);
469     TProtocolFactory protocolFactory = getTProtocolFactory(compact);
470     final ThriftHBaseServiceHandler hbaseHandler =
471       new ThriftHBaseServiceHandler(conf, userProvider);
472     THBaseService.Iface handler =
473       ThriftHBaseServiceHandler.newInstance(hbaseHandler, metrics);
474     final THBaseService.Processor p = new THBaseService.Processor(handler);
475     conf.setBoolean("hbase.regionserver.thrift.compact", compact);
476     TProcessor processor = p;
477 
478     boolean framed = cmd.hasOption("framed") ||
479         conf.getBoolean("hbase.regionserver.thrift.framed", false) || nonblocking || hsha;
480     TTransportFactory transportFactory = getTTransportFactory(qop, name, host, framed,
481         conf.getInt("hbase.regionserver.thrift.framed.max_frame_size_in_mb", 2) * 1024 * 1024);
482     InetSocketAddress inetSocketAddress = bindToPort(bindAddress, listenPort);
483     conf.setBoolean("hbase.regionserver.thrift.framed", framed);
484     if (qop != null) {
485       // Create a processor wrapper, to get the caller
486       processor = new TProcessor() {
487         @Override
488         public boolean process(TProtocol inProt,
489             TProtocol outProt) throws TException {
490           TSaslServerTransport saslServerTransport =
491             (TSaslServerTransport)inProt.getTransport();
492           SaslServer saslServer = saslServerTransport.getSaslServer();
493           String principal = saslServer.getAuthorizationID();
494           hbaseHandler.setEffectiveUser(principal);
495           return p.process(inProt, outProt);
496         }
497       };
498     }
499 
500     if (cmd.hasOption("w")) {
501       workerThreads = Integer.parseInt(cmd.getOptionValue("w"));
502     }
503     if (cmd.hasOption("s")) {
504       selectorThreads = Integer.parseInt(cmd.getOptionValue("s"));
505     }
506 
507     // check for user-defined info server port setting, if so override the conf
508     try {
509       if (cmd.hasOption("infoport")) {
510         String val = cmd.getOptionValue("infoport");
511         conf.setInt("hbase.thrift.info.port", Integer.parseInt(val));
512         log.debug("Web UI port set to " + val);
513       }
514     } catch (NumberFormatException e) {
515       log.error("Could not parse the value provided for the infoport option", e);
516       printUsage();
517       System.exit(1);
518     }
519 
520     // Put up info server.
521     int port = conf.getInt("hbase.thrift.info.port", 9095);
522     if (port >= 0) {
523       conf.setLong("startcode", System.currentTimeMillis());
524       String a = conf.get("hbase.thrift.info.bindAddress", "0.0.0.0");
525       InfoServer infoServer = new InfoServer("thrift", a, port, false, conf);
526       infoServer.setAttribute("hbase.conf", conf);
527       infoServer.start();
528     }
529 
530     if (nonblocking) {
531       server = getTNonBlockingServer(protocolFactory,
532           processor,
533           transportFactory,
534           inetSocketAddress);
535     } else if (hsha) {
536       server = getTHsHaServer(protocolFactory,
537           processor,
538           transportFactory,
539           workerThreads,
540           inetSocketAddress,
541           metrics);
542     } else if (selector) {
543           server = getTThreadedSelectorServer(protocolFactory,
544           processor,
545           transportFactory,
546           workerThreads, selectorThreads,
547           inetSocketAddress,
548           metrics);
549     } else {
550       server = getTThreadPoolServer(protocolFactory,
551           processor,
552           transportFactory,
553           workerThreads,
554           inetSocketAddress,
555           backlog,
556           readTimeout);
557     }
558 
559     final TServer tserver = server;
560     realUser.doAs(
561       new PrivilegedAction<Object>() {
562         @Override
563         public Object run() {
564           tserver.serve();
565           return null;
566         }
567       });
568   }
569 }