View Javadoc

1   /**
2    * Copyright 2011 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.thrift2;
21  
22  import java.io.IOException;
23  import java.net.InetAddress;
24  import java.net.InetSocketAddress;
25  import java.net.UnknownHostException;
26  import java.util.List;
27  import java.util.concurrent.ExecutorService;
28  import java.util.concurrent.LinkedBlockingQueue;
29  import java.util.concurrent.ThreadPoolExecutor;
30  import java.util.concurrent.TimeUnit;
31  
32  import org.apache.commons.cli.CommandLine;
33  import org.apache.commons.cli.CommandLineParser;
34  import org.apache.commons.cli.HelpFormatter;
35  import org.apache.commons.cli.Option;
36  import org.apache.commons.cli.OptionGroup;
37  import org.apache.commons.cli.Options;
38  import org.apache.commons.cli.ParseException;
39  import org.apache.commons.cli.PosixParser;
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.conf.Configuration;
43  import org.apache.hadoop.hbase.HBaseConfiguration;
44  import org.apache.hadoop.hbase.filter.ParseFilter;
45  import org.apache.hadoop.hbase.thrift.CallQueue;
46  import org.apache.hadoop.hbase.thrift.CallQueue.Call;
47  import org.apache.hadoop.hbase.thrift.ThriftMetrics;
48  import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
49  import org.apache.hadoop.hbase.util.InfoServer;
50  import org.apache.hadoop.util.GenericOptionsParser;
51  import org.apache.thrift.protocol.TBinaryProtocol;
52  import org.apache.thrift.protocol.TCompactProtocol;
53  import org.apache.thrift.protocol.TProtocolFactory;
54  import org.apache.thrift.server.THsHaServer;
55  import org.apache.thrift.server.TNonblockingServer;
56  import org.apache.thrift.server.TServer;
57  import org.apache.thrift.server.TThreadPoolServer;
58  import org.apache.thrift.transport.TFramedTransport;
59  import org.apache.thrift.transport.TNonblockingServerSocket;
60  import org.apache.thrift.transport.TNonblockingServerTransport;
61  import org.apache.thrift.transport.TServerSocket;
62  import org.apache.thrift.transport.TServerTransport;
63  import org.apache.thrift.transport.TTransportException;
64  import org.apache.thrift.transport.TTransportFactory;
65  
66  import com.google.common.util.concurrent.ThreadFactoryBuilder;
67  
68  /**
69   * ThriftServer - this class starts up a Thrift server which implements the HBase API specified in the
70   * HbaseClient.thrift IDL file.
71   */
72  @SuppressWarnings({ "rawtypes", "unchecked" })
73  public class ThriftServer {
74    private static final Log log = LogFactory.getLog(ThriftServer.class);
75  
76    public static final String DEFAULT_LISTEN_PORT = "9090";
77  
78    public ThriftServer() {
79    }
80  
81    private static void printUsage() {
82      HelpFormatter formatter = new HelpFormatter();
83      formatter.printHelp("Thrift", null, getOptions(),
84          "To start the Thrift server run 'bin/hbase-daemon.sh start thrift2'\n" +
85              "To shutdown the thrift server run 'bin/hbase-daemon.sh stop thrift2' or" +
86              " send a kill signal to the thrift server pid",
87          true);
88    }
89  
90    private static Options getOptions() {
91      Options options = new Options();
92      options.addOption("b", "bind", true,
93          "Address to bind the Thrift server to. Not supported by the Nonblocking and HsHa server [default: 0.0.0.0]");
94      options.addOption("p", "port", true, "Port to bind to [default: " + DEFAULT_LISTEN_PORT + "]");
95      options.addOption("f", "framed", false, "Use framed transport");
96      options.addOption("c", "compact", false, "Use the compact protocol");
97      options.addOption("h", "help", false, "Print help information");
98      options.addOption(null, "infoport", true, "Port for web UI");
99  
100     OptionGroup servers = new OptionGroup();
101     servers.addOption(
102         new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport."));
103     servers.addOption(new Option("hsha", false, "Use the THsHaServer. This implies the framed transport."));
104     servers.addOption(new Option("threadpool", false, "Use the TThreadPoolServer. This is the default."));
105     options.addOptionGroup(servers);
106     return options;
107   }
108 
109   private static CommandLine parseArguments(Configuration conf, Options options, String[] args)
110       throws ParseException, IOException {
111     GenericOptionsParser genParser = new GenericOptionsParser(conf, args);
112     String[] remainingArgs = genParser.getRemainingArgs();
113     CommandLineParser parser = new PosixParser();
114     return parser.parse(options, remainingArgs);
115   }
116 
117   private static TProtocolFactory getTProtocolFactory(boolean isCompact) {
118     if (isCompact) {
119       log.debug("Using compact protocol");
120       return new TCompactProtocol.Factory();
121     } else {
122       log.debug("Using binary protocol");
123       return new TBinaryProtocol.Factory();
124     }
125   }
126 
127   private static TTransportFactory getTTransportFactory(boolean framed) {
128     if (framed) {
129       log.debug("Using framed transport");
130       return new TFramedTransport.Factory();
131     } else {
132       return new TTransportFactory();
133     }
134   }
135 
136   /*
137    * If bindValue is null, we don't bind. 
138    */
139   private static InetSocketAddress bindToPort(String bindValue, int listenPort)
140       throws UnknownHostException {
141     try {
142       if (bindValue == null) {
143         return new InetSocketAddress(listenPort);
144       } else {
145         return new InetSocketAddress(InetAddress.getByName(bindValue), listenPort);
146       }
147     } catch (UnknownHostException e) {
148       throw new RuntimeException("Could not bind to provided ip address", e);
149     }
150   }
151 
152   private static TServer getTNonBlockingServer(TProtocolFactory protocolFactory, THBaseService.Processor processor,
153       TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException {
154     TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
155     log.info("starting HBase Nonblocking Thrift server on " + inetSocketAddress.toString());
156     TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport);
157     serverArgs.processor(processor);
158     serverArgs.transportFactory(transportFactory);
159     serverArgs.protocolFactory(protocolFactory);
160     return new TNonblockingServer(serverArgs);
161   }
162 
163   private static TServer getTHsHaServer(TProtocolFactory protocolFactory,
164       THBaseService.Processor processor, TTransportFactory transportFactory,
165       InetSocketAddress inetSocketAddress, ThriftMetrics metrics)
166       throws TTransportException {
167     TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
168     log.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString());
169     THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
170     ExecutorService executorService = createExecutor(
171         serverArgs.getWorkerThreads(), metrics);
172     serverArgs.executorService(executorService);
173     serverArgs.processor(processor);
174     serverArgs.transportFactory(transportFactory);
175     serverArgs.protocolFactory(protocolFactory);
176     return new THsHaServer(serverArgs);
177   }
178 
179   private static ExecutorService createExecutor(
180       int workerThreads, ThriftMetrics metrics) {
181     CallQueue callQueue = new CallQueue(
182         new LinkedBlockingQueue<Call>(), metrics);
183     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
184     tfb.setDaemon(true);
185     tfb.setNameFormat("thrift2-worker-%d");
186     return new ThreadPoolExecutor(workerThreads, workerThreads,
187             Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
188   }
189 
190   private static TServer getTThreadPoolServer(TProtocolFactory protocolFactory, THBaseService.Processor processor,
191       TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException {
192     TServerTransport serverTransport = new TServerSocket(inetSocketAddress);
193     log.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString());
194     TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport);
195     serverArgs.processor(processor);
196     serverArgs.transportFactory(transportFactory);
197     serverArgs.protocolFactory(protocolFactory);
198     return new TThreadPoolServer(serverArgs);
199   }
200 
201   /**
202    * Adds the option to pre-load filters at startup.
203    *
204    * @param conf  The current configuration instance.
205    */
206   protected static void registerFilters(Configuration conf) {
207     String[] filters = conf.getStrings("hbase.thrift.filters");
208     if(filters != null) {
209       for(String filterClass: filters) {
210         String[] filterPart = filterClass.split(":");
211         if(filterPart.length != 2) {
212           log.warn("Invalid filter specification " + filterClass + " - skipping");
213         } else {
214           ParseFilter.registerFilter(filterPart[0], filterPart[1]);
215         }
216       }
217     }
218   }
219 
220   /**
221    * Start up the Thrift2 server.
222    * 
223    * @param args
224    */
225   public static void main(String[] args) throws Exception {
226     TServer server = null;
227     Options options = getOptions();
228     try {
229       Configuration conf = HBaseConfiguration.create();
230       CommandLine cmd = parseArguments(conf, options, args);
231 
232       /**
233        * This is to please both bin/hbase and bin/hbase-daemon. hbase-daemon provides "start" and "stop" arguments hbase
234        * should print the help if no argument is provided
235        */
236       List<?> argList = cmd.getArgList();
237       if (cmd.hasOption("help") || !argList.contains("start") || argList.contains("stop")) {
238         printUsage();
239         System.exit(1);
240       }
241 
242       // Get port to bind to
243       int listenPort = 0;
244       try {
245         listenPort = Integer.parseInt(cmd.getOptionValue("port", DEFAULT_LISTEN_PORT));
246       } catch (NumberFormatException e) {
247         throw new RuntimeException("Could not parse the value provided for the port option", e);
248       }
249 
250       boolean nonblocking = cmd.hasOption("nonblocking");
251       boolean hsha = cmd.hasOption("hsha");
252 
253       ThriftMetrics metrics = new ThriftMetrics(
254           listenPort, conf, THBaseService.Iface.class);
255 
256       String implType = "threadpool";
257       if (nonblocking) {
258         implType = "nonblocking";
259       } else if (hsha) {
260         implType = "hsha";
261       }
262 
263       conf.set("hbase.regionserver.thrift.server.type", implType);
264       conf.setInt("hbase.regionserver.thrift.port", listenPort);
265       registerFilters(conf);
266 
267       // Construct correct ProtocolFactory
268       boolean compact = cmd.hasOption("compact") ||
269         conf.getBoolean("hbase.regionserver.thrift.compact", false);
270       TProtocolFactory protocolFactory = getTProtocolFactory(compact);
271       THBaseService.Iface handler =
272           ThriftHBaseServiceHandler.newInstance(conf, metrics);
273       THBaseService.Processor processor = new THBaseService.Processor(handler);
274       conf.setBoolean("hbase.regionserver.thrift.compact", compact);
275 
276       boolean framed = cmd.hasOption("framed") ||
277         conf.getBoolean("hbase.regionserver.thrift.framed", false) || nonblocking || hsha;
278       TTransportFactory transportFactory = getTTransportFactory(framed);
279       conf.setBoolean("hbase.regionserver.thrift.framed", framed);
280 
281       // TODO: Remove once HBASE-2155 is resolved
282       if (cmd.hasOption("bind") && (nonblocking || hsha)) {
283         log.error("The Nonblocking and HsHaServer servers don't support IP address binding at the moment." +
284             " See https://issues.apache.org/jira/browse/HBASE-2155 for details.");
285         printUsage();
286         System.exit(1);
287       }
288 
289       // check for user-defined info server port setting, if so override the conf
290       try {
291         if (cmd.hasOption("infoport")) {
292           String val = cmd.getOptionValue("infoport");
293           conf.setInt("hbase.thrift.info.port", Integer.valueOf(val));
294           log.debug("Web UI port set to " + val);
295         }
296       } catch (NumberFormatException e) {
297         log.error("Could not parse the value provided for the infoport option", e);
298         printUsage();
299         System.exit(1);
300       }
301 
302       // Put up info server.
303       int port = conf.getInt("hbase.thrift.info.port", 9095);
304       if (port >= 0) {
305         conf.setLong("startcode", System.currentTimeMillis());
306         String a = conf.get("hbase.thrift.info.bindAddress", "0.0.0.0");
307         InfoServer infoServer = new InfoServer("thrift", a, port, false, conf);
308         infoServer.setAttribute("hbase.conf", conf);
309         infoServer.start();
310       }
311 
312       InetSocketAddress inetSocketAddress = bindToPort(cmd.getOptionValue("bind"), listenPort);
313 
314       if (nonblocking) {
315         server = getTNonBlockingServer(protocolFactory, processor, transportFactory, inetSocketAddress);
316       } else if (hsha) {
317         server = getTHsHaServer(protocolFactory, processor, transportFactory, inetSocketAddress, metrics);
318       } else {
319         server = getTThreadPoolServer(protocolFactory, processor, transportFactory, inetSocketAddress);
320       }
321     } catch (Exception e) {
322       log.error(e.getMessage(), e);
323       printUsage();
324       System.exit(1);
325     }
326     server.serve();
327   }
328 }