View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.thrift2;
20  
21  import java.io.IOException;
22  import java.net.InetAddress;
23  import java.net.InetSocketAddress;
24  import java.net.UnknownHostException;
25  import java.security.PrivilegedAction;
26  import java.util.HashMap;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.concurrent.ExecutorService;
30  import java.util.concurrent.LinkedBlockingQueue;
31  import java.util.concurrent.ThreadPoolExecutor;
32  import java.util.concurrent.TimeUnit;
33  
34  import javax.security.auth.callback.Callback;
35  import javax.security.auth.callback.UnsupportedCallbackException;
36  import javax.security.sasl.AuthorizeCallback;
37  import javax.security.sasl.Sasl;
38  import javax.security.sasl.SaslServer;
39  
40  import org.apache.commons.cli.CommandLine;
41  import org.apache.commons.cli.CommandLineParser;
42  import org.apache.commons.cli.HelpFormatter;
43  import org.apache.commons.cli.Option;
44  import org.apache.commons.cli.OptionGroup;
45  import org.apache.commons.cli.Options;
46  import org.apache.commons.cli.ParseException;
47  import org.apache.commons.cli.PosixParser;
48  import org.apache.commons.logging.Log;
49  import org.apache.commons.logging.LogFactory;
50  import org.apache.hadoop.hbase.classification.InterfaceAudience;
51  import org.apache.hadoop.conf.Configuration;
52  import org.apache.hadoop.hbase.HBaseConfiguration;
53  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
54  import org.apache.hadoop.hbase.filter.ParseFilter;
55  import org.apache.hadoop.hbase.http.InfoServer;
56  import org.apache.hadoop.hbase.security.SaslUtil;
57  import org.apache.hadoop.hbase.security.SecurityUtil;
58  import org.apache.hadoop.hbase.security.UserProvider;
59  import org.apache.hadoop.hbase.thrift.CallQueue;
60  import org.apache.hadoop.hbase.thrift.CallQueue.Call;
61  import org.apache.hadoop.hbase.thrift.ThriftMetrics;
62  import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
63  import org.apache.hadoop.hbase.util.DNS;
64  import org.apache.hadoop.hbase.util.Strings;
65  import org.apache.hadoop.security.UserGroupInformation;
66  import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
67  import org.apache.hadoop.util.GenericOptionsParser;
68  import org.apache.thrift.TException;
69  import org.apache.thrift.TProcessor;
70  import org.apache.thrift.protocol.TBinaryProtocol;
71  import org.apache.thrift.protocol.TCompactProtocol;
72  import org.apache.thrift.protocol.TProtocol;
73  import org.apache.thrift.protocol.TProtocolFactory;
74  import org.apache.thrift.server.THsHaServer;
75  import org.apache.thrift.server.TNonblockingServer;
76  import org.apache.thrift.server.TServer;
77  import org.apache.thrift.server.TThreadPoolServer;
78  import org.apache.thrift.transport.TFramedTransport;
79  import org.apache.thrift.transport.TNonblockingServerSocket;
80  import org.apache.thrift.transport.TNonblockingServerTransport;
81  import org.apache.thrift.transport.TSaslServerTransport;
82  import org.apache.thrift.transport.TServerSocket;
83  import org.apache.thrift.transport.TServerTransport;
84  import org.apache.thrift.transport.TTransportException;
85  import org.apache.thrift.transport.TTransportFactory;
86  
87  import com.google.common.util.concurrent.ThreadFactoryBuilder;
88  
89  /**
90   * ThriftServer - this class starts up a Thrift server which implements the HBase API specified in the
91   * HbaseClient.thrift IDL file.
92   */
93  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
94  @SuppressWarnings({ "rawtypes", "unchecked" })
95  public class ThriftServer {
96    private static final Log log = LogFactory.getLog(ThriftServer.class);
97  
98    /**
99     * Thrift quality of protection configuration key. Valid values can be:
100    * privacy: authentication, integrity and confidentiality checking
101    * integrity: authentication and integrity checking
102    * authentication: authentication only
103    *
104    * This is used to authenticate the callers and support impersonation.
105    * The thrift server and the HBase cluster must run in secure mode.
106    */
107   static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
108 
109   public static final int DEFAULT_LISTEN_PORT = 9090;
110 
111 
112   public ThriftServer() {
113   }
114 
115   private static void printUsage() {
116     HelpFormatter formatter = new HelpFormatter();
117     formatter.printHelp("Thrift", null, getOptions(),
118         "To start the Thrift server run 'bin/hbase-daemon.sh start thrift2'\n" +
119             "To shutdown the thrift server run 'bin/hbase-daemon.sh stop thrift2' or" +
120             " send a kill signal to the thrift server pid",
121         true);
122   }
123 
124   private static Options getOptions() {
125     Options options = new Options();
126     options.addOption("b", "bind", true,
127         "Address to bind the Thrift server to. [default: 0.0.0.0]");
128     options.addOption("p", "port", true, "Port to bind to [default: " + DEFAULT_LISTEN_PORT + "]");
129     options.addOption("f", "framed", false, "Use framed transport");
130     options.addOption("c", "compact", false, "Use the compact protocol");
131     options.addOption("h", "help", false, "Print help information");
132     options.addOption(null, "infoport", true, "Port for web UI");
133 
134     OptionGroup servers = new OptionGroup();
135     servers.addOption(
136         new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport."));
137     servers.addOption(new Option("hsha", false, "Use the THsHaServer. This implies the framed transport."));
138     servers.addOption(new Option("threadpool", false, "Use the TThreadPoolServer. This is the default."));
139     options.addOptionGroup(servers);
140     return options;
141   }
142 
143   private static CommandLine parseArguments(Configuration conf, Options options, String[] args)
144       throws ParseException, IOException {
145     GenericOptionsParser genParser = new GenericOptionsParser(conf, args);
146     String[] remainingArgs = genParser.getRemainingArgs();
147     CommandLineParser parser = new PosixParser();
148     return parser.parse(options, remainingArgs);
149   }
150 
151   private static TProtocolFactory getTProtocolFactory(boolean isCompact) {
152     if (isCompact) {
153       log.debug("Using compact protocol");
154       return new TCompactProtocol.Factory();
155     } else {
156       log.debug("Using binary protocol");
157       return new TBinaryProtocol.Factory();
158     }
159   }
160 
161   private static TTransportFactory getTTransportFactory(
162       SaslUtil.QualityOfProtection qop, String name, String host,
163       boolean framed, int frameSize) {
164     if (framed) {
165       if (qop != null) {
166         throw new RuntimeException("Thrift server authentication"
167           + " doesn't work with framed transport yet");
168       }
169       log.debug("Using framed transport");
170       return new TFramedTransport.Factory(frameSize);
171     } else if (qop == null) {
172       return new TTransportFactory();
173     } else {
174       Map<String, String> saslProperties = new HashMap<String, String>();
175       saslProperties.put(Sasl.QOP, qop.getSaslQop());
176       TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
177       saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
178         new SaslGssCallbackHandler() {
179           @Override
180           public void handle(Callback[] callbacks)
181               throws UnsupportedCallbackException {
182             AuthorizeCallback ac = null;
183             for (Callback callback : callbacks) {
184               if (callback instanceof AuthorizeCallback) {
185                 ac = (AuthorizeCallback) callback;
186               } else {
187                 throw new UnsupportedCallbackException(callback,
188                     "Unrecognized SASL GSSAPI Callback");
189               }
190             }
191             if (ac != null) {
192               String authid = ac.getAuthenticationID();
193               String authzid = ac.getAuthorizationID();
194               if (!authid.equals(authzid)) {
195                 ac.setAuthorized(false);
196               } else {
197                 ac.setAuthorized(true);
198                 String userName = SecurityUtil.getUserFromPrincipal(authzid);
199                 log.info("Effective user: " + userName);
200                 ac.setAuthorizedID(userName);
201               }
202             }
203           }
204         });
205       return saslFactory;
206     }
207   }
208 
209   /*
210    * If bindValue is null, we don't bind.
211    */
212   private static InetSocketAddress bindToPort(String bindValue, int listenPort)
213       throws UnknownHostException {
214     try {
215       if (bindValue == null) {
216         return new InetSocketAddress(listenPort);
217       } else {
218         return new InetSocketAddress(InetAddress.getByName(bindValue), listenPort);
219       }
220     } catch (UnknownHostException e) {
221       throw new RuntimeException("Could not bind to provided ip address", e);
222     }
223   }
224 
225   private static TServer getTNonBlockingServer(TProtocolFactory protocolFactory, TProcessor processor,
226       TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException {
227     TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
228     log.info("starting HBase Nonblocking Thrift server on " + inetSocketAddress.toString());
229     TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport);
230     serverArgs.processor(processor);
231     serverArgs.transportFactory(transportFactory);
232     serverArgs.protocolFactory(protocolFactory);
233     return new TNonblockingServer(serverArgs);
234   }
235 
236   private static TServer getTHsHaServer(TProtocolFactory protocolFactory,
237       TProcessor processor, TTransportFactory transportFactory,
238       InetSocketAddress inetSocketAddress, ThriftMetrics metrics)
239       throws TTransportException {
240     TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
241     log.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString());
242     THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
243     ExecutorService executorService = createExecutor(
244         serverArgs.getWorkerThreads(), metrics);
245     serverArgs.executorService(executorService);
246     serverArgs.processor(processor);
247     serverArgs.transportFactory(transportFactory);
248     serverArgs.protocolFactory(protocolFactory);
249     return new THsHaServer(serverArgs);
250   }
251 
252   private static ExecutorService createExecutor(
253       int workerThreads, ThriftMetrics metrics) {
254     CallQueue callQueue = new CallQueue(
255         new LinkedBlockingQueue<Call>(), metrics);
256     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
257     tfb.setDaemon(true);
258     tfb.setNameFormat("thrift2-worker-%d");
259     return new ThreadPoolExecutor(workerThreads, workerThreads,
260             Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
261   }
262 
263   private static TServer getTThreadPoolServer(TProtocolFactory protocolFactory, TProcessor processor,
264       TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException {
265     TServerTransport serverTransport = new TServerSocket(inetSocketAddress);
266     log.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString());
267     TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport);
268     serverArgs.processor(processor);
269     serverArgs.transportFactory(transportFactory);
270     serverArgs.protocolFactory(protocolFactory);
271     return new TThreadPoolServer(serverArgs);
272   }
273 
274   /**
275    * Adds the option to pre-load filters at startup.
276    *
277    * @param conf  The current configuration instance.
278    */
279   protected static void registerFilters(Configuration conf) {
280     String[] filters = conf.getStrings("hbase.thrift.filters");
281     if(filters != null) {
282       for(String filterClass: filters) {
283         String[] filterPart = filterClass.split(":");
284         if(filterPart.length != 2) {
285           log.warn("Invalid filter specification " + filterClass + " - skipping");
286         } else {
287           ParseFilter.registerFilter(filterPart[0], filterPart[1]);
288         }
289       }
290     }
291   }
292 
293   /**
294    * Start up the Thrift2 server.
295    *
296    * @param args
297    */
298   public static void main(String[] args) throws Exception {
299     TServer server = null;
300     Options options = getOptions();
301     Configuration conf = HBaseConfiguration.create();
302     CommandLine cmd = parseArguments(conf, options, args);
303 
304     /**
305      * This is to please both bin/hbase and bin/hbase-daemon. hbase-daemon provides "start" and "stop" arguments hbase
306      * should print the help if no argument is provided
307      */
308     List<?> argList = cmd.getArgList();
309     if (cmd.hasOption("help") || !argList.contains("start") || argList.contains("stop")) {
310       printUsage();
311       System.exit(1);
312     }
313 
314     // Get address to bind
315     String bindAddress;
316     if (cmd.hasOption("bind")) {
317       bindAddress = cmd.getOptionValue("bind");
318       conf.set("hbase.thrift.info.bindAddress", bindAddress);
319     } else {
320       bindAddress = conf.get("hbase.thrift.info.bindAddress");
321     }
322 
323     // Get port to bind to
324     int listenPort = 0;
325     try {
326       if (cmd.hasOption("port")) {
327         listenPort = Integer.parseInt(cmd.getOptionValue("port"));
328       } else {
329         listenPort = conf.getInt("hbase.regionserver.thrift.port", DEFAULT_LISTEN_PORT);
330       }
331     } catch (NumberFormatException e) {
332       throw new RuntimeException("Could not parse the value provided for the port option", e);
333     }
334 
335     // Local hostname and user name,
336     // used only if QOP is configured.
337     String host = null;
338     String name = null;
339 
340     UserProvider userProvider = UserProvider.instantiate(conf);
341     // login the server principal (if using secure Hadoop)
342     boolean securityEnabled = userProvider.isHadoopSecurityEnabled()
343       && userProvider.isHBaseSecurityEnabled();
344     if (securityEnabled) {
345       host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
346         conf.get("hbase.thrift.dns.interface", "default"),
347         conf.get("hbase.thrift.dns.nameserver", "default")));
348       userProvider.login("hbase.thrift.keytab.file",
349         "hbase.thrift.kerberos.principal", host);
350     }
351 
352     UserGroupInformation realUser = userProvider.getCurrent().getUGI();
353     String stringQop = conf.get(THRIFT_QOP_KEY);
354     SaslUtil.QualityOfProtection qop = null;
355     if (stringQop != null) {
356       qop = SaslUtil.getQop(stringQop);
357       if (!securityEnabled) {
358         throw new IOException("Thrift server must"
359           + " run in secure mode to support authentication");
360       }
361       // Extract the name from the principal
362       name = SecurityUtil.getUserFromPrincipal(
363         conf.get("hbase.thrift.kerberos.principal"));
364     }
365 
366     boolean nonblocking = cmd.hasOption("nonblocking");
367     boolean hsha = cmd.hasOption("hsha");
368 
369     ThriftMetrics metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.TWO);
370 
371     String implType = "threadpool";
372     if (nonblocking) {
373       implType = "nonblocking";
374     } else if (hsha) {
375       implType = "hsha";
376     }
377 
378     conf.set("hbase.regionserver.thrift.server.type", implType);
379     conf.setInt("hbase.regionserver.thrift.port", listenPort);
380     registerFilters(conf);
381 
382     // Construct correct ProtocolFactory
383     boolean compact = cmd.hasOption("compact") ||
384         conf.getBoolean("hbase.regionserver.thrift.compact", false);
385     TProtocolFactory protocolFactory = getTProtocolFactory(compact);
386     final ThriftHBaseServiceHandler hbaseHandler =
387       new ThriftHBaseServiceHandler(conf, userProvider);
388     THBaseService.Iface handler =
389       ThriftHBaseServiceHandler.newInstance(hbaseHandler, metrics);
390     final THBaseService.Processor p = new THBaseService.Processor(handler);
391     conf.setBoolean("hbase.regionserver.thrift.compact", compact);
392     TProcessor processor = p;
393 
394     boolean framed = cmd.hasOption("framed") ||
395         conf.getBoolean("hbase.regionserver.thrift.framed", false) || nonblocking || hsha;
396     TTransportFactory transportFactory = getTTransportFactory(qop, name, host, framed,
397         conf.getInt("hbase.regionserver.thrift.framed.max_frame_size_in_mb", 2) * 1024 * 1024);
398     InetSocketAddress inetSocketAddress = bindToPort(bindAddress, listenPort);
399     conf.setBoolean("hbase.regionserver.thrift.framed", framed);
400     if (qop != null) {
401       // Create a processor wrapper, to get the caller
402       processor = new TProcessor() {
403         @Override
404         public boolean process(TProtocol inProt,
405             TProtocol outProt) throws TException {
406           TSaslServerTransport saslServerTransport =
407             (TSaslServerTransport)inProt.getTransport();
408           SaslServer saslServer = saslServerTransport.getSaslServer();
409           String principal = saslServer.getAuthorizationID();
410           hbaseHandler.setEffectiveUser(principal);
411           return p.process(inProt, outProt);
412         }
413       };
414     }
415 
416     // check for user-defined info server port setting, if so override the conf
417     try {
418       if (cmd.hasOption("infoport")) {
419         String val = cmd.getOptionValue("infoport");
420         conf.setInt("hbase.thrift.info.port", Integer.parseInt(val));
421         log.debug("Web UI port set to " + val);
422       }
423     } catch (NumberFormatException e) {
424       log.error("Could not parse the value provided for the infoport option", e);
425       printUsage();
426       System.exit(1);
427     }
428 
429     // Put up info server.
430     int port = conf.getInt("hbase.thrift.info.port", 9095);
431     if (port >= 0) {
432       conf.setLong("startcode", System.currentTimeMillis());
433       String a = conf.get("hbase.thrift.info.bindAddress", "0.0.0.0");
434       InfoServer infoServer = new InfoServer("thrift", a, port, false, conf);
435       infoServer.setAttribute("hbase.conf", conf);
436       infoServer.start();
437     }
438 
439     if (nonblocking) {
440       server = getTNonBlockingServer(protocolFactory, processor, transportFactory, inetSocketAddress);
441     } else if (hsha) {
442       server = getTHsHaServer(protocolFactory, processor, transportFactory, inetSocketAddress, metrics);
443     } else {
444       server = getTThreadPoolServer(protocolFactory, processor, transportFactory, inetSocketAddress);
445     }
446 
447     final TServer tserver = server;
448     realUser.doAs(
449       new PrivilegedAction<Object>() {
450         @Override
451         public Object run() {
452           tserver.serve();
453           return null;
454         }
455       });
456   }
457 }