View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.thrift2;
20  
21  import java.io.IOException;
22  import java.net.InetAddress;
23  import java.net.InetSocketAddress;
24  import java.net.UnknownHostException;
25  import java.security.PrivilegedAction;
26  import java.util.HashMap;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.concurrent.ExecutorService;
30  import java.util.concurrent.LinkedBlockingQueue;
31  import java.util.concurrent.ThreadPoolExecutor;
32  import java.util.concurrent.TimeUnit;
33  
34  import javax.security.auth.callback.Callback;
35  import javax.security.auth.callback.UnsupportedCallbackException;
36  import javax.security.sasl.AuthorizeCallback;
37  import javax.security.sasl.Sasl;
38  import javax.security.sasl.SaslServer;
39  
40  import org.apache.commons.cli.CommandLine;
41  import org.apache.commons.cli.CommandLineParser;
42  import org.apache.commons.cli.HelpFormatter;
43  import org.apache.commons.cli.Option;
44  import org.apache.commons.cli.OptionGroup;
45  import org.apache.commons.cli.Options;
46  import org.apache.commons.cli.ParseException;
47  import org.apache.commons.cli.PosixParser;
48  import org.apache.commons.logging.Log;
49  import org.apache.commons.logging.LogFactory;
50  import org.apache.hadoop.classification.InterfaceAudience;
51  import org.apache.hadoop.conf.Configuration;
52  import org.apache.hadoop.hbase.HBaseConfiguration;
53  import org.apache.hadoop.hbase.filter.ParseFilter;
54  import org.apache.hadoop.hbase.http.InfoServer;
55  import org.apache.hadoop.hbase.security.SecurityUtil;
56  import org.apache.hadoop.hbase.security.UserProvider;
57  import org.apache.hadoop.hbase.thrift.CallQueue;
58  import org.apache.hadoop.hbase.thrift.CallQueue.Call;
59  import org.apache.hadoop.hbase.thrift.ThriftMetrics;
60  import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
61  import org.apache.hadoop.hbase.util.Strings;
62  import org.apache.hadoop.net.DNS;
63  import org.apache.hadoop.security.UserGroupInformation;
64  import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
65  import org.apache.hadoop.util.GenericOptionsParser;
66  import org.apache.thrift.TException;
67  import org.apache.thrift.TProcessor;
68  import org.apache.thrift.protocol.TBinaryProtocol;
69  import org.apache.thrift.protocol.TCompactProtocol;
70  import org.apache.thrift.protocol.TProtocol;
71  import org.apache.thrift.protocol.TProtocolFactory;
72  import org.apache.thrift.server.THsHaServer;
73  import org.apache.thrift.server.TNonblockingServer;
74  import org.apache.thrift.server.TServer;
75  import org.apache.thrift.server.TThreadPoolServer;
76  import org.apache.thrift.transport.TFramedTransport;
77  import org.apache.thrift.transport.TNonblockingServerSocket;
78  import org.apache.thrift.transport.TNonblockingServerTransport;
79  import org.apache.thrift.transport.TSaslServerTransport;
80  import org.apache.thrift.transport.TServerSocket;
81  import org.apache.thrift.transport.TServerTransport;
82  import org.apache.thrift.transport.TTransportException;
83  import org.apache.thrift.transport.TTransportFactory;
84  
85  import com.google.common.util.concurrent.ThreadFactoryBuilder;
86  
87  /**
88   * ThriftServer - this class starts up a Thrift server which implements the HBase API specified in the
89   * HbaseClient.thrift IDL file.
90   */
91  @InterfaceAudience.Private
92  @SuppressWarnings({ "rawtypes", "unchecked" })
93  public class ThriftServer {
94    private static final Log log = LogFactory.getLog(ThriftServer.class);
95  
96    /**
97     * Thrift quality of protection configuration key. Valid values can be:
98     * auth-conf: authentication, integrity and confidentiality checking
99     * auth-int: authentication and integrity checking
100    * auth: authentication only
101    *
102    * This is used to authenticate the callers and support impersonation.
103    * The thrift server and the HBase cluster must run in secure mode.
104    */
105   static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
106 
107   public static final int DEFAULT_LISTEN_PORT = 9090;
108 
109   
110   public ThriftServer() {
111   }
112 
113   private static void printUsage() {
114     HelpFormatter formatter = new HelpFormatter();
115     formatter.printHelp("Thrift", null, getOptions(),
116         "To start the Thrift server run 'bin/hbase-daemon.sh start thrift2'\n" +
117             "To shutdown the thrift server run 'bin/hbase-daemon.sh stop thrift2' or" +
118             " send a kill signal to the thrift server pid",
119         true);
120   }
121 
122   private static Options getOptions() {
123     Options options = new Options();
124     options.addOption("b", "bind", true,
125         "Address to bind the Thrift server to. [default: 0.0.0.0]");
126     options.addOption("p", "port", true, "Port to bind to [default: " + DEFAULT_LISTEN_PORT + "]");
127     options.addOption("f", "framed", false, "Use framed transport");
128     options.addOption("c", "compact", false, "Use the compact protocol");
129     options.addOption("h", "help", false, "Print help information");
130     options.addOption(null, "infoport", true, "Port for web UI");
131 
132     OptionGroup servers = new OptionGroup();
133     servers.addOption(
134         new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport."));
135     servers.addOption(new Option("hsha", false, "Use the THsHaServer. This implies the framed transport."));
136     servers.addOption(new Option("threadpool", false, "Use the TThreadPoolServer. This is the default."));
137     options.addOptionGroup(servers);
138     return options;
139   }
140 
141   private static CommandLine parseArguments(Configuration conf, Options options, String[] args)
142       throws ParseException, IOException {
143     GenericOptionsParser genParser = new GenericOptionsParser(conf, args);
144     String[] remainingArgs = genParser.getRemainingArgs();
145     CommandLineParser parser = new PosixParser();
146     return parser.parse(options, remainingArgs);
147   }
148 
149   private static TProtocolFactory getTProtocolFactory(boolean isCompact) {
150     if (isCompact) {
151       log.debug("Using compact protocol");
152       return new TCompactProtocol.Factory();
153     } else {
154       log.debug("Using binary protocol");
155       return new TBinaryProtocol.Factory();
156     }
157   }
158 
159   private static TTransportFactory getTTransportFactory(
160       String qop, String name, String host, boolean framed, int frameSize) {
161     if (framed) {
162       if (qop != null) {
163         throw new RuntimeException("Thrift server authentication"
164           + " doesn't work with framed transport yet");
165       }
166       log.debug("Using framed transport");
167       return new TFramedTransport.Factory(frameSize);
168     } else if (qop == null) {
169       return new TTransportFactory();
170     } else {
171       Map<String, String> saslProperties = new HashMap<String, String>();
172       saslProperties.put(Sasl.QOP, qop);
173       TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
174       saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
175         new SaslGssCallbackHandler() {
176           @Override
177           public void handle(Callback[] callbacks)
178               throws UnsupportedCallbackException {
179             AuthorizeCallback ac = null;
180             for (Callback callback : callbacks) {
181               if (callback instanceof AuthorizeCallback) {
182                 ac = (AuthorizeCallback) callback;
183               } else {
184                 throw new UnsupportedCallbackException(callback,
185                     "Unrecognized SASL GSSAPI Callback");
186               }
187             }
188             if (ac != null) {
189               String authid = ac.getAuthenticationID();
190               String authzid = ac.getAuthorizationID();
191               if (!authid.equals(authzid)) {
192                 ac.setAuthorized(false);
193               } else {
194                 ac.setAuthorized(true);
195                 String userName = SecurityUtil.getUserFromPrincipal(authzid);
196                 log.info("Effective user: " + userName);
197                 ac.setAuthorizedID(userName);
198               }
199             }
200           }
201         });
202       return saslFactory;
203     }
204   }
205 
206   /*
207    * If bindValue is null, we don't bind.
208    */
209   private static InetSocketAddress bindToPort(String bindValue, int listenPort)
210       throws UnknownHostException {
211     try {
212       if (bindValue == null) {
213         return new InetSocketAddress(listenPort);
214       } else {
215         return new InetSocketAddress(InetAddress.getByName(bindValue), listenPort);
216       }
217     } catch (UnknownHostException e) {
218       throw new RuntimeException("Could not bind to provided ip address", e);
219     }
220   }
221 
222   private static TServer getTNonBlockingServer(TProtocolFactory protocolFactory, TProcessor processor,
223       TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException {
224     TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
225     log.info("starting HBase Nonblocking Thrift server on " + inetSocketAddress.toString());
226     TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport);
227     serverArgs.processor(processor);
228     serverArgs.transportFactory(transportFactory);
229     serverArgs.protocolFactory(protocolFactory);
230     return new TNonblockingServer(serverArgs);
231   }
232 
233   private static TServer getTHsHaServer(TProtocolFactory protocolFactory,
234       TProcessor processor, TTransportFactory transportFactory,
235       InetSocketAddress inetSocketAddress, ThriftMetrics metrics)
236       throws TTransportException {
237     TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
238     log.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString());
239     THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
240     ExecutorService executorService = createExecutor(
241         serverArgs.getWorkerThreads(), metrics);
242     serverArgs.executorService(executorService);
243     serverArgs.processor(processor);
244     serverArgs.transportFactory(transportFactory);
245     serverArgs.protocolFactory(protocolFactory);
246     return new THsHaServer(serverArgs);
247   }
248 
249   private static ExecutorService createExecutor(
250       int workerThreads, ThriftMetrics metrics) {
251     CallQueue callQueue = new CallQueue(
252         new LinkedBlockingQueue<Call>(), metrics);
253     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
254     tfb.setDaemon(true);
255     tfb.setNameFormat("thrift2-worker-%d");
256     return new ThreadPoolExecutor(workerThreads, workerThreads,
257             Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
258   }
259 
260   private static TServer getTThreadPoolServer(TProtocolFactory protocolFactory, TProcessor processor,
261       TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException {
262     TServerTransport serverTransport = new TServerSocket(inetSocketAddress);
263     log.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString());
264     TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport);
265     serverArgs.processor(processor);
266     serverArgs.transportFactory(transportFactory);
267     serverArgs.protocolFactory(protocolFactory);
268     return new TThreadPoolServer(serverArgs);
269   }
270 
271   /**
272    * Adds the option to pre-load filters at startup.
273    *
274    * @param conf  The current configuration instance.
275    */
276   protected static void registerFilters(Configuration conf) {
277     String[] filters = conf.getStrings("hbase.thrift.filters");
278     if(filters != null) {
279       for(String filterClass: filters) {
280         String[] filterPart = filterClass.split(":");
281         if(filterPart.length != 2) {
282           log.warn("Invalid filter specification " + filterClass + " - skipping");
283         } else {
284           ParseFilter.registerFilter(filterPart[0], filterPart[1]);
285         }
286       }
287     }
288   }
289 
290   /**
291    * Start up the Thrift2 server.
292    *
293    * @param args
294    */
295   public static void main(String[] args) throws Exception {
296     TServer server = null;
297     Options options = getOptions();
298     Configuration conf = HBaseConfiguration.create();
299     CommandLine cmd = parseArguments(conf, options, args);
300 
301     /**
302      * This is to please both bin/hbase and bin/hbase-daemon. hbase-daemon provides "start" and "stop" arguments hbase
303      * should print the help if no argument is provided
304      */
305     List<?> argList = cmd.getArgList();
306     if (cmd.hasOption("help") || !argList.contains("start") || argList.contains("stop")) {
307       printUsage();
308       System.exit(1);
309     }
310 
311     // Get port to bind to
312     int listenPort = 0;
313     try {
314       if (cmd.hasOption("port")) {
315         listenPort = Integer.parseInt(cmd.getOptionValue("port"));
316       } else {
317         listenPort = conf.getInt("hbase.regionserver.thrift.port", DEFAULT_LISTEN_PORT);
318       }
319     } catch (NumberFormatException e) {
320       throw new RuntimeException("Could not parse the value provided for the port option", e);
321     }
322 
323     // Local hostname and user name,
324     // used only if QOP is configured.
325     String host = null;
326     String name = null;
327 
328     UserProvider userProvider = UserProvider.instantiate(conf);
329     // login the server principal (if using secure Hadoop)
330     boolean securityEnabled = userProvider.isHadoopSecurityEnabled()
331       && userProvider.isHBaseSecurityEnabled();
332     if (securityEnabled) {
333       host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
334         conf.get("hbase.thrift.dns.interface", "default"),
335         conf.get("hbase.thrift.dns.nameserver", "default")));
336       userProvider.login("hbase.thrift.keytab.file",
337         "hbase.thrift.kerberos.principal", host);
338     }
339 
340     UserGroupInformation realUser = userProvider.getCurrent().getUGI();
341     String qop = conf.get(THRIFT_QOP_KEY);
342     if (qop != null) {
343       if (!qop.equals("auth") && !qop.equals("auth-int")
344           && !qop.equals("auth-conf")) {
345         throw new IOException("Invalid " + THRIFT_QOP_KEY + ": " + qop
346           + ", it must be 'auth', 'auth-int', or 'auth-conf'");
347       }
348       if (!securityEnabled) {
349         throw new IOException("Thrift server must"
350           + " run in secure mode to support authentication");
351       }
352       // Extract the name from the principal
353       name = SecurityUtil.getUserFromPrincipal(
354         conf.get("hbase.thrift.kerberos.principal"));
355     }
356 
357     boolean nonblocking = cmd.hasOption("nonblocking");
358     boolean hsha = cmd.hasOption("hsha");
359 
360     ThriftMetrics metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.TWO);
361 
362     String implType = "threadpool";
363     if (nonblocking) {
364       implType = "nonblocking";
365     } else if (hsha) {
366       implType = "hsha";
367     }
368 
369     conf.set("hbase.regionserver.thrift.server.type", implType);
370     conf.setInt("hbase.regionserver.thrift.port", listenPort);
371     registerFilters(conf);
372 
373     // Construct correct ProtocolFactory
374     boolean compact = cmd.hasOption("compact") ||
375         conf.getBoolean("hbase.regionserver.thrift.compact", false);
376     TProtocolFactory protocolFactory = getTProtocolFactory(compact);
377     final ThriftHBaseServiceHandler hbaseHandler =
378       new ThriftHBaseServiceHandler(conf, userProvider);
379     THBaseService.Iface handler =
380       ThriftHBaseServiceHandler.newInstance(hbaseHandler, metrics);
381     final THBaseService.Processor p = new THBaseService.Processor(handler);
382     conf.setBoolean("hbase.regionserver.thrift.compact", compact);
383     TProcessor processor = p;
384 
385     boolean framed = cmd.hasOption("framed") ||
386         conf.getBoolean("hbase.regionserver.thrift.framed", false) || nonblocking || hsha;
387     TTransportFactory transportFactory = getTTransportFactory(qop, name, host, framed,
388         conf.getInt("hbase.regionserver.thrift.framed.max_frame_size_in_mb", 2) * 1024 * 1024);
389     InetSocketAddress inetSocketAddress = bindToPort(cmd.getOptionValue("bind"), listenPort);
390     conf.setBoolean("hbase.regionserver.thrift.framed", framed);
391     if (qop != null) {
392       // Create a processor wrapper, to get the caller
393       processor = new TProcessor() {
394         @Override
395         public boolean process(TProtocol inProt,
396             TProtocol outProt) throws TException {
397           TSaslServerTransport saslServerTransport =
398             (TSaslServerTransport)inProt.getTransport();
399           SaslServer saslServer = saslServerTransport.getSaslServer();
400           String principal = saslServer.getAuthorizationID();
401           hbaseHandler.setEffectiveUser(principal);
402           return p.process(inProt, outProt);
403         }
404       };
405     }
406 
407     // check for user-defined info server port setting, if so override the conf
408     try {
409       if (cmd.hasOption("infoport")) {
410         String val = cmd.getOptionValue("infoport");
411         conf.setInt("hbase.thrift.info.port", Integer.valueOf(val));
412         log.debug("Web UI port set to " + val);
413       }
414     } catch (NumberFormatException e) {
415       log.error("Could not parse the value provided for the infoport option", e);
416       printUsage();
417       System.exit(1);
418     }
419 
420     // Put up info server.
421     int port = conf.getInt("hbase.thrift.info.port", 9095);
422     if (port >= 0) {
423       conf.setLong("startcode", System.currentTimeMillis());
424       String a = conf.get("hbase.thrift.info.bindAddress", "0.0.0.0");
425       InfoServer infoServer = new InfoServer("thrift", a, port, false, conf);
426       infoServer.setAttribute("hbase.conf", conf);
427       infoServer.start();
428     }
429 
430     if (nonblocking) {
431       server = getTNonBlockingServer(protocolFactory, processor, transportFactory, inetSocketAddress);
432     } else if (hsha) {
433       server = getTHsHaServer(protocolFactory, processor, transportFactory, inetSocketAddress, metrics);
434     } else {
435       server = getTThreadPoolServer(protocolFactory, processor, transportFactory, inetSocketAddress);
436     }
437 
438     final TServer tserver = server;
439     realUser.doAs(
440       new PrivilegedAction<Object>() {
441         @Override
442         public Object run() {
443           tserver.serve();
444           return null;
445         }
446       });
447   }
448 }