001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.thrift;
020
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.hbase.HBaseConfiguration;
023import org.apache.hadoop.hbase.HBaseInterfaceAudience;
024import org.apache.hadoop.hbase.http.InfoServer;
025import org.apache.hadoop.hbase.thrift.ThriftServerRunner.ImplType;
026import org.apache.hadoop.hbase.util.VersionInfo;
027import org.apache.hadoop.util.Shell.ExitCodeException;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
032import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
033import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser;
034import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
035import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
036
037/**
038 * ThriftServer- this class starts up a Thrift server which implements the
039 * Hbase API specified in the Hbase.thrift IDL file. The server runs in an
040 * independent process.
041 */
042@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
043public class ThriftServer {
044
045  private static final Logger LOG = LoggerFactory.getLogger(ThriftServer.class);
046
047  private static final String MIN_WORKERS_OPTION = "minWorkers";
048  private static final String MAX_WORKERS_OPTION = "workers";
049  private static final String MAX_QUEUE_SIZE_OPTION = "queue";
050  private static final String KEEP_ALIVE_SEC_OPTION = "keepAliveSec";
051  static final String BIND_OPTION = "bind";
052  static final String COMPACT_OPTION = "compact";
053  static final String FRAMED_OPTION = "framed";
054  static final String PORT_OPTION = "port";
055
056  private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
057  private static final int DEFAULT_LISTEN_PORT = 9090;
058
059  private Configuration conf;
060  ThriftServerRunner serverRunner;
061
062  private InfoServer infoServer;
063
064  private static final String READ_TIMEOUT_OPTION = "readTimeout";
065
066  //
067  // Main program and support routines
068  //
069
070  public ThriftServer(Configuration conf) {
071    this.conf = HBaseConfiguration.create(conf);
072  }
073
074  private static void printUsageAndExit(Options options, int exitCode)
075      throws ExitCodeException {
076    HelpFormatter formatter = new HelpFormatter();
077    formatter.printHelp("Thrift", null, options,
078        "To start the Thrift server run 'hbase-daemon.sh start thrift' or " +
079        "'hbase thrift'\n" +
080        "To shutdown the thrift server run 'hbase-daemon.sh stop " +
081        "thrift' or send a kill signal to the thrift server pid",
082        true);
083    throw new ExitCodeException(exitCode, "");
084  }
085
086  /**
087   * Start up or shuts down the Thrift server, depending on the arguments.
088   * @param args
089   */
090   void doMain(final String[] args) throws Exception {
091     processOptions(args);
092
093     serverRunner = new ThriftServerRunner(conf);
094
095     // Put up info server.
096     int port = conf.getInt("hbase.thrift.info.port", 9095);
097     if (port >= 0) {
098       conf.setLong("startcode", System.currentTimeMillis());
099       String a = conf.get("hbase.thrift.info.bindAddress", "0.0.0.0");
100       infoServer = new InfoServer("thrift", a, port, false, conf);
101       infoServer.setAttribute("hbase.conf", conf);
102       infoServer.start();
103     }
104     serverRunner.run();
105  }
106
107  /**
108   * Parse the command line options to set parameters the conf.
109   */
110  private void processOptions(final String[] args) throws Exception {
111    Options options = new Options();
112    options.addOption("b", BIND_OPTION, true, "Address to bind " +
113        "the Thrift server to. [default: " + DEFAULT_BIND_ADDR + "]");
114    options.addOption("p", PORT_OPTION, true, "Port to bind to [default: " +
115        DEFAULT_LISTEN_PORT + "]");
116    options.addOption("f", FRAMED_OPTION, false, "Use framed transport");
117    options.addOption("c", COMPACT_OPTION, false, "Use the compact protocol");
118    options.addOption("h", "help", false, "Print help information");
119    options.addOption(null, "infoport", true, "Port for web UI");
120
121    options.addOption("m", MIN_WORKERS_OPTION, true,
122        "The minimum number of worker threads for " +
123        ImplType.THREAD_POOL.simpleClassName());
124
125    options.addOption("w", MAX_WORKERS_OPTION, true,
126        "The maximum number of worker threads for " +
127        ImplType.THREAD_POOL.simpleClassName());
128
129    options.addOption("q", MAX_QUEUE_SIZE_OPTION, true,
130        "The maximum number of queued requests in " +
131        ImplType.THREAD_POOL.simpleClassName());
132
133    options.addOption("k", KEEP_ALIVE_SEC_OPTION, true,
134        "The amount of time in secods to keep a thread alive when idle in " +
135        ImplType.THREAD_POOL.simpleClassName());
136
137    options.addOption("t", READ_TIMEOUT_OPTION, true,
138        "Amount of time in milliseconds before a server thread will timeout " +
139        "waiting for client to send data on a connected socket. Currently, " +
140        "only applies to TBoundedThreadPoolServer");
141
142    options.addOptionGroup(ImplType.createOptionGroup());
143
144    CommandLineParser parser = new DefaultParser();
145    CommandLine cmd = parser.parse(options, args);
146
147    if (cmd.hasOption("help")) {
148      printUsageAndExit(options, 1);
149    }
150
151    // Get port to bind to
152    try {
153      if (cmd.hasOption(PORT_OPTION)) {
154        int listenPort = Integer.parseInt(cmd.getOptionValue(PORT_OPTION));
155        conf.setInt(ThriftServerRunner.PORT_CONF_KEY, listenPort);
156      }
157    } catch (NumberFormatException e) {
158      LOG.error("Could not parse the value provided for the port option", e);
159      printUsageAndExit(options, -1);
160    }
161
162    // check for user-defined info server port setting, if so override the conf
163    try {
164      if (cmd.hasOption("infoport")) {
165        String val = cmd.getOptionValue("infoport");
166        conf.setInt("hbase.thrift.info.port", Integer.parseInt(val));
167        LOG.debug("Web UI port set to " + val);
168      }
169    } catch (NumberFormatException e) {
170      LOG.error("Could not parse the value provided for the infoport option", e);
171      printUsageAndExit(options, -1);
172    }
173
174    // Make optional changes to the configuration based on command-line options
175    optionToConf(cmd, MIN_WORKERS_OPTION,
176        conf, TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY);
177    optionToConf(cmd, MAX_WORKERS_OPTION,
178        conf, TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY);
179    optionToConf(cmd, MAX_QUEUE_SIZE_OPTION,
180        conf, TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY);
181    optionToConf(cmd, KEEP_ALIVE_SEC_OPTION,
182        conf, TBoundedThreadPoolServer.THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY);
183    optionToConf(cmd, READ_TIMEOUT_OPTION, conf,
184        ThriftServerRunner.THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY);
185
186    // Set general thrift server options
187    boolean compact = cmd.hasOption(COMPACT_OPTION) ||
188      conf.getBoolean(ThriftServerRunner.COMPACT_CONF_KEY, false);
189    conf.setBoolean(ThriftServerRunner.COMPACT_CONF_KEY, compact);
190    boolean framed = cmd.hasOption(FRAMED_OPTION) ||
191      conf.getBoolean(ThriftServerRunner.FRAMED_CONF_KEY, false);
192    conf.setBoolean(ThriftServerRunner.FRAMED_CONF_KEY, framed);
193    if (cmd.hasOption(BIND_OPTION)) {
194      conf.set(ThriftServerRunner.BIND_CONF_KEY, cmd.getOptionValue(BIND_OPTION));
195    }
196
197    ImplType.setServerImpl(cmd, conf);
198  }
199
200  public void stop() {
201    if (this.infoServer != null) {
202      LOG.info("Stopping infoServer");
203      try {
204        this.infoServer.stop();
205      } catch (Exception ex) {
206        ex.printStackTrace();
207      }
208    }
209    serverRunner.shutdown();
210  }
211
212  private static void optionToConf(CommandLine cmd, String option,
213      Configuration conf, String destConfKey) {
214    if (cmd.hasOption(option)) {
215      String value = cmd.getOptionValue(option);
216      LOG.info("Set configuration key:" + destConfKey + " value:" + value);
217      conf.set(destConfKey, value);
218    }
219  }
220
221  /**
222   * @param args
223   * @throws Exception
224   */
225  public static void main(String [] args) throws Exception {
226    LOG.info("***** STARTING service '" + ThriftServer.class.getSimpleName() + "' *****");
227    VersionInfo.logVersion();
228    int exitCode = 0;
229    try {
230      new ThriftServer(HBaseConfiguration.create()).doMain(args);
231    } catch (ExitCodeException ex) {
232      exitCode = ex.getExitCode();
233    }
234    LOG.info("***** STOPPING service '" + ThriftServer.class.getSimpleName() + "' *****");
235    System.exit(exitCode);
236  }
237}