001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.thrift;
020
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.hbase.HBaseConfiguration;
023import org.apache.hadoop.hbase.HBaseInterfaceAudience;
024import org.apache.hadoop.hbase.http.InfoServer;
025import org.apache.hadoop.hbase.thrift.ThriftServerRunner.ImplType;
026import org.apache.hadoop.hbase.util.VersionInfo;
027import org.apache.hadoop.util.Shell.ExitCodeException;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
032import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
033import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser;
034import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
035import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
036
037/**
038 * ThriftServer- this class starts up a Thrift server which implements the
039 * Hbase API specified in the Hbase.thrift IDL file. The server runs in an
040 * independent process.
041 */
042@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
043public class ThriftServer {
044
045  private static final Logger LOG = LoggerFactory.getLogger(ThriftServer.class);
046
047  private static final String MIN_WORKERS_OPTION = "minWorkers";
048  private static final String MAX_WORKERS_OPTION = "workers";
049  private static final String MAX_QUEUE_SIZE_OPTION = "queue";
050  private static final String KEEP_ALIVE_SEC_OPTION = "keepAliveSec";
051  static final String BIND_OPTION = "bind";
052  static final String COMPACT_OPTION = "compact";
053  static final String FRAMED_OPTION = "framed";
054  static final String PORT_OPTION = "port";
055  static final String INFOPORT_OPTION = "infoport";
056
057  private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
058  private static final int DEFAULT_LISTEN_PORT = 9090;
059
060  private Configuration conf;
061  ThriftServerRunner serverRunner;
062
063  private InfoServer infoServer;
064
065  private static final String READ_TIMEOUT_OPTION = "readTimeout";
066
067  //
068  // Main program and support routines
069  //
070
071  public ThriftServer(Configuration conf) {
072    this.conf = HBaseConfiguration.create(conf);
073  }
074
075  private static void printUsageAndExit(Options options, int exitCode)
076      throws ExitCodeException {
077    HelpFormatter formatter = new HelpFormatter();
078    formatter.printHelp("Thrift", null, options,
079        "To start the Thrift server run 'hbase-daemon.sh start thrift' or " +
080        "'hbase thrift'\n" +
081        "To shutdown the thrift server run 'hbase-daemon.sh stop " +
082        "thrift' or send a kill signal to the thrift server pid",
083        true);
084    throw new ExitCodeException(exitCode, "");
085  }
086
087  /**
088   * Start up or shuts down the Thrift server, depending on the arguments.
089   * @param args
090   */
091   void doMain(final String[] args) throws Exception {
092     processOptions(args);
093
094     serverRunner = new ThriftServerRunner(conf);
095
096     // Put up info server.
097     int port = conf.getInt("hbase.thrift.info.port", 9095);
098     if (port >= 0) {
099       conf.setLong("startcode", System.currentTimeMillis());
100       String a = conf.get("hbase.thrift.info.bindAddress", "0.0.0.0");
101       infoServer = new InfoServer("thrift", a, port, false, conf);
102       infoServer.setAttribute("hbase.conf", conf);
103       infoServer.start();
104     }
105     serverRunner.run();
106  }
107
108  /**
109   * Parse the command line options to set parameters the conf.
110   */
111  private void processOptions(final String[] args) throws Exception {
112    Options options = new Options();
113    options.addOption("b", BIND_OPTION, true, "Address to bind " +
114        "the Thrift server to. [default: " + DEFAULT_BIND_ADDR + "]");
115    options.addOption("p", PORT_OPTION, true, "Port to bind to [default: " +
116        DEFAULT_LISTEN_PORT + "]");
117    options.addOption("f", FRAMED_OPTION, false, "Use framed transport");
118    options.addOption("c", COMPACT_OPTION, false, "Use the compact protocol");
119    options.addOption("h", "help", false, "Print help information");
120    options.addOption(null, INFOPORT_OPTION, true, "Port for web UI");
121
122    options.addOption("m", MIN_WORKERS_OPTION, true,
123        "The minimum number of worker threads for " +
124        ImplType.THREAD_POOL.simpleClassName());
125
126    options.addOption("w", MAX_WORKERS_OPTION, true,
127        "The maximum number of worker threads for " +
128        ImplType.THREAD_POOL.simpleClassName());
129
130    options.addOption("q", MAX_QUEUE_SIZE_OPTION, true,
131        "The maximum number of queued requests in " +
132        ImplType.THREAD_POOL.simpleClassName());
133
134    options.addOption("k", KEEP_ALIVE_SEC_OPTION, true,
135        "The amount of time in secods to keep a thread alive when idle in " +
136        ImplType.THREAD_POOL.simpleClassName());
137
138    options.addOption("t", READ_TIMEOUT_OPTION, true,
139        "Amount of time in milliseconds before a server thread will timeout " +
140        "waiting for client to send data on a connected socket. Currently, " +
141        "only applies to TBoundedThreadPoolServer");
142
143    options.addOptionGroup(ImplType.createOptionGroup());
144
145    CommandLineParser parser = new DefaultParser();
146    CommandLine cmd = parser.parse(options, args);
147
148    if (cmd.hasOption("help")) {
149      printUsageAndExit(options, 1);
150    }
151
152    // Get port to bind to
153    try {
154      if (cmd.hasOption(PORT_OPTION)) {
155        int listenPort = Integer.parseInt(cmd.getOptionValue(PORT_OPTION));
156        conf.setInt(ThriftServerRunner.PORT_CONF_KEY, listenPort);
157      }
158    } catch (NumberFormatException e) {
159      LOG.error("Could not parse the value provided for the port option", e);
160      printUsageAndExit(options, -1);
161    }
162
163    // check for user-defined info server port setting, if so override the conf
164    try {
165      if (cmd.hasOption(INFOPORT_OPTION)) {
166        String val = cmd.getOptionValue(INFOPORT_OPTION);
167        conf.setInt("hbase.thrift.info.port", Integer.parseInt(val));
168        LOG.debug("Web UI port set to " + val);
169      }
170    } catch (NumberFormatException e) {
171      LOG.error("Could not parse the value provided for the " + INFOPORT_OPTION +
172        " option", e);
173      printUsageAndExit(options, -1);
174    }
175
176    // Make optional changes to the configuration based on command-line options
177    optionToConf(cmd, MIN_WORKERS_OPTION,
178        conf, TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY);
179    optionToConf(cmd, MAX_WORKERS_OPTION,
180        conf, TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY);
181    optionToConf(cmd, MAX_QUEUE_SIZE_OPTION,
182        conf, TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY);
183    optionToConf(cmd, KEEP_ALIVE_SEC_OPTION,
184        conf, TBoundedThreadPoolServer.THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY);
185    optionToConf(cmd, READ_TIMEOUT_OPTION, conf,
186        ThriftServerRunner.THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY);
187
188    // Set general thrift server options
189    boolean compact = cmd.hasOption(COMPACT_OPTION) ||
190      conf.getBoolean(ThriftServerRunner.COMPACT_CONF_KEY, false);
191    conf.setBoolean(ThriftServerRunner.COMPACT_CONF_KEY, compact);
192    boolean framed = cmd.hasOption(FRAMED_OPTION) ||
193      conf.getBoolean(ThriftServerRunner.FRAMED_CONF_KEY, false);
194    conf.setBoolean(ThriftServerRunner.FRAMED_CONF_KEY, framed);
195    if (cmd.hasOption(BIND_OPTION)) {
196      conf.set(ThriftServerRunner.BIND_CONF_KEY, cmd.getOptionValue(BIND_OPTION));
197    }
198
199    ImplType.setServerImpl(cmd, conf);
200  }
201
202  public void stop() {
203    if (this.infoServer != null) {
204      LOG.info("Stopping infoServer");
205      try {
206        this.infoServer.stop();
207      } catch (Exception ex) {
208        ex.printStackTrace();
209      }
210    }
211    serverRunner.shutdown();
212  }
213
214  private static void optionToConf(CommandLine cmd, String option,
215      Configuration conf, String destConfKey) {
216    if (cmd.hasOption(option)) {
217      String value = cmd.getOptionValue(option);
218      LOG.info("Set configuration key:" + destConfKey + " value:" + value);
219      conf.set(destConfKey, value);
220    }
221  }
222
223  /**
224   * @param args
225   * @throws Exception
226   */
227  public static void main(String [] args) throws Exception {
228    LOG.info("***** STARTING service '" + ThriftServer.class.getSimpleName() + "' *****");
229    VersionInfo.logVersion();
230    int exitCode = 0;
231    try {
232      new ThriftServer(HBaseConfiguration.create()).doMain(args);
233    } catch (ExitCodeException ex) {
234      exitCode = ex.getExitCode();
235    }
236    LOG.info("***** STOPPING service '" + ThriftServer.class.getSimpleName() + "' *****");
237    System.exit(exitCode);
238  }
239}