View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.thrift2;
20  
21  import java.io.IOException;
22  import java.net.InetAddress;
23  import java.net.InetSocketAddress;
24  import java.net.UnknownHostException;
25  import java.util.List;
26  import java.util.concurrent.ExecutorService;
27  import java.util.concurrent.LinkedBlockingQueue;
28  import java.util.concurrent.ThreadPoolExecutor;
29  import java.util.concurrent.TimeUnit;
30  
31  import org.apache.commons.cli.CommandLine;
32  import org.apache.commons.cli.CommandLineParser;
33  import org.apache.commons.cli.HelpFormatter;
34  import org.apache.commons.cli.Option;
35  import org.apache.commons.cli.OptionGroup;
36  import org.apache.commons.cli.Options;
37  import org.apache.commons.cli.ParseException;
38  import org.apache.commons.cli.PosixParser;
39  import org.apache.commons.logging.Log;
40  import org.apache.commons.logging.LogFactory;
41  import org.apache.hadoop.classification.InterfaceAudience;
42  import org.apache.hadoop.conf.Configuration;
43  import org.apache.hadoop.hbase.HBaseConfiguration;
44  import org.apache.hadoop.hbase.filter.ParseFilter;
45  import org.apache.hadoop.hbase.thrift.CallQueue;
46  import org.apache.hadoop.hbase.thrift.CallQueue.Call;
47  import org.apache.hadoop.hbase.thrift.ThriftMetrics;
48  import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
49  import org.apache.hadoop.hbase.util.InfoServer;
50  import org.apache.hadoop.util.GenericOptionsParser;
51  import org.apache.thrift.protocol.TBinaryProtocol;
52  import org.apache.thrift.protocol.TCompactProtocol;
53  import org.apache.thrift.protocol.TProtocolFactory;
54  import org.apache.thrift.server.THsHaServer;
55  import org.apache.thrift.server.TNonblockingServer;
56  import org.apache.thrift.server.TServer;
57  import org.apache.thrift.server.TThreadPoolServer;
58  import org.apache.thrift.transport.TFramedTransport;
59  import org.apache.thrift.transport.TNonblockingServerSocket;
60  import org.apache.thrift.transport.TNonblockingServerTransport;
61  import org.apache.thrift.transport.TServerSocket;
62  import org.apache.thrift.transport.TServerTransport;
63  import org.apache.thrift.transport.TTransportException;
64  import org.apache.thrift.transport.TTransportFactory;
65  
66  import com.google.common.util.concurrent.ThreadFactoryBuilder;
67  
68  /**
69   * ThriftServer - this class starts up a Thrift server which implements the HBase API specified in the
70   * HbaseClient.thrift IDL file.
71   */
72  @InterfaceAudience.Private
73  @SuppressWarnings({ "rawtypes", "unchecked" })
74  public class ThriftServer {
75    private static final Log log = LogFactory.getLog(ThriftServer.class);
76  
77    public static final String DEFAULT_LISTEN_PORT = "9090";
78  
79    public ThriftServer() {
80    }
81  
82    private static void printUsage() {
83      HelpFormatter formatter = new HelpFormatter();
84      formatter.printHelp("Thrift", null, getOptions(),
85          "To start the Thrift server run 'bin/hbase-daemon.sh start thrift2'\n" +
86              "To shutdown the thrift server run 'bin/hbase-daemon.sh stop thrift2' or" +
87              " send a kill signal to the thrift server pid",
88          true);
89    }
90  
91    private static Options getOptions() {
92      Options options = new Options();
93      options.addOption("b", "bind", true,
94          "Address to bind the Thrift server to. [default: 0.0.0.0]");
95      options.addOption("p", "port", true, "Port to bind to [default: " + DEFAULT_LISTEN_PORT + "]");
96      options.addOption("f", "framed", false, "Use framed transport");
97      options.addOption("c", "compact", false, "Use the compact protocol");
98      options.addOption("h", "help", false, "Print help information");
99      options.addOption(null, "infoport", true, "Port for web UI");
100 
101     OptionGroup servers = new OptionGroup();
102     servers.addOption(
103         new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport."));
104     servers.addOption(new Option("hsha", false, "Use the THsHaServer. This implies the framed transport."));
105     servers.addOption(new Option("threadpool", false, "Use the TThreadPoolServer. This is the default."));
106     options.addOptionGroup(servers);
107     return options;
108   }
109 
110   private static CommandLine parseArguments(Configuration conf, Options options, String[] args)
111       throws ParseException, IOException {
112     GenericOptionsParser genParser = new GenericOptionsParser(conf, args);
113     String[] remainingArgs = genParser.getRemainingArgs();
114     CommandLineParser parser = new PosixParser();
115     return parser.parse(options, remainingArgs);
116   }
117 
118   private static TProtocolFactory getTProtocolFactory(boolean isCompact) {
119     if (isCompact) {
120       log.debug("Using compact protocol");
121       return new TCompactProtocol.Factory();
122     } else {
123       log.debug("Using binary protocol");
124       return new TBinaryProtocol.Factory();
125     }
126   }
127 
128   private static TTransportFactory getTTransportFactory(boolean framed) {
129     if (framed) {
130       log.debug("Using framed transport");
131       return new TFramedTransport.Factory();
132     } else {
133       return new TTransportFactory();
134     }
135   }
136 
137   /*
138    * If bindValue is null, we don't bind.
139    */
140   private static InetSocketAddress bindToPort(String bindValue, int listenPort)
141       throws UnknownHostException {
142     try {
143       if (bindValue == null) {
144         return new InetSocketAddress(listenPort);
145       } else {
146         return new InetSocketAddress(InetAddress.getByName(bindValue), listenPort);
147       }
148     } catch (UnknownHostException e) {
149       throw new RuntimeException("Could not bind to provided ip address", e);
150     }
151   }
152 
153   private static TServer getTNonBlockingServer(TProtocolFactory protocolFactory, THBaseService.Processor processor,
154       TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException {
155     TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
156     log.info("starting HBase Nonblocking Thrift server on " + inetSocketAddress.toString());
157     TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport);
158     serverArgs.processor(processor);
159     serverArgs.transportFactory(transportFactory);
160     serverArgs.protocolFactory(protocolFactory);
161     return new TNonblockingServer(serverArgs);
162   }
163 
164   private static TServer getTHsHaServer(TProtocolFactory protocolFactory,
165       THBaseService.Processor processor, TTransportFactory transportFactory,
166       InetSocketAddress inetSocketAddress, ThriftMetrics metrics)
167       throws TTransportException {
168     TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
169     log.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString());
170     THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
171     ExecutorService executorService = createExecutor(
172         serverArgs.getWorkerThreads(), metrics);
173     serverArgs.executorService(executorService);
174     serverArgs.processor(processor);
175     serverArgs.transportFactory(transportFactory);
176     serverArgs.protocolFactory(protocolFactory);
177     return new THsHaServer(serverArgs);
178   }
179 
180   private static ExecutorService createExecutor(
181       int workerThreads, ThriftMetrics metrics) {
182     CallQueue callQueue = new CallQueue(
183         new LinkedBlockingQueue<Call>(), metrics);
184     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
185     tfb.setDaemon(true);
186     tfb.setNameFormat("thrift2-worker-%d");
187     return new ThreadPoolExecutor(workerThreads, workerThreads,
188             Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
189   }
190 
191   private static TServer getTThreadPoolServer(TProtocolFactory protocolFactory, THBaseService.Processor processor,
192       TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException {
193     TServerTransport serverTransport = new TServerSocket(inetSocketAddress);
194     log.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString());
195     TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport);
196     serverArgs.processor(processor);
197     serverArgs.transportFactory(transportFactory);
198     serverArgs.protocolFactory(protocolFactory);
199     return new TThreadPoolServer(serverArgs);
200   }
201 
202   /**
203    * Adds the option to pre-load filters at startup.
204    *
205    * @param conf  The current configuration instance.
206    */
207   protected static void registerFilters(Configuration conf) {
208     String[] filters = conf.getStrings("hbase.thrift.filters");
209     if(filters != null) {
210       for(String filterClass: filters) {
211         String[] filterPart = filterClass.split(":");
212         if(filterPart.length != 2) {
213           log.warn("Invalid filter specification " + filterClass + " - skipping");
214         } else {
215           ParseFilter.registerFilter(filterPart[0], filterPart[1]);
216         }
217       }
218     }
219   }
220 
221   /**
222    * Start up the Thrift2 server.
223    *
224    * @param args
225    */
226   public static void main(String[] args) throws Exception {
227     TServer server = null;
228     Options options = getOptions();
229     Configuration conf = HBaseConfiguration.create();
230     CommandLine cmd = parseArguments(conf, options, args);
231 
232     /**
233      * This is to please both bin/hbase and bin/hbase-daemon. hbase-daemon provides "start" and "stop" arguments hbase
234      * should print the help if no argument is provided
235      */
236     List<?> argList = cmd.getArgList();
237     if (cmd.hasOption("help") || !argList.contains("start") || argList.contains("stop")) {
238       printUsage();
239       System.exit(1);
240     }
241 
242     // Get port to bind to
243     int listenPort = 0;
244     try {
245       listenPort = Integer.parseInt(cmd.getOptionValue("port", DEFAULT_LISTEN_PORT));
246     } catch (NumberFormatException e) {
247       throw new RuntimeException("Could not parse the value provided for the port option", e);
248     }
249 
250     boolean nonblocking = cmd.hasOption("nonblocking");
251     boolean hsha = cmd.hasOption("hsha");
252 
253     ThriftMetrics metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.TWO);
254 
255     String implType = "threadpool";
256     if (nonblocking) {
257       implType = "nonblocking";
258     } else if (hsha) {
259       implType = "hsha";
260     }
261 
262     conf.set("hbase.regionserver.thrift.server.type", implType);
263     conf.setInt("hbase.regionserver.thrift.port", listenPort);
264     registerFilters(conf);
265 
266     // Construct correct ProtocolFactory
267     boolean compact = cmd.hasOption("compact") ||
268         conf.getBoolean("hbase.regionserver.thrift.compact", false);
269     TProtocolFactory protocolFactory = getTProtocolFactory(compact);
270     THBaseService.Iface handler =
271         ThriftHBaseServiceHandler.newInstance(conf, metrics);
272     THBaseService.Processor processor = new THBaseService.Processor(handler);
273     conf.setBoolean("hbase.regionserver.thrift.compact", compact);
274 
275     boolean framed = cmd.hasOption("framed") ||
276         conf.getBoolean("hbase.regionserver.thrift.framed", false) || nonblocking || hsha;
277     TTransportFactory transportFactory = getTTransportFactory(framed);
278     InetSocketAddress inetSocketAddress = bindToPort(cmd.getOptionValue("bind"), listenPort);
279     conf.setBoolean("hbase.regionserver.thrift.framed", framed);
280 
281     // check for user-defined info server port setting, if so override the conf
282     try {
283       if (cmd.hasOption("infoport")) {
284         String val = cmd.getOptionValue("infoport");
285         conf.setInt("hbase.thrift.info.port", Integer.valueOf(val));
286         log.debug("Web UI port set to " + val);
287       }
288     } catch (NumberFormatException e) {
289       log.error("Could not parse the value provided for the infoport option", e);
290       printUsage();
291       System.exit(1);
292     }
293 
294     // Put up info server.
295     int port = conf.getInt("hbase.thrift.info.port", 9095);
296     if (port >= 0) {
297       conf.setLong("startcode", System.currentTimeMillis());
298       String a = conf.get("hbase.thrift.info.bindAddress", "0.0.0.0");
299       InfoServer infoServer = new InfoServer("thrift", a, port, false, conf);
300       infoServer.setAttribute("hbase.conf", conf);
301       infoServer.start();
302     }
303 
304     if (nonblocking) {
305       server = getTNonBlockingServer(protocolFactory, processor, transportFactory, inetSocketAddress);
306     } else if (hsha) {
307       server = getTHsHaServer(protocolFactory, processor, transportFactory, inetSocketAddress, metrics);
308     } else {
309       server = getTThreadPoolServer(protocolFactory, processor, transportFactory, inetSocketAddress);
310     }
311     server.serve();
312   }
313 }