1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.thrift;
20
21 import static org.apache.hadoop.hbase.util.Bytes.getBytes;
22
23 import java.io.IOException;
24 import java.net.InetAddress;
25 import java.net.InetSocketAddress;
26 import java.net.UnknownHostException;
27 import java.nio.ByteBuffer;
28 import java.security.PrivilegedAction;
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.Collections;
32 import java.util.HashMap;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.TreeMap;
36 import java.util.concurrent.BlockingQueue;
37 import java.util.concurrent.ExecutorService;
38 import java.util.concurrent.LinkedBlockingQueue;
39 import java.util.concurrent.ThreadPoolExecutor;
40 import java.util.concurrent.TimeUnit;
41
42 import javax.security.auth.callback.Callback;
43 import javax.security.auth.callback.UnsupportedCallbackException;
44 import javax.security.sasl.AuthorizeCallback;
45 import javax.security.sasl.Sasl;
46 import javax.security.sasl.SaslServer;
47
48 import org.apache.commons.cli.CommandLine;
49 import org.apache.commons.cli.Option;
50 import org.apache.commons.cli.OptionGroup;
51 import org.apache.commons.logging.Log;
52 import org.apache.commons.logging.LogFactory;
53 import org.apache.hadoop.conf.Configuration;
54 import org.apache.hadoop.hbase.HBaseConfiguration;
55 import org.apache.hadoop.hbase.HColumnDescriptor;
56 import org.apache.hadoop.hbase.HConstants;
57 import org.apache.hadoop.hbase.HRegionInfo;
58 import org.apache.hadoop.hbase.HRegionLocation;
59 import org.apache.hadoop.hbase.HTableDescriptor;
60 import org.apache.hadoop.hbase.KeyValue;
61 import org.apache.hadoop.hbase.ServerName;
62 import org.apache.hadoop.hbase.TableName;
63 import org.apache.hadoop.hbase.TableNotFoundException;
64 import org.apache.hadoop.hbase.classification.InterfaceAudience;
65 import org.apache.hadoop.hbase.client.Admin;
66 import org.apache.hadoop.hbase.client.Append;
67 import org.apache.hadoop.hbase.client.Delete;
68 import org.apache.hadoop.hbase.client.Durability;
69 import org.apache.hadoop.hbase.client.Get;
70 import org.apache.hadoop.hbase.client.HBaseAdmin;
71 import org.apache.hadoop.hbase.client.Increment;
72 import org.apache.hadoop.hbase.client.OperationWithAttributes;
73 import org.apache.hadoop.hbase.client.Put;
74 import org.apache.hadoop.hbase.client.RegionLocator;
75 import org.apache.hadoop.hbase.client.Result;
76 import org.apache.hadoop.hbase.client.ResultScanner;
77 import org.apache.hadoop.hbase.client.Scan;
78 import org.apache.hadoop.hbase.client.Table;
79 import org.apache.hadoop.hbase.filter.Filter;
80 import org.apache.hadoop.hbase.filter.ParseFilter;
81 import org.apache.hadoop.hbase.filter.PrefixFilter;
82 import org.apache.hadoop.hbase.filter.WhileMatchFilter;
83 import org.apache.hadoop.hbase.jetty.SslSelectChannelConnectorSecure;
84 import org.apache.hadoop.hbase.security.SecurityUtil;
85 import org.apache.hadoop.hbase.security.UserProvider;
86 import org.apache.hadoop.hbase.thrift.CallQueue.Call;
87 import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
88 import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
89 import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
90 import org.apache.hadoop.hbase.thrift.generated.Hbase;
91 import org.apache.hadoop.hbase.thrift.generated.IOError;
92 import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
93 import org.apache.hadoop.hbase.thrift.generated.Mutation;
94 import org.apache.hadoop.hbase.thrift.generated.TAppend;
95 import org.apache.hadoop.hbase.thrift.generated.TCell;
96 import org.apache.hadoop.hbase.thrift.generated.TIncrement;
97 import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
98 import org.apache.hadoop.hbase.thrift.generated.TRowResult;
99 import org.apache.hadoop.hbase.thrift.generated.TScan;
100 import org.apache.hadoop.hbase.util.Bytes;
101 import org.apache.hadoop.hbase.util.ConnectionCache;
102 import org.apache.hadoop.hbase.util.DNS;
103 import org.apache.hadoop.hbase.util.Strings;
104 import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
105 import org.apache.hadoop.security.UserGroupInformation;
106 import org.apache.hadoop.security.authorize.ProxyUsers;
107 import org.apache.thrift.TException;
108 import org.apache.thrift.TProcessor;
109 import org.apache.thrift.protocol.TBinaryProtocol;
110 import org.apache.thrift.protocol.TCompactProtocol;
111 import org.apache.thrift.protocol.TProtocol;
112 import org.apache.thrift.protocol.TProtocolFactory;
113 import org.apache.thrift.server.THsHaServer;
114 import org.apache.thrift.server.TNonblockingServer;
115 import org.apache.thrift.server.TServer;
116 import org.apache.thrift.server.TServlet;
117 import org.apache.thrift.server.TThreadedSelectorServer;
118 import org.apache.thrift.transport.TFramedTransport;
119 import org.apache.thrift.transport.TNonblockingServerSocket;
120 import org.apache.thrift.transport.TNonblockingServerTransport;
121 import org.apache.thrift.transport.TSaslServerTransport;
122 import org.apache.thrift.transport.TServerSocket;
123 import org.apache.thrift.transport.TServerTransport;
124 import org.apache.thrift.transport.TTransportFactory;
125 import org.mortbay.jetty.Connector;
126 import org.mortbay.jetty.Server;
127 import org.mortbay.jetty.nio.SelectChannelConnector;
128 import org.mortbay.jetty.servlet.Context;
129 import org.mortbay.jetty.servlet.ServletHolder;
130 import org.mortbay.thread.QueuedThreadPool;
131
132 import com.google.common.base.Joiner;
133 import com.google.common.base.Throwables;
134 import com.google.common.util.concurrent.ThreadFactoryBuilder;
135
136
137
138
139
140 @InterfaceAudience.Private
141 @SuppressWarnings("deprecation")
142 public class ThriftServerRunner implements Runnable {
143
144 private static final Log LOG = LogFactory.getLog(ThriftServerRunner.class);
145
146 static final String SERVER_TYPE_CONF_KEY =
147 "hbase.regionserver.thrift.server.type";
148
149 static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress";
150 static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
151 static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
152 static final String MAX_FRAME_SIZE_CONF_KEY = "hbase.regionserver.thrift.framed.max_frame_size_in_mb";
153 static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
154 static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
155 static final String USE_HTTP_CONF_KEY = "hbase.regionserver.thrift.http";
156 static final String HTTP_MIN_THREADS = "hbase.thrift.http_threads.min";
157 static final String HTTP_MAX_THREADS = "hbase.thrift.http_threads.max";
158
159 static final String THRIFT_SSL_ENABLED = "hbase.thrift.ssl.enabled";
160 static final String THRIFT_SSL_KEYSTORE_STORE = "hbase.thrift.ssl.keystore.store";
161 static final String THRIFT_SSL_KEYSTORE_PASSWORD = "hbase.thrift.ssl.keystore.password";
162 static final String THRIFT_SSL_KEYSTORE_KEYPASSWORD = "hbase.thrift.ssl.keystore.keypassword";
163
164
165
166
167
168
169
170
171
172
173
174 static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
175
176 private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
177 public static final int DEFAULT_LISTEN_PORT = 9090;
178 public static final int HREGION_VERSION = 1;
179 static final String THRIFT_SUPPORT_PROXYUSER = "hbase.thrift.support.proxyuser";
180 private final int listenPort;
181
182 private Configuration conf;
183 volatile TServer tserver;
184 volatile Server httpServer;
185 private final Hbase.Iface handler;
186 private final ThriftMetrics metrics;
187 private final HBaseHandler hbaseHandler;
188 private final UserGroupInformation realUser;
189
190 private final String qop;
191 private String host;
192
193 private final boolean securityEnabled;
194 private final boolean doAsEnabled;
195
196
197 enum ImplType {
198 HS_HA("hsha", true, THsHaServer.class, true),
199 NONBLOCKING("nonblocking", true, TNonblockingServer.class, true),
200 THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true),
201 THREADED_SELECTOR(
202 "threadedselector", true, TThreadedSelectorServer.class, true);
203
204 public static final ImplType DEFAULT = THREAD_POOL;
205
206 final String option;
207 final boolean isAlwaysFramed;
208 final Class<? extends TServer> serverClass;
209 final boolean canSpecifyBindIP;
210
211 ImplType(String option, boolean isAlwaysFramed,
212 Class<? extends TServer> serverClass, boolean canSpecifyBindIP) {
213 this.option = option;
214 this.isAlwaysFramed = isAlwaysFramed;
215 this.serverClass = serverClass;
216 this.canSpecifyBindIP = canSpecifyBindIP;
217 }
218
219
220
221
222
223 @Override
224 public String toString() {
225 return "-" + option;
226 }
227
228 String getDescription() {
229 StringBuilder sb = new StringBuilder("Use the " +
230 serverClass.getSimpleName());
231 if (isAlwaysFramed) {
232 sb.append(" This implies the framed transport.");
233 }
234 if (this == DEFAULT) {
235 sb.append("This is the default.");
236 }
237 return sb.toString();
238 }
239
240 static OptionGroup createOptionGroup() {
241 OptionGroup group = new OptionGroup();
242 for (ImplType t : values()) {
243 group.addOption(new Option(t.option, t.getDescription()));
244 }
245 return group;
246 }
247
248 static ImplType getServerImpl(Configuration conf) {
249 String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option);
250 for (ImplType t : values()) {
251 if (confType.equals(t.option)) {
252 return t;
253 }
254 }
255 throw new AssertionError("Unknown server ImplType.option:" + confType);
256 }
257
258 static void setServerImpl(CommandLine cmd, Configuration conf) {
259 ImplType chosenType = null;
260 int numChosen = 0;
261 for (ImplType t : values()) {
262 if (cmd.hasOption(t.option)) {
263 chosenType = t;
264 ++numChosen;
265 }
266 }
267 if (numChosen < 1) {
268 LOG.info("Using default thrift server type");
269 chosenType = DEFAULT;
270 } else if (numChosen > 1) {
271 throw new AssertionError("Exactly one option out of " +
272 Arrays.toString(values()) + " has to be specified");
273 }
274 LOG.info("Using thrift server type " + chosenType.option);
275 conf.set(SERVER_TYPE_CONF_KEY, chosenType.option);
276 }
277
278 public String simpleClassName() {
279 return serverClass.getSimpleName();
280 }
281
282 public static List<String> serversThatCannotSpecifyBindIP() {
283 List<String> l = new ArrayList<String>();
284 for (ImplType t : values()) {
285 if (!t.canSpecifyBindIP) {
286 l.add(t.simpleClassName());
287 }
288 }
289 return l;
290 }
291
292 }
293
294 public ThriftServerRunner(Configuration conf) throws IOException {
295 UserProvider userProvider = UserProvider.instantiate(conf);
296
297 securityEnabled = userProvider.isHadoopSecurityEnabled()
298 && userProvider.isHBaseSecurityEnabled();
299 if (securityEnabled) {
300 host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
301 conf.get("hbase.thrift.dns.interface", "default"),
302 conf.get("hbase.thrift.dns.nameserver", "default")));
303 userProvider.login("hbase.thrift.keytab.file",
304 "hbase.thrift.kerberos.principal", host);
305 }
306 this.conf = HBaseConfiguration.create(conf);
307 this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
308 this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
309 this.hbaseHandler = new HBaseHandler(conf, userProvider);
310 this.hbaseHandler.initMetrics(metrics);
311 this.handler = HbaseHandlerMetricsProxy.newInstance(
312 hbaseHandler, metrics, conf);
313 this.realUser = userProvider.getCurrent().getUGI();
314 qop = conf.get(THRIFT_QOP_KEY);
315 doAsEnabled = conf.getBoolean(THRIFT_SUPPORT_PROXYUSER, false);
316 if (qop != null) {
317 if (!qop.equals("auth") && !qop.equals("auth-int")
318 && !qop.equals("auth-conf")) {
319 throw new IOException("Invalid " + THRIFT_QOP_KEY + ": " + qop
320 + ", it must be 'auth', 'auth-int', or 'auth-conf'");
321 }
322 if (!securityEnabled) {
323 throw new IOException("Thrift server must"
324 + " run in secure mode to support authentication");
325 }
326 }
327 }
328
329
330
331
332 @Override
333 public void run() {
334 realUser.doAs(new PrivilegedAction<Object>() {
335 @Override
336 public Object run() {
337 try {
338 if (conf.getBoolean(USE_HTTP_CONF_KEY, false)) {
339 setupHTTPServer();
340 httpServer.start();
341 httpServer.join();
342 } else {
343 setupServer();
344 tserver.serve();
345 }
346 } catch (Exception e) {
347 LOG.fatal("Cannot run ThriftServer", e);
348
349 System.exit(-1);
350 }
351 return null;
352 }
353 });
354
355 }
356
357 public void shutdown() {
358 if (tserver != null) {
359 tserver.stop();
360 tserver = null;
361 }
362 if (httpServer != null) {
363 try {
364 httpServer.stop();
365 httpServer = null;
366 } catch (Exception e) {
367 LOG.error("Problem encountered in shutting down HTTP server " + e.getCause());
368 }
369 httpServer = null;
370 }
371 }
372
373 private void setupHTTPServer() throws IOException {
374 TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
375 TProcessor processor = new Hbase.Processor<Hbase.Iface>(handler);
376 TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, realUser,
377 conf, hbaseHandler, securityEnabled, doAsEnabled);
378
379 httpServer = new Server();
380
381 Context context = new Context(httpServer, "/", Context.SESSIONS);
382 context.setContextPath("/");
383 String httpPath = "/*";
384 httpServer.setHandler(context);
385 context.addServlet(new ServletHolder(thriftHttpServlet), httpPath);
386
387
388 Connector connector = new SelectChannelConnector();
389 if(conf.getBoolean(THRIFT_SSL_ENABLED, false)) {
390 SslSelectChannelConnectorSecure sslConnector = new SslSelectChannelConnectorSecure();
391 String keystore = conf.get(THRIFT_SSL_KEYSTORE_STORE);
392 String password = HBaseConfiguration.getPassword(conf,
393 THRIFT_SSL_KEYSTORE_PASSWORD, null);
394 String keyPassword = HBaseConfiguration.getPassword(conf,
395 THRIFT_SSL_KEYSTORE_KEYPASSWORD, password);
396 sslConnector.setKeystore(keystore);
397 sslConnector.setPassword(password);
398 sslConnector.setKeyPassword(keyPassword);
399 connector = sslConnector;
400 }
401 String host = getBindAddress(conf).getHostAddress();
402 connector.setPort(listenPort);
403 connector.setHost(host);
404 connector.setHeaderBufferSize(1024 * 64);
405 httpServer.addConnector(connector);
406
407 if (doAsEnabled) {
408 ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
409 }
410
411
412
413
414
415
416 int minThreads = conf.getInt(HTTP_MIN_THREADS, 2);
417 int maxThreads = conf.getInt(HTTP_MAX_THREADS, 100);
418 QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads);
419 threadPool.setMinThreads(minThreads);
420 httpServer.setThreadPool(threadPool);
421
422 httpServer.setSendServerVersion(false);
423 httpServer.setSendDateHeader(false);
424 httpServer.setStopAtShutdown(true);
425
426 LOG.info("Starting Thrift HTTP Server on " + Integer.toString(listenPort));
427 }
428
429
430
431
432 private void setupServer() throws Exception {
433
434 TProtocolFactory protocolFactory;
435 if (conf.getBoolean(COMPACT_CONF_KEY, false)) {
436 LOG.debug("Using compact protocol");
437 protocolFactory = new TCompactProtocol.Factory();
438 } else {
439 LOG.debug("Using binary protocol");
440 protocolFactory = new TBinaryProtocol.Factory();
441 }
442
443 final TProcessor p = new Hbase.Processor<Hbase.Iface>(handler);
444 ImplType implType = ImplType.getServerImpl(conf);
445 TProcessor processor = p;
446
447
448 TTransportFactory transportFactory;
449 if (conf.getBoolean(FRAMED_CONF_KEY, false) || implType.isAlwaysFramed) {
450 if (qop != null) {
451 throw new RuntimeException("Thrift server authentication"
452 + " doesn't work with framed transport yet");
453 }
454 transportFactory = new TFramedTransport.Factory(
455 conf.getInt(MAX_FRAME_SIZE_CONF_KEY, 2) * 1024 * 1024);
456 LOG.debug("Using framed transport");
457 } else if (qop == null) {
458 transportFactory = new TTransportFactory();
459 } else {
460
461 String name = SecurityUtil.getUserFromPrincipal(
462 conf.get("hbase.thrift.kerberos.principal"));
463 Map<String, String> saslProperties = new HashMap<String, String>();
464 saslProperties.put(Sasl.QOP, qop);
465 TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
466 saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
467 new SaslGssCallbackHandler() {
468 @Override
469 public void handle(Callback[] callbacks)
470 throws UnsupportedCallbackException {
471 AuthorizeCallback ac = null;
472 for (Callback callback : callbacks) {
473 if (callback instanceof AuthorizeCallback) {
474 ac = (AuthorizeCallback) callback;
475 } else {
476 throw new UnsupportedCallbackException(callback,
477 "Unrecognized SASL GSSAPI Callback");
478 }
479 }
480 if (ac != null) {
481 String authid = ac.getAuthenticationID();
482 String authzid = ac.getAuthorizationID();
483 if (!authid.equals(authzid)) {
484 ac.setAuthorized(false);
485 } else {
486 ac.setAuthorized(true);
487 String userName = SecurityUtil.getUserFromPrincipal(authzid);
488 LOG.info("Effective user: " + userName);
489 ac.setAuthorizedID(userName);
490 }
491 }
492 }
493 });
494 transportFactory = saslFactory;
495
496
497 processor = new TProcessor() {
498 @Override
499 public boolean process(TProtocol inProt,
500 TProtocol outProt) throws TException {
501 TSaslServerTransport saslServerTransport =
502 (TSaslServerTransport)inProt.getTransport();
503 SaslServer saslServer = saslServerTransport.getSaslServer();
504 String principal = saslServer.getAuthorizationID();
505 hbaseHandler.setEffectiveUser(principal);
506 return p.process(inProt, outProt);
507 }
508 };
509 }
510
511 if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
512 LOG.error("Server types " + Joiner.on(", ").join(
513 ImplType.serversThatCannotSpecifyBindIP()) + " don't support IP " +
514 "address binding at the moment. See " +
515 "https://issues.apache.org/jira/browse/HBASE-2155 for details.");
516 throw new RuntimeException(
517 "-" + BIND_CONF_KEY + " not supported with " + implType);
518 }
519
520 if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING ||
521 implType == ImplType.THREADED_SELECTOR) {
522
523 InetAddress listenAddress = getBindAddress(conf);
524 TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(
525 new InetSocketAddress(listenAddress, listenPort));
526
527 if (implType == ImplType.NONBLOCKING) {
528 TNonblockingServer.Args serverArgs =
529 new TNonblockingServer.Args(serverTransport);
530 serverArgs.processor(processor)
531 .transportFactory(transportFactory)
532 .protocolFactory(protocolFactory);
533 tserver = new TNonblockingServer(serverArgs);
534 } else if (implType == ImplType.HS_HA) {
535 THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
536 CallQueue callQueue =
537 new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
538 ExecutorService executorService = createExecutor(
539 callQueue, serverArgs.getWorkerThreads());
540 serverArgs.executorService(executorService)
541 .processor(processor)
542 .transportFactory(transportFactory)
543 .protocolFactory(protocolFactory);
544 tserver = new THsHaServer(serverArgs);
545 } else {
546 TThreadedSelectorServer.Args serverArgs =
547 new HThreadedSelectorServerArgs(serverTransport, conf);
548 CallQueue callQueue =
549 new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
550 ExecutorService executorService = createExecutor(
551 callQueue, serverArgs.getWorkerThreads());
552 serverArgs.executorService(executorService)
553 .processor(processor)
554 .transportFactory(transportFactory)
555 .protocolFactory(protocolFactory);
556 tserver = new TThreadedSelectorServer(serverArgs);
557 }
558 LOG.info("starting HBase " + implType.simpleClassName() +
559 " server on " + Integer.toString(listenPort));
560 } else if (implType == ImplType.THREAD_POOL) {
561
562 InetAddress listenAddress = getBindAddress(conf);
563
564 TServerTransport serverTransport = new TServerSocket(
565 new InetSocketAddress(listenAddress, listenPort));
566
567 TBoundedThreadPoolServer.Args serverArgs =
568 new TBoundedThreadPoolServer.Args(serverTransport, conf);
569 serverArgs.processor(processor)
570 .transportFactory(transportFactory)
571 .protocolFactory(protocolFactory);
572 LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
573 + listenAddress + ":" + Integer.toString(listenPort)
574 + "; " + serverArgs);
575 TBoundedThreadPoolServer tserver =
576 new TBoundedThreadPoolServer(serverArgs, metrics);
577 this.tserver = tserver;
578 } else {
579 throw new AssertionError("Unsupported Thrift server implementation: " +
580 implType.simpleClassName());
581 }
582
583
584 if (tserver.getClass() != implType.serverClass) {
585 throw new AssertionError("Expected to create Thrift server class " +
586 implType.serverClass.getName() + " but got " +
587 tserver.getClass().getName());
588 }
589
590
591
592 registerFilters(conf);
593 }
594
595 ExecutorService createExecutor(BlockingQueue<Runnable> callQueue,
596 int workerThreads) {
597 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
598 tfb.setDaemon(true);
599 tfb.setNameFormat("thrift-worker-%d");
600 return new ThreadPoolExecutor(workerThreads, workerThreads,
601 Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
602 }
603
604 private InetAddress getBindAddress(Configuration conf)
605 throws UnknownHostException {
606 String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
607 return InetAddress.getByName(bindAddressStr);
608 }
609
610 protected static class ResultScannerWrapper {
611
612 private final ResultScanner scanner;
613 private final boolean sortColumns;
614 public ResultScannerWrapper(ResultScanner resultScanner,
615 boolean sortResultColumns) {
616 scanner = resultScanner;
617 sortColumns = sortResultColumns;
618 }
619
620 public ResultScanner getScanner() {
621 return scanner;
622 }
623
624 public boolean isColumnSorted() {
625 return sortColumns;
626 }
627 }
628
629
630
631
632
633 public static class HBaseHandler implements Hbase.Iface {
634 protected Configuration conf;
635 protected final Log LOG = LogFactory.getLog(this.getClass().getName());
636
637
638 protected int nextScannerId = 0;
639 protected HashMap<Integer, ResultScannerWrapper> scannerMap = null;
640 private ThriftMetrics metrics = null;
641
642 private final ConnectionCache connectionCache;
643 IncrementCoalescer coalescer = null;
644
645 static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
646 static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
647
648
649
650
651
652
653
654 byte[][] getAllColumns(Table table) throws IOException {
655 HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies();
656 byte[][] columns = new byte[cds.length][];
657 for (int i = 0; i < cds.length; i++) {
658 columns[i] = Bytes.add(cds[i].getName(),
659 KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
660 }
661 return columns;
662 }
663
664
665
666
667
668
669
670
671
672
673 public Table getTable(final byte[] tableName) throws
674 IOException {
675 String table = Bytes.toString(tableName);
676 return connectionCache.getTable(table);
677 }
678
679 public Table getTable(final ByteBuffer tableName) throws IOException {
680 return getTable(getBytes(tableName));
681 }
682
683
684
685
686
687
688
689
690 protected synchronized int addScanner(ResultScanner scanner,boolean sortColumns) {
691 int id = nextScannerId++;
692 ResultScannerWrapper resultScannerWrapper = new ResultScannerWrapper(scanner, sortColumns);
693 scannerMap.put(id, resultScannerWrapper);
694 return id;
695 }
696
697
698
699
700
701
702
703 protected synchronized ResultScannerWrapper getScanner(int id) {
704 return scannerMap.get(id);
705 }
706
707
708
709
710
711
712
713
714 protected synchronized ResultScannerWrapper removeScanner(int id) {
715 return scannerMap.remove(id);
716 }
717
718 protected HBaseHandler(final Configuration c,
719 final UserProvider userProvider) throws IOException {
720 this.conf = c;
721 scannerMap = new HashMap<Integer, ResultScannerWrapper>();
722 this.coalescer = new IncrementCoalescer(this);
723
724 int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
725 int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
726 connectionCache = new ConnectionCache(
727 conf, userProvider, cleanInterval, maxIdleTime);
728 }
729
730
731
732
733 private Admin getAdmin() throws IOException {
734 return connectionCache.getAdmin();
735 }
736
737 void setEffectiveUser(String effectiveUser) {
738 connectionCache.setEffectiveUser(effectiveUser);
739 }
740
741 @Override
742 public void enableTable(ByteBuffer tableName) throws IOError {
743 try{
744 getAdmin().enableTable(getTableName(tableName));
745 } catch (IOException e) {
746 LOG.warn(e.getMessage(), e);
747 throw new IOError(Throwables.getStackTraceAsString(e));
748 }
749 }
750
751 @Override
752 public void disableTable(ByteBuffer tableName) throws IOError{
753 try{
754 getAdmin().disableTable(getTableName(tableName));
755 } catch (IOException e) {
756 LOG.warn(e.getMessage(), e);
757 throw new IOError(Throwables.getStackTraceAsString(e));
758 }
759 }
760
761 @Override
762 public boolean isTableEnabled(ByteBuffer tableName) throws IOError {
763 try {
764 return this.connectionCache.getAdmin().isTableEnabled(getTableName(tableName));
765 } catch (IOException e) {
766 LOG.warn(e.getMessage(), e);
767 throw new IOError(Throwables.getStackTraceAsString(e));
768 }
769 }
770
771 @Override
772 public void compact(ByteBuffer tableNameOrRegionName) throws IOError {
773 try {
774
775
776
777 ((HBaseAdmin) getAdmin()).compact(getBytes(tableNameOrRegionName));
778 } catch (IOException e) {
779 LOG.warn(e.getMessage(), e);
780 throw new IOError(Throwables.getStackTraceAsString(e));
781 }
782 }
783
784 @Override
785 public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError {
786 try {
787
788
789
790 ((HBaseAdmin) getAdmin()).majorCompact(getBytes(tableNameOrRegionName));
791 } catch (IOException e) {
792 LOG.warn(e.getMessage(), e);
793 throw new IOError(Throwables.getStackTraceAsString(e));
794 }
795 }
796
797 @Override
798 public List<ByteBuffer> getTableNames() throws IOError {
799 try {
800 TableName[] tableNames = this.getAdmin().listTableNames();
801 ArrayList<ByteBuffer> list = new ArrayList<ByteBuffer>(tableNames.length);
802 for (int i = 0; i < tableNames.length; i++) {
803 list.add(ByteBuffer.wrap(tableNames[i].getName()));
804 }
805 return list;
806 } catch (IOException e) {
807 LOG.warn(e.getMessage(), e);
808 throw new IOError(Throwables.getStackTraceAsString(e));
809 }
810 }
811
812
813
814
815 @Override
816 public List<TRegionInfo> getTableRegions(ByteBuffer tableName)
817 throws IOError {
818 try (RegionLocator locator = connectionCache.getRegionLocator(getBytes(tableName))) {
819 List<HRegionLocation> regionLocations = locator.getAllRegionLocations();
820 List<TRegionInfo> results = new ArrayList<TRegionInfo>();
821 for (HRegionLocation regionLocation : regionLocations) {
822 HRegionInfo info = regionLocation.getRegionInfo();
823 ServerName serverName = regionLocation.getServerName();
824 TRegionInfo region = new TRegionInfo();
825 region.serverName = ByteBuffer.wrap(
826 Bytes.toBytes(serverName.getHostname()));
827 region.port = serverName.getPort();
828 region.startKey = ByteBuffer.wrap(info.getStartKey());
829 region.endKey = ByteBuffer.wrap(info.getEndKey());
830 region.id = info.getRegionId();
831 region.name = ByteBuffer.wrap(info.getRegionName());
832 region.version = info.getVersion();
833 results.add(region);
834 }
835 return results;
836 } catch (TableNotFoundException e) {
837
838 return Collections.emptyList();
839 } catch (IOException e){
840 LOG.warn(e.getMessage(), e);
841 throw new IOError(Throwables.getStackTraceAsString(e));
842 }
843 }
844
845 @Deprecated
846 @Override
847 public List<TCell> get(
848 ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
849 Map<ByteBuffer, ByteBuffer> attributes)
850 throws IOError {
851 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
852 if (famAndQf.length == 1) {
853 return get(tableName, row, famAndQf[0], null, attributes);
854 }
855 if (famAndQf.length == 2) {
856 return get(tableName, row, famAndQf[0], famAndQf[1], attributes);
857 }
858 throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
859 }
860
861
862
863
864
865
866
867
868 protected List<TCell> get(ByteBuffer tableName,
869 ByteBuffer row,
870 byte[] family,
871 byte[] qualifier,
872 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
873 Table table = null;
874 try {
875 table = getTable(tableName);
876 Get get = new Get(getBytes(row));
877 addAttributes(get, attributes);
878 if (qualifier == null) {
879 get.addFamily(family);
880 } else {
881 get.addColumn(family, qualifier);
882 }
883 Result result = table.get(get);
884 return ThriftUtilities.cellFromHBase(result.rawCells());
885 } catch (IOException e) {
886 LOG.warn(e.getMessage(), e);
887 throw new IOError(Throwables.getStackTraceAsString(e));
888 } finally {
889 closeTable(table);
890 }
891 }
892
893 @Deprecated
894 @Override
895 public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
896 int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
897 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
898 if(famAndQf.length == 1) {
899 return getVer(tableName, row, famAndQf[0], null, numVersions, attributes);
900 }
901 if (famAndQf.length == 2) {
902 return getVer(tableName, row, famAndQf[0], famAndQf[1], numVersions, attributes);
903 }
904 throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
905
906 }
907
908
909
910
911
912
913
914
915
916 public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family,
917 byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
918
919 Table table = null;
920 try {
921 table = getTable(tableName);
922 Get get = new Get(getBytes(row));
923 addAttributes(get, attributes);
924 if (null == qualifier) {
925 get.addFamily(family);
926 } else {
927 get.addColumn(family, qualifier);
928 }
929 get.setMaxVersions(numVersions);
930 Result result = table.get(get);
931 return ThriftUtilities.cellFromHBase(result.rawCells());
932 } catch (IOException e) {
933 LOG.warn(e.getMessage(), e);
934 throw new IOError(Throwables.getStackTraceAsString(e));
935 } finally{
936 closeTable(table);
937 }
938 }
939
940 @Deprecated
941 @Override
942 public List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
943 long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
944 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
945 if (famAndQf.length == 1) {
946 return getVerTs(tableName, row, famAndQf[0], null, timestamp, numVersions, attributes);
947 }
948 if (famAndQf.length == 2) {
949 return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, numVersions,
950 attributes);
951 }
952 throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
953 }
954
955
956
957
958
959
960
961
962 protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
963 byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
964 throws IOError {
965
966 Table table = null;
967 try {
968 table = getTable(tableName);
969 Get get = new Get(getBytes(row));
970 addAttributes(get, attributes);
971 if (null == qualifier) {
972 get.addFamily(family);
973 } else {
974 get.addColumn(family, qualifier);
975 }
976 get.setTimeRange(0, timestamp);
977 get.setMaxVersions(numVersions);
978 Result result = table.get(get);
979 return ThriftUtilities.cellFromHBase(result.rawCells());
980 } catch (IOException e) {
981 LOG.warn(e.getMessage(), e);
982 throw new IOError(Throwables.getStackTraceAsString(e));
983 } finally{
984 closeTable(table);
985 }
986 }
987
988 @Override
989 public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row,
990 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
991 return getRowWithColumnsTs(tableName, row, null,
992 HConstants.LATEST_TIMESTAMP,
993 attributes);
994 }
995
996 @Override
997 public List<TRowResult> getRowWithColumns(ByteBuffer tableName,
998 ByteBuffer row,
999 List<ByteBuffer> columns,
1000 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1001 return getRowWithColumnsTs(tableName, row, columns,
1002 HConstants.LATEST_TIMESTAMP,
1003 attributes);
1004 }
1005
1006 @Override
1007 public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row,
1008 long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1009 return getRowWithColumnsTs(tableName, row, null,
1010 timestamp, attributes);
1011 }
1012
1013 @Override
1014 public List<TRowResult> getRowWithColumnsTs(
1015 ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
1016 long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1017
1018 Table table = null;
1019 try {
1020 table = getTable(tableName);
1021 if (columns == null) {
1022 Get get = new Get(getBytes(row));
1023 addAttributes(get, attributes);
1024 get.setTimeRange(0, timestamp);
1025 Result result = table.get(get);
1026 return ThriftUtilities.rowResultFromHBase(result);
1027 }
1028 Get get = new Get(getBytes(row));
1029 addAttributes(get, attributes);
1030 for(ByteBuffer column : columns) {
1031 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1032 if (famAndQf.length == 1) {
1033 get.addFamily(famAndQf[0]);
1034 } else {
1035 get.addColumn(famAndQf[0], famAndQf[1]);
1036 }
1037 }
1038 get.setTimeRange(0, timestamp);
1039 Result result = table.get(get);
1040 return ThriftUtilities.rowResultFromHBase(result);
1041 } catch (IOException e) {
1042 LOG.warn(e.getMessage(), e);
1043 throw new IOError(Throwables.getStackTraceAsString(e));
1044 } finally{
1045 closeTable(table);
1046 }
1047 }
1048
1049 @Override
1050 public List<TRowResult> getRows(ByteBuffer tableName,
1051 List<ByteBuffer> rows,
1052 Map<ByteBuffer, ByteBuffer> attributes)
1053 throws IOError {
1054 return getRowsWithColumnsTs(tableName, rows, null,
1055 HConstants.LATEST_TIMESTAMP,
1056 attributes);
1057 }
1058
1059 @Override
1060 public List<TRowResult> getRowsWithColumns(ByteBuffer tableName,
1061 List<ByteBuffer> rows,
1062 List<ByteBuffer> columns,
1063 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1064 return getRowsWithColumnsTs(tableName, rows, columns,
1065 HConstants.LATEST_TIMESTAMP,
1066 attributes);
1067 }
1068
1069 @Override
1070 public List<TRowResult> getRowsTs(ByteBuffer tableName,
1071 List<ByteBuffer> rows,
1072 long timestamp,
1073 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1074 return getRowsWithColumnsTs(tableName, rows, null,
1075 timestamp, attributes);
1076 }
1077
1078 @Override
1079 public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName,
1080 List<ByteBuffer> rows,
1081 List<ByteBuffer> columns, long timestamp,
1082 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1083
1084 Table table= null;
1085 try {
1086 List<Get> gets = new ArrayList<Get>(rows.size());
1087 table = getTable(tableName);
1088 if (metrics != null) {
1089 metrics.incNumRowKeysInBatchGet(rows.size());
1090 }
1091 for (ByteBuffer row : rows) {
1092 Get get = new Get(getBytes(row));
1093 addAttributes(get, attributes);
1094 if (columns != null) {
1095
1096 for(ByteBuffer column : columns) {
1097 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1098 if (famAndQf.length == 1) {
1099 get.addFamily(famAndQf[0]);
1100 } else {
1101 get.addColumn(famAndQf[0], famAndQf[1]);
1102 }
1103 }
1104 }
1105 get.setTimeRange(0, timestamp);
1106 gets.add(get);
1107 }
1108 Result[] result = table.get(gets);
1109 return ThriftUtilities.rowResultFromHBase(result);
1110 } catch (IOException e) {
1111 LOG.warn(e.getMessage(), e);
1112 throw new IOError(Throwables.getStackTraceAsString(e));
1113 } finally{
1114 closeTable(table);
1115 }
1116 }
1117
1118 @Override
1119 public void deleteAll(
1120 ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1121 Map<ByteBuffer, ByteBuffer> attributes)
1122 throws IOError {
1123 deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP,
1124 attributes);
1125 }
1126
1127 @Override
1128 public void deleteAllTs(ByteBuffer tableName,
1129 ByteBuffer row,
1130 ByteBuffer column,
1131 long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1132 Table table = null;
1133 try {
1134 table = getTable(tableName);
1135 Delete delete = new Delete(getBytes(row));
1136 addAttributes(delete, attributes);
1137 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1138 if (famAndQf.length == 1) {
1139 delete.deleteFamily(famAndQf[0], timestamp);
1140 } else {
1141 delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1142 }
1143 table.delete(delete);
1144
1145 } catch (IOException e) {
1146 LOG.warn(e.getMessage(), e);
1147 throw new IOError(Throwables.getStackTraceAsString(e));
1148 } finally {
1149 closeTable(table);
1150 }
1151 }
1152
1153 @Override
1154 public void deleteAllRow(
1155 ByteBuffer tableName, ByteBuffer row,
1156 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1157 deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, attributes);
1158 }
1159
1160 @Override
1161 public void deleteAllRowTs(
1162 ByteBuffer tableName, ByteBuffer row, long timestamp,
1163 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1164 Table table = null;
1165 try {
1166 table = getTable(tableName);
1167 Delete delete = new Delete(getBytes(row), timestamp);
1168 addAttributes(delete, attributes);
1169 table.delete(delete);
1170 } catch (IOException e) {
1171 LOG.warn(e.getMessage(), e);
1172 throw new IOError(Throwables.getStackTraceAsString(e));
1173 } finally {
1174 closeTable(table);
1175 }
1176 }
1177
1178 @Override
1179 public void createTable(ByteBuffer in_tableName,
1180 List<ColumnDescriptor> columnFamilies) throws IOError,
1181 IllegalArgument, AlreadyExists {
1182 TableName tableName = getTableName(in_tableName);
1183 try {
1184 if (getAdmin().tableExists(tableName)) {
1185 throw new AlreadyExists("table name already in use");
1186 }
1187 HTableDescriptor desc = new HTableDescriptor(tableName);
1188 for (ColumnDescriptor col : columnFamilies) {
1189 HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col);
1190 desc.addFamily(colDesc);
1191 }
1192 getAdmin().createTable(desc);
1193 } catch (IOException e) {
1194 LOG.warn(e.getMessage(), e);
1195 throw new IOError(Throwables.getStackTraceAsString(e));
1196 } catch (IllegalArgumentException e) {
1197 LOG.warn(e.getMessage(), e);
1198 throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1199 }
1200 }
1201
1202 private static TableName getTableName(ByteBuffer buffer) {
1203 return TableName.valueOf(getBytes(buffer));
1204 }
1205
1206 @Override
1207 public void deleteTable(ByteBuffer in_tableName) throws IOError {
1208 TableName tableName = getTableName(in_tableName);
1209 if (LOG.isDebugEnabled()) {
1210 LOG.debug("deleteTable: table=" + tableName);
1211 }
1212 try {
1213 if (!getAdmin().tableExists(tableName)) {
1214 throw new IOException("table does not exist");
1215 }
1216 getAdmin().deleteTable(tableName);
1217 } catch (IOException e) {
1218 LOG.warn(e.getMessage(), e);
1219 throw new IOError(Throwables.getStackTraceAsString(e));
1220 }
1221 }
1222
1223 @Override
1224 public void mutateRow(ByteBuffer tableName, ByteBuffer row,
1225 List<Mutation> mutations, Map<ByteBuffer, ByteBuffer> attributes)
1226 throws IOError, IllegalArgument {
1227 mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP,
1228 attributes);
1229 }
1230
1231 @Override
1232 public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
1233 List<Mutation> mutations, long timestamp,
1234 Map<ByteBuffer, ByteBuffer> attributes)
1235 throws IOError, IllegalArgument {
1236 Table table = null;
1237 try {
1238 table = getTable(tableName);
1239 Put put = new Put(getBytes(row), timestamp);
1240 addAttributes(put, attributes);
1241
1242 Delete delete = new Delete(getBytes(row));
1243 addAttributes(delete, attributes);
1244 if (metrics != null) {
1245 metrics.incNumRowKeysInBatchMutate(mutations.size());
1246 }
1247
1248
1249 for (Mutation m : mutations) {
1250 byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1251 if (m.isDelete) {
1252 if (famAndQf.length == 1) {
1253 delete.deleteFamily(famAndQf[0], timestamp);
1254 } else {
1255 delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1256 }
1257 delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1258 : Durability.SKIP_WAL);
1259 } else {
1260 if(famAndQf.length == 1) {
1261 LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1262 + "over the whole column family.");
1263 } else {
1264 put.addImmutable(famAndQf[0], famAndQf[1],
1265 m.value != null ? getBytes(m.value)
1266 : HConstants.EMPTY_BYTE_ARRAY);
1267 }
1268 put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1269 }
1270 }
1271 if (!delete.isEmpty())
1272 table.delete(delete);
1273 if (!put.isEmpty())
1274 table.put(put);
1275 } catch (IOException e) {
1276 LOG.warn(e.getMessage(), e);
1277 throw new IOError(Throwables.getStackTraceAsString(e));
1278 } catch (IllegalArgumentException e) {
1279 LOG.warn(e.getMessage(), e);
1280 throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1281 } finally{
1282 closeTable(table);
1283 }
1284 }
1285
1286 @Override
1287 public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches,
1288 Map<ByteBuffer, ByteBuffer> attributes)
1289 throws IOError, IllegalArgument, TException {
1290 mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes);
1291 }
1292
1293 @Override
1294 public void mutateRowsTs(
1295 ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp,
1296 Map<ByteBuffer, ByteBuffer> attributes)
1297 throws IOError, IllegalArgument, TException {
1298 List<Put> puts = new ArrayList<Put>();
1299 List<Delete> deletes = new ArrayList<Delete>();
1300
1301 for (BatchMutation batch : rowBatches) {
1302 byte[] row = getBytes(batch.row);
1303 List<Mutation> mutations = batch.mutations;
1304 Delete delete = new Delete(row);
1305 addAttributes(delete, attributes);
1306 Put put = new Put(row, timestamp);
1307 addAttributes(put, attributes);
1308 for (Mutation m : mutations) {
1309 byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1310 if (m.isDelete) {
1311
1312 if (famAndQf.length == 1) {
1313 delete.deleteFamily(famAndQf[0], timestamp);
1314 } else {
1315 delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1316 }
1317 delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1318 : Durability.SKIP_WAL);
1319 } else {
1320 if (famAndQf.length == 1) {
1321 LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1322 + "over the whole column family.");
1323 }
1324 if (famAndQf.length == 2) {
1325 put.addImmutable(famAndQf[0], famAndQf[1],
1326 m.value != null ? getBytes(m.value)
1327 : HConstants.EMPTY_BYTE_ARRAY);
1328 } else {
1329 throw new IllegalArgumentException("Invalid famAndQf provided.");
1330 }
1331 put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1332 }
1333 }
1334 if (!delete.isEmpty())
1335 deletes.add(delete);
1336 if (!put.isEmpty())
1337 puts.add(put);
1338 }
1339
1340 Table table = null;
1341 try {
1342 table = getTable(tableName);
1343 if (!puts.isEmpty())
1344 table.put(puts);
1345 if (!deletes.isEmpty())
1346 table.delete(deletes);
1347
1348 } catch (IOException e) {
1349 LOG.warn(e.getMessage(), e);
1350 throw new IOError(Throwables.getStackTraceAsString(e));
1351 } catch (IllegalArgumentException e) {
1352 LOG.warn(e.getMessage(), e);
1353 throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1354 } finally{
1355 closeTable(table);
1356 }
1357 }
1358
1359 @Deprecated
1360 @Override
1361 public long atomicIncrement(
1362 ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount)
1363 throws IOError, IllegalArgument, TException {
1364 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1365 if(famAndQf.length == 1) {
1366 return atomicIncrement(tableName, row, famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, amount);
1367 }
1368 return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
1369 }
1370
1371 protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
1372 byte [] family, byte [] qualifier, long amount)
1373 throws IOError, IllegalArgument, TException {
1374 Table table = null;
1375 try {
1376 table = getTable(tableName);
1377 return table.incrementColumnValue(
1378 getBytes(row), family, qualifier, amount);
1379 } catch (IOException e) {
1380 LOG.warn(e.getMessage(), e);
1381 throw new IOError(Throwables.getStackTraceAsString(e));
1382 } finally {
1383 closeTable(table);
1384 }
1385 }
1386
1387 @Override
1388 public void scannerClose(int id) throws IOError, IllegalArgument {
1389 LOG.debug("scannerClose: id=" + id);
1390 ResultScannerWrapper resultScannerWrapper = getScanner(id);
1391 if (resultScannerWrapper == null) {
1392 String message = "scanner ID is invalid";
1393 LOG.warn(message);
1394 throw new IllegalArgument("scanner ID is invalid");
1395 }
1396 resultScannerWrapper.getScanner().close();
1397 removeScanner(id);
1398 }
1399
1400 @Override
1401 public List<TRowResult> scannerGetList(int id,int nbRows)
1402 throws IllegalArgument, IOError {
1403 LOG.debug("scannerGetList: id=" + id);
1404 ResultScannerWrapper resultScannerWrapper = getScanner(id);
1405 if (null == resultScannerWrapper) {
1406 String message = "scanner ID is invalid";
1407 LOG.warn(message);
1408 throw new IllegalArgument("scanner ID is invalid");
1409 }
1410
1411 Result [] results = null;
1412 try {
1413 results = resultScannerWrapper.getScanner().next(nbRows);
1414 if (null == results) {
1415 return new ArrayList<TRowResult>();
1416 }
1417 } catch (IOException e) {
1418 LOG.warn(e.getMessage(), e);
1419 throw new IOError(Throwables.getStackTraceAsString(e));
1420 }
1421 return ThriftUtilities.rowResultFromHBase(results, resultScannerWrapper.isColumnSorted());
1422 }
1423
1424 @Override
1425 public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
1426 return scannerGetList(id,1);
1427 }
1428
1429 @Override
1430 public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
1431 Map<ByteBuffer, ByteBuffer> attributes)
1432 throws IOError {
1433
1434 Table table = null;
1435 try {
1436 table = getTable(tableName);
1437 Scan scan = new Scan();
1438 addAttributes(scan, attributes);
1439 if (tScan.isSetStartRow()) {
1440 scan.setStartRow(tScan.getStartRow());
1441 }
1442 if (tScan.isSetStopRow()) {
1443 scan.setStopRow(tScan.getStopRow());
1444 }
1445 if (tScan.isSetTimestamp()) {
1446 scan.setTimeRange(0, tScan.getTimestamp());
1447 }
1448 if (tScan.isSetCaching()) {
1449 scan.setCaching(tScan.getCaching());
1450 }
1451 if (tScan.isSetBatchSize()) {
1452 scan.setBatch(tScan.getBatchSize());
1453 }
1454 if (tScan.isSetColumns() && tScan.getColumns().size() != 0) {
1455 for(ByteBuffer column : tScan.getColumns()) {
1456 byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1457 if(famQf.length == 1) {
1458 scan.addFamily(famQf[0]);
1459 } else {
1460 scan.addColumn(famQf[0], famQf[1]);
1461 }
1462 }
1463 }
1464 if (tScan.isSetFilterString()) {
1465 ParseFilter parseFilter = new ParseFilter();
1466 scan.setFilter(
1467 parseFilter.parseFilterString(tScan.getFilterString()));
1468 }
1469 if (tScan.isSetReversed()) {
1470 scan.setReversed(tScan.isReversed());
1471 }
1472 return addScanner(table.getScanner(scan), tScan.sortColumns);
1473 } catch (IOException e) {
1474 LOG.warn(e.getMessage(), e);
1475 throw new IOError(Throwables.getStackTraceAsString(e));
1476 } finally{
1477 closeTable(table);
1478 }
1479 }
1480
1481 @Override
1482 public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
1483 List<ByteBuffer> columns,
1484 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1485
1486 Table table = null;
1487 try {
1488 table = getTable(tableName);
1489 Scan scan = new Scan(getBytes(startRow));
1490 addAttributes(scan, attributes);
1491 if(columns != null && columns.size() != 0) {
1492 for(ByteBuffer column : columns) {
1493 byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1494 if(famQf.length == 1) {
1495 scan.addFamily(famQf[0]);
1496 } else {
1497 scan.addColumn(famQf[0], famQf[1]);
1498 }
1499 }
1500 }
1501 return addScanner(table.getScanner(scan), false);
1502 } catch (IOException e) {
1503 LOG.warn(e.getMessage(), e);
1504 throw new IOError(Throwables.getStackTraceAsString(e));
1505 } finally{
1506 closeTable(table);
1507 }
1508 }
1509
1510 @Override
1511 public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow,
1512 ByteBuffer stopRow, List<ByteBuffer> columns,
1513 Map<ByteBuffer, ByteBuffer> attributes)
1514 throws IOError, TException {
1515
1516 Table table = null;
1517 try {
1518 table = getTable(tableName);
1519 Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1520 addAttributes(scan, attributes);
1521 if(columns != null && columns.size() != 0) {
1522 for(ByteBuffer column : columns) {
1523 byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1524 if(famQf.length == 1) {
1525 scan.addFamily(famQf[0]);
1526 } else {
1527 scan.addColumn(famQf[0], famQf[1]);
1528 }
1529 }
1530 }
1531 return addScanner(table.getScanner(scan), false);
1532 } catch (IOException e) {
1533 LOG.warn(e.getMessage(), e);
1534 throw new IOError(Throwables.getStackTraceAsString(e));
1535 } finally{
1536 closeTable(table);
1537 }
1538 }
1539
1540 @Override
1541 public int scannerOpenWithPrefix(ByteBuffer tableName,
1542 ByteBuffer startAndPrefix,
1543 List<ByteBuffer> columns,
1544 Map<ByteBuffer, ByteBuffer> attributes)
1545 throws IOError, TException {
1546
1547 Table table = null;
1548 try {
1549 table = getTable(tableName);
1550 Scan scan = new Scan(getBytes(startAndPrefix));
1551 addAttributes(scan, attributes);
1552 Filter f = new WhileMatchFilter(
1553 new PrefixFilter(getBytes(startAndPrefix)));
1554 scan.setFilter(f);
1555 if (columns != null && columns.size() != 0) {
1556 for(ByteBuffer column : columns) {
1557 byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1558 if(famQf.length == 1) {
1559 scan.addFamily(famQf[0]);
1560 } else {
1561 scan.addColumn(famQf[0], famQf[1]);
1562 }
1563 }
1564 }
1565 return addScanner(table.getScanner(scan), false);
1566 } catch (IOException e) {
1567 LOG.warn(e.getMessage(), e);
1568 throw new IOError(Throwables.getStackTraceAsString(e));
1569 } finally{
1570 closeTable(table);
1571 }
1572 }
1573
1574 @Override
1575 public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
1576 List<ByteBuffer> columns, long timestamp,
1577 Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
1578
1579 Table table = null;
1580 try {
1581 table = getTable(tableName);
1582 Scan scan = new Scan(getBytes(startRow));
1583 addAttributes(scan, attributes);
1584 scan.setTimeRange(0, timestamp);
1585 if (columns != null && columns.size() != 0) {
1586 for (ByteBuffer column : columns) {
1587 byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1588 if(famQf.length == 1) {
1589 scan.addFamily(famQf[0]);
1590 } else {
1591 scan.addColumn(famQf[0], famQf[1]);
1592 }
1593 }
1594 }
1595 return addScanner(table.getScanner(scan), false);
1596 } catch (IOException e) {
1597 LOG.warn(e.getMessage(), e);
1598 throw new IOError(Throwables.getStackTraceAsString(e));
1599 } finally{
1600 closeTable(table);
1601 }
1602 }
1603
1604 @Override
1605 public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow,
1606 ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
1607 Map<ByteBuffer, ByteBuffer> attributes)
1608 throws IOError, TException {
1609
1610 Table table = null;
1611 try {
1612 table = getTable(tableName);
1613 Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1614 addAttributes(scan, attributes);
1615 scan.setTimeRange(0, timestamp);
1616 if (columns != null && columns.size() != 0) {
1617 for (ByteBuffer column : columns) {
1618 byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1619 if(famQf.length == 1) {
1620 scan.addFamily(famQf[0]);
1621 } else {
1622 scan.addColumn(famQf[0], famQf[1]);
1623 }
1624 }
1625 }
1626 scan.setTimeRange(0, timestamp);
1627 return addScanner(table.getScanner(scan), false);
1628 } catch (IOException e) {
1629 LOG.warn(e.getMessage(), e);
1630 throw new IOError(Throwables.getStackTraceAsString(e));
1631 } finally{
1632 closeTable(table);
1633 }
1634 }
1635
1636 @Override
1637 public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
1638 ByteBuffer tableName) throws IOError, TException {
1639
1640 Table table = null;
1641 try {
1642 TreeMap<ByteBuffer, ColumnDescriptor> columns =
1643 new TreeMap<ByteBuffer, ColumnDescriptor>();
1644
1645 table = getTable(tableName);
1646 HTableDescriptor desc = table.getTableDescriptor();
1647
1648 for (HColumnDescriptor e : desc.getFamilies()) {
1649 ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
1650 columns.put(col.name, col);
1651 }
1652 return columns;
1653 } catch (IOException e) {
1654 LOG.warn(e.getMessage(), e);
1655 throw new IOError(Throwables.getStackTraceAsString(e));
1656 } finally {
1657 closeTable(table);
1658 }
1659 }
1660
1661 @Deprecated
1662 @Override
1663 public List<TCell> getRowOrBefore(ByteBuffer tableName, ByteBuffer row,
1664 ByteBuffer family) throws IOError {
1665 try {
1666 Result result = getRowOrBefore(getBytes(tableName), getBytes(row), getBytes(family));
1667 return ThriftUtilities.cellFromHBase(result.rawCells());
1668 } catch (IOException e) {
1669 LOG.warn(e.getMessage(), e);
1670 throw new IOError(Throwables.getStackTraceAsString(e));
1671 }
1672 }
1673
1674 @Override
1675 public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
1676 try {
1677 byte[] row = getBytes(searchRow);
1678 Result startRowResult =
1679 getRowOrBefore(TableName.META_TABLE_NAME.getName(), row, HConstants.CATALOG_FAMILY);
1680
1681 if (startRowResult == null) {
1682 throw new IOException("Cannot find row in "+ TableName.META_TABLE_NAME+", row="
1683 + Bytes.toStringBinary(row));
1684 }
1685
1686
1687 HRegionInfo regionInfo = HRegionInfo.getHRegionInfo(startRowResult);
1688 if (regionInfo == null) {
1689 throw new IOException("HRegionInfo REGIONINFO was null or " +
1690 " empty in Meta for row="
1691 + Bytes.toStringBinary(row));
1692 }
1693 TRegionInfo region = new TRegionInfo();
1694 region.setStartKey(regionInfo.getStartKey());
1695 region.setEndKey(regionInfo.getEndKey());
1696 region.id = regionInfo.getRegionId();
1697 region.setName(regionInfo.getRegionName());
1698 region.version = regionInfo.getVersion();
1699
1700
1701 ServerName serverName = HRegionInfo.getServerName(startRowResult);
1702 if (serverName != null) {
1703 region.setServerName(Bytes.toBytes(serverName.getHostname()));
1704 region.port = serverName.getPort();
1705 }
1706 return region;
1707 } catch (IOException e) {
1708 LOG.warn(e.getMessage(), e);
1709 throw new IOError(Throwables.getStackTraceAsString(e));
1710 }
1711 }
1712
1713 private void closeTable(Table table) throws IOError
1714 {
1715 try{
1716 if(table != null){
1717 table.close();
1718 }
1719 } catch (IOException e){
1720 LOG.error(e.getMessage(), e);
1721 throw new IOError(Throwables.getStackTraceAsString(e));
1722 }
1723 }
1724
1725 private Result getRowOrBefore(byte[] tableName, byte[] row, byte[] family) throws IOException {
1726 Scan scan = new Scan(row);
1727 scan.setReversed(true);
1728 scan.addFamily(family);
1729 scan.setStartRow(row);
1730 Table table = getTable(tableName);
1731 try (ResultScanner scanner = table.getScanner(scan)) {
1732 return scanner.next();
1733 } finally{
1734 if(table != null){
1735 table.close();
1736 }
1737 }
1738 }
1739
1740 private void initMetrics(ThriftMetrics metrics) {
1741 this.metrics = metrics;
1742 }
1743
1744 @Override
1745 public void increment(TIncrement tincrement) throws IOError, TException {
1746
1747 if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) {
1748 throw new TException("Must supply a table and a row key; can't increment");
1749 }
1750
1751 if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1752 this.coalescer.queueIncrement(tincrement);
1753 return;
1754 }
1755
1756 Table table = null;
1757 try {
1758 table = getTable(tincrement.getTable());
1759 Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
1760 table.increment(inc);
1761 } catch (IOException e) {
1762 LOG.warn(e.getMessage(), e);
1763 throw new IOError(Throwables.getStackTraceAsString(e));
1764 } finally{
1765 closeTable(table);
1766 }
1767 }
1768
1769 @Override
1770 public void incrementRows(List<TIncrement> tincrements) throws IOError, TException {
1771 if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1772 this.coalescer.queueIncrements(tincrements);
1773 return;
1774 }
1775 for (TIncrement tinc : tincrements) {
1776 increment(tinc);
1777 }
1778 }
1779
1780 @Override
1781 public List<TCell> append(TAppend tappend) throws IOError, TException {
1782 if (tappend.getRow().length == 0 || tappend.getTable().length == 0) {
1783 throw new TException("Must supply a table and a row key; can't append");
1784 }
1785
1786 Table table = null;
1787 try {
1788 table = getTable(tappend.getTable());
1789 Append append = ThriftUtilities.appendFromThrift(tappend);
1790 Result result = table.append(append);
1791 return ThriftUtilities.cellFromHBase(result.rawCells());
1792 } catch (IOException e) {
1793 LOG.warn(e.getMessage(), e);
1794 throw new IOError(Throwables.getStackTraceAsString(e));
1795 } finally{
1796 closeTable(table);
1797 }
1798 }
1799
1800 @Override
1801 public boolean checkAndPut(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1802 ByteBuffer value, Mutation mput, Map<ByteBuffer, ByteBuffer> attributes) throws IOError,
1803 IllegalArgument, TException {
1804 Put put;
1805 try {
1806 put = new Put(getBytes(row), HConstants.LATEST_TIMESTAMP);
1807 addAttributes(put, attributes);
1808
1809 byte[][] famAndQf = KeyValue.parseColumn(getBytes(mput.column));
1810
1811 put.addImmutable(famAndQf[0], famAndQf[1], mput.value != null ? getBytes(mput.value)
1812 : HConstants.EMPTY_BYTE_ARRAY);
1813
1814 put.setDurability(mput.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1815 } catch (IllegalArgumentException e) {
1816 LOG.warn(e.getMessage(), e);
1817 throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1818 }
1819
1820 Table table = null;
1821 try {
1822 table = getTable(tableName);
1823 byte[][] famAndQf = KeyValue.parseColumn(getBytes(column));
1824 return table.checkAndPut(getBytes(row), famAndQf[0], famAndQf[1],
1825 value != null ? getBytes(value) : HConstants.EMPTY_BYTE_ARRAY, put);
1826 } catch (IOException e) {
1827 LOG.warn(e.getMessage(), e);
1828 throw new IOError(Throwables.getStackTraceAsString(e));
1829 } catch (IllegalArgumentException e) {
1830 LOG.warn(e.getMessage(), e);
1831 throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1832 } finally {
1833 closeTable(table);
1834 }
1835 }
1836 }
1837
1838
1839
1840
1841
1842
1843 private static void addAttributes(OperationWithAttributes op,
1844 Map<ByteBuffer, ByteBuffer> attributes) {
1845 if (attributes == null || attributes.size() == 0) {
1846 return;
1847 }
1848 for (Map.Entry<ByteBuffer, ByteBuffer> entry : attributes.entrySet()) {
1849 String name = Bytes.toStringBinary(getBytes(entry.getKey()));
1850 byte[] value = getBytes(entry.getValue());
1851 op.setAttribute(name, value);
1852 }
1853 }
1854
1855 public static void registerFilters(Configuration conf) {
1856 String[] filters = conf.getStrings("hbase.thrift.filters");
1857 if(filters != null) {
1858 for(String filterClass: filters) {
1859 String[] filterPart = filterClass.split(":");
1860 if(filterPart.length != 2) {
1861 LOG.warn("Invalid filter specification " + filterClass + " - skipping");
1862 } else {
1863 ParseFilter.registerFilter(filterPart[0], filterPart[1]);
1864 }
1865 }
1866 }
1867 }
1868 }