View Javadoc

1   /**
2    * Copyright 2011 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.thrift2;
21  
22  import java.io.IOException;
23  import java.net.InetAddress;
24  import java.net.InetSocketAddress;
25  import java.net.UnknownHostException;
26  import java.util.List;
27  import java.util.concurrent.ExecutorService;
28  import java.util.concurrent.LinkedBlockingQueue;
29  import java.util.concurrent.ThreadPoolExecutor;
30  import java.util.concurrent.TimeUnit;
31  
32  import org.apache.commons.cli.CommandLine;
33  import org.apache.commons.cli.CommandLineParser;
34  import org.apache.commons.cli.HelpFormatter;
35  import org.apache.commons.cli.Option;
36  import org.apache.commons.cli.OptionGroup;
37  import org.apache.commons.cli.Options;
38  import org.apache.commons.cli.ParseException;
39  import org.apache.commons.cli.PosixParser;
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.conf.Configuration;
43  import org.apache.hadoop.hbase.HBaseConfiguration;
44  import org.apache.hadoop.hbase.filter.ParseFilter;
45  import org.apache.hadoop.hbase.thrift.CallQueue;
46  import org.apache.hadoop.hbase.thrift.CallQueue.Call;
47  import org.apache.hadoop.hbase.thrift.ThriftMetrics;
48  import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
49  import org.apache.hadoop.hbase.util.InfoServer;
50  import org.apache.hadoop.util.GenericOptionsParser;
51  import org.apache.thrift.protocol.TBinaryProtocol;
52  import org.apache.thrift.protocol.TCompactProtocol;
53  import org.apache.thrift.protocol.TProtocolFactory;
54  import org.apache.thrift.server.THsHaServer;
55  import org.apache.thrift.server.TNonblockingServer;
56  import org.apache.thrift.server.TServer;
57  import org.apache.thrift.server.TThreadPoolServer;
58  import org.apache.thrift.transport.TFramedTransport;
59  import org.apache.thrift.transport.TNonblockingServerSocket;
60  import org.apache.thrift.transport.TNonblockingServerTransport;
61  import org.apache.thrift.transport.TServerSocket;
62  import org.apache.thrift.transport.TServerTransport;
63  import org.apache.thrift.transport.TTransportException;
64  import org.apache.thrift.transport.TTransportFactory;
65  
66  import com.google.common.util.concurrent.ThreadFactoryBuilder;
67  
68  /**
69   * ThriftServer - this class starts up a Thrift server which implements the HBase API specified in the
70   * HbaseClient.thrift IDL file.
71   */
72  @SuppressWarnings({ "rawtypes", "unchecked" })
73  public class ThriftServer {
74    private static final Log log = LogFactory.getLog(ThriftServer.class);
75  
76    public static final String DEFAULT_LISTEN_PORT = "9090";
77    //The max length of a message frame in MB
78    static final String MAX_FRAME_SIZE_CONF_KEY = "hbase.regionserver.thrift.framed.max_frame_size_in_mb";
79  
80    public ThriftServer() {
81    }
82  
83    private static void printUsage() {
84      HelpFormatter formatter = new HelpFormatter();
85      formatter.printHelp("Thrift", null, getOptions(),
86          "To start the Thrift server run 'bin/hbase-daemon.sh start thrift2'\n" +
87              "To shutdown the thrift server run 'bin/hbase-daemon.sh stop thrift2' or" +
88              " send a kill signal to the thrift server pid",
89          true);
90    }
91  
92    private static Options getOptions() {
93      Options options = new Options();
94      options.addOption("b", "bind", true,
95          "Address to bind the Thrift server to. Not supported by the Nonblocking and HsHa server [default: 0.0.0.0]");
96      options.addOption("p", "port", true, "Port to bind to [default: " + DEFAULT_LISTEN_PORT + "]");
97      options.addOption("f", "framed", false, "Use framed transport");
98      options.addOption("c", "compact", false, "Use the compact protocol");
99      options.addOption("h", "help", false, "Print help information");
100     options.addOption(null, "infoport", true, "Port for web UI");
101 
102     OptionGroup servers = new OptionGroup();
103     servers.addOption(
104         new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport."));
105     servers.addOption(new Option("hsha", false, "Use the THsHaServer. This implies the framed transport."));
106     servers.addOption(new Option("threadpool", false, "Use the TThreadPoolServer. This is the default."));
107     options.addOptionGroup(servers);
108     return options;
109   }
110 
111   private static CommandLine parseArguments(Configuration conf, Options options, String[] args)
112       throws ParseException, IOException {
113     GenericOptionsParser genParser = new GenericOptionsParser(conf, args);
114     String[] remainingArgs = genParser.getRemainingArgs();
115     CommandLineParser parser = new PosixParser();
116     return parser.parse(options, remainingArgs);
117   }
118 
119   private static TProtocolFactory getTProtocolFactory(boolean isCompact) {
120     if (isCompact) {
121       log.debug("Using compact protocol");
122       return new TCompactProtocol.Factory();
123     } else {
124       log.debug("Using binary protocol");
125       return new TBinaryProtocol.Factory();
126     }
127   }
128 
129   private static TTransportFactory getTTransportFactory(boolean framed, int maxFrameSize) {
130     if (framed) {
131       log.debug("Using framed transport");
132       return new TFramedTransport.Factory(maxFrameSize);
133     } else {
134       return new TTransportFactory();
135     }
136   }
137 
138   /*
139    * If bindValue is null, we don't bind. 
140    */
141   private static InetSocketAddress bindToPort(String bindValue, int listenPort)
142       throws UnknownHostException {
143     try {
144       if (bindValue == null) {
145         return new InetSocketAddress(listenPort);
146       } else {
147         return new InetSocketAddress(InetAddress.getByName(bindValue), listenPort);
148       }
149     } catch (UnknownHostException e) {
150       throw new RuntimeException("Could not bind to provided ip address", e);
151     }
152   }
153 
154   private static TServer getTNonBlockingServer(TProtocolFactory protocolFactory, THBaseService.Processor processor,
155       TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException {
156     TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
157     log.info("starting HBase Nonblocking Thrift server on " + inetSocketAddress.toString());
158     TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport);
159     serverArgs.processor(processor);
160     serverArgs.transportFactory(transportFactory);
161     serverArgs.protocolFactory(protocolFactory);
162     return new TNonblockingServer(serverArgs);
163   }
164 
165   private static TServer getTHsHaServer(TProtocolFactory protocolFactory,
166       THBaseService.Processor processor, TTransportFactory transportFactory,
167       InetSocketAddress inetSocketAddress, ThriftMetrics metrics)
168       throws TTransportException {
169     TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
170     log.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString());
171     THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
172     ExecutorService executorService = createExecutor(
173         serverArgs.getWorkerThreads(), metrics);
174     serverArgs.executorService(executorService);
175     serverArgs.processor(processor);
176     serverArgs.transportFactory(transportFactory);
177     serverArgs.protocolFactory(protocolFactory);
178     return new THsHaServer(serverArgs);
179   }
180 
181   private static ExecutorService createExecutor(
182       int workerThreads, ThriftMetrics metrics) {
183     CallQueue callQueue = new CallQueue(
184         new LinkedBlockingQueue<Call>(), metrics);
185     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
186     tfb.setDaemon(true);
187     tfb.setNameFormat("thrift2-worker-%d");
188     return new ThreadPoolExecutor(workerThreads, workerThreads,
189             Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
190   }
191 
192   private static TServer getTThreadPoolServer(TProtocolFactory protocolFactory, THBaseService.Processor processor,
193       TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException {
194     TServerTransport serverTransport = new TServerSocket(inetSocketAddress);
195     log.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString());
196     TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport);
197     serverArgs.processor(processor);
198     serverArgs.transportFactory(transportFactory);
199     serverArgs.protocolFactory(protocolFactory);
200     return new TThreadPoolServer(serverArgs);
201   }
202 
203   /**
204    * Adds the option to pre-load filters at startup.
205    *
206    * @param conf  The current configuration instance.
207    */
208   protected static void registerFilters(Configuration conf) {
209     String[] filters = conf.getStrings("hbase.thrift.filters");
210     if(filters != null) {
211       for(String filterClass: filters) {
212         String[] filterPart = filterClass.split(":");
213         if(filterPart.length != 2) {
214           log.warn("Invalid filter specification " + filterClass + " - skipping");
215         } else {
216           ParseFilter.registerFilter(filterPart[0], filterPart[1]);
217         }
218       }
219     }
220   }
221 
222   /**
223    * Start up the Thrift2 server.
224    * 
225    * @param args
226    */
227   public static void main(String[] args) throws Exception {
228     TServer server = null;
229     Options options = getOptions();
230     try {
231       Configuration conf = HBaseConfiguration.create();
232       CommandLine cmd = parseArguments(conf, options, args);
233 
234       /**
235        * This is to please both bin/hbase and bin/hbase-daemon. hbase-daemon provides "start" and "stop" arguments hbase
236        * should print the help if no argument is provided
237        */
238       List<?> argList = cmd.getArgList();
239       if (cmd.hasOption("help") || !argList.contains("start") || argList.contains("stop")) {
240         printUsage();
241         System.exit(1);
242       }
243 
244       // Get port to bind to
245       int listenPort = 0;
246       try {
247         listenPort = Integer.parseInt(cmd.getOptionValue("port", DEFAULT_LISTEN_PORT));
248       } catch (NumberFormatException e) {
249         throw new RuntimeException("Could not parse the value provided for the port option", e);
250       }
251 
252       boolean nonblocking = cmd.hasOption("nonblocking");
253       boolean hsha = cmd.hasOption("hsha");
254 
255       ThriftMetrics metrics = new ThriftMetrics(
256           listenPort, conf, THBaseService.Iface.class);
257 
258       String implType = "threadpool";
259       if (nonblocking) {
260         implType = "nonblocking";
261       } else if (hsha) {
262         implType = "hsha";
263       }
264 
265       conf.set("hbase.regionserver.thrift.server.type", implType);
266       conf.setInt("hbase.regionserver.thrift.port", listenPort);
267       registerFilters(conf);
268 
269       // Construct correct ProtocolFactory
270       boolean compact = cmd.hasOption("compact") ||
271         conf.getBoolean("hbase.regionserver.thrift.compact", false);
272       TProtocolFactory protocolFactory = getTProtocolFactory(compact);
273       THBaseService.Iface handler =
274           ThriftHBaseServiceHandler.newInstance(conf, metrics);
275       THBaseService.Processor processor = new THBaseService.Processor(handler);
276       conf.setBoolean("hbase.regionserver.thrift.compact", compact);
277 
278       boolean framed = cmd.hasOption("framed") ||
279         conf.getBoolean("hbase.regionserver.thrift.framed", false) || nonblocking || hsha;
280       TTransportFactory transportFactory = getTTransportFactory(framed,
281         conf.getInt(MAX_FRAME_SIZE_CONF_KEY, 2)  * 1024 * 1024);
282       conf.setBoolean("hbase.regionserver.thrift.framed", framed);
283 
284       // TODO: Remove once HBASE-2155 is resolved
285       if (cmd.hasOption("bind") && (nonblocking || hsha)) {
286         log.error("The Nonblocking and HsHaServer servers don't support IP address binding at the moment." +
287             " See https://issues.apache.org/jira/browse/HBASE-2155 for details.");
288         printUsage();
289         System.exit(1);
290       }
291 
292       // check for user-defined info server port setting, if so override the conf
293       try {
294         if (cmd.hasOption("infoport")) {
295           String val = cmd.getOptionValue("infoport");
296           conf.setInt("hbase.thrift.info.port", Integer.valueOf(val));
297           log.debug("Web UI port set to " + val);
298         }
299       } catch (NumberFormatException e) {
300         log.error("Could not parse the value provided for the infoport option", e);
301         printUsage();
302         System.exit(1);
303       }
304 
305       // Put up info server.
306       int port = conf.getInt("hbase.thrift.info.port", 9095);
307       if (port >= 0) {
308         conf.setLong("startcode", System.currentTimeMillis());
309         String a = conf.get("hbase.thrift.info.bindAddress", "0.0.0.0");
310         InfoServer infoServer = new InfoServer("thrift", a, port, false, conf);
311         infoServer.setAttribute("hbase.conf", conf);
312         infoServer.start();
313       }
314 
315       InetSocketAddress inetSocketAddress = bindToPort(cmd.getOptionValue("bind"), listenPort);
316 
317       if (nonblocking) {
318         server = getTNonBlockingServer(protocolFactory, processor, transportFactory, inetSocketAddress);
319       } else if (hsha) {
320         server = getTHsHaServer(protocolFactory, processor, transportFactory, inetSocketAddress, metrics);
321       } else {
322         server = getTThreadPoolServer(protocolFactory, processor, transportFactory, inetSocketAddress);
323       }
324     } catch (Exception e) {
325       log.error(e.getMessage(), e);
326       printUsage();
327       System.exit(1);
328     }
329     server.serve();
330   }
331 }