View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.thrift2;
20  
21  import java.io.IOException;
22  import java.net.InetAddress;
23  import java.net.InetSocketAddress;
24  import java.net.UnknownHostException;
25  import java.security.PrivilegedAction;
26  import java.util.HashMap;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.concurrent.ExecutorService;
30  import java.util.concurrent.LinkedBlockingQueue;
31  import java.util.concurrent.ThreadPoolExecutor;
32  import java.util.concurrent.TimeUnit;
33  
34  import javax.security.auth.callback.Callback;
35  import javax.security.auth.callback.UnsupportedCallbackException;
36  import javax.security.sasl.AuthorizeCallback;
37  import javax.security.sasl.Sasl;
38  import javax.security.sasl.SaslServer;
39  
40  import org.apache.commons.cli.CommandLine;
41  import org.apache.commons.cli.CommandLineParser;
42  import org.apache.commons.cli.HelpFormatter;
43  import org.apache.commons.cli.Option;
44  import org.apache.commons.cli.OptionGroup;
45  import org.apache.commons.cli.Options;
46  import org.apache.commons.cli.ParseException;
47  import org.apache.commons.cli.PosixParser;
48  import org.apache.commons.logging.Log;
49  import org.apache.commons.logging.LogFactory;
50  import org.apache.hadoop.hbase.classification.InterfaceAudience;
51  import org.apache.hadoop.conf.Configuration;
52  import org.apache.hadoop.conf.Configured;
53  import org.apache.hadoop.hbase.HBaseConfiguration;
54  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
55  import org.apache.hadoop.hbase.filter.ParseFilter;
56  import org.apache.hadoop.hbase.http.InfoServer;
57  import org.apache.hadoop.hbase.security.SecurityUtil;
58  import org.apache.hadoop.hbase.security.UserProvider;
59  import org.apache.hadoop.hbase.thrift.CallQueue;
60  import org.apache.hadoop.hbase.thrift.CallQueue.Call;
61  import org.apache.hadoop.hbase.thrift.ThriftMetrics;
62  import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
63  import org.apache.hadoop.hbase.util.Strings;
64  import org.apache.hadoop.net.DNS;
65  import org.apache.hadoop.security.UserGroupInformation;
66  import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
67  import org.apache.hadoop.util.Tool;
68  import org.apache.hadoop.util.ToolRunner;
69  import org.apache.thrift.TException;
70  import org.apache.thrift.TProcessor;
71  import org.apache.thrift.protocol.TBinaryProtocol;
72  import org.apache.thrift.protocol.TCompactProtocol;
73  import org.apache.thrift.protocol.TProtocol;
74  import org.apache.thrift.protocol.TProtocolFactory;
75  import org.apache.thrift.server.THsHaServer;
76  import org.apache.thrift.server.TNonblockingServer;
77  import org.apache.thrift.server.TServer;
78  import org.apache.thrift.server.TThreadPoolServer;
79  import org.apache.thrift.transport.TFramedTransport;
80  import org.apache.thrift.transport.TNonblockingServerSocket;
81  import org.apache.thrift.transport.TNonblockingServerTransport;
82  import org.apache.thrift.transport.TSaslServerTransport;
83  import org.apache.thrift.transport.TServerSocket;
84  import org.apache.thrift.transport.TServerTransport;
85  import org.apache.thrift.transport.TTransportException;
86  import org.apache.thrift.transport.TTransportFactory;
87  
88  import com.google.common.util.concurrent.ThreadFactoryBuilder;
89  
90  /**
91   * ThriftServer - this class starts up a Thrift server which implements the HBase API specified in the
92   * HbaseClient.thrift IDL file.
93   */
94  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
95  @SuppressWarnings({ "rawtypes", "unchecked" })
96  public class ThriftServer extends Configured implements Tool {
97    private static final Log log = LogFactory.getLog(ThriftServer.class);
98  
99    /**
100    * Thrift quality of protection configuration key. Valid values can be:
101    * auth-conf: authentication, integrity and confidentiality checking
102    * auth-int: authentication and integrity checking
103    * auth: authentication only
104    *
105    * This is used to authenticate the callers and support impersonation.
106    * The thrift server and the HBase cluster must run in secure mode.
107    */
108   static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
109 
110   public static final int DEFAULT_LISTEN_PORT = 9090;
111 
112 
113   public ThriftServer() {
114   }
115 
116   private static void printUsage() {
117     HelpFormatter formatter = new HelpFormatter();
118     formatter.printHelp("Thrift", null, getOptions(),
119         "To start the Thrift server run 'bin/hbase-daemon.sh start thrift2'\n" +
120             "To shutdown the thrift server run 'bin/hbase-daemon.sh stop thrift2' or" +
121             " send a kill signal to the thrift server pid",
122         true);
123   }
124 
125   private static Options getOptions() {
126     Options options = new Options();
127     options.addOption("b", "bind", true,
128         "Address to bind the Thrift server to. [default: 0.0.0.0]");
129     options.addOption("p", "port", true, "Port to bind to [default: " + DEFAULT_LISTEN_PORT + "]");
130     options.addOption("f", "framed", false, "Use framed transport");
131     options.addOption("c", "compact", false, "Use the compact protocol");
132     options.addOption("w", "workers", true, "How many worker threads to use.");
133     options.addOption("h", "help", false, "Print help information");
134     options.addOption(null, "infoport", true, "Port for web UI");
135 
136     OptionGroup servers = new OptionGroup();
137     servers.addOption(
138         new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport."));
139     servers.addOption(new Option("hsha", false, "Use the THsHaServer. This implies the framed transport."));
140     servers.addOption(new Option("threadpool", false, "Use the TThreadPoolServer. This is the default."));
141     options.addOptionGroup(servers);
142     return options;
143   }
144 
145   private static CommandLine parseArguments(Configuration conf, Options options, String[] args)
146       throws ParseException, IOException {
147     CommandLineParser parser = new PosixParser();
148     return parser.parse(options, args);
149   }
150 
151   private static TProtocolFactory getTProtocolFactory(boolean isCompact) {
152     if (isCompact) {
153       log.debug("Using compact protocol");
154       return new TCompactProtocol.Factory();
155     } else {
156       log.debug("Using binary protocol");
157       return new TBinaryProtocol.Factory();
158     }
159   }
160 
161   private static TTransportFactory getTTransportFactory(
162       String qop, String name, String host, boolean framed, int frameSize) {
163     if (framed) {
164       if (qop != null) {
165         throw new RuntimeException("Thrift server authentication"
166           + " doesn't work with framed transport yet");
167       }
168       log.debug("Using framed transport");
169       return new TFramedTransport.Factory(frameSize);
170     } else if (qop == null) {
171       return new TTransportFactory();
172     } else {
173       Map<String, String> saslProperties = new HashMap<String, String>();
174       saslProperties.put(Sasl.QOP, qop);
175       TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
176       saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
177         new SaslGssCallbackHandler() {
178           @Override
179           public void handle(Callback[] callbacks)
180               throws UnsupportedCallbackException {
181             AuthorizeCallback ac = null;
182             for (Callback callback : callbacks) {
183               if (callback instanceof AuthorizeCallback) {
184                 ac = (AuthorizeCallback) callback;
185               } else {
186                 throw new UnsupportedCallbackException(callback,
187                     "Unrecognized SASL GSSAPI Callback");
188               }
189             }
190             if (ac != null) {
191               String authid = ac.getAuthenticationID();
192               String authzid = ac.getAuthorizationID();
193               if (!authid.equals(authzid)) {
194                 ac.setAuthorized(false);
195               } else {
196                 ac.setAuthorized(true);
197                 String userName = SecurityUtil.getUserFromPrincipal(authzid);
198                 log.info("Effective user: " + userName);
199                 ac.setAuthorizedID(userName);
200               }
201             }
202           }
203         });
204       return saslFactory;
205     }
206   }
207 
208   /*
209    * If bindValue is null, we don't bind.
210    */
211   private static InetSocketAddress bindToPort(String bindValue, int listenPort)
212       throws UnknownHostException {
213     try {
214       if (bindValue == null) {
215         return new InetSocketAddress(listenPort);
216       } else {
217         return new InetSocketAddress(InetAddress.getByName(bindValue), listenPort);
218       }
219     } catch (UnknownHostException e) {
220       throw new RuntimeException("Could not bind to provided ip address", e);
221     }
222   }
223 
224   private static TServer getTNonBlockingServer(TProtocolFactory protocolFactory, TProcessor processor,
225       TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException {
226     TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
227     log.info("starting HBase Nonblocking Thrift server on " + inetSocketAddress.toString());
228     TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport);
229     serverArgs.processor(processor);
230     serverArgs.transportFactory(transportFactory);
231     serverArgs.protocolFactory(protocolFactory);
232     return new TNonblockingServer(serverArgs);
233   }
234 
235   private static TServer getTHsHaServer(TProtocolFactory protocolFactory,
236       TProcessor processor, TTransportFactory transportFactory,
237       int workerThreads,
238       InetSocketAddress inetSocketAddress, ThriftMetrics metrics)
239       throws TTransportException {
240     TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
241     log.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString());
242     THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
243     if (workerThreads > 0) {
244       serverArgs.workerThreads(workerThreads);
245     }
246     ExecutorService executorService = createExecutor(
247         serverArgs.getWorkerThreads(), metrics);
248     serverArgs.executorService(executorService);
249     serverArgs.processor(processor);
250     serverArgs.transportFactory(transportFactory);
251     serverArgs.protocolFactory(protocolFactory);
252     return new THsHaServer(serverArgs);
253   }
254 
255   private static ExecutorService createExecutor(
256       int workerThreads, ThriftMetrics metrics) {
257     CallQueue callQueue = new CallQueue(
258         new LinkedBlockingQueue<Call>(), metrics);
259     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
260     tfb.setDaemon(true);
261     tfb.setNameFormat("thrift2-worker-%d");
262     ThreadPoolExecutor pool = new ThreadPoolExecutor(workerThreads, workerThreads,
263             Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
264     pool.prestartAllCoreThreads();
265     return pool;
266   }
267 
268   private static TServer getTThreadPoolServer(TProtocolFactory protocolFactory,
269                                               TProcessor processor,
270                                               TTransportFactory transportFactory,
271                                               int workerThreads,
272                                               InetSocketAddress inetSocketAddress)
273       throws TTransportException {
274     TServerTransport serverTransport = new TServerSocket(inetSocketAddress);
275     log.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString());
276     TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport);
277     serverArgs.processor(processor);
278     serverArgs.transportFactory(transportFactory);
279     serverArgs.protocolFactory(protocolFactory);
280     if (workerThreads > 0) {
281       serverArgs.maxWorkerThreads(workerThreads);
282     }
283     return new TThreadPoolServer(serverArgs);
284   }
285 
286   /**
287    * Adds the option to pre-load filters at startup.
288    *
289    * @param conf  The current configuration instance.
290    */
291   protected static void registerFilters(Configuration conf) {
292     String[] filters = conf.getStrings("hbase.thrift.filters");
293     if(filters != null) {
294       for(String filterClass: filters) {
295         String[] filterPart = filterClass.split(":");
296         if(filterPart.length != 2) {
297           log.warn("Invalid filter specification " + filterClass + " - skipping");
298         } else {
299           ParseFilter.registerFilter(filterPart[0], filterPart[1]);
300         }
301       }
302     }
303   }
304 
305   /**
306    * Start up the Thrift2 server.
307    */
308   public static void main(String[] args) throws Exception {
309     final Configuration conf = HBaseConfiguration.create();
310     // for now, only time we return is on an argument error.
311     final int status = ToolRunner.run(conf, new ThriftServer(), args);
312     System.exit(status);
313   }
314 
315   @Override
316   public int run(String[] args) throws Exception {
317     final Configuration conf = getConf();
318     TServer server = null;
319     Options options = getOptions();
320     CommandLine cmd = parseArguments(conf, options, args);
321     int workerThreads = 0;
322 
323     /**
324      * This is to please both bin/hbase and bin/hbase-daemon. hbase-daemon provides "start" and "stop" arguments hbase
325      * should print the help if no argument is provided
326      */
327     List<?> argList = cmd.getArgList();
328     if (cmd.hasOption("help") || !argList.contains("start") || argList.contains("stop")) {
329       printUsage();
330       return 1;
331     }
332 
333     // Get address to bind
334     String bindAddress;
335     if (cmd.hasOption("bind")) {
336       bindAddress = cmd.getOptionValue("bind");
337       conf.set("hbase.thrift.info.bindAddress", bindAddress);
338     } else {
339       bindAddress = conf.get("hbase.thrift.info.bindAddress");
340     }
341 
342     // Get port to bind to
343     int listenPort = 0;
344     try {
345       if (cmd.hasOption("port")) {
346         listenPort = Integer.parseInt(cmd.getOptionValue("port"));
347       } else {
348         listenPort = conf.getInt("hbase.regionserver.thrift.port", DEFAULT_LISTEN_PORT);
349       }
350     } catch (NumberFormatException e) {
351       throw new RuntimeException("Could not parse the value provided for the port option", e);
352     }
353 
354     // Local hostname and user name,
355     // used only if QOP is configured.
356     String host = null;
357     String name = null;
358 
359     UserProvider userProvider = UserProvider.instantiate(conf);
360     // login the server principal (if using secure Hadoop)
361     boolean securityEnabled = userProvider.isHadoopSecurityEnabled()
362       && userProvider.isHBaseSecurityEnabled();
363     if (securityEnabled) {
364       host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
365         conf.get("hbase.thrift.dns.interface", "default"),
366         conf.get("hbase.thrift.dns.nameserver", "default")));
367       userProvider.login("hbase.thrift.keytab.file",
368         "hbase.thrift.kerberos.principal", host);
369     }
370 
371     UserGroupInformation realUser = userProvider.getCurrent().getUGI();
372     String qop = conf.get(THRIFT_QOP_KEY);
373     if (qop != null) {
374       if (!qop.equals("auth") && !qop.equals("auth-int")
375           && !qop.equals("auth-conf")) {
376         throw new IOException("Invalid " + THRIFT_QOP_KEY + ": " + qop
377           + ", it must be 'auth', 'auth-int', or 'auth-conf'");
378       }
379       if (!securityEnabled) {
380         throw new IOException("Thrift server must"
381           + " run in secure mode to support authentication");
382       }
383       // Extract the name from the principal
384       name = SecurityUtil.getUserFromPrincipal(
385         conf.get("hbase.thrift.kerberos.principal"));
386     }
387 
388     boolean nonblocking = cmd.hasOption("nonblocking");
389     boolean hsha = cmd.hasOption("hsha");
390 
391     ThriftMetrics metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.TWO);
392 
393     String implType = "threadpool";
394     if (nonblocking) {
395       implType = "nonblocking";
396     } else if (hsha) {
397       implType = "hsha";
398     }
399 
400     conf.set("hbase.regionserver.thrift.server.type", implType);
401     conf.setInt("hbase.regionserver.thrift.port", listenPort);
402     registerFilters(conf);
403 
404     // Construct correct ProtocolFactory
405     boolean compact = cmd.hasOption("compact") ||
406         conf.getBoolean("hbase.regionserver.thrift.compact", false);
407     TProtocolFactory protocolFactory = getTProtocolFactory(compact);
408     final ThriftHBaseServiceHandler hbaseHandler =
409       new ThriftHBaseServiceHandler(conf, userProvider);
410     THBaseService.Iface handler =
411       ThriftHBaseServiceHandler.newInstance(hbaseHandler, metrics);
412     final THBaseService.Processor p = new THBaseService.Processor(handler);
413     conf.setBoolean("hbase.regionserver.thrift.compact", compact);
414     TProcessor processor = p;
415 
416     boolean framed = cmd.hasOption("framed") ||
417         conf.getBoolean("hbase.regionserver.thrift.framed", false) || nonblocking || hsha;
418     TTransportFactory transportFactory = getTTransportFactory(qop, name, host, framed,
419         conf.getInt("hbase.regionserver.thrift.framed.max_frame_size_in_mb", 2) * 1024 * 1024);
420     InetSocketAddress inetSocketAddress = bindToPort(bindAddress, listenPort);
421     conf.setBoolean("hbase.regionserver.thrift.framed", framed);
422     if (qop != null) {
423       // Create a processor wrapper, to get the caller
424       processor = new TProcessor() {
425         @Override
426         public boolean process(TProtocol inProt,
427             TProtocol outProt) throws TException {
428           TSaslServerTransport saslServerTransport =
429             (TSaslServerTransport)inProt.getTransport();
430           SaslServer saslServer = saslServerTransport.getSaslServer();
431           String principal = saslServer.getAuthorizationID();
432           hbaseHandler.setEffectiveUser(principal);
433           return p.process(inProt, outProt);
434         }
435       };
436     }
437 
438     if (cmd.hasOption("w")) {
439       workerThreads = Integer.parseInt(cmd.getOptionValue("w"));
440     }
441 
442     // check for user-defined info server port setting, if so override the conf
443     try {
444       if (cmd.hasOption("infoport")) {
445         String val = cmd.getOptionValue("infoport");
446         conf.setInt("hbase.thrift.info.port", Integer.parseInt(val));
447         log.debug("Web UI port set to " + val);
448       }
449     } catch (NumberFormatException e) {
450       log.error("Could not parse the value provided for the infoport option", e);
451       printUsage();
452       System.exit(1);
453     }
454 
455     // Put up info server.
456     int port = conf.getInt("hbase.thrift.info.port", 9095);
457     if (port >= 0) {
458       conf.setLong("startcode", System.currentTimeMillis());
459       String a = conf.get("hbase.thrift.info.bindAddress", "0.0.0.0");
460       InfoServer infoServer = new InfoServer("thrift", a, port, false, conf);
461       infoServer.setAttribute("hbase.conf", conf);
462       infoServer.start();
463     }
464 
465     if (nonblocking) {
466       server = getTNonBlockingServer(protocolFactory,
467           processor,
468           transportFactory,
469           inetSocketAddress);
470     } else if (hsha) {
471       server = getTHsHaServer(protocolFactory,
472           processor,
473           transportFactory,
474           workerThreads,
475           inetSocketAddress,
476           metrics);
477     } else {
478       server = getTThreadPoolServer(protocolFactory,
479           processor,
480           transportFactory,
481           workerThreads,
482           inetSocketAddress);
483     }
484 
485     final TServer tserver = server;
486     realUser.doAs(
487       new PrivilegedAction<Object>() {
488         @Override
489         public Object run() {
490           tserver.serve();
491           return null;
492         }
493       });
494     // when tserver.stop eventually happens we'll get here.
495     return 0;
496   }
497 }