1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.thrift;
20
21 import static org.apache.hadoop.hbase.util.Bytes.getBytes;
22
23 import java.io.IOException;
24 import java.net.InetAddress;
25 import java.net.InetSocketAddress;
26 import java.net.UnknownHostException;
27 import java.nio.ByteBuffer;
28 import java.security.PrivilegedAction;
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.Collections;
32 import java.util.HashMap;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.TreeMap;
36 import java.util.concurrent.BlockingQueue;
37 import java.util.concurrent.ExecutorService;
38 import java.util.concurrent.LinkedBlockingQueue;
39 import java.util.concurrent.ThreadPoolExecutor;
40 import java.util.concurrent.TimeUnit;
41
42 import javax.security.auth.callback.Callback;
43 import javax.security.auth.callback.UnsupportedCallbackException;
44 import javax.security.sasl.AuthorizeCallback;
45 import javax.security.sasl.Sasl;
46 import javax.security.sasl.SaslServer;
47
48 import org.apache.commons.cli.CommandLine;
49 import org.apache.commons.cli.Option;
50 import org.apache.commons.cli.OptionGroup;
51 import org.apache.commons.logging.Log;
52 import org.apache.commons.logging.LogFactory;
53 import org.apache.hadoop.conf.Configuration;
54 import org.apache.hadoop.hbase.HBaseConfiguration;
55 import org.apache.hadoop.hbase.HColumnDescriptor;
56 import org.apache.hadoop.hbase.HConstants;
57 import org.apache.hadoop.hbase.HRegionInfo;
58 import org.apache.hadoop.hbase.HRegionLocation;
59 import org.apache.hadoop.hbase.HTableDescriptor;
60 import org.apache.hadoop.hbase.KeyValue;
61 import org.apache.hadoop.hbase.ServerName;
62 import org.apache.hadoop.hbase.TableName;
63 import org.apache.hadoop.hbase.TableNotFoundException;
64 import org.apache.hadoop.hbase.classification.InterfaceAudience;
65 import org.apache.hadoop.hbase.client.Admin;
66 import org.apache.hadoop.hbase.client.Append;
67 import org.apache.hadoop.hbase.client.Delete;
68 import org.apache.hadoop.hbase.client.Durability;
69 import org.apache.hadoop.hbase.client.Get;
70 import org.apache.hadoop.hbase.client.HBaseAdmin;
71 import org.apache.hadoop.hbase.client.Increment;
72 import org.apache.hadoop.hbase.client.OperationWithAttributes;
73 import org.apache.hadoop.hbase.client.Put;
74 import org.apache.hadoop.hbase.client.RegionLocator;
75 import org.apache.hadoop.hbase.client.Result;
76 import org.apache.hadoop.hbase.client.ResultScanner;
77 import org.apache.hadoop.hbase.client.Scan;
78 import org.apache.hadoop.hbase.client.Table;
79 import org.apache.hadoop.hbase.filter.Filter;
80 import org.apache.hadoop.hbase.filter.ParseFilter;
81 import org.apache.hadoop.hbase.filter.PrefixFilter;
82 import org.apache.hadoop.hbase.filter.WhileMatchFilter;
83 import org.apache.hadoop.hbase.jetty.SslSelectChannelConnectorSecure;
84 import org.apache.hadoop.hbase.security.SaslUtil;
85 import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
86 import org.apache.hadoop.hbase.security.SecurityUtil;
87 import org.apache.hadoop.hbase.security.UserProvider;
88 import org.apache.hadoop.hbase.thrift.CallQueue.Call;
89 import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
90 import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
91 import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
92 import org.apache.hadoop.hbase.thrift.generated.Hbase;
93 import org.apache.hadoop.hbase.thrift.generated.IOError;
94 import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
95 import org.apache.hadoop.hbase.thrift.generated.Mutation;
96 import org.apache.hadoop.hbase.thrift.generated.TAppend;
97 import org.apache.hadoop.hbase.thrift.generated.TCell;
98 import org.apache.hadoop.hbase.thrift.generated.TIncrement;
99 import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
100 import org.apache.hadoop.hbase.thrift.generated.TRowResult;
101 import org.apache.hadoop.hbase.thrift.generated.TScan;
102 import org.apache.hadoop.hbase.util.Bytes;
103 import org.apache.hadoop.hbase.util.ConnectionCache;
104 import org.apache.hadoop.hbase.util.DNS;
105 import org.apache.hadoop.hbase.util.Strings;
106 import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
107 import org.apache.hadoop.security.UserGroupInformation;
108 import org.apache.hadoop.security.authorize.ProxyUsers;
109 import org.apache.thrift.TException;
110 import org.apache.thrift.TProcessor;
111 import org.apache.thrift.protocol.TBinaryProtocol;
112 import org.apache.thrift.protocol.TCompactProtocol;
113 import org.apache.thrift.protocol.TProtocol;
114 import org.apache.thrift.protocol.TProtocolFactory;
115 import org.apache.thrift.server.THsHaServer;
116 import org.apache.thrift.server.TNonblockingServer;
117 import org.apache.thrift.server.TServer;
118 import org.apache.thrift.server.TServlet;
119 import org.apache.thrift.server.TThreadedSelectorServer;
120 import org.apache.thrift.transport.TFramedTransport;
121 import org.apache.thrift.transport.TNonblockingServerSocket;
122 import org.apache.thrift.transport.TNonblockingServerTransport;
123 import org.apache.thrift.transport.TSaslServerTransport;
124 import org.apache.thrift.transport.TServerSocket;
125 import org.apache.thrift.transport.TServerTransport;
126 import org.apache.thrift.transport.TTransportFactory;
127 import org.mortbay.jetty.Connector;
128 import org.mortbay.jetty.Server;
129 import org.mortbay.jetty.nio.SelectChannelConnector;
130 import org.mortbay.jetty.servlet.Context;
131 import org.mortbay.jetty.servlet.ServletHolder;
132 import org.mortbay.thread.QueuedThreadPool;
133
134 import com.google.common.base.Joiner;
135 import com.google.common.base.Throwables;
136 import com.google.common.util.concurrent.ThreadFactoryBuilder;
137
138
139
140
141
142 @InterfaceAudience.Private
143 public class ThriftServerRunner implements Runnable {
144
145 private static final Log LOG = LogFactory.getLog(ThriftServerRunner.class);
146
147 static final String SERVER_TYPE_CONF_KEY =
148 "hbase.regionserver.thrift.server.type";
149
150 static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress";
151 static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
152 static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
153 static final String MAX_FRAME_SIZE_CONF_KEY = "hbase.regionserver.thrift.framed.max_frame_size_in_mb";
154 static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
155 static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
156 static final String USE_HTTP_CONF_KEY = "hbase.regionserver.thrift.http";
157 static final String HTTP_MIN_THREADS = "hbase.thrift.http_threads.min";
158 static final String HTTP_MAX_THREADS = "hbase.thrift.http_threads.max";
159
160 static final String THRIFT_SSL_ENABLED = "hbase.thrift.ssl.enabled";
161 static final String THRIFT_SSL_KEYSTORE_STORE = "hbase.thrift.ssl.keystore.store";
162 static final String THRIFT_SSL_KEYSTORE_PASSWORD = "hbase.thrift.ssl.keystore.password";
163 static final String THRIFT_SSL_KEYSTORE_KEYPASSWORD = "hbase.thrift.ssl.keystore.keypassword";
164
165
166
167
168
169
170 public static final String THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY =
171 "hbase.thrift.server.socket.read.timeout";
172 public static final int THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT = 60000;
173
174
175
176
177
178
179
180
181
182
183
184 static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
185 static final String BACKLOG_CONF_KEY = "hbase.regionserver.thrift.backlog";
186
187 private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
188 public static final int DEFAULT_LISTEN_PORT = 9090;
189 public static final int HREGION_VERSION = 1;
190 static final String THRIFT_SUPPORT_PROXYUSER = "hbase.thrift.support.proxyuser";
191 private final int listenPort;
192
193 private Configuration conf;
194 volatile TServer tserver;
195 volatile Server httpServer;
196 private final Hbase.Iface handler;
197 private final ThriftMetrics metrics;
198 private final HBaseHandler hbaseHandler;
199 private final UserGroupInformation realUser;
200
201 private SaslUtil.QualityOfProtection qop;
202 private String host;
203
204 private final boolean securityEnabled;
205 private final boolean doAsEnabled;
206
207
208 enum ImplType {
209 HS_HA("hsha", true, THsHaServer.class, true),
210 NONBLOCKING("nonblocking", true, TNonblockingServer.class, true),
211 THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true),
212 THREADED_SELECTOR(
213 "threadedselector", true, TThreadedSelectorServer.class, true);
214
215 public static final ImplType DEFAULT = THREAD_POOL;
216
217 final String option;
218 final boolean isAlwaysFramed;
219 final Class<? extends TServer> serverClass;
220 final boolean canSpecifyBindIP;
221
222 ImplType(String option, boolean isAlwaysFramed,
223 Class<? extends TServer> serverClass, boolean canSpecifyBindIP) {
224 this.option = option;
225 this.isAlwaysFramed = isAlwaysFramed;
226 this.serverClass = serverClass;
227 this.canSpecifyBindIP = canSpecifyBindIP;
228 }
229
230
231
232
233
234 @Override
235 public String toString() {
236 return "-" + option;
237 }
238
239 String getDescription() {
240 StringBuilder sb = new StringBuilder("Use the " +
241 serverClass.getSimpleName());
242 if (isAlwaysFramed) {
243 sb.append(" This implies the framed transport.");
244 }
245 if (this == DEFAULT) {
246 sb.append("This is the default.");
247 }
248 return sb.toString();
249 }
250
251 static OptionGroup createOptionGroup() {
252 OptionGroup group = new OptionGroup();
253 for (ImplType t : values()) {
254 group.addOption(new Option(t.option, t.getDescription()));
255 }
256 return group;
257 }
258
259 static ImplType getServerImpl(Configuration conf) {
260 String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option);
261 for (ImplType t : values()) {
262 if (confType.equals(t.option)) {
263 return t;
264 }
265 }
266 throw new AssertionError("Unknown server ImplType.option:" + confType);
267 }
268
269 static void setServerImpl(CommandLine cmd, Configuration conf) {
270 ImplType chosenType = null;
271 int numChosen = 0;
272 for (ImplType t : values()) {
273 if (cmd.hasOption(t.option)) {
274 chosenType = t;
275 ++numChosen;
276 }
277 }
278 if (numChosen < 1) {
279 LOG.info("Using default thrift server type");
280 chosenType = DEFAULT;
281 } else if (numChosen > 1) {
282 throw new AssertionError("Exactly one option out of " +
283 Arrays.toString(values()) + " has to be specified");
284 }
285 LOG.info("Using thrift server type " + chosenType.option);
286 conf.set(SERVER_TYPE_CONF_KEY, chosenType.option);
287 }
288
289 public String simpleClassName() {
290 return serverClass.getSimpleName();
291 }
292
293 public static List<String> serversThatCannotSpecifyBindIP() {
294 List<String> l = new ArrayList<String>();
295 for (ImplType t : values()) {
296 if (!t.canSpecifyBindIP) {
297 l.add(t.simpleClassName());
298 }
299 }
300 return l;
301 }
302
303 }
304
305 public ThriftServerRunner(Configuration conf) throws IOException {
306 UserProvider userProvider = UserProvider.instantiate(conf);
307
308 securityEnabled = userProvider.isHadoopSecurityEnabled()
309 && userProvider.isHBaseSecurityEnabled();
310 if (securityEnabled) {
311 host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
312 conf.get("hbase.thrift.dns.interface", "default"),
313 conf.get("hbase.thrift.dns.nameserver", "default")));
314 userProvider.login("hbase.thrift.keytab.file",
315 "hbase.thrift.kerberos.principal", host);
316 }
317 this.conf = HBaseConfiguration.create(conf);
318 this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
319 this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
320 this.hbaseHandler = new HBaseHandler(conf, userProvider);
321 this.hbaseHandler.initMetrics(metrics);
322 this.handler = HbaseHandlerMetricsProxy.newInstance(
323 hbaseHandler, metrics, conf);
324 this.realUser = userProvider.getCurrent().getUGI();
325 String strQop = conf.get(THRIFT_QOP_KEY);
326 if (strQop != null) {
327 this.qop = SaslUtil.getQop(strQop);
328 }
329 doAsEnabled = conf.getBoolean(THRIFT_SUPPORT_PROXYUSER, false);
330 if (doAsEnabled) {
331 if (!conf.getBoolean(USE_HTTP_CONF_KEY, false)) {
332 LOG.warn("Fail to enable the doAs feature. hbase.regionserver.thrift.http is not configured ");
333 }
334 }
335 if (qop != null) {
336 if (qop != SaslUtil.QualityOfProtection.AUTHENTICATION &&
337 qop != SaslUtil.QualityOfProtection.INTEGRITY &&
338 qop != SaslUtil.QualityOfProtection.PRIVACY) {
339 throw new IOException(String.format("Invalide %s: It must be one of %s, %s, or %s.",
340 THRIFT_QOP_KEY,
341 QualityOfProtection.AUTHENTICATION.name(),
342 QualityOfProtection.INTEGRITY.name(),
343 QualityOfProtection.PRIVACY.name()));
344 }
345 checkHttpSecurity(qop, conf);
346 if (!securityEnabled) {
347 throw new IOException("Thrift server must"
348 + " run in secure mode to support authentication");
349 }
350 }
351 }
352
353 private void checkHttpSecurity(QualityOfProtection qop, Configuration conf) {
354 if (qop == QualityOfProtection.PRIVACY &&
355 conf.getBoolean(USE_HTTP_CONF_KEY, false) &&
356 !conf.getBoolean(THRIFT_SSL_ENABLED, false)) {
357 throw new IllegalArgumentException("Thrift HTTP Server's QoP is privacy, but " +
358 THRIFT_SSL_ENABLED + " is false");
359 }
360 }
361
362
363
364
365 @Override
366 public void run() {
367 realUser.doAs(new PrivilegedAction<Object>() {
368 @Override
369 public Object run() {
370 try {
371 if (conf.getBoolean(USE_HTTP_CONF_KEY, false)) {
372 setupHTTPServer();
373 httpServer.start();
374 httpServer.join();
375 } else {
376 setupServer();
377 tserver.serve();
378 }
379 } catch (Exception e) {
380 LOG.fatal("Cannot run ThriftServer", e);
381
382 System.exit(-1);
383 }
384 return null;
385 }
386 });
387
388 }
389
390 public void shutdown() {
391 if (tserver != null) {
392 tserver.stop();
393 tserver = null;
394 }
395 if (httpServer != null) {
396 try {
397 httpServer.stop();
398 httpServer = null;
399 } catch (Exception e) {
400 LOG.error("Problem encountered in shutting down HTTP server " + e.getCause());
401 }
402 httpServer = null;
403 }
404 }
405
406 private void setupHTTPServer() throws IOException {
407 TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
408 TProcessor processor = new Hbase.Processor<Hbase.Iface>(handler);
409 TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, realUser,
410 conf, hbaseHandler, securityEnabled, doAsEnabled);
411
412 httpServer = new Server();
413
414 Context context = new Context(httpServer, "/", Context.SESSIONS);
415 context.setContextPath("/");
416 String httpPath = "/*";
417 httpServer.setHandler(context);
418 context.addServlet(new ServletHolder(thriftHttpServlet), httpPath);
419
420
421 Connector connector = new SelectChannelConnector();
422 if(conf.getBoolean(THRIFT_SSL_ENABLED, false)) {
423 SslSelectChannelConnectorSecure sslConnector = new SslSelectChannelConnectorSecure();
424 String keystore = conf.get(THRIFT_SSL_KEYSTORE_STORE);
425 String password = HBaseConfiguration.getPassword(conf,
426 THRIFT_SSL_KEYSTORE_PASSWORD, null);
427 String keyPassword = HBaseConfiguration.getPassword(conf,
428 THRIFT_SSL_KEYSTORE_KEYPASSWORD, password);
429 sslConnector.setKeystore(keystore);
430 sslConnector.setPassword(password);
431 sslConnector.setKeyPassword(keyPassword);
432 connector = sslConnector;
433 }
434 String host = getBindAddress(conf).getHostAddress();
435 connector.setPort(listenPort);
436 connector.setHost(host);
437 connector.setHeaderBufferSize(1024 * 64);
438 httpServer.addConnector(connector);
439
440 if (doAsEnabled) {
441 ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
442 }
443
444
445
446
447
448
449 int minThreads = conf.getInt(HTTP_MIN_THREADS, 2);
450 int maxThreads = conf.getInt(HTTP_MAX_THREADS, 100);
451 QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads);
452 threadPool.setMinThreads(minThreads);
453 httpServer.setThreadPool(threadPool);
454
455 httpServer.setSendServerVersion(false);
456 httpServer.setSendDateHeader(false);
457 httpServer.setStopAtShutdown(true);
458
459 LOG.info("Starting Thrift HTTP Server on " + Integer.toString(listenPort));
460 }
461
462
463
464
465 private void setupServer() throws Exception {
466
467 TProtocolFactory protocolFactory;
468 if (conf.getBoolean(COMPACT_CONF_KEY, false)) {
469 LOG.debug("Using compact protocol");
470 protocolFactory = new TCompactProtocol.Factory();
471 } else {
472 LOG.debug("Using binary protocol");
473 protocolFactory = new TBinaryProtocol.Factory();
474 }
475
476 final TProcessor p = new Hbase.Processor<Hbase.Iface>(handler);
477 ImplType implType = ImplType.getServerImpl(conf);
478 TProcessor processor = p;
479
480
481 TTransportFactory transportFactory;
482 if (conf.getBoolean(FRAMED_CONF_KEY, false) || implType.isAlwaysFramed) {
483 if (qop != null) {
484 throw new RuntimeException("Thrift server authentication"
485 + " doesn't work with framed transport yet");
486 }
487 transportFactory = new TFramedTransport.Factory(
488 conf.getInt(MAX_FRAME_SIZE_CONF_KEY, 2) * 1024 * 1024);
489 LOG.debug("Using framed transport");
490 } else if (qop == null) {
491 transportFactory = new TTransportFactory();
492 } else {
493
494 String name = SecurityUtil.getUserFromPrincipal(
495 conf.get("hbase.thrift.kerberos.principal"));
496 Map<String, String> saslProperties = new HashMap<String, String>();
497 saslProperties.put(Sasl.QOP, qop.getSaslQop());
498 saslProperties.put(Sasl.SERVER_AUTH, "true");
499 TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
500 saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
501 new SaslGssCallbackHandler() {
502 @Override
503 public void handle(Callback[] callbacks)
504 throws UnsupportedCallbackException {
505 AuthorizeCallback ac = null;
506 for (Callback callback : callbacks) {
507 if (callback instanceof AuthorizeCallback) {
508 ac = (AuthorizeCallback) callback;
509 } else {
510 throw new UnsupportedCallbackException(callback,
511 "Unrecognized SASL GSSAPI Callback");
512 }
513 }
514 if (ac != null) {
515 String authid = ac.getAuthenticationID();
516 String authzid = ac.getAuthorizationID();
517 if (!authid.equals(authzid)) {
518 ac.setAuthorized(false);
519 } else {
520 ac.setAuthorized(true);
521 String userName = SecurityUtil.getUserFromPrincipal(authzid);
522 LOG.info("Effective user: " + userName);
523 ac.setAuthorizedID(userName);
524 }
525 }
526 }
527 });
528 transportFactory = saslFactory;
529
530
531 processor = new TProcessor() {
532 @Override
533 public boolean process(TProtocol inProt,
534 TProtocol outProt) throws TException {
535 TSaslServerTransport saslServerTransport =
536 (TSaslServerTransport)inProt.getTransport();
537 SaslServer saslServer = saslServerTransport.getSaslServer();
538 String principal = saslServer.getAuthorizationID();
539 hbaseHandler.setEffectiveUser(principal);
540 return p.process(inProt, outProt);
541 }
542 };
543 }
544
545 if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
546 LOG.error("Server types " + Joiner.on(", ").join(
547 ImplType.serversThatCannotSpecifyBindIP()) + " don't support IP " +
548 "address binding at the moment. See " +
549 "https://issues.apache.org/jira/browse/HBASE-2155 for details.");
550 throw new RuntimeException(
551 "-" + BIND_CONF_KEY + " not supported with " + implType);
552 }
553
554
555 int backlog = conf.getInt(BACKLOG_CONF_KEY, 0);
556
557 if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING ||
558 implType == ImplType.THREADED_SELECTOR) {
559 InetAddress listenAddress = getBindAddress(conf);
560 TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(
561 new InetSocketAddress(listenAddress, listenPort));
562
563 if (implType == ImplType.NONBLOCKING) {
564 TNonblockingServer.Args serverArgs =
565 new TNonblockingServer.Args(serverTransport);
566 serverArgs.processor(processor)
567 .transportFactory(transportFactory)
568 .protocolFactory(protocolFactory);
569 tserver = new TNonblockingServer(serverArgs);
570 } else if (implType == ImplType.HS_HA) {
571 THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
572 CallQueue callQueue =
573 new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
574 ExecutorService executorService = createExecutor(
575 callQueue, serverArgs.getMinWorkerThreads(), serverArgs.getMaxWorkerThreads());
576 serverArgs.executorService(executorService)
577 .processor(processor)
578 .transportFactory(transportFactory)
579 .protocolFactory(protocolFactory);
580 tserver = new THsHaServer(serverArgs);
581 } else {
582 TThreadedSelectorServer.Args serverArgs =
583 new HThreadedSelectorServerArgs(serverTransport, conf);
584 CallQueue callQueue =
585 new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
586 ExecutorService executorService = createExecutor(
587 callQueue, serverArgs.getWorkerThreads(), serverArgs.getWorkerThreads());
588 serverArgs.executorService(executorService)
589 .processor(processor)
590 .transportFactory(transportFactory)
591 .protocolFactory(protocolFactory);
592 tserver = new TThreadedSelectorServer(serverArgs);
593 }
594 LOG.info("starting HBase " + implType.simpleClassName() +
595 " server on " + Integer.toString(listenPort));
596 } else if (implType == ImplType.THREAD_POOL) {
597
598 InetAddress listenAddress = getBindAddress(conf);
599 int readTimeout = conf.getInt(THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY,
600 THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT);
601 TServerTransport serverTransport = new TServerSocket(
602 new TServerSocket.ServerSocketTransportArgs().
603 bindAddr(new InetSocketAddress(listenAddress, listenPort)).
604 backlog(backlog).
605 clientTimeout(readTimeout));
606
607 TBoundedThreadPoolServer.Args serverArgs =
608 new TBoundedThreadPoolServer.Args(serverTransport, conf);
609 serverArgs.processor(processor)
610 .transportFactory(transportFactory)
611 .protocolFactory(protocolFactory);
612 LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
613 + listenAddress + ":" + Integer.toString(listenPort)
614 + " with readTimeout " + readTimeout + "ms; " + serverArgs);
615 TBoundedThreadPoolServer tserver =
616 new TBoundedThreadPoolServer(serverArgs, metrics);
617 this.tserver = tserver;
618 } else {
619 throw new AssertionError("Unsupported Thrift server implementation: " +
620 implType.simpleClassName());
621 }
622
623
624 if (tserver.getClass() != implType.serverClass) {
625 throw new AssertionError("Expected to create Thrift server class " +
626 implType.serverClass.getName() + " but got " +
627 tserver.getClass().getName());
628 }
629
630
631
632 registerFilters(conf);
633 }
634
635 ExecutorService createExecutor(BlockingQueue<Runnable> callQueue,
636 int minWorkers, int maxWorkers) {
637 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
638 tfb.setDaemon(true);
639 tfb.setNameFormat("thrift-worker-%d");
640 return new ThreadPoolExecutor(minWorkers, maxWorkers,
641 Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
642 }
643
644 private InetAddress getBindAddress(Configuration conf)
645 throws UnknownHostException {
646 String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
647 return InetAddress.getByName(bindAddressStr);
648 }
649
650 protected static class ResultScannerWrapper {
651
652 private final ResultScanner scanner;
653 private final boolean sortColumns;
654 public ResultScannerWrapper(ResultScanner resultScanner,
655 boolean sortResultColumns) {
656 scanner = resultScanner;
657 sortColumns = sortResultColumns;
658 }
659
660 public ResultScanner getScanner() {
661 return scanner;
662 }
663
664 public boolean isColumnSorted() {
665 return sortColumns;
666 }
667 }
668
669
670
671
672
673 public static class HBaseHandler implements Hbase.Iface {
674 protected Configuration conf;
675 protected static final Log LOG = LogFactory.getLog(HBaseHandler.class);
676
677
678 protected int nextScannerId = 0;
679 protected HashMap<Integer, ResultScannerWrapper> scannerMap = null;
680 private ThriftMetrics metrics = null;
681
682 private final ConnectionCache connectionCache;
683 IncrementCoalescer coalescer = null;
684
685 static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
686 static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
687
688
689
690
691
692
693
694 byte[][] getAllColumns(Table table) throws IOException {
695 HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies();
696 byte[][] columns = new byte[cds.length][];
697 for (int i = 0; i < cds.length; i++) {
698 columns[i] = Bytes.add(cds[i].getName(),
699 KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
700 }
701 return columns;
702 }
703
704
705
706
707
708
709
710
711
712 public Table getTable(final byte[] tableName) throws
713 IOException {
714 String table = Bytes.toString(tableName);
715 return connectionCache.getTable(table);
716 }
717
718 public Table getTable(final ByteBuffer tableName) throws IOException {
719 return getTable(getBytes(tableName));
720 }
721
722
723
724
725
726
727
728
729 protected synchronized int addScanner(ResultScanner scanner,boolean sortColumns) {
730 int id = nextScannerId++;
731 ResultScannerWrapper resultScannerWrapper = new ResultScannerWrapper(scanner, sortColumns);
732 scannerMap.put(id, resultScannerWrapper);
733 return id;
734 }
735
736
737
738
739
740
741
742 protected synchronized ResultScannerWrapper getScanner(int id) {
743 return scannerMap.get(id);
744 }
745
746
747
748
749
750
751
752
753 protected synchronized ResultScannerWrapper removeScanner(int id) {
754 return scannerMap.remove(id);
755 }
756
757 protected HBaseHandler(final Configuration c,
758 final UserProvider userProvider) throws IOException {
759 this.conf = c;
760 scannerMap = new HashMap<Integer, ResultScannerWrapper>();
761 this.coalescer = new IncrementCoalescer(this);
762
763 int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
764 int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
765 connectionCache = new ConnectionCache(
766 conf, userProvider, cleanInterval, maxIdleTime);
767 }
768
769
770
771
772 private Admin getAdmin() throws IOException {
773 return connectionCache.getAdmin();
774 }
775
776 void setEffectiveUser(String effectiveUser) {
777 connectionCache.setEffectiveUser(effectiveUser);
778 }
779
780 @Override
781 public void enableTable(ByteBuffer tableName) throws IOError {
782 try{
783 getAdmin().enableTable(getTableName(tableName));
784 } catch (IOException e) {
785 LOG.warn(e.getMessage(), e);
786 throw new IOError(Throwables.getStackTraceAsString(e));
787 }
788 }
789
790 @Override
791 public void disableTable(ByteBuffer tableName) throws IOError{
792 try{
793 getAdmin().disableTable(getTableName(tableName));
794 } catch (IOException e) {
795 LOG.warn(e.getMessage(), e);
796 throw new IOError(Throwables.getStackTraceAsString(e));
797 }
798 }
799
800 @Override
801 public boolean isTableEnabled(ByteBuffer tableName) throws IOError {
802 try {
803 return this.connectionCache.getAdmin().isTableEnabled(getTableName(tableName));
804 } catch (IOException e) {
805 LOG.warn(e.getMessage(), e);
806 throw new IOError(Throwables.getStackTraceAsString(e));
807 }
808 }
809
810 @Override
811 public void compact(ByteBuffer tableNameOrRegionName) throws IOError {
812 try {
813
814
815
816 ((HBaseAdmin) getAdmin()).compact(getBytes(tableNameOrRegionName));
817 } catch (IOException e) {
818 LOG.warn(e.getMessage(), e);
819 throw new IOError(Throwables.getStackTraceAsString(e));
820 }
821 }
822
823 @Override
824 public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError {
825 try {
826
827
828
829 ((HBaseAdmin) getAdmin()).majorCompact(getBytes(tableNameOrRegionName));
830 } catch (IOException e) {
831 LOG.warn(e.getMessage(), e);
832 throw new IOError(Throwables.getStackTraceAsString(e));
833 }
834 }
835
836 @Override
837 public List<ByteBuffer> getTableNames() throws IOError {
838 try {
839 TableName[] tableNames = this.getAdmin().listTableNames();
840 ArrayList<ByteBuffer> list = new ArrayList<ByteBuffer>(tableNames.length);
841 for (int i = 0; i < tableNames.length; i++) {
842 list.add(ByteBuffer.wrap(tableNames[i].getName()));
843 }
844 return list;
845 } catch (IOException e) {
846 LOG.warn(e.getMessage(), e);
847 throw new IOError(Throwables.getStackTraceAsString(e));
848 }
849 }
850
851
852
853
854 @Override
855 public List<TRegionInfo> getTableRegions(ByteBuffer tableName)
856 throws IOError {
857 try (RegionLocator locator = connectionCache.getRegionLocator(getBytes(tableName))) {
858 List<HRegionLocation> regionLocations = locator.getAllRegionLocations();
859 List<TRegionInfo> results = new ArrayList<TRegionInfo>();
860 for (HRegionLocation regionLocation : regionLocations) {
861 HRegionInfo info = regionLocation.getRegionInfo();
862 ServerName serverName = regionLocation.getServerName();
863 TRegionInfo region = new TRegionInfo();
864 region.serverName = ByteBuffer.wrap(
865 Bytes.toBytes(serverName.getHostname()));
866 region.port = serverName.getPort();
867 region.startKey = ByteBuffer.wrap(info.getStartKey());
868 region.endKey = ByteBuffer.wrap(info.getEndKey());
869 region.id = info.getRegionId();
870 region.name = ByteBuffer.wrap(info.getRegionName());
871 region.version = info.getVersion();
872 results.add(region);
873 }
874 return results;
875 } catch (TableNotFoundException e) {
876
877 return Collections.emptyList();
878 } catch (IOException e){
879 LOG.warn(e.getMessage(), e);
880 throw new IOError(Throwables.getStackTraceAsString(e));
881 }
882 }
883
884 @Override
885 public List<TCell> get(
886 ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
887 Map<ByteBuffer, ByteBuffer> attributes)
888 throws IOError {
889 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
890 if (famAndQf.length == 1) {
891 return get(tableName, row, famAndQf[0], null, attributes);
892 }
893 if (famAndQf.length == 2) {
894 return get(tableName, row, famAndQf[0], famAndQf[1], attributes);
895 }
896 throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
897 }
898
899
900
901
902
903
904
905
906 protected List<TCell> get(ByteBuffer tableName,
907 ByteBuffer row,
908 byte[] family,
909 byte[] qualifier,
910 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
911 Table table = null;
912 try {
913 table = getTable(tableName);
914 Get get = new Get(getBytes(row));
915 addAttributes(get, attributes);
916 if (qualifier == null) {
917 get.addFamily(family);
918 } else {
919 get.addColumn(family, qualifier);
920 }
921 Result result = table.get(get);
922 return ThriftUtilities.cellFromHBase(result.rawCells());
923 } catch (IOException e) {
924 LOG.warn(e.getMessage(), e);
925 throw new IOError(Throwables.getStackTraceAsString(e));
926 } finally {
927 closeTable(table);
928 }
929 }
930
931 @Override
932 public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
933 int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
934 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
935 if(famAndQf.length == 1) {
936 return getVer(tableName, row, famAndQf[0], null, numVersions, attributes);
937 }
938 if (famAndQf.length == 2) {
939 return getVer(tableName, row, famAndQf[0], famAndQf[1], numVersions, attributes);
940 }
941 throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
942
943 }
944
945
946
947
948
949
950
951
952
953 public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family,
954 byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
955
956 Table table = null;
957 try {
958 table = getTable(tableName);
959 Get get = new Get(getBytes(row));
960 addAttributes(get, attributes);
961 if (null == qualifier) {
962 get.addFamily(family);
963 } else {
964 get.addColumn(family, qualifier);
965 }
966 get.setMaxVersions(numVersions);
967 Result result = table.get(get);
968 return ThriftUtilities.cellFromHBase(result.rawCells());
969 } catch (IOException e) {
970 LOG.warn(e.getMessage(), e);
971 throw new IOError(Throwables.getStackTraceAsString(e));
972 } finally{
973 closeTable(table);
974 }
975 }
976
977 @Override
978 public List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
979 long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
980 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
981 if (famAndQf.length == 1) {
982 return getVerTs(tableName, row, famAndQf[0], null, timestamp, numVersions, attributes);
983 }
984 if (famAndQf.length == 2) {
985 return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, numVersions,
986 attributes);
987 }
988 throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
989 }
990
991
992
993
994
995
996
997
998 protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
999 byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
1000 throws IOError {
1001
1002 Table table = null;
1003 try {
1004 table = getTable(tableName);
1005 Get get = new Get(getBytes(row));
1006 addAttributes(get, attributes);
1007 if (null == qualifier) {
1008 get.addFamily(family);
1009 } else {
1010 get.addColumn(family, qualifier);
1011 }
1012 get.setTimeRange(0, timestamp);
1013 get.setMaxVersions(numVersions);
1014 Result result = table.get(get);
1015 return ThriftUtilities.cellFromHBase(result.rawCells());
1016 } catch (IOException e) {
1017 LOG.warn(e.getMessage(), e);
1018 throw new IOError(Throwables.getStackTraceAsString(e));
1019 } finally{
1020 closeTable(table);
1021 }
1022 }
1023
1024 @Override
1025 public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row,
1026 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1027 return getRowWithColumnsTs(tableName, row, null,
1028 HConstants.LATEST_TIMESTAMP,
1029 attributes);
1030 }
1031
1032 @Override
1033 public List<TRowResult> getRowWithColumns(ByteBuffer tableName,
1034 ByteBuffer row,
1035 List<ByteBuffer> columns,
1036 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1037 return getRowWithColumnsTs(tableName, row, columns,
1038 HConstants.LATEST_TIMESTAMP,
1039 attributes);
1040 }
1041
1042 @Override
1043 public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row,
1044 long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1045 return getRowWithColumnsTs(tableName, row, null,
1046 timestamp, attributes);
1047 }
1048
1049 @Override
1050 public List<TRowResult> getRowWithColumnsTs(
1051 ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
1052 long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1053
1054 Table table = null;
1055 try {
1056 table = getTable(tableName);
1057 if (columns == null) {
1058 Get get = new Get(getBytes(row));
1059 addAttributes(get, attributes);
1060 get.setTimeRange(0, timestamp);
1061 Result result = table.get(get);
1062 return ThriftUtilities.rowResultFromHBase(result);
1063 }
1064 Get get = new Get(getBytes(row));
1065 addAttributes(get, attributes);
1066 for(ByteBuffer column : columns) {
1067 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1068 if (famAndQf.length == 1) {
1069 get.addFamily(famAndQf[0]);
1070 } else {
1071 get.addColumn(famAndQf[0], famAndQf[1]);
1072 }
1073 }
1074 get.setTimeRange(0, timestamp);
1075 Result result = table.get(get);
1076 return ThriftUtilities.rowResultFromHBase(result);
1077 } catch (IOException e) {
1078 LOG.warn(e.getMessage(), e);
1079 throw new IOError(Throwables.getStackTraceAsString(e));
1080 } finally{
1081 closeTable(table);
1082 }
1083 }
1084
1085 @Override
1086 public List<TRowResult> getRows(ByteBuffer tableName,
1087 List<ByteBuffer> rows,
1088 Map<ByteBuffer, ByteBuffer> attributes)
1089 throws IOError {
1090 return getRowsWithColumnsTs(tableName, rows, null,
1091 HConstants.LATEST_TIMESTAMP,
1092 attributes);
1093 }
1094
1095 @Override
1096 public List<TRowResult> getRowsWithColumns(ByteBuffer tableName,
1097 List<ByteBuffer> rows,
1098 List<ByteBuffer> columns,
1099 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1100 return getRowsWithColumnsTs(tableName, rows, columns,
1101 HConstants.LATEST_TIMESTAMP,
1102 attributes);
1103 }
1104
1105 @Override
1106 public List<TRowResult> getRowsTs(ByteBuffer tableName,
1107 List<ByteBuffer> rows,
1108 long timestamp,
1109 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1110 return getRowsWithColumnsTs(tableName, rows, null,
1111 timestamp, attributes);
1112 }
1113
1114 @Override
1115 public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName,
1116 List<ByteBuffer> rows,
1117 List<ByteBuffer> columns, long timestamp,
1118 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1119
1120 Table table= null;
1121 try {
1122 List<Get> gets = new ArrayList<Get>(rows.size());
1123 table = getTable(tableName);
1124 if (metrics != null) {
1125 metrics.incNumRowKeysInBatchGet(rows.size());
1126 }
1127 for (ByteBuffer row : rows) {
1128 Get get = new Get(getBytes(row));
1129 addAttributes(get, attributes);
1130 if (columns != null) {
1131
1132 for(ByteBuffer column : columns) {
1133 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1134 if (famAndQf.length == 1) {
1135 get.addFamily(famAndQf[0]);
1136 } else {
1137 get.addColumn(famAndQf[0], famAndQf[1]);
1138 }
1139 }
1140 }
1141 get.setTimeRange(0, timestamp);
1142 gets.add(get);
1143 }
1144 Result[] result = table.get(gets);
1145 return ThriftUtilities.rowResultFromHBase(result);
1146 } catch (IOException e) {
1147 LOG.warn(e.getMessage(), e);
1148 throw new IOError(Throwables.getStackTraceAsString(e));
1149 } finally{
1150 closeTable(table);
1151 }
1152 }
1153
1154 @Override
1155 public void deleteAll(
1156 ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1157 Map<ByteBuffer, ByteBuffer> attributes)
1158 throws IOError {
1159 deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP,
1160 attributes);
1161 }
1162
1163 @Override
1164 public void deleteAllTs(ByteBuffer tableName,
1165 ByteBuffer row,
1166 ByteBuffer column,
1167 long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1168 Table table = null;
1169 try {
1170 table = getTable(tableName);
1171 Delete delete = new Delete(getBytes(row));
1172 addAttributes(delete, attributes);
1173 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1174 if (famAndQf.length == 1) {
1175 delete.deleteFamily(famAndQf[0], timestamp);
1176 } else {
1177 delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1178 }
1179 table.delete(delete);
1180
1181 } catch (IOException e) {
1182 LOG.warn(e.getMessage(), e);
1183 throw new IOError(Throwables.getStackTraceAsString(e));
1184 } finally {
1185 closeTable(table);
1186 }
1187 }
1188
1189 @Override
1190 public void deleteAllRow(
1191 ByteBuffer tableName, ByteBuffer row,
1192 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1193 deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, attributes);
1194 }
1195
1196 @Override
1197 public void deleteAllRowTs(
1198 ByteBuffer tableName, ByteBuffer row, long timestamp,
1199 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1200 Table table = null;
1201 try {
1202 table = getTable(tableName);
1203 Delete delete = new Delete(getBytes(row), timestamp);
1204 addAttributes(delete, attributes);
1205 table.delete(delete);
1206 } catch (IOException e) {
1207 LOG.warn(e.getMessage(), e);
1208 throw new IOError(Throwables.getStackTraceAsString(e));
1209 } finally {
1210 closeTable(table);
1211 }
1212 }
1213
1214 @Override
1215 public void createTable(ByteBuffer in_tableName,
1216 List<ColumnDescriptor> columnFamilies) throws IOError,
1217 IllegalArgument, AlreadyExists {
1218 TableName tableName = getTableName(in_tableName);
1219 try {
1220 if (getAdmin().tableExists(tableName)) {
1221 throw new AlreadyExists("table name already in use");
1222 }
1223 HTableDescriptor desc = new HTableDescriptor(tableName);
1224 for (ColumnDescriptor col : columnFamilies) {
1225 HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col);
1226 desc.addFamily(colDesc);
1227 }
1228 getAdmin().createTable(desc);
1229 } catch (IOException e) {
1230 LOG.warn(e.getMessage(), e);
1231 throw new IOError(Throwables.getStackTraceAsString(e));
1232 } catch (IllegalArgumentException e) {
1233 LOG.warn(e.getMessage(), e);
1234 throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1235 }
1236 }
1237
1238 private static TableName getTableName(ByteBuffer buffer) {
1239 return TableName.valueOf(getBytes(buffer));
1240 }
1241
1242 @Override
1243 public void deleteTable(ByteBuffer in_tableName) throws IOError {
1244 TableName tableName = getTableName(in_tableName);
1245 if (LOG.isDebugEnabled()) {
1246 LOG.debug("deleteTable: table=" + tableName);
1247 }
1248 try {
1249 if (!getAdmin().tableExists(tableName)) {
1250 throw new IOException("table does not exist");
1251 }
1252 getAdmin().deleteTable(tableName);
1253 } catch (IOException e) {
1254 LOG.warn(e.getMessage(), e);
1255 throw new IOError(Throwables.getStackTraceAsString(e));
1256 }
1257 }
1258
1259 @Override
1260 public void mutateRow(ByteBuffer tableName, ByteBuffer row,
1261 List<Mutation> mutations, Map<ByteBuffer, ByteBuffer> attributes)
1262 throws IOError, IllegalArgument {
1263 mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP,
1264 attributes);
1265 }
1266
1267 @Override
1268 public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
1269 List<Mutation> mutations, long timestamp,
1270 Map<ByteBuffer, ByteBuffer> attributes)
1271 throws IOError, IllegalArgument {
1272 Table table = null;
1273 try {
1274 table = getTable(tableName);
1275 Put put = new Put(getBytes(row), timestamp);
1276 addAttributes(put, attributes);
1277
1278 Delete delete = new Delete(getBytes(row));
1279 addAttributes(delete, attributes);
1280 if (metrics != null) {
1281 metrics.incNumRowKeysInBatchMutate(mutations.size());
1282 }
1283
1284
1285 for (Mutation m : mutations) {
1286 byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1287 if (m.isDelete) {
1288 if (famAndQf.length == 1) {
1289 delete.deleteFamily(famAndQf[0], timestamp);
1290 } else {
1291 delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1292 }
1293 delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1294 : Durability.SKIP_WAL);
1295 } else {
1296 if(famAndQf.length == 1) {
1297 LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1298 + "over the whole column family.");
1299 } else {
1300 put.addImmutable(famAndQf[0], famAndQf[1],
1301 m.value != null ? getBytes(m.value)
1302 : HConstants.EMPTY_BYTE_ARRAY);
1303 }
1304 put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1305 }
1306 }
1307 if (!delete.isEmpty())
1308 table.delete(delete);
1309 if (!put.isEmpty())
1310 table.put(put);
1311 } catch (IOException e) {
1312 LOG.warn(e.getMessage(), e);
1313 throw new IOError(Throwables.getStackTraceAsString(e));
1314 } catch (IllegalArgumentException e) {
1315 LOG.warn(e.getMessage(), e);
1316 throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1317 } finally{
1318 closeTable(table);
1319 }
1320 }
1321
1322 @Override
1323 public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches,
1324 Map<ByteBuffer, ByteBuffer> attributes)
1325 throws IOError, IllegalArgument, TException {
1326 mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes);
1327 }
1328
1329 @Override
1330 public void mutateRowsTs(
1331 ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp,
1332 Map<ByteBuffer, ByteBuffer> attributes)
1333 throws IOError, IllegalArgument, TException {
1334 List<Put> puts = new ArrayList<Put>();
1335 List<Delete> deletes = new ArrayList<Delete>();
1336
1337 for (BatchMutation batch : rowBatches) {
1338 byte[] row = getBytes(batch.row);
1339 List<Mutation> mutations = batch.mutations;
1340 Delete delete = new Delete(row);
1341 addAttributes(delete, attributes);
1342 Put put = new Put(row, timestamp);
1343 addAttributes(put, attributes);
1344 for (Mutation m : mutations) {
1345 byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1346 if (m.isDelete) {
1347
1348 if (famAndQf.length == 1) {
1349 delete.deleteFamily(famAndQf[0], timestamp);
1350 } else {
1351 delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1352 }
1353 delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1354 : Durability.SKIP_WAL);
1355 } else {
1356 if (famAndQf.length == 1) {
1357 LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1358 + "over the whole column family.");
1359 }
1360 if (famAndQf.length == 2) {
1361 put.addImmutable(famAndQf[0], famAndQf[1],
1362 m.value != null ? getBytes(m.value)
1363 : HConstants.EMPTY_BYTE_ARRAY);
1364 } else {
1365 throw new IllegalArgumentException("Invalid famAndQf provided.");
1366 }
1367 put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1368 }
1369 }
1370 if (!delete.isEmpty())
1371 deletes.add(delete);
1372 if (!put.isEmpty())
1373 puts.add(put);
1374 }
1375
1376 Table table = null;
1377 try {
1378 table = getTable(tableName);
1379 if (!puts.isEmpty())
1380 table.put(puts);
1381 if (!deletes.isEmpty())
1382 table.delete(deletes);
1383
1384 } catch (IOException e) {
1385 LOG.warn(e.getMessage(), e);
1386 throw new IOError(Throwables.getStackTraceAsString(e));
1387 } catch (IllegalArgumentException e) {
1388 LOG.warn(e.getMessage(), e);
1389 throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1390 } finally{
1391 closeTable(table);
1392 }
1393 }
1394
1395 @Override
1396 public long atomicIncrement(
1397 ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount)
1398 throws IOError, IllegalArgument, TException {
1399 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1400 if(famAndQf.length == 1) {
1401 return atomicIncrement(tableName, row, famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, amount);
1402 }
1403 return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
1404 }
1405
1406 protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
1407 byte [] family, byte [] qualifier, long amount)
1408 throws IOError, IllegalArgument, TException {
1409 Table table = null;
1410 try {
1411 table = getTable(tableName);
1412 return table.incrementColumnValue(
1413 getBytes(row), family, qualifier, amount);
1414 } catch (IOException e) {
1415 LOG.warn(e.getMessage(), e);
1416 throw new IOError(Throwables.getStackTraceAsString(e));
1417 } finally {
1418 closeTable(table);
1419 }
1420 }
1421
1422 @Override
1423 public void scannerClose(int id) throws IOError, IllegalArgument {
1424 LOG.debug("scannerClose: id=" + id);
1425 ResultScannerWrapper resultScannerWrapper = getScanner(id);
1426 if (resultScannerWrapper == null) {
1427 String message = "scanner ID is invalid";
1428 LOG.warn(message);
1429 throw new IllegalArgument("scanner ID is invalid");
1430 }
1431 resultScannerWrapper.getScanner().close();
1432 removeScanner(id);
1433 }
1434
1435 @Override
1436 public List<TRowResult> scannerGetList(int id,int nbRows)
1437 throws IllegalArgument, IOError {
1438 LOG.debug("scannerGetList: id=" + id);
1439 ResultScannerWrapper resultScannerWrapper = getScanner(id);
1440 if (null == resultScannerWrapper) {
1441 String message = "scanner ID is invalid";
1442 LOG.warn(message);
1443 throw new IllegalArgument("scanner ID is invalid");
1444 }
1445
1446 Result [] results = null;
1447 try {
1448 results = resultScannerWrapper.getScanner().next(nbRows);
1449 if (null == results) {
1450 return new ArrayList<TRowResult>();
1451 }
1452 } catch (IOException e) {
1453 LOG.warn(e.getMessage(), e);
1454 throw new IOError(Throwables.getStackTraceAsString(e));
1455 }
1456 return ThriftUtilities.rowResultFromHBase(results, resultScannerWrapper.isColumnSorted());
1457 }
1458
1459 @Override
1460 public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
1461 return scannerGetList(id,1);
1462 }
1463
1464 @Override
1465 public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
1466 Map<ByteBuffer, ByteBuffer> attributes)
1467 throws IOError {
1468
1469 Table table = null;
1470 try {
1471 table = getTable(tableName);
1472 Scan scan = new Scan();
1473 addAttributes(scan, attributes);
1474 if (tScan.isSetStartRow()) {
1475 scan.setStartRow(tScan.getStartRow());
1476 }
1477 if (tScan.isSetStopRow()) {
1478 scan.setStopRow(tScan.getStopRow());
1479 }
1480 if (tScan.isSetTimestamp()) {
1481 scan.setTimeRange(0, tScan.getTimestamp());
1482 }
1483 if (tScan.isSetCaching()) {
1484 scan.setCaching(tScan.getCaching());
1485 }
1486 if (tScan.isSetBatchSize()) {
1487 scan.setBatch(tScan.getBatchSize());
1488 }
1489 if (tScan.isSetColumns() && tScan.getColumns().size() != 0) {
1490 for(ByteBuffer column : tScan.getColumns()) {
1491 byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1492 if(famQf.length == 1) {
1493 scan.addFamily(famQf[0]);
1494 } else {
1495 scan.addColumn(famQf[0], famQf[1]);
1496 }
1497 }
1498 }
1499 if (tScan.isSetFilterString()) {
1500 ParseFilter parseFilter = new ParseFilter();
1501 scan.setFilter(
1502 parseFilter.parseFilterString(tScan.getFilterString()));
1503 }
1504 if (tScan.isSetReversed()) {
1505 scan.setReversed(tScan.isReversed());
1506 }
1507 return addScanner(table.getScanner(scan), tScan.sortColumns);
1508 } catch (IOException e) {
1509 LOG.warn(e.getMessage(), e);
1510 throw new IOError(Throwables.getStackTraceAsString(e));
1511 } finally{
1512 closeTable(table);
1513 }
1514 }
1515
1516 @Override
1517 public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
1518 List<ByteBuffer> columns,
1519 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1520
1521 Table table = null;
1522 try {
1523 table = getTable(tableName);
1524 Scan scan = new Scan(getBytes(startRow));
1525 addAttributes(scan, attributes);
1526 if(columns != null && columns.size() != 0) {
1527 for(ByteBuffer column : columns) {
1528 byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1529 if(famQf.length == 1) {
1530 scan.addFamily(famQf[0]);
1531 } else {
1532 scan.addColumn(famQf[0], famQf[1]);
1533 }
1534 }
1535 }
1536 return addScanner(table.getScanner(scan), false);
1537 } catch (IOException e) {
1538 LOG.warn(e.getMessage(), e);
1539 throw new IOError(Throwables.getStackTraceAsString(e));
1540 } finally{
1541 closeTable(table);
1542 }
1543 }
1544
1545 @Override
1546 public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow,
1547 ByteBuffer stopRow, List<ByteBuffer> columns,
1548 Map<ByteBuffer, ByteBuffer> attributes)
1549 throws IOError, TException {
1550
1551 Table table = null;
1552 try {
1553 table = getTable(tableName);
1554 Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1555 addAttributes(scan, attributes);
1556 if(columns != null && columns.size() != 0) {
1557 for(ByteBuffer column : columns) {
1558 byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1559 if(famQf.length == 1) {
1560 scan.addFamily(famQf[0]);
1561 } else {
1562 scan.addColumn(famQf[0], famQf[1]);
1563 }
1564 }
1565 }
1566 return addScanner(table.getScanner(scan), false);
1567 } catch (IOException e) {
1568 LOG.warn(e.getMessage(), e);
1569 throw new IOError(Throwables.getStackTraceAsString(e));
1570 } finally{
1571 closeTable(table);
1572 }
1573 }
1574
1575 @Override
1576 public int scannerOpenWithPrefix(ByteBuffer tableName,
1577 ByteBuffer startAndPrefix,
1578 List<ByteBuffer> columns,
1579 Map<ByteBuffer, ByteBuffer> attributes)
1580 throws IOError, TException {
1581
1582 Table table = null;
1583 try {
1584 table = getTable(tableName);
1585 Scan scan = new Scan(getBytes(startAndPrefix));
1586 addAttributes(scan, attributes);
1587 Filter f = new WhileMatchFilter(
1588 new PrefixFilter(getBytes(startAndPrefix)));
1589 scan.setFilter(f);
1590 if (columns != null && columns.size() != 0) {
1591 for(ByteBuffer column : columns) {
1592 byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1593 if(famQf.length == 1) {
1594 scan.addFamily(famQf[0]);
1595 } else {
1596 scan.addColumn(famQf[0], famQf[1]);
1597 }
1598 }
1599 }
1600 return addScanner(table.getScanner(scan), false);
1601 } catch (IOException e) {
1602 LOG.warn(e.getMessage(), e);
1603 throw new IOError(Throwables.getStackTraceAsString(e));
1604 } finally{
1605 closeTable(table);
1606 }
1607 }
1608
1609 @Override
1610 public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
1611 List<ByteBuffer> columns, long timestamp,
1612 Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
1613
1614 Table table = null;
1615 try {
1616 table = getTable(tableName);
1617 Scan scan = new Scan(getBytes(startRow));
1618 addAttributes(scan, attributes);
1619 scan.setTimeRange(0, timestamp);
1620 if (columns != null && columns.size() != 0) {
1621 for (ByteBuffer column : columns) {
1622 byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1623 if(famQf.length == 1) {
1624 scan.addFamily(famQf[0]);
1625 } else {
1626 scan.addColumn(famQf[0], famQf[1]);
1627 }
1628 }
1629 }
1630 return addScanner(table.getScanner(scan), false);
1631 } catch (IOException e) {
1632 LOG.warn(e.getMessage(), e);
1633 throw new IOError(Throwables.getStackTraceAsString(e));
1634 } finally{
1635 closeTable(table);
1636 }
1637 }
1638
1639 @Override
1640 public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow,
1641 ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
1642 Map<ByteBuffer, ByteBuffer> attributes)
1643 throws IOError, TException {
1644
1645 Table table = null;
1646 try {
1647 table = getTable(tableName);
1648 Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1649 addAttributes(scan, attributes);
1650 scan.setTimeRange(0, timestamp);
1651 if (columns != null && columns.size() != 0) {
1652 for (ByteBuffer column : columns) {
1653 byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1654 if(famQf.length == 1) {
1655 scan.addFamily(famQf[0]);
1656 } else {
1657 scan.addColumn(famQf[0], famQf[1]);
1658 }
1659 }
1660 }
1661 scan.setTimeRange(0, timestamp);
1662 return addScanner(table.getScanner(scan), false);
1663 } catch (IOException e) {
1664 LOG.warn(e.getMessage(), e);
1665 throw new IOError(Throwables.getStackTraceAsString(e));
1666 } finally{
1667 closeTable(table);
1668 }
1669 }
1670
1671 @Override
1672 public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
1673 ByteBuffer tableName) throws IOError, TException {
1674
1675 Table table = null;
1676 try {
1677 TreeMap<ByteBuffer, ColumnDescriptor> columns =
1678 new TreeMap<ByteBuffer, ColumnDescriptor>();
1679
1680 table = getTable(tableName);
1681 HTableDescriptor desc = table.getTableDescriptor();
1682
1683 for (HColumnDescriptor e : desc.getFamilies()) {
1684 ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
1685 columns.put(col.name, col);
1686 }
1687 return columns;
1688 } catch (IOException e) {
1689 LOG.warn(e.getMessage(), e);
1690 throw new IOError(Throwables.getStackTraceAsString(e));
1691 } finally {
1692 closeTable(table);
1693 }
1694 }
1695
1696 @Deprecated
1697 @Override
1698 public List<TCell> getRowOrBefore(ByteBuffer tableName, ByteBuffer row,
1699 ByteBuffer family) throws IOError {
1700 try {
1701 Result result = getRowOrBefore(getBytes(tableName), getBytes(row), getBytes(family));
1702 return ThriftUtilities.cellFromHBase(result.rawCells());
1703 } catch (IOException e) {
1704 LOG.warn(e.getMessage(), e);
1705 throw new IOError(Throwables.getStackTraceAsString(e));
1706 }
1707 }
1708
1709 @Override
1710 public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
1711 try {
1712 byte[] row = getBytes(searchRow);
1713 Result startRowResult =
1714 getRowOrBefore(TableName.META_TABLE_NAME.getName(), row, HConstants.CATALOG_FAMILY);
1715
1716 if (startRowResult == null) {
1717 throw new IOException("Cannot find row in "+ TableName.META_TABLE_NAME+", row="
1718 + Bytes.toStringBinary(row));
1719 }
1720
1721
1722 HRegionInfo regionInfo = HRegionInfo.getHRegionInfo(startRowResult);
1723 if (regionInfo == null) {
1724 throw new IOException("HRegionInfo REGIONINFO was null or " +
1725 " empty in Meta for row="
1726 + Bytes.toStringBinary(row));
1727 }
1728 TRegionInfo region = new TRegionInfo();
1729 region.setStartKey(regionInfo.getStartKey());
1730 region.setEndKey(regionInfo.getEndKey());
1731 region.id = regionInfo.getRegionId();
1732 region.setName(regionInfo.getRegionName());
1733 region.version = regionInfo.getVersion();
1734
1735
1736 ServerName serverName = HRegionInfo.getServerName(startRowResult);
1737 if (serverName != null) {
1738 region.setServerName(Bytes.toBytes(serverName.getHostname()));
1739 region.port = serverName.getPort();
1740 }
1741 return region;
1742 } catch (IOException e) {
1743 LOG.warn(e.getMessage(), e);
1744 throw new IOError(Throwables.getStackTraceAsString(e));
1745 }
1746 }
1747
1748 private void closeTable(Table table) throws IOError
1749 {
1750 try{
1751 if(table != null){
1752 table.close();
1753 }
1754 } catch (IOException e){
1755 LOG.error(e.getMessage(), e);
1756 throw new IOError(Throwables.getStackTraceAsString(e));
1757 }
1758 }
1759
1760 private Result getRowOrBefore(byte[] tableName, byte[] row, byte[] family) throws IOException {
1761 Scan scan = new Scan(row);
1762 scan.setReversed(true);
1763 scan.addFamily(family);
1764 scan.setStartRow(row);
1765 Table table = getTable(tableName);
1766 try (ResultScanner scanner = table.getScanner(scan)) {
1767 return scanner.next();
1768 } finally{
1769 if(table != null){
1770 table.close();
1771 }
1772 }
1773 }
1774
1775 private void initMetrics(ThriftMetrics metrics) {
1776 this.metrics = metrics;
1777 }
1778
1779 @Override
1780 public void increment(TIncrement tincrement) throws IOError, TException {
1781
1782 if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) {
1783 throw new TException("Must supply a table and a row key; can't increment");
1784 }
1785
1786 if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1787 this.coalescer.queueIncrement(tincrement);
1788 return;
1789 }
1790
1791 Table table = null;
1792 try {
1793 table = getTable(tincrement.getTable());
1794 Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
1795 table.increment(inc);
1796 } catch (IOException e) {
1797 LOG.warn(e.getMessage(), e);
1798 throw new IOError(Throwables.getStackTraceAsString(e));
1799 } finally{
1800 closeTable(table);
1801 }
1802 }
1803
1804 @Override
1805 public void incrementRows(List<TIncrement> tincrements) throws IOError, TException {
1806 if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1807 this.coalescer.queueIncrements(tincrements);
1808 return;
1809 }
1810 for (TIncrement tinc : tincrements) {
1811 increment(tinc);
1812 }
1813 }
1814
1815 @Override
1816 public List<TCell> append(TAppend tappend) throws IOError, TException {
1817 if (tappend.getRow().length == 0 || tappend.getTable().length == 0) {
1818 throw new TException("Must supply a table and a row key; can't append");
1819 }
1820
1821 Table table = null;
1822 try {
1823 table = getTable(tappend.getTable());
1824 Append append = ThriftUtilities.appendFromThrift(tappend);
1825 Result result = table.append(append);
1826 return ThriftUtilities.cellFromHBase(result.rawCells());
1827 } catch (IOException e) {
1828 LOG.warn(e.getMessage(), e);
1829 throw new IOError(Throwables.getStackTraceAsString(e));
1830 } finally{
1831 closeTable(table);
1832 }
1833 }
1834
1835 @Override
1836 public boolean checkAndPut(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1837 ByteBuffer value, Mutation mput, Map<ByteBuffer, ByteBuffer> attributes) throws IOError,
1838 IllegalArgument, TException {
1839 Put put;
1840 try {
1841 put = new Put(getBytes(row), HConstants.LATEST_TIMESTAMP);
1842 addAttributes(put, attributes);
1843
1844 byte[][] famAndQf = KeyValue.parseColumn(getBytes(mput.column));
1845
1846 put.addImmutable(famAndQf[0], famAndQf[1], mput.value != null ? getBytes(mput.value)
1847 : HConstants.EMPTY_BYTE_ARRAY);
1848
1849 put.setDurability(mput.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1850 } catch (IllegalArgumentException e) {
1851 LOG.warn(e.getMessage(), e);
1852 throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1853 }
1854
1855 Table table = null;
1856 try {
1857 table = getTable(tableName);
1858 byte[][] famAndQf = KeyValue.parseColumn(getBytes(column));
1859 return table.checkAndPut(getBytes(row), famAndQf[0], famAndQf[1],
1860 value != null ? getBytes(value) : HConstants.EMPTY_BYTE_ARRAY, put);
1861 } catch (IOException e) {
1862 LOG.warn(e.getMessage(), e);
1863 throw new IOError(Throwables.getStackTraceAsString(e));
1864 } catch (IllegalArgumentException e) {
1865 LOG.warn(e.getMessage(), e);
1866 throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1867 } finally {
1868 closeTable(table);
1869 }
1870 }
1871 }
1872
1873
1874
1875
1876
1877
1878 private static void addAttributes(OperationWithAttributes op,
1879 Map<ByteBuffer, ByteBuffer> attributes) {
1880 if (attributes == null || attributes.size() == 0) {
1881 return;
1882 }
1883 for (Map.Entry<ByteBuffer, ByteBuffer> entry : attributes.entrySet()) {
1884 String name = Bytes.toStringBinary(getBytes(entry.getKey()));
1885 byte[] value = getBytes(entry.getValue());
1886 op.setAttribute(name, value);
1887 }
1888 }
1889
1890 public static void registerFilters(Configuration conf) {
1891 String[] filters = conf.getStrings("hbase.thrift.filters");
1892 if(filters != null) {
1893 for(String filterClass: filters) {
1894 String[] filterPart = filterClass.split(":");
1895 if(filterPart.length != 2) {
1896 LOG.warn("Invalid filter specification " + filterClass + " - skipping");
1897 } else {
1898 ParseFilter.registerFilter(filterPart[0], filterPart[1]);
1899 }
1900 }
1901 }
1902 }
1903 }