001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.thrift; 020 021import org.apache.hadoop.conf.Configuration; 022import org.apache.hadoop.hbase.HBaseConfiguration; 023import org.apache.hadoop.hbase.HBaseInterfaceAudience; 024import org.apache.hadoop.hbase.http.InfoServer; 025import org.apache.hadoop.hbase.thrift.ThriftServerRunner.ImplType; 026import org.apache.hadoop.hbase.util.VersionInfo; 027import org.apache.hadoop.util.Shell.ExitCodeException; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 032import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser; 033import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser; 034import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; 035import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; 036 037/** 038 * ThriftServer- this class starts up a Thrift server which implements the 039 * Hbase API specified in the Hbase.thrift IDL file. The server runs in an 040 * independent process. 041 */ 042@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 043public class ThriftServer { 044 045 private static final Logger LOG = LoggerFactory.getLogger(ThriftServer.class); 046 047 private static final String MIN_WORKERS_OPTION = "minWorkers"; 048 private static final String MAX_WORKERS_OPTION = "workers"; 049 private static final String MAX_QUEUE_SIZE_OPTION = "queue"; 050 private static final String KEEP_ALIVE_SEC_OPTION = "keepAliveSec"; 051 static final String BIND_OPTION = "bind"; 052 static final String COMPACT_OPTION = "compact"; 053 static final String FRAMED_OPTION = "framed"; 054 static final String PORT_OPTION = "port"; 055 static final String INFOPORT_OPTION = "infoport"; 056 057 private static final String DEFAULT_BIND_ADDR = "0.0.0.0"; 058 private static final int DEFAULT_LISTEN_PORT = 9090; 059 060 private Configuration conf; 061 ThriftServerRunner serverRunner; 062 063 private InfoServer infoServer; 064 065 private static final String READ_TIMEOUT_OPTION = "readTimeout"; 066 067 // 068 // Main program and support routines 069 // 070 071 public ThriftServer(Configuration conf) { 072 this.conf = HBaseConfiguration.create(conf); 073 } 074 075 private static void printUsageAndExit(Options options, int exitCode) 076 throws ExitCodeException { 077 HelpFormatter formatter = new HelpFormatter(); 078 formatter.printHelp("Thrift", null, options, 079 "To start the Thrift server run 'hbase-daemon.sh start thrift' or " + 080 "'hbase thrift'\n" + 081 "To shutdown the thrift server run 'hbase-daemon.sh stop " + 082 "thrift' or send a kill signal to the thrift server pid", 083 true); 084 throw new ExitCodeException(exitCode, ""); 085 } 086 087 /** 088 * Start up or shuts down the Thrift server, depending on the arguments. 089 * @param args 090 */ 091 void doMain(final String[] args) throws Exception { 092 processOptions(args); 093 094 serverRunner = new ThriftServerRunner(conf); 095 096 // Put up info server. 097 int port = conf.getInt("hbase.thrift.info.port", 9095); 098 if (port >= 0) { 099 conf.setLong("startcode", System.currentTimeMillis()); 100 String a = conf.get("hbase.thrift.info.bindAddress", "0.0.0.0"); 101 infoServer = new InfoServer("thrift", a, port, false, conf); 102 infoServer.setAttribute("hbase.conf", conf); 103 infoServer.start(); 104 } 105 serverRunner.run(); 106 } 107 108 /** 109 * Parse the command line options to set parameters the conf. 110 */ 111 private void processOptions(final String[] args) throws Exception { 112 Options options = new Options(); 113 options.addOption("b", BIND_OPTION, true, "Address to bind " + 114 "the Thrift server to. [default: " + DEFAULT_BIND_ADDR + "]"); 115 options.addOption("p", PORT_OPTION, true, "Port to bind to [default: " + 116 DEFAULT_LISTEN_PORT + "]"); 117 options.addOption("f", FRAMED_OPTION, false, "Use framed transport"); 118 options.addOption("c", COMPACT_OPTION, false, "Use the compact protocol"); 119 options.addOption("h", "help", false, "Print help information"); 120 options.addOption(null, INFOPORT_OPTION, true, "Port for web UI"); 121 122 options.addOption("m", MIN_WORKERS_OPTION, true, 123 "The minimum number of worker threads for " + 124 ImplType.THREAD_POOL.simpleClassName()); 125 126 options.addOption("w", MAX_WORKERS_OPTION, true, 127 "The maximum number of worker threads for " + 128 ImplType.THREAD_POOL.simpleClassName()); 129 130 options.addOption("q", MAX_QUEUE_SIZE_OPTION, true, 131 "The maximum number of queued requests in " + 132 ImplType.THREAD_POOL.simpleClassName()); 133 134 options.addOption("k", KEEP_ALIVE_SEC_OPTION, true, 135 "The amount of time in secods to keep a thread alive when idle in " + 136 ImplType.THREAD_POOL.simpleClassName()); 137 138 options.addOption("t", READ_TIMEOUT_OPTION, true, 139 "Amount of time in milliseconds before a server thread will timeout " + 140 "waiting for client to send data on a connected socket. Currently, " + 141 "only applies to TBoundedThreadPoolServer"); 142 143 options.addOptionGroup(ImplType.createOptionGroup()); 144 145 CommandLineParser parser = new DefaultParser(); 146 CommandLine cmd = parser.parse(options, args); 147 148 if (cmd.hasOption("help")) { 149 printUsageAndExit(options, 1); 150 } 151 152 // Get port to bind to 153 try { 154 if (cmd.hasOption(PORT_OPTION)) { 155 int listenPort = Integer.parseInt(cmd.getOptionValue(PORT_OPTION)); 156 conf.setInt(ThriftServerRunner.PORT_CONF_KEY, listenPort); 157 } 158 } catch (NumberFormatException e) { 159 LOG.error("Could not parse the value provided for the port option", e); 160 printUsageAndExit(options, -1); 161 } 162 163 // check for user-defined info server port setting, if so override the conf 164 try { 165 if (cmd.hasOption(INFOPORT_OPTION)) { 166 String val = cmd.getOptionValue(INFOPORT_OPTION); 167 conf.setInt("hbase.thrift.info.port", Integer.parseInt(val)); 168 LOG.debug("Web UI port set to " + val); 169 } 170 } catch (NumberFormatException e) { 171 LOG.error("Could not parse the value provided for the " + INFOPORT_OPTION + 172 " option", e); 173 printUsageAndExit(options, -1); 174 } 175 176 // Make optional changes to the configuration based on command-line options 177 optionToConf(cmd, MIN_WORKERS_OPTION, 178 conf, TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY); 179 optionToConf(cmd, MAX_WORKERS_OPTION, 180 conf, TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY); 181 optionToConf(cmd, MAX_QUEUE_SIZE_OPTION, 182 conf, TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY); 183 optionToConf(cmd, KEEP_ALIVE_SEC_OPTION, 184 conf, TBoundedThreadPoolServer.THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY); 185 optionToConf(cmd, READ_TIMEOUT_OPTION, conf, 186 ThriftServerRunner.THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY); 187 188 // Set general thrift server options 189 boolean compact = cmd.hasOption(COMPACT_OPTION) || 190 conf.getBoolean(ThriftServerRunner.COMPACT_CONF_KEY, false); 191 conf.setBoolean(ThriftServerRunner.COMPACT_CONF_KEY, compact); 192 boolean framed = cmd.hasOption(FRAMED_OPTION) || 193 conf.getBoolean(ThriftServerRunner.FRAMED_CONF_KEY, false); 194 conf.setBoolean(ThriftServerRunner.FRAMED_CONF_KEY, framed); 195 if (cmd.hasOption(BIND_OPTION)) { 196 conf.set(ThriftServerRunner.BIND_CONF_KEY, cmd.getOptionValue(BIND_OPTION)); 197 } 198 199 ImplType.setServerImpl(cmd, conf); 200 } 201 202 public void stop() { 203 if (this.infoServer != null) { 204 LOG.info("Stopping infoServer"); 205 try { 206 this.infoServer.stop(); 207 } catch (Exception ex) { 208 ex.printStackTrace(); 209 } 210 } 211 serverRunner.shutdown(); 212 } 213 214 private static void optionToConf(CommandLine cmd, String option, 215 Configuration conf, String destConfKey) { 216 if (cmd.hasOption(option)) { 217 String value = cmd.getOptionValue(option); 218 LOG.info("Set configuration key:" + destConfKey + " value:" + value); 219 conf.set(destConfKey, value); 220 } 221 } 222 223 /** 224 * @param args 225 * @throws Exception 226 */ 227 public static void main(String [] args) throws Exception { 228 LOG.info("***** STARTING service '" + ThriftServer.class.getSimpleName() + "' *****"); 229 VersionInfo.logVersion(); 230 int exitCode = 0; 231 try { 232 new ThriftServer(HBaseConfiguration.create()).doMain(args); 233 } catch (ExitCodeException ex) { 234 exitCode = ex.getExitCode(); 235 } 236 LOG.info("***** STOPPING service '" + ThriftServer.class.getSimpleName() + "' *****"); 237 System.exit(exitCode); 238 } 239}