View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.thrift2;
20  
21  import java.io.IOException;
22  import java.net.InetAddress;
23  import java.net.InetSocketAddress;
24  import java.net.UnknownHostException;
25  import java.security.PrivilegedAction;
26  import java.util.HashMap;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.concurrent.ExecutorService;
30  import java.util.concurrent.LinkedBlockingQueue;
31  import java.util.concurrent.ThreadPoolExecutor;
32  import java.util.concurrent.TimeUnit;
33  
34  import javax.security.auth.callback.Callback;
35  import javax.security.auth.callback.UnsupportedCallbackException;
36  import javax.security.sasl.AuthorizeCallback;
37  import javax.security.sasl.Sasl;
38  import javax.security.sasl.SaslServer;
39  
40  import org.apache.commons.cli.CommandLine;
41  import org.apache.commons.cli.CommandLineParser;
42  import org.apache.commons.cli.HelpFormatter;
43  import org.apache.commons.cli.Option;
44  import org.apache.commons.cli.OptionGroup;
45  import org.apache.commons.cli.Options;
46  import org.apache.commons.cli.ParseException;
47  import org.apache.commons.cli.PosixParser;
48  import org.apache.commons.logging.Log;
49  import org.apache.commons.logging.LogFactory;
50  import org.apache.hadoop.hbase.classification.InterfaceAudience;
51  import org.apache.hadoop.conf.Configuration;
52  import org.apache.hadoop.conf.Configured;
53  import org.apache.hadoop.hbase.HBaseConfiguration;
54  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
55  import org.apache.hadoop.hbase.filter.ParseFilter;
56  import org.apache.hadoop.hbase.http.InfoServer;
57  import org.apache.hadoop.hbase.security.SaslUtil;
58  import org.apache.hadoop.hbase.security.SecurityUtil;
59  import org.apache.hadoop.hbase.security.UserProvider;
60  import org.apache.hadoop.hbase.thrift.CallQueue;
61  import org.apache.hadoop.hbase.thrift.CallQueue.Call;
62  import org.apache.hadoop.hbase.thrift.ThriftMetrics;
63  import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
64  import org.apache.hadoop.hbase.util.DNS;
65  import org.apache.hadoop.hbase.util.Strings;
66  import org.apache.hadoop.security.UserGroupInformation;
67  import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
68  import org.apache.hadoop.util.Tool;
69  import org.apache.hadoop.util.ToolRunner;
70  import org.apache.thrift.TException;
71  import org.apache.thrift.TProcessor;
72  import org.apache.thrift.protocol.TBinaryProtocol;
73  import org.apache.thrift.protocol.TCompactProtocol;
74  import org.apache.thrift.protocol.TProtocol;
75  import org.apache.thrift.protocol.TProtocolFactory;
76  import org.apache.thrift.server.THsHaServer;
77  import org.apache.thrift.server.TNonblockingServer;
78  import org.apache.thrift.server.TServer;
79  import org.apache.thrift.server.TThreadPoolServer;
80  import org.apache.thrift.transport.TFramedTransport;
81  import org.apache.thrift.transport.TNonblockingServerSocket;
82  import org.apache.thrift.transport.TNonblockingServerTransport;
83  import org.apache.thrift.transport.TSaslServerTransport;
84  import org.apache.thrift.transport.TServerSocket;
85  import org.apache.thrift.transport.TServerTransport;
86  import org.apache.thrift.transport.TTransportException;
87  import org.apache.thrift.transport.TTransportFactory;
88  
89  import com.google.common.util.concurrent.ThreadFactoryBuilder;
90  
91  /**
92   * ThriftServer - this class starts up a Thrift server which implements the HBase API specified in the
93   * HbaseClient.thrift IDL file.
94   */
95  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
96  @SuppressWarnings({ "rawtypes", "unchecked" })
97  public class ThriftServer extends Configured implements Tool {
98    private static final Log log = LogFactory.getLog(ThriftServer.class);
99  
100   /**
101    * Thrift quality of protection configuration key. Valid values can be:
102    * privacy: authentication, integrity and confidentiality checking
103    * integrity: authentication and integrity checking
104    * authentication: authentication only
105    *
106    * This is used to authenticate the callers and support impersonation.
107    * The thrift server and the HBase cluster must run in secure mode.
108    */
109   static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
110 
111   static final String BACKLOG_CONF_KEY = "hbase.regionserver.thrift.backlog";
112 
113   public static final int DEFAULT_LISTEN_PORT = 9090;
114 
115   private static final String READ_TIMEOUT_OPTION = "readTimeout";
116 
117   /**
118    * Amount of time in milliseconds before a server thread will timeout
119    * waiting for client to send data on a connected socket. Currently,
120    * applies only to TBoundedThreadPoolServer
121    */
122   public static final String THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY =
123     "hbase.thrift.server.socket.read.timeout";
124   public static final int THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT = 60000;
125 
126   public ThriftServer() {
127   }
128 
129   private static void printUsage() {
130     HelpFormatter formatter = new HelpFormatter();
131     formatter.printHelp("Thrift", null, getOptions(),
132         "To start the Thrift server run 'bin/hbase-daemon.sh start thrift2'\n" +
133             "To shutdown the thrift server run 'bin/hbase-daemon.sh stop thrift2' or" +
134             " send a kill signal to the thrift server pid",
135         true);
136   }
137 
138   private static Options getOptions() {
139     Options options = new Options();
140     options.addOption("b", "bind", true,
141         "Address to bind the Thrift server to. [default: 0.0.0.0]");
142     options.addOption("p", "port", true, "Port to bind to [default: " + DEFAULT_LISTEN_PORT + "]");
143     options.addOption("f", "framed", false, "Use framed transport");
144     options.addOption("c", "compact", false, "Use the compact protocol");
145     options.addOption("w", "workers", true, "How many worker threads to use.");
146     options.addOption("h", "help", false, "Print help information");
147     options.addOption(null, "infoport", true, "Port for web UI");
148     options.addOption("t", READ_TIMEOUT_OPTION, true,
149       "Amount of time in milliseconds before a server thread will timeout " +
150       "waiting for client to send data on a connected socket. Currently, " +
151       "only applies to TBoundedThreadPoolServer");
152     OptionGroup servers = new OptionGroup();
153     servers.addOption(
154         new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport."));
155     servers.addOption(new Option("hsha", false, "Use the THsHaServer. This implies the framed transport."));
156     servers.addOption(new Option("threadpool", false, "Use the TThreadPoolServer. This is the default."));
157     options.addOptionGroup(servers);
158     return options;
159   }
160 
161   private static CommandLine parseArguments(Configuration conf, Options options, String[] args)
162       throws ParseException, IOException {
163     CommandLineParser parser = new PosixParser();
164     return parser.parse(options, args);
165   }
166 
167   private static TProtocolFactory getTProtocolFactory(boolean isCompact) {
168     if (isCompact) {
169       log.debug("Using compact protocol");
170       return new TCompactProtocol.Factory();
171     } else {
172       log.debug("Using binary protocol");
173       return new TBinaryProtocol.Factory();
174     }
175   }
176 
177   private static TTransportFactory getTTransportFactory(
178       SaslUtil.QualityOfProtection qop, String name, String host,
179       boolean framed, int frameSize) {
180     if (framed) {
181       if (qop != null) {
182         throw new RuntimeException("Thrift server authentication"
183           + " doesn't work with framed transport yet");
184       }
185       log.debug("Using framed transport");
186       return new TFramedTransport.Factory(frameSize);
187     } else if (qop == null) {
188       return new TTransportFactory();
189     } else {
190       Map<String, String> saslProperties = new HashMap<String, String>();
191       saslProperties.put(Sasl.QOP, qop.getSaslQop());
192       TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
193       saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
194         new SaslGssCallbackHandler() {
195           @Override
196           public void handle(Callback[] callbacks)
197               throws UnsupportedCallbackException {
198             AuthorizeCallback ac = null;
199             for (Callback callback : callbacks) {
200               if (callback instanceof AuthorizeCallback) {
201                 ac = (AuthorizeCallback) callback;
202               } else {
203                 throw new UnsupportedCallbackException(callback,
204                     "Unrecognized SASL GSSAPI Callback");
205               }
206             }
207             if (ac != null) {
208               String authid = ac.getAuthenticationID();
209               String authzid = ac.getAuthorizationID();
210               if (!authid.equals(authzid)) {
211                 ac.setAuthorized(false);
212               } else {
213                 ac.setAuthorized(true);
214                 String userName = SecurityUtil.getUserFromPrincipal(authzid);
215                 log.info("Effective user: " + userName);
216                 ac.setAuthorizedID(userName);
217               }
218             }
219           }
220         });
221       return saslFactory;
222     }
223   }
224 
225   /*
226    * If bindValue is null, we don't bind.
227    */
228   private static InetSocketAddress bindToPort(String bindValue, int listenPort)
229       throws UnknownHostException {
230     try {
231       if (bindValue == null) {
232         return new InetSocketAddress(listenPort);
233       } else {
234         return new InetSocketAddress(InetAddress.getByName(bindValue), listenPort);
235       }
236     } catch (UnknownHostException e) {
237       throw new RuntimeException("Could not bind to provided ip address", e);
238     }
239   }
240 
241   private static TServer getTNonBlockingServer(TProtocolFactory protocolFactory, TProcessor processor,
242       TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException {
243     TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
244     log.info("starting HBase Nonblocking Thrift server on " + inetSocketAddress.toString());
245     TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport);
246     serverArgs.processor(processor);
247     serverArgs.transportFactory(transportFactory);
248     serverArgs.protocolFactory(protocolFactory);
249     return new TNonblockingServer(serverArgs);
250   }
251 
252   private static TServer getTHsHaServer(TProtocolFactory protocolFactory,
253       TProcessor processor, TTransportFactory transportFactory,
254       int workerThreads,
255       InetSocketAddress inetSocketAddress, ThriftMetrics metrics)
256       throws TTransportException {
257     TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
258     log.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString());
259     THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
260     if (workerThreads > 0) {
261       // Could support the min & max threads, avoiding to preserve existing functionality.
262       serverArgs.minWorkerThreads(workerThreads).maxWorkerThreads(workerThreads);
263     }
264     ExecutorService executorService = createExecutor(
265         workerThreads, metrics);
266     serverArgs.executorService(executorService);
267     serverArgs.processor(processor);
268     serverArgs.transportFactory(transportFactory);
269     serverArgs.protocolFactory(protocolFactory);
270     return new THsHaServer(serverArgs);
271   }
272 
273   private static ExecutorService createExecutor(
274       int workerThreads, ThriftMetrics metrics) {
275     CallQueue callQueue = new CallQueue(
276         new LinkedBlockingQueue<Call>(), metrics);
277     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
278     tfb.setDaemon(true);
279     tfb.setNameFormat("thrift2-worker-%d");
280     ThreadPoolExecutor pool = new ThreadPoolExecutor(workerThreads, workerThreads,
281             Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
282     pool.prestartAllCoreThreads();
283     return pool;
284   }
285 
286   private static TServer getTThreadPoolServer(TProtocolFactory protocolFactory,
287                                               TProcessor processor,
288                                               TTransportFactory transportFactory,
289                                               int workerThreads,
290                                               InetSocketAddress inetSocketAddress,
291                                               int backlog,
292                                               int clientTimeout)
293       throws TTransportException {
294     TServerTransport serverTransport = new TServerSocket(
295                                            new TServerSocket.ServerSocketTransportArgs().
296                                                bindAddr(inetSocketAddress).backlog(backlog).
297                                                clientTimeout(clientTimeout));
298     log.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString());
299     TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport);
300     serverArgs.processor(processor);
301     serverArgs.transportFactory(transportFactory);
302     serverArgs.protocolFactory(protocolFactory);
303     if (workerThreads > 0) {
304       serverArgs.maxWorkerThreads(workerThreads);
305     }
306     return new TThreadPoolServer(serverArgs);
307   }
308 
309   /**
310    * Adds the option to pre-load filters at startup.
311    *
312    * @param conf  The current configuration instance.
313    */
314   protected static void registerFilters(Configuration conf) {
315     String[] filters = conf.getStrings("hbase.thrift.filters");
316     if(filters != null) {
317       for(String filterClass: filters) {
318         String[] filterPart = filterClass.split(":");
319         if(filterPart.length != 2) {
320           log.warn("Invalid filter specification " + filterClass + " - skipping");
321         } else {
322           ParseFilter.registerFilter(filterPart[0], filterPart[1]);
323         }
324       }
325     }
326   }
327 
328   /**
329    * Start up the Thrift2 server.
330    */
331   public static void main(String[] args) throws Exception {
332     final Configuration conf = HBaseConfiguration.create();
333     // for now, only time we return is on an argument error.
334     final int status = ToolRunner.run(conf, new ThriftServer(), args);
335     System.exit(status);
336   }
337 
338   @Override
339   public int run(String[] args) throws Exception {
340     final Configuration conf = getConf();
341     TServer server = null;
342     Options options = getOptions();
343     CommandLine cmd = parseArguments(conf, options, args);
344     int workerThreads = 0;
345 
346     /**
347      * This is to please both bin/hbase and bin/hbase-daemon. hbase-daemon provides "start" and "stop" arguments hbase
348      * should print the help if no argument is provided
349      */
350     List<?> argList = cmd.getArgList();
351     if (cmd.hasOption("help") || !argList.contains("start") || argList.contains("stop")) {
352       printUsage();
353       return 1;
354     }
355 
356     // Get address to bind
357     String bindAddress;
358     if (cmd.hasOption("bind")) {
359       bindAddress = cmd.getOptionValue("bind");
360       conf.set("hbase.thrift.info.bindAddress", bindAddress);
361     } else {
362       bindAddress = conf.get("hbase.thrift.info.bindAddress");
363     }
364 
365     // Get read timeout
366     int readTimeout = THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT;
367     if (cmd.hasOption(READ_TIMEOUT_OPTION)) {
368       try {
369         readTimeout = Integer.parseInt(cmd.getOptionValue(READ_TIMEOUT_OPTION));
370       } catch (NumberFormatException e) {
371         throw new RuntimeException("Could not parse the value provided for the timeout option", e);
372       }
373     } else {
374       readTimeout = conf.getInt(THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY,
375         THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT);
376     }
377 
378     // Get port to bind to
379     int listenPort = 0;
380     try {
381       if (cmd.hasOption("port")) {
382         listenPort = Integer.parseInt(cmd.getOptionValue("port"));
383       } else {
384         listenPort = conf.getInt("hbase.regionserver.thrift.port", DEFAULT_LISTEN_PORT);
385       }
386     } catch (NumberFormatException e) {
387       throw new RuntimeException("Could not parse the value provided for the port option", e);
388     }
389 
390     // Thrift's implementation uses '0' as a placeholder for 'use the default.'
391     int backlog = conf.getInt(BACKLOG_CONF_KEY, 0);
392 
393     // Local hostname and user name,
394     // used only if QOP is configured.
395     String host = null;
396     String name = null;
397 
398     UserProvider userProvider = UserProvider.instantiate(conf);
399     // login the server principal (if using secure Hadoop)
400     boolean securityEnabled = userProvider.isHadoopSecurityEnabled()
401       && userProvider.isHBaseSecurityEnabled();
402     if (securityEnabled) {
403       host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
404         conf.get("hbase.thrift.dns.interface", "default"),
405         conf.get("hbase.thrift.dns.nameserver", "default")));
406       userProvider.login("hbase.thrift.keytab.file",
407         "hbase.thrift.kerberos.principal", host);
408     }
409 
410     UserGroupInformation realUser = userProvider.getCurrent().getUGI();
411     String stringQop = conf.get(THRIFT_QOP_KEY);
412     SaslUtil.QualityOfProtection qop = null;
413     if (stringQop != null) {
414       qop = SaslUtil.getQop(stringQop);
415       if (!securityEnabled) {
416         throw new IOException("Thrift server must"
417           + " run in secure mode to support authentication");
418       }
419       // Extract the name from the principal
420       name = SecurityUtil.getUserFromPrincipal(
421         conf.get("hbase.thrift.kerberos.principal"));
422     }
423 
424     boolean nonblocking = cmd.hasOption("nonblocking");
425     boolean hsha = cmd.hasOption("hsha");
426 
427     ThriftMetrics metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.TWO);
428 
429     String implType = "threadpool";
430     if (nonblocking) {
431       implType = "nonblocking";
432     } else if (hsha) {
433       implType = "hsha";
434     }
435 
436     conf.set("hbase.regionserver.thrift.server.type", implType);
437     conf.setInt("hbase.regionserver.thrift.port", listenPort);
438     registerFilters(conf);
439 
440     // Construct correct ProtocolFactory
441     boolean compact = cmd.hasOption("compact") ||
442         conf.getBoolean("hbase.regionserver.thrift.compact", false);
443     TProtocolFactory protocolFactory = getTProtocolFactory(compact);
444     final ThriftHBaseServiceHandler hbaseHandler =
445       new ThriftHBaseServiceHandler(conf, userProvider);
446     THBaseService.Iface handler =
447       ThriftHBaseServiceHandler.newInstance(hbaseHandler, metrics);
448     final THBaseService.Processor p = new THBaseService.Processor(handler);
449     conf.setBoolean("hbase.regionserver.thrift.compact", compact);
450     TProcessor processor = p;
451 
452     boolean framed = cmd.hasOption("framed") ||
453         conf.getBoolean("hbase.regionserver.thrift.framed", false) || nonblocking || hsha;
454     TTransportFactory transportFactory = getTTransportFactory(qop, name, host, framed,
455         conf.getInt("hbase.regionserver.thrift.framed.max_frame_size_in_mb", 2) * 1024 * 1024);
456     InetSocketAddress inetSocketAddress = bindToPort(bindAddress, listenPort);
457     conf.setBoolean("hbase.regionserver.thrift.framed", framed);
458     if (qop != null) {
459       // Create a processor wrapper, to get the caller
460       processor = new TProcessor() {
461         @Override
462         public boolean process(TProtocol inProt,
463             TProtocol outProt) throws TException {
464           TSaslServerTransport saslServerTransport =
465             (TSaslServerTransport)inProt.getTransport();
466           SaslServer saslServer = saslServerTransport.getSaslServer();
467           String principal = saslServer.getAuthorizationID();
468           hbaseHandler.setEffectiveUser(principal);
469           return p.process(inProt, outProt);
470         }
471       };
472     }
473 
474     if (cmd.hasOption("w")) {
475       workerThreads = Integer.parseInt(cmd.getOptionValue("w"));
476     }
477 
478     // check for user-defined info server port setting, if so override the conf
479     try {
480       if (cmd.hasOption("infoport")) {
481         String val = cmd.getOptionValue("infoport");
482         conf.setInt("hbase.thrift.info.port", Integer.parseInt(val));
483         log.debug("Web UI port set to " + val);
484       }
485     } catch (NumberFormatException e) {
486       log.error("Could not parse the value provided for the infoport option", e);
487       printUsage();
488       System.exit(1);
489     }
490 
491     // Put up info server.
492     int port = conf.getInt("hbase.thrift.info.port", 9095);
493     if (port >= 0) {
494       conf.setLong("startcode", System.currentTimeMillis());
495       String a = conf.get("hbase.thrift.info.bindAddress", "0.0.0.0");
496       InfoServer infoServer = new InfoServer("thrift", a, port, false, conf);
497       infoServer.setAttribute("hbase.conf", conf);
498       infoServer.start();
499     }
500 
501     if (nonblocking) {
502       server = getTNonBlockingServer(protocolFactory,
503           processor,
504           transportFactory,
505           inetSocketAddress);
506     } else if (hsha) {
507       server = getTHsHaServer(protocolFactory,
508           processor,
509           transportFactory,
510           workerThreads,
511           inetSocketAddress,
512           metrics);
513     } else {
514       server = getTThreadPoolServer(protocolFactory,
515           processor,
516           transportFactory,
517           workerThreads,
518           inetSocketAddress,
519           backlog,
520           readTimeout);
521     }
522 
523     final TServer tserver = server;
524     realUser.doAs(
525       new PrivilegedAction<Object>() {
526         @Override
527         public Object run() {
528           tserver.serve();
529           return null;
530         }
531       });
532     // when tserver.stop eventually happens we'll get here.
533     return 0;
534   }
535 }