001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.thrift; 020 021import org.apache.hadoop.conf.Configuration; 022import org.apache.hadoop.hbase.HBaseConfiguration; 023import org.apache.hadoop.hbase.HBaseInterfaceAudience; 024import org.apache.hadoop.hbase.http.InfoServer; 025import org.apache.hadoop.hbase.thrift.ThriftServerRunner.ImplType; 026import org.apache.hadoop.hbase.util.VersionInfo; 027import org.apache.hadoop.util.Shell.ExitCodeException; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 032import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser; 033import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser; 034import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; 035import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; 036 037/** 038 * ThriftServer- this class starts up a Thrift server which implements the 039 * Hbase API specified in the Hbase.thrift IDL file. The server runs in an 040 * independent process. 041 */ 042@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 043public class ThriftServer { 044 045 private static final Logger LOG = LoggerFactory.getLogger(ThriftServer.class); 046 047 private static final String MIN_WORKERS_OPTION = "minWorkers"; 048 private static final String MAX_WORKERS_OPTION = "workers"; 049 private static final String MAX_QUEUE_SIZE_OPTION = "queue"; 050 private static final String KEEP_ALIVE_SEC_OPTION = "keepAliveSec"; 051 static final String BIND_OPTION = "bind"; 052 static final String COMPACT_OPTION = "compact"; 053 static final String FRAMED_OPTION = "framed"; 054 static final String PORT_OPTION = "port"; 055 056 private static final String DEFAULT_BIND_ADDR = "0.0.0.0"; 057 private static final int DEFAULT_LISTEN_PORT = 9090; 058 059 private Configuration conf; 060 ThriftServerRunner serverRunner; 061 062 private InfoServer infoServer; 063 064 private static final String READ_TIMEOUT_OPTION = "readTimeout"; 065 066 // 067 // Main program and support routines 068 // 069 070 public ThriftServer(Configuration conf) { 071 this.conf = HBaseConfiguration.create(conf); 072 } 073 074 private static void printUsageAndExit(Options options, int exitCode) 075 throws ExitCodeException { 076 HelpFormatter formatter = new HelpFormatter(); 077 formatter.printHelp("Thrift", null, options, 078 "To start the Thrift server run 'hbase-daemon.sh start thrift' or " + 079 "'hbase thrift'\n" + 080 "To shutdown the thrift server run 'hbase-daemon.sh stop " + 081 "thrift' or send a kill signal to the thrift server pid", 082 true); 083 throw new ExitCodeException(exitCode, ""); 084 } 085 086 /** 087 * Start up or shuts down the Thrift server, depending on the arguments. 088 * @param args 089 */ 090 void doMain(final String[] args) throws Exception { 091 processOptions(args); 092 093 serverRunner = new ThriftServerRunner(conf); 094 095 // Put up info server. 096 int port = conf.getInt("hbase.thrift.info.port", 9095); 097 if (port >= 0) { 098 conf.setLong("startcode", System.currentTimeMillis()); 099 String a = conf.get("hbase.thrift.info.bindAddress", "0.0.0.0"); 100 infoServer = new InfoServer("thrift", a, port, false, conf); 101 infoServer.setAttribute("hbase.conf", conf); 102 infoServer.start(); 103 } 104 serverRunner.run(); 105 } 106 107 /** 108 * Parse the command line options to set parameters the conf. 109 */ 110 private void processOptions(final String[] args) throws Exception { 111 Options options = new Options(); 112 options.addOption("b", BIND_OPTION, true, "Address to bind " + 113 "the Thrift server to. [default: " + DEFAULT_BIND_ADDR + "]"); 114 options.addOption("p", PORT_OPTION, true, "Port to bind to [default: " + 115 DEFAULT_LISTEN_PORT + "]"); 116 options.addOption("f", FRAMED_OPTION, false, "Use framed transport"); 117 options.addOption("c", COMPACT_OPTION, false, "Use the compact protocol"); 118 options.addOption("h", "help", false, "Print help information"); 119 options.addOption(null, "infoport", true, "Port for web UI"); 120 121 options.addOption("m", MIN_WORKERS_OPTION, true, 122 "The minimum number of worker threads for " + 123 ImplType.THREAD_POOL.simpleClassName()); 124 125 options.addOption("w", MAX_WORKERS_OPTION, true, 126 "The maximum number of worker threads for " + 127 ImplType.THREAD_POOL.simpleClassName()); 128 129 options.addOption("q", MAX_QUEUE_SIZE_OPTION, true, 130 "The maximum number of queued requests in " + 131 ImplType.THREAD_POOL.simpleClassName()); 132 133 options.addOption("k", KEEP_ALIVE_SEC_OPTION, true, 134 "The amount of time in secods to keep a thread alive when idle in " + 135 ImplType.THREAD_POOL.simpleClassName()); 136 137 options.addOption("t", READ_TIMEOUT_OPTION, true, 138 "Amount of time in milliseconds before a server thread will timeout " + 139 "waiting for client to send data on a connected socket. Currently, " + 140 "only applies to TBoundedThreadPoolServer"); 141 142 options.addOptionGroup(ImplType.createOptionGroup()); 143 144 CommandLineParser parser = new DefaultParser(); 145 CommandLine cmd = parser.parse(options, args); 146 147 if (cmd.hasOption("help")) { 148 printUsageAndExit(options, 1); 149 } 150 151 // Get port to bind to 152 try { 153 if (cmd.hasOption(PORT_OPTION)) { 154 int listenPort = Integer.parseInt(cmd.getOptionValue(PORT_OPTION)); 155 conf.setInt(ThriftServerRunner.PORT_CONF_KEY, listenPort); 156 } 157 } catch (NumberFormatException e) { 158 LOG.error("Could not parse the value provided for the port option", e); 159 printUsageAndExit(options, -1); 160 } 161 162 // check for user-defined info server port setting, if so override the conf 163 try { 164 if (cmd.hasOption("infoport")) { 165 String val = cmd.getOptionValue("infoport"); 166 conf.setInt("hbase.thrift.info.port", Integer.parseInt(val)); 167 LOG.debug("Web UI port set to " + val); 168 } 169 } catch (NumberFormatException e) { 170 LOG.error("Could not parse the value provided for the infoport option", e); 171 printUsageAndExit(options, -1); 172 } 173 174 // Make optional changes to the configuration based on command-line options 175 optionToConf(cmd, MIN_WORKERS_OPTION, 176 conf, TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY); 177 optionToConf(cmd, MAX_WORKERS_OPTION, 178 conf, TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY); 179 optionToConf(cmd, MAX_QUEUE_SIZE_OPTION, 180 conf, TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY); 181 optionToConf(cmd, KEEP_ALIVE_SEC_OPTION, 182 conf, TBoundedThreadPoolServer.THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY); 183 optionToConf(cmd, READ_TIMEOUT_OPTION, conf, 184 ThriftServerRunner.THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY); 185 186 // Set general thrift server options 187 boolean compact = cmd.hasOption(COMPACT_OPTION) || 188 conf.getBoolean(ThriftServerRunner.COMPACT_CONF_KEY, false); 189 conf.setBoolean(ThriftServerRunner.COMPACT_CONF_KEY, compact); 190 boolean framed = cmd.hasOption(FRAMED_OPTION) || 191 conf.getBoolean(ThriftServerRunner.FRAMED_CONF_KEY, false); 192 conf.setBoolean(ThriftServerRunner.FRAMED_CONF_KEY, framed); 193 if (cmd.hasOption(BIND_OPTION)) { 194 conf.set(ThriftServerRunner.BIND_CONF_KEY, cmd.getOptionValue(BIND_OPTION)); 195 } 196 197 ImplType.setServerImpl(cmd, conf); 198 } 199 200 public void stop() { 201 if (this.infoServer != null) { 202 LOG.info("Stopping infoServer"); 203 try { 204 this.infoServer.stop(); 205 } catch (Exception ex) { 206 ex.printStackTrace(); 207 } 208 } 209 serverRunner.shutdown(); 210 } 211 212 private static void optionToConf(CommandLine cmd, String option, 213 Configuration conf, String destConfKey) { 214 if (cmd.hasOption(option)) { 215 String value = cmd.getOptionValue(option); 216 LOG.info("Set configuration key:" + destConfKey + " value:" + value); 217 conf.set(destConfKey, value); 218 } 219 } 220 221 /** 222 * @param args 223 * @throws Exception 224 */ 225 public static void main(String [] args) throws Exception { 226 LOG.info("***** STARTING service '" + ThriftServer.class.getSimpleName() + "' *****"); 227 VersionInfo.logVersion(); 228 int exitCode = 0; 229 try { 230 new ThriftServer(HBaseConfiguration.create()).doMain(args); 231 } catch (ExitCodeException ex) { 232 exitCode = ex.getExitCode(); 233 } 234 LOG.info("***** STOPPING service '" + ThriftServer.class.getSimpleName() + "' *****"); 235 System.exit(exitCode); 236 } 237}