1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.thrift2;
20
21 import java.io.IOException;
22 import java.net.InetAddress;
23 import java.net.InetSocketAddress;
24 import java.net.UnknownHostException;
25 import java.security.PrivilegedAction;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33
34 import javax.security.auth.callback.Callback;
35 import javax.security.auth.callback.UnsupportedCallbackException;
36 import javax.security.sasl.AuthorizeCallback;
37 import javax.security.sasl.Sasl;
38 import javax.security.sasl.SaslServer;
39
40 import org.apache.commons.cli.CommandLine;
41 import org.apache.commons.cli.CommandLineParser;
42 import org.apache.commons.cli.HelpFormatter;
43 import org.apache.commons.cli.Option;
44 import org.apache.commons.cli.OptionGroup;
45 import org.apache.commons.cli.Options;
46 import org.apache.commons.cli.ParseException;
47 import org.apache.commons.cli.PosixParser;
48 import org.apache.commons.logging.Log;
49 import org.apache.commons.logging.LogFactory;
50 import org.apache.hadoop.hbase.classification.InterfaceAudience;
51 import org.apache.hadoop.conf.Configuration;
52 import org.apache.hadoop.hbase.HBaseConfiguration;
53 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
54 import org.apache.hadoop.hbase.filter.ParseFilter;
55 import org.apache.hadoop.hbase.http.InfoServer;
56 import org.apache.hadoop.hbase.security.SaslUtil;
57 import org.apache.hadoop.hbase.security.SecurityUtil;
58 import org.apache.hadoop.hbase.security.UserProvider;
59 import org.apache.hadoop.hbase.thrift.CallQueue;
60 import org.apache.hadoop.hbase.thrift.CallQueue.Call;
61 import org.apache.hadoop.hbase.thrift.ThriftMetrics;
62 import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
63 import org.apache.hadoop.hbase.util.DNS;
64 import org.apache.hadoop.hbase.util.Strings;
65 import org.apache.hadoop.security.UserGroupInformation;
66 import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
67 import org.apache.hadoop.util.GenericOptionsParser;
68 import org.apache.thrift.TException;
69 import org.apache.thrift.TProcessor;
70 import org.apache.thrift.protocol.TBinaryProtocol;
71 import org.apache.thrift.protocol.TCompactProtocol;
72 import org.apache.thrift.protocol.TProtocol;
73 import org.apache.thrift.protocol.TProtocolFactory;
74 import org.apache.thrift.server.THsHaServer;
75 import org.apache.thrift.server.TNonblockingServer;
76 import org.apache.thrift.server.TServer;
77 import org.apache.thrift.server.TThreadPoolServer;
78 import org.apache.thrift.server.TThreadedSelectorServer;
79 import org.apache.thrift.transport.TFramedTransport;
80 import org.apache.thrift.transport.TNonblockingServerSocket;
81 import org.apache.thrift.transport.TNonblockingServerTransport;
82 import org.apache.thrift.transport.TSaslServerTransport;
83 import org.apache.thrift.transport.TServerSocket;
84 import org.apache.thrift.transport.TServerTransport;
85 import org.apache.thrift.transport.TTransportException;
86 import org.apache.thrift.transport.TTransportFactory;
87
88 import com.google.common.util.concurrent.ThreadFactoryBuilder;
89
90
91
92
93
94 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
95 @SuppressWarnings({ "rawtypes", "unchecked" })
96 public class ThriftServer {
97 private static final Log log = LogFactory.getLog(ThriftServer.class);
98
99
100
101
102
103
104
105
106
107
108 static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
109
110 public static final int DEFAULT_LISTEN_PORT = 9090;
111
112 private static final String READ_TIMEOUT_OPTION = "readTimeout";
113
114 static final String BACKLOG_CONF_KEY = "hbase.regionserver.thrift.backlog";
115
116
117
118
119
120
121 public static final String THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY =
122 "hbase.thrift.server.socket.read.timeout";
123 public static final int THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT = 60000;
124
125 public ThriftServer() {
126 }
127
128 private static void printUsage() {
129 HelpFormatter formatter = new HelpFormatter();
130 formatter.printHelp("Thrift", null, getOptions(),
131 "To start the Thrift server run 'bin/hbase-daemon.sh start thrift2'\n" +
132 "To shutdown the thrift server run 'bin/hbase-daemon.sh stop thrift2' or" +
133 " send a kill signal to the thrift server pid",
134 true);
135 }
136
137 private static Options getOptions() {
138 Options options = new Options();
139 options.addOption("b", "bind", true,
140 "Address to bind the Thrift server to. [default: 0.0.0.0]");
141 options.addOption("p", "port", true, "Port to bind to [default: " + DEFAULT_LISTEN_PORT + "]");
142 options.addOption("f", "framed", false, "Use framed transport");
143 options.addOption("c", "compact", false, "Use the compact protocol");
144 options.addOption("w", "workers", true, "How many worker threads to use.");
145 options.addOption("s", "selectors", true, "How many selector threads to use.");
146 options.addOption("h", "help", false, "Print help information");
147 options.addOption(null, "infoport", true, "Port for web UI");
148 options.addOption("t", READ_TIMEOUT_OPTION, true,
149 "Amount of time in milliseconds before a server thread will timeout " +
150 "waiting for client to send data on a connected socket. Currently, " +
151 "only applies to TBoundedThreadPoolServer");
152 OptionGroup servers = new OptionGroup();
153 servers.addOption(
154 new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport."));
155 servers.addOption(new Option("hsha", false, "Use the THsHaServer. This implies the framed transport."));
156 servers.addOption(new Option("selector", false, "Use the TThreadedSelectorServer. This implies the framed transport."));
157 servers.addOption(new Option("threadpool", false, "Use the TThreadPoolServer. This is the default."));
158 options.addOptionGroup(servers);
159 return options;
160 }
161
162 private static CommandLine parseArguments(Configuration conf, Options options, String[] args)
163 throws ParseException, IOException {
164 GenericOptionsParser genParser = new GenericOptionsParser(conf, args);
165 String[] remainingArgs = genParser.getRemainingArgs();
166 CommandLineParser parser = new PosixParser();
167 return parser.parse(options, remainingArgs);
168 }
169
170 private static TProtocolFactory getTProtocolFactory(boolean isCompact) {
171 if (isCompact) {
172 log.debug("Using compact protocol");
173 return new TCompactProtocol.Factory();
174 } else {
175 log.debug("Using binary protocol");
176 return new TBinaryProtocol.Factory();
177 }
178 }
179
180 private static TTransportFactory getTTransportFactory(
181 SaslUtil.QualityOfProtection qop, String name, String host,
182 boolean framed, int frameSize) {
183 if (framed) {
184 if (qop != null) {
185 throw new RuntimeException("Thrift server authentication"
186 + " doesn't work with framed transport yet");
187 }
188 log.debug("Using framed transport");
189 return new TFramedTransport.Factory(frameSize);
190 } else if (qop == null) {
191 return new TTransportFactory();
192 } else {
193 Map<String, String> saslProperties = new HashMap<String, String>();
194 saslProperties.put(Sasl.QOP, qop.getSaslQop());
195 saslProperties.put(Sasl.SERVER_AUTH, "true");
196 TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
197 saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
198 new SaslGssCallbackHandler() {
199 @Override
200 public void handle(Callback[] callbacks)
201 throws UnsupportedCallbackException {
202 AuthorizeCallback ac = null;
203 for (Callback callback : callbacks) {
204 if (callback instanceof AuthorizeCallback) {
205 ac = (AuthorizeCallback) callback;
206 } else {
207 throw new UnsupportedCallbackException(callback,
208 "Unrecognized SASL GSSAPI Callback");
209 }
210 }
211 if (ac != null) {
212 String authid = ac.getAuthenticationID();
213 String authzid = ac.getAuthorizationID();
214 if (!authid.equals(authzid)) {
215 ac.setAuthorized(false);
216 } else {
217 ac.setAuthorized(true);
218 String userName = SecurityUtil.getUserFromPrincipal(authzid);
219 log.info("Effective user: " + userName);
220 ac.setAuthorizedID(userName);
221 }
222 }
223 }
224 });
225 return saslFactory;
226 }
227 }
228
229
230
231
232 private static InetSocketAddress bindToPort(String bindValue, int listenPort)
233 throws UnknownHostException {
234 try {
235 if (bindValue == null) {
236 return new InetSocketAddress(listenPort);
237 } else {
238 return new InetSocketAddress(InetAddress.getByName(bindValue), listenPort);
239 }
240 } catch (UnknownHostException e) {
241 throw new RuntimeException("Could not bind to provided ip address", e);
242 }
243 }
244
245 private static TServer getTNonBlockingServer(TProtocolFactory protocolFactory, TProcessor processor,
246 TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException {
247 TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
248 log.info("starting HBase Nonblocking Thrift server on " + inetSocketAddress.toString());
249 TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport);
250 serverArgs.processor(processor);
251 serverArgs.transportFactory(transportFactory);
252 serverArgs.protocolFactory(protocolFactory);
253 return new TNonblockingServer(serverArgs);
254 }
255
256 private static TServer getTHsHaServer(TProtocolFactory protocolFactory,
257 TProcessor processor, TTransportFactory transportFactory,
258 int workerThreads,
259 InetSocketAddress inetSocketAddress, ThriftMetrics metrics)
260 throws TTransportException {
261 TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
262 log.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString());
263 THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
264 if (workerThreads > 0) {
265
266 serverArgs.minWorkerThreads(workerThreads).maxWorkerThreads(workerThreads);
267 }
268 ExecutorService executorService = createExecutor(
269 workerThreads, metrics);
270 serverArgs.executorService(executorService);
271 serverArgs.processor(processor);
272 serverArgs.transportFactory(transportFactory);
273 serverArgs.protocolFactory(protocolFactory);
274 return new THsHaServer(serverArgs);
275 }
276
277 private static TServer getTThreadedSelectorServer(TProtocolFactory protocolFactory,
278 TProcessor processor, TTransportFactory transportFactory,
279 int workerThreads, int selectorThreads,
280 InetSocketAddress inetSocketAddress, ThriftMetrics metrics)
281 throws TTransportException {
282 TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
283 log.info("starting HBase ThreadedSelector Thrift server on " + inetSocketAddress.toString());
284 TThreadedSelectorServer.Args serverArgs = new TThreadedSelectorServer.Args(serverTransport);
285 if (workerThreads > 0) {
286 serverArgs.workerThreads(workerThreads);
287 }
288 if (selectorThreads > 0) {
289 serverArgs.selectorThreads(selectorThreads);
290 }
291
292 ExecutorService executorService = createExecutor(
293 workerThreads, metrics);
294 serverArgs.executorService(executorService);
295 serverArgs.processor(processor);
296 serverArgs.transportFactory(transportFactory);
297 serverArgs.protocolFactory(protocolFactory);
298 return new TThreadedSelectorServer(serverArgs);
299 }
300
301 private static ExecutorService createExecutor(
302 int workerThreads, ThriftMetrics metrics) {
303 CallQueue callQueue = new CallQueue(
304 new LinkedBlockingQueue<Call>(), metrics);
305 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
306 tfb.setDaemon(true);
307 tfb.setNameFormat("thrift2-worker-%d");
308 ThreadPoolExecutor pool = new ThreadPoolExecutor(workerThreads, workerThreads,
309 Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
310 pool.prestartAllCoreThreads();
311 return pool;
312 }
313
314 private static TServer getTThreadPoolServer(TProtocolFactory protocolFactory,
315 TProcessor processor,
316 TTransportFactory transportFactory,
317 int workerThreads,
318 InetSocketAddress inetSocketAddress,
319 int backlog,
320 int clientTimeout)
321 throws TTransportException {
322 TServerTransport serverTransport = new TServerSocket(
323 new TServerSocket.ServerSocketTransportArgs().
324 bindAddr(inetSocketAddress).backlog(backlog).
325 clientTimeout(clientTimeout));
326 log.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString());
327 TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport);
328 serverArgs.processor(processor);
329 serverArgs.transportFactory(transportFactory);
330 serverArgs.protocolFactory(protocolFactory);
331 if (workerThreads > 0) {
332 serverArgs.maxWorkerThreads(workerThreads);
333 }
334 return new TThreadPoolServer(serverArgs);
335 }
336
337
338
339
340
341
342 protected static void registerFilters(Configuration conf) {
343 String[] filters = conf.getStrings("hbase.thrift.filters");
344 if(filters != null) {
345 for(String filterClass: filters) {
346 String[] filterPart = filterClass.split(":");
347 if(filterPart.length != 2) {
348 log.warn("Invalid filter specification " + filterClass + " - skipping");
349 } else {
350 ParseFilter.registerFilter(filterPart[0], filterPart[1]);
351 }
352 }
353 }
354 }
355
356
357
358
359
360
361 public static void main(String[] args) throws Exception {
362 TServer server = null;
363 Options options = getOptions();
364 Configuration conf = HBaseConfiguration.create();
365 CommandLine cmd = parseArguments(conf, options, args);
366 int workerThreads = 0;
367 int selectorThreads = 0;
368
369
370
371
372
373 List<?> argList = cmd.getArgList();
374 if (cmd.hasOption("help") || !argList.contains("start") || argList.contains("stop")) {
375 printUsage();
376 System.exit(1);
377 }
378
379
380 String bindAddress;
381 if (cmd.hasOption("bind")) {
382 bindAddress = cmd.getOptionValue("bind");
383 conf.set("hbase.thrift.info.bindAddress", bindAddress);
384 } else {
385 bindAddress = conf.get("hbase.thrift.info.bindAddress");
386 }
387
388
389 int readTimeout = THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT;
390 if (cmd.hasOption(READ_TIMEOUT_OPTION)) {
391 try {
392 readTimeout = Integer.parseInt(cmd.getOptionValue(READ_TIMEOUT_OPTION));
393 } catch (NumberFormatException e) {
394 throw new RuntimeException("Could not parse the value provided for the timeout option", e);
395 }
396 } else {
397 readTimeout = conf.getInt(THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY,
398 THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT);
399 }
400
401
402 int listenPort = 0;
403 try {
404 if (cmd.hasOption("port")) {
405 listenPort = Integer.parseInt(cmd.getOptionValue("port"));
406 } else {
407 listenPort = conf.getInt("hbase.regionserver.thrift.port", DEFAULT_LISTEN_PORT);
408 }
409 } catch (NumberFormatException e) {
410 throw new RuntimeException("Could not parse the value provided for the port option", e);
411 }
412
413
414 int backlog = conf.getInt(BACKLOG_CONF_KEY, 0);
415
416
417
418 String host = null;
419 String name = null;
420
421 UserProvider userProvider = UserProvider.instantiate(conf);
422
423 boolean securityEnabled = userProvider.isHadoopSecurityEnabled()
424 && userProvider.isHBaseSecurityEnabled();
425 if (securityEnabled) {
426 host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
427 conf.get("hbase.thrift.dns.interface", "default"),
428 conf.get("hbase.thrift.dns.nameserver", "default")));
429 userProvider.login("hbase.thrift.keytab.file",
430 "hbase.thrift.kerberos.principal", host);
431 }
432
433 UserGroupInformation realUser = userProvider.getCurrent().getUGI();
434 String stringQop = conf.get(THRIFT_QOP_KEY);
435 SaslUtil.QualityOfProtection qop = null;
436 if (stringQop != null) {
437 qop = SaslUtil.getQop(stringQop);
438 if (!securityEnabled) {
439 throw new IOException("Thrift server must"
440 + " run in secure mode to support authentication");
441 }
442
443 name = SecurityUtil.getUserFromPrincipal(
444 conf.get("hbase.thrift.kerberos.principal"));
445 }
446
447 boolean nonblocking = cmd.hasOption("nonblocking");
448 boolean hsha = cmd.hasOption("hsha");
449 boolean selector = cmd.hasOption("selector");
450
451 ThriftMetrics metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.TWO);
452
453 String implType = "threadpool";
454 if (nonblocking) {
455 implType = "nonblocking";
456 } else if (hsha) {
457 implType = "hsha";
458 } else if (selector) {
459 implType = "selector";
460 }
461
462 conf.set("hbase.regionserver.thrift.server.type", implType);
463 conf.setInt("hbase.regionserver.thrift.port", listenPort);
464 registerFilters(conf);
465
466
467 boolean compact = cmd.hasOption("compact") ||
468 conf.getBoolean("hbase.regionserver.thrift.compact", false);
469 TProtocolFactory protocolFactory = getTProtocolFactory(compact);
470 final ThriftHBaseServiceHandler hbaseHandler =
471 new ThriftHBaseServiceHandler(conf, userProvider);
472 THBaseService.Iface handler =
473 ThriftHBaseServiceHandler.newInstance(hbaseHandler, metrics);
474 final THBaseService.Processor p = new THBaseService.Processor(handler);
475 conf.setBoolean("hbase.regionserver.thrift.compact", compact);
476 TProcessor processor = p;
477
478 boolean framed = cmd.hasOption("framed") ||
479 conf.getBoolean("hbase.regionserver.thrift.framed", false) || nonblocking || hsha;
480 TTransportFactory transportFactory = getTTransportFactory(qop, name, host, framed,
481 conf.getInt("hbase.regionserver.thrift.framed.max_frame_size_in_mb", 2) * 1024 * 1024);
482 InetSocketAddress inetSocketAddress = bindToPort(bindAddress, listenPort);
483 conf.setBoolean("hbase.regionserver.thrift.framed", framed);
484 if (qop != null) {
485
486 processor = new TProcessor() {
487 @Override
488 public boolean process(TProtocol inProt,
489 TProtocol outProt) throws TException {
490 TSaslServerTransport saslServerTransport =
491 (TSaslServerTransport)inProt.getTransport();
492 SaslServer saslServer = saslServerTransport.getSaslServer();
493 String principal = saslServer.getAuthorizationID();
494 hbaseHandler.setEffectiveUser(principal);
495 return p.process(inProt, outProt);
496 }
497 };
498 }
499
500 if (cmd.hasOption("w")) {
501 workerThreads = Integer.parseInt(cmd.getOptionValue("w"));
502 }
503 if (cmd.hasOption("s")) {
504 selectorThreads = Integer.parseInt(cmd.getOptionValue("s"));
505 }
506
507
508 try {
509 if (cmd.hasOption("infoport")) {
510 String val = cmd.getOptionValue("infoport");
511 conf.setInt("hbase.thrift.info.port", Integer.parseInt(val));
512 log.debug("Web UI port set to " + val);
513 }
514 } catch (NumberFormatException e) {
515 log.error("Could not parse the value provided for the infoport option", e);
516 printUsage();
517 System.exit(1);
518 }
519
520
521 int port = conf.getInt("hbase.thrift.info.port", 9095);
522 if (port >= 0) {
523 conf.setLong("startcode", System.currentTimeMillis());
524 String a = conf.get("hbase.thrift.info.bindAddress", "0.0.0.0");
525 InfoServer infoServer = new InfoServer("thrift", a, port, false, conf);
526 infoServer.setAttribute("hbase.conf", conf);
527 infoServer.start();
528 }
529
530 if (nonblocking) {
531 server = getTNonBlockingServer(protocolFactory,
532 processor,
533 transportFactory,
534 inetSocketAddress);
535 } else if (hsha) {
536 server = getTHsHaServer(protocolFactory,
537 processor,
538 transportFactory,
539 workerThreads,
540 inetSocketAddress,
541 metrics);
542 } else if (selector) {
543 server = getTThreadedSelectorServer(protocolFactory,
544 processor,
545 transportFactory,
546 workerThreads, selectorThreads,
547 inetSocketAddress,
548 metrics);
549 } else {
550 server = getTThreadPoolServer(protocolFactory,
551 processor,
552 transportFactory,
553 workerThreads,
554 inetSocketAddress,
555 backlog,
556 readTimeout);
557 }
558
559 final TServer tserver = server;
560 realUser.doAs(
561 new PrivilegedAction<Object>() {
562 @Override
563 public Object run() {
564 tserver.serve();
565 return null;
566 }
567 });
568 }
569 }