1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.thrift2;
20
21 import java.io.IOException;
22 import java.net.InetAddress;
23 import java.net.InetSocketAddress;
24 import java.net.UnknownHostException;
25 import java.security.PrivilegedAction;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33
34 import javax.security.auth.callback.Callback;
35 import javax.security.auth.callback.UnsupportedCallbackException;
36 import javax.security.sasl.AuthorizeCallback;
37 import javax.security.sasl.Sasl;
38 import javax.security.sasl.SaslServer;
39
40 import org.apache.commons.cli.CommandLine;
41 import org.apache.commons.cli.CommandLineParser;
42 import org.apache.commons.cli.HelpFormatter;
43 import org.apache.commons.cli.Option;
44 import org.apache.commons.cli.OptionGroup;
45 import org.apache.commons.cli.Options;
46 import org.apache.commons.cli.ParseException;
47 import org.apache.commons.cli.PosixParser;
48 import org.apache.commons.logging.Log;
49 import org.apache.commons.logging.LogFactory;
50 import org.apache.hadoop.hbase.classification.InterfaceAudience;
51 import org.apache.hadoop.conf.Configuration;
52 import org.apache.hadoop.hbase.HBaseConfiguration;
53 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
54 import org.apache.hadoop.hbase.filter.ParseFilter;
55 import org.apache.hadoop.hbase.http.InfoServer;
56 import org.apache.hadoop.hbase.security.SaslUtil;
57 import org.apache.hadoop.hbase.security.SecurityUtil;
58 import org.apache.hadoop.hbase.security.UserProvider;
59 import org.apache.hadoop.hbase.thrift.CallQueue;
60 import org.apache.hadoop.hbase.thrift.CallQueue.Call;
61 import org.apache.hadoop.hbase.thrift.ThriftMetrics;
62 import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
63 import org.apache.hadoop.hbase.util.DNS;
64 import org.apache.hadoop.hbase.util.Strings;
65 import org.apache.hadoop.security.UserGroupInformation;
66 import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
67 import org.apache.hadoop.util.GenericOptionsParser;
68 import org.apache.thrift.TException;
69 import org.apache.thrift.TProcessor;
70 import org.apache.thrift.protocol.TBinaryProtocol;
71 import org.apache.thrift.protocol.TCompactProtocol;
72 import org.apache.thrift.protocol.TProtocol;
73 import org.apache.thrift.protocol.TProtocolFactory;
74 import org.apache.thrift.server.THsHaServer;
75 import org.apache.thrift.server.TNonblockingServer;
76 import org.apache.thrift.server.TServer;
77 import org.apache.thrift.server.TThreadPoolServer;
78 import org.apache.thrift.transport.TFramedTransport;
79 import org.apache.thrift.transport.TNonblockingServerSocket;
80 import org.apache.thrift.transport.TNonblockingServerTransport;
81 import org.apache.thrift.transport.TSaslServerTransport;
82 import org.apache.thrift.transport.TServerSocket;
83 import org.apache.thrift.transport.TServerTransport;
84 import org.apache.thrift.transport.TTransportException;
85 import org.apache.thrift.transport.TTransportFactory;
86
87 import com.google.common.util.concurrent.ThreadFactoryBuilder;
88
89
90
91
92
93 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
94 @SuppressWarnings({ "rawtypes", "unchecked" })
95 public class ThriftServer {
96 private static final Log log = LogFactory.getLog(ThriftServer.class);
97
98
99
100
101
102
103
104
105
106
107 static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
108
109 public static final int DEFAULT_LISTEN_PORT = 9090;
110
111
112 public ThriftServer() {
113 }
114
115 private static void printUsage() {
116 HelpFormatter formatter = new HelpFormatter();
117 formatter.printHelp("Thrift", null, getOptions(),
118 "To start the Thrift server run 'bin/hbase-daemon.sh start thrift2'\n" +
119 "To shutdown the thrift server run 'bin/hbase-daemon.sh stop thrift2' or" +
120 " send a kill signal to the thrift server pid",
121 true);
122 }
123
124 private static Options getOptions() {
125 Options options = new Options();
126 options.addOption("b", "bind", true,
127 "Address to bind the Thrift server to. [default: 0.0.0.0]");
128 options.addOption("p", "port", true, "Port to bind to [default: " + DEFAULT_LISTEN_PORT + "]");
129 options.addOption("f", "framed", false, "Use framed transport");
130 options.addOption("c", "compact", false, "Use the compact protocol");
131 options.addOption("h", "help", false, "Print help information");
132 options.addOption(null, "infoport", true, "Port for web UI");
133
134 OptionGroup servers = new OptionGroup();
135 servers.addOption(
136 new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport."));
137 servers.addOption(new Option("hsha", false, "Use the THsHaServer. This implies the framed transport."));
138 servers.addOption(new Option("threadpool", false, "Use the TThreadPoolServer. This is the default."));
139 options.addOptionGroup(servers);
140 return options;
141 }
142
143 private static CommandLine parseArguments(Configuration conf, Options options, String[] args)
144 throws ParseException, IOException {
145 GenericOptionsParser genParser = new GenericOptionsParser(conf, args);
146 String[] remainingArgs = genParser.getRemainingArgs();
147 CommandLineParser parser = new PosixParser();
148 return parser.parse(options, remainingArgs);
149 }
150
151 private static TProtocolFactory getTProtocolFactory(boolean isCompact) {
152 if (isCompact) {
153 log.debug("Using compact protocol");
154 return new TCompactProtocol.Factory();
155 } else {
156 log.debug("Using binary protocol");
157 return new TBinaryProtocol.Factory();
158 }
159 }
160
161 private static TTransportFactory getTTransportFactory(
162 SaslUtil.QualityOfProtection qop, String name, String host,
163 boolean framed, int frameSize) {
164 if (framed) {
165 if (qop != null) {
166 throw new RuntimeException("Thrift server authentication"
167 + " doesn't work with framed transport yet");
168 }
169 log.debug("Using framed transport");
170 return new TFramedTransport.Factory(frameSize);
171 } else if (qop == null) {
172 return new TTransportFactory();
173 } else {
174 Map<String, String> saslProperties = new HashMap<String, String>();
175 saslProperties.put(Sasl.QOP, qop.getSaslQop());
176 TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
177 saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
178 new SaslGssCallbackHandler() {
179 @Override
180 public void handle(Callback[] callbacks)
181 throws UnsupportedCallbackException {
182 AuthorizeCallback ac = null;
183 for (Callback callback : callbacks) {
184 if (callback instanceof AuthorizeCallback) {
185 ac = (AuthorizeCallback) callback;
186 } else {
187 throw new UnsupportedCallbackException(callback,
188 "Unrecognized SASL GSSAPI Callback");
189 }
190 }
191 if (ac != null) {
192 String authid = ac.getAuthenticationID();
193 String authzid = ac.getAuthorizationID();
194 if (!authid.equals(authzid)) {
195 ac.setAuthorized(false);
196 } else {
197 ac.setAuthorized(true);
198 String userName = SecurityUtil.getUserFromPrincipal(authzid);
199 log.info("Effective user: " + userName);
200 ac.setAuthorizedID(userName);
201 }
202 }
203 }
204 });
205 return saslFactory;
206 }
207 }
208
209
210
211
212 private static InetSocketAddress bindToPort(String bindValue, int listenPort)
213 throws UnknownHostException {
214 try {
215 if (bindValue == null) {
216 return new InetSocketAddress(listenPort);
217 } else {
218 return new InetSocketAddress(InetAddress.getByName(bindValue), listenPort);
219 }
220 } catch (UnknownHostException e) {
221 throw new RuntimeException("Could not bind to provided ip address", e);
222 }
223 }
224
225 private static TServer getTNonBlockingServer(TProtocolFactory protocolFactory, TProcessor processor,
226 TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException {
227 TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
228 log.info("starting HBase Nonblocking Thrift server on " + inetSocketAddress.toString());
229 TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport);
230 serverArgs.processor(processor);
231 serverArgs.transportFactory(transportFactory);
232 serverArgs.protocolFactory(protocolFactory);
233 return new TNonblockingServer(serverArgs);
234 }
235
236 private static TServer getTHsHaServer(TProtocolFactory protocolFactory,
237 TProcessor processor, TTransportFactory transportFactory,
238 InetSocketAddress inetSocketAddress, ThriftMetrics metrics)
239 throws TTransportException {
240 TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
241 log.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString());
242 THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
243 ExecutorService executorService = createExecutor(
244 serverArgs.getWorkerThreads(), metrics);
245 serverArgs.executorService(executorService);
246 serverArgs.processor(processor);
247 serverArgs.transportFactory(transportFactory);
248 serverArgs.protocolFactory(protocolFactory);
249 return new THsHaServer(serverArgs);
250 }
251
252 private static ExecutorService createExecutor(
253 int workerThreads, ThriftMetrics metrics) {
254 CallQueue callQueue = new CallQueue(
255 new LinkedBlockingQueue<Call>(), metrics);
256 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
257 tfb.setDaemon(true);
258 tfb.setNameFormat("thrift2-worker-%d");
259 return new ThreadPoolExecutor(workerThreads, workerThreads,
260 Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
261 }
262
263 private static TServer getTThreadPoolServer(TProtocolFactory protocolFactory, TProcessor processor,
264 TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException {
265 TServerTransport serverTransport = new TServerSocket(inetSocketAddress);
266 log.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString());
267 TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport);
268 serverArgs.processor(processor);
269 serverArgs.transportFactory(transportFactory);
270 serverArgs.protocolFactory(protocolFactory);
271 return new TThreadPoolServer(serverArgs);
272 }
273
274
275
276
277
278
279 protected static void registerFilters(Configuration conf) {
280 String[] filters = conf.getStrings("hbase.thrift.filters");
281 if(filters != null) {
282 for(String filterClass: filters) {
283 String[] filterPart = filterClass.split(":");
284 if(filterPart.length != 2) {
285 log.warn("Invalid filter specification " + filterClass + " - skipping");
286 } else {
287 ParseFilter.registerFilter(filterPart[0], filterPart[1]);
288 }
289 }
290 }
291 }
292
293
294
295
296
297
298 public static void main(String[] args) throws Exception {
299 TServer server = null;
300 Options options = getOptions();
301 Configuration conf = HBaseConfiguration.create();
302 CommandLine cmd = parseArguments(conf, options, args);
303
304
305
306
307
308 List<?> argList = cmd.getArgList();
309 if (cmd.hasOption("help") || !argList.contains("start") || argList.contains("stop")) {
310 printUsage();
311 System.exit(1);
312 }
313
314
315 String bindAddress;
316 if (cmd.hasOption("bind")) {
317 bindAddress = cmd.getOptionValue("bind");
318 conf.set("hbase.thrift.info.bindAddress", bindAddress);
319 } else {
320 bindAddress = conf.get("hbase.thrift.info.bindAddress");
321 }
322
323
324 int listenPort = 0;
325 try {
326 if (cmd.hasOption("port")) {
327 listenPort = Integer.parseInt(cmd.getOptionValue("port"));
328 } else {
329 listenPort = conf.getInt("hbase.regionserver.thrift.port", DEFAULT_LISTEN_PORT);
330 }
331 } catch (NumberFormatException e) {
332 throw new RuntimeException("Could not parse the value provided for the port option", e);
333 }
334
335
336
337 String host = null;
338 String name = null;
339
340 UserProvider userProvider = UserProvider.instantiate(conf);
341
342 boolean securityEnabled = userProvider.isHadoopSecurityEnabled()
343 && userProvider.isHBaseSecurityEnabled();
344 if (securityEnabled) {
345 host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
346 conf.get("hbase.thrift.dns.interface", "default"),
347 conf.get("hbase.thrift.dns.nameserver", "default")));
348 userProvider.login("hbase.thrift.keytab.file",
349 "hbase.thrift.kerberos.principal", host);
350 }
351
352 UserGroupInformation realUser = userProvider.getCurrent().getUGI();
353 String stringQop = conf.get(THRIFT_QOP_KEY);
354 SaslUtil.QualityOfProtection qop = null;
355 if (stringQop != null) {
356 qop = SaslUtil.getQop(stringQop);
357 if (!securityEnabled) {
358 throw new IOException("Thrift server must"
359 + " run in secure mode to support authentication");
360 }
361
362 name = SecurityUtil.getUserFromPrincipal(
363 conf.get("hbase.thrift.kerberos.principal"));
364 }
365
366 boolean nonblocking = cmd.hasOption("nonblocking");
367 boolean hsha = cmd.hasOption("hsha");
368
369 ThriftMetrics metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.TWO);
370
371 String implType = "threadpool";
372 if (nonblocking) {
373 implType = "nonblocking";
374 } else if (hsha) {
375 implType = "hsha";
376 }
377
378 conf.set("hbase.regionserver.thrift.server.type", implType);
379 conf.setInt("hbase.regionserver.thrift.port", listenPort);
380 registerFilters(conf);
381
382
383 boolean compact = cmd.hasOption("compact") ||
384 conf.getBoolean("hbase.regionserver.thrift.compact", false);
385 TProtocolFactory protocolFactory = getTProtocolFactory(compact);
386 final ThriftHBaseServiceHandler hbaseHandler =
387 new ThriftHBaseServiceHandler(conf, userProvider);
388 THBaseService.Iface handler =
389 ThriftHBaseServiceHandler.newInstance(hbaseHandler, metrics);
390 final THBaseService.Processor p = new THBaseService.Processor(handler);
391 conf.setBoolean("hbase.regionserver.thrift.compact", compact);
392 TProcessor processor = p;
393
394 boolean framed = cmd.hasOption("framed") ||
395 conf.getBoolean("hbase.regionserver.thrift.framed", false) || nonblocking || hsha;
396 TTransportFactory transportFactory = getTTransportFactory(qop, name, host, framed,
397 conf.getInt("hbase.regionserver.thrift.framed.max_frame_size_in_mb", 2) * 1024 * 1024);
398 InetSocketAddress inetSocketAddress = bindToPort(bindAddress, listenPort);
399 conf.setBoolean("hbase.regionserver.thrift.framed", framed);
400 if (qop != null) {
401
402 processor = new TProcessor() {
403 @Override
404 public boolean process(TProtocol inProt,
405 TProtocol outProt) throws TException {
406 TSaslServerTransport saslServerTransport =
407 (TSaslServerTransport)inProt.getTransport();
408 SaslServer saslServer = saslServerTransport.getSaslServer();
409 String principal = saslServer.getAuthorizationID();
410 hbaseHandler.setEffectiveUser(principal);
411 return p.process(inProt, outProt);
412 }
413 };
414 }
415
416
417 try {
418 if (cmd.hasOption("infoport")) {
419 String val = cmd.getOptionValue("infoport");
420 conf.setInt("hbase.thrift.info.port", Integer.parseInt(val));
421 log.debug("Web UI port set to " + val);
422 }
423 } catch (NumberFormatException e) {
424 log.error("Could not parse the value provided for the infoport option", e);
425 printUsage();
426 System.exit(1);
427 }
428
429
430 int port = conf.getInt("hbase.thrift.info.port", 9095);
431 if (port >= 0) {
432 conf.setLong("startcode", System.currentTimeMillis());
433 String a = conf.get("hbase.thrift.info.bindAddress", "0.0.0.0");
434 InfoServer infoServer = new InfoServer("thrift", a, port, false, conf);
435 infoServer.setAttribute("hbase.conf", conf);
436 infoServer.start();
437 }
438
439 if (nonblocking) {
440 server = getTNonBlockingServer(protocolFactory, processor, transportFactory, inetSocketAddress);
441 } else if (hsha) {
442 server = getTHsHaServer(protocolFactory, processor, transportFactory, inetSocketAddress, metrics);
443 } else {
444 server = getTThreadPoolServer(protocolFactory, processor, transportFactory, inetSocketAddress);
445 }
446
447 final TServer tserver = server;
448 realUser.doAs(
449 new PrivilegedAction<Object>() {
450 @Override
451 public Object run() {
452 tserver.serve();
453 return null;
454 }
455 });
456 }
457 }