1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.thrift;
20
21 import static org.apache.hadoop.hbase.util.Bytes.getBytes;
22
23 import java.io.IOException;
24 import java.net.InetAddress;
25 import java.net.InetSocketAddress;
26 import java.net.UnknownHostException;
27 import java.nio.ByteBuffer;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.Collections;
31 import java.util.HashMap;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.TreeMap;
35 import java.util.concurrent.BlockingQueue;
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.LinkedBlockingQueue;
38 import java.util.concurrent.ThreadPoolExecutor;
39 import java.util.concurrent.TimeUnit;
40
41 import org.apache.commons.cli.CommandLine;
42 import org.apache.commons.cli.Option;
43 import org.apache.commons.cli.OptionGroup;
44 import org.apache.commons.logging.Log;
45 import org.apache.commons.logging.LogFactory;
46 import org.apache.hadoop.classification.InterfaceAudience;
47 import org.apache.hadoop.conf.Configuration;
48 import org.apache.hadoop.hbase.HBaseConfiguration;
49 import org.apache.hadoop.hbase.HColumnDescriptor;
50 import org.apache.hadoop.hbase.HConstants;
51 import org.apache.hadoop.hbase.HRegionInfo;
52 import org.apache.hadoop.hbase.HTableDescriptor;
53 import org.apache.hadoop.hbase.KeyValue;
54 import org.apache.hadoop.hbase.ServerName;
55 import org.apache.hadoop.hbase.exceptions.TableNotFoundException;
56 import org.apache.hadoop.hbase.client.Delete;
57 import org.apache.hadoop.hbase.client.Get;
58 import org.apache.hadoop.hbase.client.HBaseAdmin;
59 import org.apache.hadoop.hbase.client.HTable;
60 import org.apache.hadoop.hbase.client.Increment;
61 import org.apache.hadoop.hbase.client.OperationWithAttributes;
62 import org.apache.hadoop.hbase.client.Put;
63 import org.apache.hadoop.hbase.client.Result;
64 import org.apache.hadoop.hbase.client.ResultScanner;
65 import org.apache.hadoop.hbase.client.Scan;
66 import org.apache.hadoop.hbase.client.Durability;
67 import org.apache.hadoop.hbase.filter.Filter;
68 import org.apache.hadoop.hbase.filter.ParseFilter;
69 import org.apache.hadoop.hbase.filter.PrefixFilter;
70 import org.apache.hadoop.hbase.filter.WhileMatchFilter;
71 import org.apache.hadoop.hbase.thrift.CallQueue.Call;
72 import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
73 import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
74 import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
75 import org.apache.hadoop.hbase.thrift.generated.Hbase;
76 import org.apache.hadoop.hbase.thrift.generated.IOError;
77 import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
78 import org.apache.hadoop.hbase.thrift.generated.Mutation;
79 import org.apache.hadoop.hbase.thrift.generated.TCell;
80 import org.apache.hadoop.hbase.thrift.generated.TIncrement;
81 import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
82 import org.apache.hadoop.hbase.thrift.generated.TRowResult;
83 import org.apache.hadoop.hbase.thrift.generated.TScan;
84 import org.apache.hadoop.hbase.util.Bytes;
85 import org.apache.thrift.TException;
86 import org.apache.thrift.protocol.TBinaryProtocol;
87 import org.apache.thrift.protocol.TCompactProtocol;
88 import org.apache.thrift.protocol.TProtocolFactory;
89 import org.apache.thrift.server.THsHaServer;
90 import org.apache.thrift.server.TNonblockingServer;
91 import org.apache.thrift.server.TServer;
92 import org.apache.thrift.server.TThreadedSelectorServer;
93 import org.apache.thrift.transport.TFramedTransport;
94 import org.apache.thrift.transport.TNonblockingServerSocket;
95 import org.apache.thrift.transport.TNonblockingServerTransport;
96 import org.apache.thrift.transport.TServerSocket;
97 import org.apache.thrift.transport.TServerTransport;
98 import org.apache.thrift.transport.TTransportFactory;
99
100 import com.google.common.base.Joiner;
101 import com.google.common.util.concurrent.ThreadFactoryBuilder;
102
103
104
105
106
107 @InterfaceAudience.Private
108 public class ThriftServerRunner implements Runnable {
109
110 private static final Log LOG = LogFactory.getLog(ThriftServerRunner.class);
111
112 static final String SERVER_TYPE_CONF_KEY =
113 "hbase.regionserver.thrift.server.type";
114
115 static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress";
116 static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
117 static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
118 static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
119 static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
120
121 private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
122 public static final int DEFAULT_LISTEN_PORT = 9090;
123 private final int listenPort;
124
125 private Configuration conf;
126 volatile TServer tserver;
127 private final Hbase.Iface handler;
128 private final ThriftMetrics metrics;
129
130
131 enum ImplType {
132 HS_HA("hsha", true, THsHaServer.class, true),
133 NONBLOCKING("nonblocking", true, TNonblockingServer.class, true),
134 THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true),
135 THREADED_SELECTOR(
136 "threadedselector", true, TThreadedSelectorServer.class, true);
137
138 public static final ImplType DEFAULT = THREAD_POOL;
139
140 final String option;
141 final boolean isAlwaysFramed;
142 final Class<? extends TServer> serverClass;
143 final boolean canSpecifyBindIP;
144
145 ImplType(String option, boolean isAlwaysFramed,
146 Class<? extends TServer> serverClass, boolean canSpecifyBindIP) {
147 this.option = option;
148 this.isAlwaysFramed = isAlwaysFramed;
149 this.serverClass = serverClass;
150 this.canSpecifyBindIP = canSpecifyBindIP;
151 }
152
153
154
155
156
157 @Override
158 public String toString() {
159 return "-" + option;
160 }
161
162 String getDescription() {
163 StringBuilder sb = new StringBuilder("Use the " +
164 serverClass.getSimpleName());
165 if (isAlwaysFramed) {
166 sb.append(" This implies the framed transport.");
167 }
168 if (this == DEFAULT) {
169 sb.append("This is the default.");
170 }
171 return sb.toString();
172 }
173
174 static OptionGroup createOptionGroup() {
175 OptionGroup group = new OptionGroup();
176 for (ImplType t : values()) {
177 group.addOption(new Option(t.option, t.getDescription()));
178 }
179 return group;
180 }
181
182 static ImplType getServerImpl(Configuration conf) {
183 String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option);
184 for (ImplType t : values()) {
185 if (confType.equals(t.option)) {
186 return t;
187 }
188 }
189 throw new AssertionError("Unknown server ImplType.option:" + confType);
190 }
191
192 static void setServerImpl(CommandLine cmd, Configuration conf) {
193 ImplType chosenType = null;
194 int numChosen = 0;
195 for (ImplType t : values()) {
196 if (cmd.hasOption(t.option)) {
197 chosenType = t;
198 ++numChosen;
199 }
200 }
201 if (numChosen < 1) {
202 LOG.info("Using default thrift server type");
203 chosenType = DEFAULT;
204 } else if (numChosen > 1) {
205 throw new AssertionError("Exactly one option out of " +
206 Arrays.toString(values()) + " has to be specified");
207 }
208 LOG.info("Using thrift server type " + chosenType.option);
209 conf.set(SERVER_TYPE_CONF_KEY, chosenType.option);
210 }
211
212 public String simpleClassName() {
213 return serverClass.getSimpleName();
214 }
215
216 public static List<String> serversThatCannotSpecifyBindIP() {
217 List<String> l = new ArrayList<String>();
218 for (ImplType t : values()) {
219 if (!t.canSpecifyBindIP) {
220 l.add(t.simpleClassName());
221 }
222 }
223 return l;
224 }
225
226 }
227
228 public ThriftServerRunner(Configuration conf) throws IOException {
229 this(conf, new ThriftServerRunner.HBaseHandler(conf));
230 }
231
232 public ThriftServerRunner(Configuration conf, HBaseHandler handler) {
233 this.conf = HBaseConfiguration.create(conf);
234 this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
235 this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
236 handler.initMetrics(metrics);
237 this.handler = HbaseHandlerMetricsProxy.newInstance(handler, metrics, conf);
238 }
239
240
241
242
243 @Override
244 public void run() {
245 try {
246 setupServer();
247 tserver.serve();
248 } catch (Exception e) {
249 LOG.fatal("Cannot run ThriftServer", e);
250
251 System.exit(-1);
252 }
253 }
254
255 public void shutdown() {
256 if (tserver != null) {
257 tserver.stop();
258 tserver = null;
259 }
260 }
261
262
263
264
265 private void setupServer() throws Exception {
266
267 TProtocolFactory protocolFactory;
268 if (conf.getBoolean(COMPACT_CONF_KEY, false)) {
269 LOG.debug("Using compact protocol");
270 protocolFactory = new TCompactProtocol.Factory();
271 } else {
272 LOG.debug("Using binary protocol");
273 protocolFactory = new TBinaryProtocol.Factory();
274 }
275
276 Hbase.Processor<Hbase.Iface> processor =
277 new Hbase.Processor<Hbase.Iface>(handler);
278 ImplType implType = ImplType.getServerImpl(conf);
279
280
281 TTransportFactory transportFactory;
282 if (conf.getBoolean(FRAMED_CONF_KEY, false) || implType.isAlwaysFramed) {
283 transportFactory = new TFramedTransport.Factory();
284 LOG.debug("Using framed transport");
285 } else {
286 transportFactory = new TTransportFactory();
287 }
288
289 if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
290 LOG.error("Server types " + Joiner.on(", ").join(
291 ImplType.serversThatCannotSpecifyBindIP()) + " don't support IP " +
292 "address binding at the moment. See " +
293 "https://issues.apache.org/jira/browse/HBASE-2155 for details.");
294 throw new RuntimeException(
295 "-" + BIND_CONF_KEY + " not supported with " + implType);
296 }
297
298 if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING ||
299 implType == ImplType.THREADED_SELECTOR) {
300
301 InetAddress listenAddress = getBindAddress(conf);
302 TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(
303 new InetSocketAddress(listenAddress, listenPort));
304
305 if (implType == ImplType.NONBLOCKING) {
306 TNonblockingServer.Args serverArgs =
307 new TNonblockingServer.Args(serverTransport);
308 serverArgs.processor(processor)
309 .transportFactory(transportFactory)
310 .protocolFactory(protocolFactory);
311 tserver = new TNonblockingServer(serverArgs);
312 } else if (implType == ImplType.HS_HA) {
313 THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
314 CallQueue callQueue =
315 new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
316 ExecutorService executorService = createExecutor(
317 callQueue, serverArgs.getWorkerThreads());
318 serverArgs.executorService(executorService)
319 .processor(processor)
320 .transportFactory(transportFactory)
321 .protocolFactory(protocolFactory);
322 tserver = new THsHaServer(serverArgs);
323 } else {
324 TThreadedSelectorServer.Args serverArgs =
325 new HThreadedSelectorServerArgs(serverTransport, conf);
326 CallQueue callQueue =
327 new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
328 ExecutorService executorService = createExecutor(
329 callQueue, serverArgs.getWorkerThreads());
330 serverArgs.executorService(executorService)
331 .processor(processor)
332 .transportFactory(transportFactory)
333 .protocolFactory(protocolFactory);
334 tserver = new TThreadedSelectorServer(serverArgs);
335 }
336 LOG.info("starting HBase " + implType.simpleClassName() +
337 " server on " + Integer.toString(listenPort));
338 } else if (implType == ImplType.THREAD_POOL) {
339
340 InetAddress listenAddress = getBindAddress(conf);
341
342 TServerTransport serverTransport = new TServerSocket(
343 new InetSocketAddress(listenAddress, listenPort));
344
345 TBoundedThreadPoolServer.Args serverArgs =
346 new TBoundedThreadPoolServer.Args(serverTransport, conf);
347 serverArgs.processor(processor)
348 .transportFactory(transportFactory)
349 .protocolFactory(protocolFactory);
350 LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
351 + listenAddress + ":" + Integer.toString(listenPort)
352 + "; " + serverArgs);
353 TBoundedThreadPoolServer tserver =
354 new TBoundedThreadPoolServer(serverArgs, metrics);
355 this.tserver = tserver;
356 } else {
357 throw new AssertionError("Unsupported Thrift server implementation: " +
358 implType.simpleClassName());
359 }
360
361
362 if (tserver.getClass() != implType.serverClass) {
363 throw new AssertionError("Expected to create Thrift server class " +
364 implType.serverClass.getName() + " but got " +
365 tserver.getClass().getName());
366 }
367
368
369
370 registerFilters(conf);
371 }
372
373 ExecutorService createExecutor(BlockingQueue<Runnable> callQueue,
374 int workerThreads) {
375 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
376 tfb.setDaemon(true);
377 tfb.setNameFormat("thrift-worker-%d");
378 return new ThreadPoolExecutor(workerThreads, workerThreads,
379 Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
380 }
381
382 private InetAddress getBindAddress(Configuration conf)
383 throws UnknownHostException {
384 String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
385 return InetAddress.getByName(bindAddressStr);
386 }
387
388
389
390
391
392 public static class HBaseHandler implements Hbase.Iface {
393 protected Configuration conf;
394 protected volatile HBaseAdmin admin = null;
395 protected final Log LOG = LogFactory.getLog(this.getClass().getName());
396
397
398 protected int nextScannerId = 0;
399 protected HashMap<Integer, ResultScanner> scannerMap = null;
400 private ThriftMetrics metrics = null;
401
402 private static ThreadLocal<Map<String, HTable>> threadLocalTables =
403 new ThreadLocal<Map<String, HTable>>() {
404 @Override
405 protected Map<String, HTable> initialValue() {
406 return new TreeMap<String, HTable>();
407 }
408 };
409
410 IncrementCoalescer coalescer = null;
411
412
413
414
415
416
417
418
419 byte[][] getAllColumns(HTable table) throws IOException {
420 HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies();
421 byte[][] columns = new byte[cds.length][];
422 for (int i = 0; i < cds.length; i++) {
423 columns[i] = Bytes.add(cds[i].getName(),
424 KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
425 }
426 return columns;
427 }
428
429
430
431
432
433
434
435
436
437
438 public HTable getTable(final byte[] tableName) throws
439 IOException {
440 String table = new String(tableName);
441 Map<String, HTable> tables = threadLocalTables.get();
442 if (!tables.containsKey(table)) {
443 tables.put(table, new HTable(conf, tableName));
444 }
445 return tables.get(table);
446 }
447
448 public HTable getTable(final ByteBuffer tableName) throws IOException {
449 return getTable(getBytes(tableName));
450 }
451
452
453
454
455
456
457
458
459 protected synchronized int addScanner(ResultScanner scanner) {
460 int id = nextScannerId++;
461 scannerMap.put(id, scanner);
462 return id;
463 }
464
465
466
467
468
469
470
471 protected synchronized ResultScanner getScanner(int id) {
472 return scannerMap.get(id);
473 }
474
475
476
477
478
479
480
481
482 protected synchronized ResultScanner removeScanner(int id) {
483 return scannerMap.remove(id);
484 }
485
486
487
488
489
490 protected HBaseHandler()
491 throws IOException {
492 this(HBaseConfiguration.create());
493 }
494
495 protected HBaseHandler(final Configuration c) throws IOException {
496 this.conf = c;
497 scannerMap = new HashMap<Integer, ResultScanner>();
498 this.coalescer = new IncrementCoalescer(this);
499 }
500
501
502
503
504 private HBaseAdmin getHBaseAdmin() throws IOException {
505 if (admin == null) {
506 synchronized (this) {
507 if (admin == null) {
508 admin = new HBaseAdmin(conf);
509 }
510 }
511 }
512 return admin;
513 }
514
515 @Override
516 public void enableTable(ByteBuffer tableName) throws IOError {
517 try{
518 getHBaseAdmin().enableTable(getBytes(tableName));
519 } catch (IOException e) {
520 LOG.warn(e.getMessage(), e);
521 throw new IOError(e.getMessage());
522 }
523 }
524
525 @Override
526 public void disableTable(ByteBuffer tableName) throws IOError{
527 try{
528 getHBaseAdmin().disableTable(getBytes(tableName));
529 } catch (IOException e) {
530 LOG.warn(e.getMessage(), e);
531 throw new IOError(e.getMessage());
532 }
533 }
534
535 @Override
536 public boolean isTableEnabled(ByteBuffer tableName) throws IOError {
537 try {
538 return HTable.isTableEnabled(this.conf, getBytes(tableName));
539 } catch (IOException e) {
540 LOG.warn(e.getMessage(), e);
541 throw new IOError(e.getMessage());
542 }
543 }
544
545 @Override
546 public void compact(ByteBuffer tableNameOrRegionName) throws IOError {
547 try{
548 getHBaseAdmin().compact(getBytes(tableNameOrRegionName));
549 } catch (InterruptedException e) {
550 throw new IOError(e.getMessage());
551 } catch (IOException e) {
552 LOG.warn(e.getMessage(), e);
553 throw new IOError(e.getMessage());
554 }
555 }
556
557 @Override
558 public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError {
559 try{
560 getHBaseAdmin().majorCompact(getBytes(tableNameOrRegionName));
561 } catch (InterruptedException e) {
562 LOG.warn(e.getMessage(), e);
563 throw new IOError(e.getMessage());
564 } catch (IOException e) {
565 LOG.warn(e.getMessage(), e);
566 throw new IOError(e.getMessage());
567 }
568 }
569
570 @Override
571 public List<ByteBuffer> getTableNames() throws IOError {
572 try {
573 HTableDescriptor[] tables = this.getHBaseAdmin().listTables();
574 ArrayList<ByteBuffer> list = new ArrayList<ByteBuffer>(tables.length);
575 for (int i = 0; i < tables.length; i++) {
576 list.add(ByteBuffer.wrap(tables[i].getName()));
577 }
578 return list;
579 } catch (IOException e) {
580 LOG.warn(e.getMessage(), e);
581 throw new IOError(e.getMessage());
582 }
583 }
584
585
586
587
588 @Override
589 public List<TRegionInfo> getTableRegions(ByteBuffer tableName)
590 throws IOError {
591 try {
592 HTable table;
593 try {
594 table = getTable(tableName);
595 } catch (TableNotFoundException ex) {
596 return new ArrayList<TRegionInfo>();
597 }
598 Map<HRegionInfo, ServerName> regionLocations =
599 table.getRegionLocations();
600 List<TRegionInfo> results = new ArrayList<TRegionInfo>();
601 for (Map.Entry<HRegionInfo, ServerName> entry :
602 regionLocations.entrySet()) {
603 HRegionInfo info = entry.getKey();
604 ServerName serverName = entry.getValue();
605 TRegionInfo region = new TRegionInfo();
606 region.serverName = ByteBuffer.wrap(
607 Bytes.toBytes(serverName.getHostname()));
608 region.port = serverName.getPort();
609 region.startKey = ByteBuffer.wrap(info.getStartKey());
610 region.endKey = ByteBuffer.wrap(info.getEndKey());
611 region.id = info.getRegionId();
612 region.name = ByteBuffer.wrap(info.getRegionName());
613 region.version = info.getVersion();
614 results.add(region);
615 }
616 return results;
617 } catch (TableNotFoundException e) {
618
619 return Collections.emptyList();
620 } catch (IOException e){
621 LOG.warn(e.getMessage(), e);
622 throw new IOError(e.getMessage());
623 }
624 }
625
626 @Deprecated
627 @Override
628 public List<TCell> get(
629 ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
630 Map<ByteBuffer, ByteBuffer> attributes)
631 throws IOError {
632 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
633 if(famAndQf.length == 1) {
634 return get(tableName, row, famAndQf[0], new byte[0], attributes);
635 }
636 return get(tableName, row, famAndQf[0], famAndQf[1], attributes);
637 }
638
639 protected List<TCell> get(ByteBuffer tableName,
640 ByteBuffer row,
641 byte[] family,
642 byte[] qualifier,
643 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
644 try {
645 HTable table = getTable(tableName);
646 Get get = new Get(getBytes(row));
647 addAttributes(get, attributes);
648 if (qualifier == null || qualifier.length == 0) {
649 get.addFamily(family);
650 } else {
651 get.addColumn(family, qualifier);
652 }
653 Result result = table.get(get);
654 return ThriftUtilities.cellFromHBase(result.raw());
655 } catch (IOException e) {
656 LOG.warn(e.getMessage(), e);
657 throw new IOError(e.getMessage());
658 }
659 }
660
661 @Deprecated
662 @Override
663 public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row,
664 ByteBuffer column, int numVersions,
665 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
666 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
667 if(famAndQf.length == 1) {
668 return getVer(tableName, row, famAndQf[0],
669 new byte[0], numVersions, attributes);
670 }
671 return getVer(tableName, row,
672 famAndQf[0], famAndQf[1], numVersions, attributes);
673 }
674
675 public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row,
676 byte[] family,
677 byte[] qualifier, int numVersions,
678 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
679 try {
680 HTable table = getTable(tableName);
681 Get get = new Get(getBytes(row));
682 addAttributes(get, attributes);
683 get.addColumn(family, qualifier);
684 get.setMaxVersions(numVersions);
685 Result result = table.get(get);
686 return ThriftUtilities.cellFromHBase(result.raw());
687 } catch (IOException e) {
688 LOG.warn(e.getMessage(), e);
689 throw new IOError(e.getMessage());
690 }
691 }
692
693 @Deprecated
694 @Override
695 public List<TCell> getVerTs(ByteBuffer tableName,
696 ByteBuffer row,
697 ByteBuffer column,
698 long timestamp,
699 int numVersions,
700 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
701 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
702 if(famAndQf.length == 1) {
703 return getVerTs(tableName, row, famAndQf[0], new byte[0], timestamp,
704 numVersions, attributes);
705 }
706 return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp,
707 numVersions, attributes);
708 }
709
710 protected List<TCell> getVerTs(ByteBuffer tableName,
711 ByteBuffer row, byte [] family,
712 byte [] qualifier, long timestamp, int numVersions,
713 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
714 try {
715 HTable table = getTable(tableName);
716 Get get = new Get(getBytes(row));
717 addAttributes(get, attributes);
718 get.addColumn(family, qualifier);
719 get.setTimeRange(Long.MIN_VALUE, timestamp);
720 get.setMaxVersions(numVersions);
721 Result result = table.get(get);
722 return ThriftUtilities.cellFromHBase(result.raw());
723 } catch (IOException e) {
724 LOG.warn(e.getMessage(), e);
725 throw new IOError(e.getMessage());
726 }
727 }
728
729 @Override
730 public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row,
731 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
732 return getRowWithColumnsTs(tableName, row, null,
733 HConstants.LATEST_TIMESTAMP,
734 attributes);
735 }
736
737 @Override
738 public List<TRowResult> getRowWithColumns(ByteBuffer tableName,
739 ByteBuffer row,
740 List<ByteBuffer> columns,
741 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
742 return getRowWithColumnsTs(tableName, row, columns,
743 HConstants.LATEST_TIMESTAMP,
744 attributes);
745 }
746
747 @Override
748 public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row,
749 long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
750 return getRowWithColumnsTs(tableName, row, null,
751 timestamp, attributes);
752 }
753
754 @Override
755 public List<TRowResult> getRowWithColumnsTs(
756 ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
757 long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
758 try {
759 HTable table = getTable(tableName);
760 if (columns == null) {
761 Get get = new Get(getBytes(row));
762 addAttributes(get, attributes);
763 get.setTimeRange(Long.MIN_VALUE, timestamp);
764 Result result = table.get(get);
765 return ThriftUtilities.rowResultFromHBase(result);
766 }
767 Get get = new Get(getBytes(row));
768 addAttributes(get, attributes);
769 for(ByteBuffer column : columns) {
770 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
771 if (famAndQf.length == 1) {
772 get.addFamily(famAndQf[0]);
773 } else {
774 get.addColumn(famAndQf[0], famAndQf[1]);
775 }
776 }
777 get.setTimeRange(Long.MIN_VALUE, timestamp);
778 Result result = table.get(get);
779 return ThriftUtilities.rowResultFromHBase(result);
780 } catch (IOException e) {
781 LOG.warn(e.getMessage(), e);
782 throw new IOError(e.getMessage());
783 }
784 }
785
786 @Override
787 public List<TRowResult> getRows(ByteBuffer tableName,
788 List<ByteBuffer> rows,
789 Map<ByteBuffer, ByteBuffer> attributes)
790 throws IOError {
791 return getRowsWithColumnsTs(tableName, rows, null,
792 HConstants.LATEST_TIMESTAMP,
793 attributes);
794 }
795
796 @Override
797 public List<TRowResult> getRowsWithColumns(ByteBuffer tableName,
798 List<ByteBuffer> rows,
799 List<ByteBuffer> columns,
800 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
801 return getRowsWithColumnsTs(tableName, rows, columns,
802 HConstants.LATEST_TIMESTAMP,
803 attributes);
804 }
805
806 @Override
807 public List<TRowResult> getRowsTs(ByteBuffer tableName,
808 List<ByteBuffer> rows,
809 long timestamp,
810 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
811 return getRowsWithColumnsTs(tableName, rows, null,
812 timestamp, attributes);
813 }
814
815 @Override
816 public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName,
817 List<ByteBuffer> rows,
818 List<ByteBuffer> columns, long timestamp,
819 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
820 try {
821 List<Get> gets = new ArrayList<Get>(rows.size());
822 HTable table = getTable(tableName);
823 if (metrics != null) {
824 metrics.incNumRowKeysInBatchGet(rows.size());
825 }
826 for (ByteBuffer row : rows) {
827 Get get = new Get(getBytes(row));
828 addAttributes(get, attributes);
829 if (columns != null) {
830
831 for(ByteBuffer column : columns) {
832 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
833 if (famAndQf.length == 1) {
834 get.addFamily(famAndQf[0]);
835 } else {
836 get.addColumn(famAndQf[0], famAndQf[1]);
837 }
838 }
839 }
840 get.setTimeRange(Long.MIN_VALUE, timestamp);
841 gets.add(get);
842 }
843 Result[] result = table.get(gets);
844 return ThriftUtilities.rowResultFromHBase(result);
845 } catch (IOException e) {
846 LOG.warn(e.getMessage(), e);
847 throw new IOError(e.getMessage());
848 }
849 }
850
851 @Override
852 public void deleteAll(
853 ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
854 Map<ByteBuffer, ByteBuffer> attributes)
855 throws IOError {
856 deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP,
857 attributes);
858 }
859
860 @Override
861 public void deleteAllTs(ByteBuffer tableName,
862 ByteBuffer row,
863 ByteBuffer column,
864 long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
865 try {
866 HTable table = getTable(tableName);
867 Delete delete = new Delete(getBytes(row));
868 addAttributes(delete, attributes);
869 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
870 if (famAndQf.length == 1) {
871 delete.deleteFamily(famAndQf[0], timestamp);
872 } else {
873 delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
874 }
875 table.delete(delete);
876
877 } catch (IOException e) {
878 LOG.warn(e.getMessage(), e);
879 throw new IOError(e.getMessage());
880 }
881 }
882
883 @Override
884 public void deleteAllRow(
885 ByteBuffer tableName, ByteBuffer row,
886 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
887 deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, attributes);
888 }
889
890 @Override
891 public void deleteAllRowTs(
892 ByteBuffer tableName, ByteBuffer row, long timestamp,
893 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
894 try {
895 HTable table = getTable(tableName);
896 Delete delete = new Delete(getBytes(row), timestamp);
897 addAttributes(delete, attributes);
898 table.delete(delete);
899 } catch (IOException e) {
900 LOG.warn(e.getMessage(), e);
901 throw new IOError(e.getMessage());
902 }
903 }
904
905 @Override
906 public void createTable(ByteBuffer in_tableName,
907 List<ColumnDescriptor> columnFamilies) throws IOError,
908 IllegalArgument, AlreadyExists {
909 byte [] tableName = getBytes(in_tableName);
910 try {
911 if (getHBaseAdmin().tableExists(tableName)) {
912 throw new AlreadyExists("table name already in use");
913 }
914 HTableDescriptor desc = new HTableDescriptor(tableName);
915 for (ColumnDescriptor col : columnFamilies) {
916 HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col);
917 desc.addFamily(colDesc);
918 }
919 getHBaseAdmin().createTable(desc);
920 } catch (IOException e) {
921 LOG.warn(e.getMessage(), e);
922 throw new IOError(e.getMessage());
923 } catch (IllegalArgumentException e) {
924 LOG.warn(e.getMessage(), e);
925 throw new IllegalArgument(e.getMessage());
926 }
927 }
928
929 @Override
930 public void deleteTable(ByteBuffer in_tableName) throws IOError {
931 byte [] tableName = getBytes(in_tableName);
932 if (LOG.isDebugEnabled()) {
933 LOG.debug("deleteTable: table=" + Bytes.toString(tableName));
934 }
935 try {
936 if (!getHBaseAdmin().tableExists(tableName)) {
937 throw new IOException("table does not exist");
938 }
939 getHBaseAdmin().deleteTable(tableName);
940 } catch (IOException e) {
941 LOG.warn(e.getMessage(), e);
942 throw new IOError(e.getMessage());
943 }
944 }
945
946 @Override
947 public void mutateRow(ByteBuffer tableName, ByteBuffer row,
948 List<Mutation> mutations, Map<ByteBuffer, ByteBuffer> attributes)
949 throws IOError, IllegalArgument {
950 mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP,
951 attributes);
952 }
953
954 @Override
955 public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
956 List<Mutation> mutations, long timestamp,
957 Map<ByteBuffer, ByteBuffer> attributes)
958 throws IOError, IllegalArgument {
959 HTable table = null;
960 try {
961 table = getTable(tableName);
962 Put put = new Put(getBytes(row), timestamp);
963 addAttributes(put, attributes);
964
965 Delete delete = new Delete(getBytes(row));
966 addAttributes(delete, attributes);
967 if (metrics != null) {
968 metrics.incNumRowKeysInBatchMutate(mutations.size());
969 }
970
971
972 for (Mutation m : mutations) {
973 byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
974 if (m.isDelete) {
975 if (famAndQf.length == 1) {
976 delete.deleteFamily(famAndQf[0], timestamp);
977 } else {
978 delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
979 }
980 delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
981 : Durability.SKIP_WAL);
982 } else {
983 if(famAndQf.length == 1) {
984 put.add(famAndQf[0], HConstants.EMPTY_BYTE_ARRAY,
985 m.value != null ? getBytes(m.value)
986 : HConstants.EMPTY_BYTE_ARRAY);
987 } else {
988 put.add(famAndQf[0], famAndQf[1],
989 m.value != null ? getBytes(m.value)
990 : HConstants.EMPTY_BYTE_ARRAY);
991 }
992 put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
993 }
994 }
995 if (!delete.isEmpty())
996 table.delete(delete);
997 if (!put.isEmpty())
998 table.put(put);
999 } catch (IOException e) {
1000 LOG.warn(e.getMessage(), e);
1001 throw new IOError(e.getMessage());
1002 } catch (IllegalArgumentException e) {
1003 LOG.warn(e.getMessage(), e);
1004 throw new IllegalArgument(e.getMessage());
1005 }
1006 }
1007
1008 @Override
1009 public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches,
1010 Map<ByteBuffer, ByteBuffer> attributes)
1011 throws IOError, IllegalArgument, TException {
1012 mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes);
1013 }
1014
1015 @Override
1016 public void mutateRowsTs(
1017 ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp,
1018 Map<ByteBuffer, ByteBuffer> attributes)
1019 throws IOError, IllegalArgument, TException {
1020 List<Put> puts = new ArrayList<Put>();
1021 List<Delete> deletes = new ArrayList<Delete>();
1022
1023 for (BatchMutation batch : rowBatches) {
1024 byte[] row = getBytes(batch.row);
1025 List<Mutation> mutations = batch.mutations;
1026 Delete delete = new Delete(row);
1027 addAttributes(delete, attributes);
1028 Put put = new Put(row, timestamp);
1029 addAttributes(put, attributes);
1030 for (Mutation m : mutations) {
1031 byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1032 if (m.isDelete) {
1033
1034 if (famAndQf.length == 1) {
1035 delete.deleteFamily(famAndQf[0], timestamp);
1036 } else {
1037 delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1038 }
1039 delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1040 : Durability.SKIP_WAL);
1041 } else {
1042 if(famAndQf.length == 1) {
1043 put.add(famAndQf[0], HConstants.EMPTY_BYTE_ARRAY,
1044 m.value != null ? getBytes(m.value)
1045 : HConstants.EMPTY_BYTE_ARRAY);
1046 } else {
1047 put.add(famAndQf[0], famAndQf[1],
1048 m.value != null ? getBytes(m.value)
1049 : HConstants.EMPTY_BYTE_ARRAY);
1050 }
1051 put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1052 }
1053 }
1054 if (!delete.isEmpty())
1055 deletes.add(delete);
1056 if (!put.isEmpty())
1057 puts.add(put);
1058 }
1059
1060 HTable table = null;
1061 try {
1062 table = getTable(tableName);
1063 if (!puts.isEmpty())
1064 table.put(puts);
1065 if (!deletes.isEmpty())
1066 table.delete(deletes);
1067
1068 } catch (IOException e) {
1069 LOG.warn(e.getMessage(), e);
1070 throw new IOError(e.getMessage());
1071 } catch (IllegalArgumentException e) {
1072 LOG.warn(e.getMessage(), e);
1073 throw new IllegalArgument(e.getMessage());
1074 }
1075 }
1076
1077 @Deprecated
1078 @Override
1079 public long atomicIncrement(
1080 ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount)
1081 throws IOError, IllegalArgument, TException {
1082 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1083 if(famAndQf.length == 1) {
1084 return atomicIncrement(tableName, row, famAndQf[0], new byte[0],
1085 amount);
1086 }
1087 return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
1088 }
1089
1090 protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
1091 byte [] family, byte [] qualifier, long amount)
1092 throws IOError, IllegalArgument, TException {
1093 HTable table;
1094 try {
1095 table = getTable(tableName);
1096 return table.incrementColumnValue(
1097 getBytes(row), family, qualifier, amount);
1098 } catch (IOException e) {
1099 LOG.warn(e.getMessage(), e);
1100 throw new IOError(e.getMessage());
1101 }
1102 }
1103
1104 public void scannerClose(int id) throws IOError, IllegalArgument {
1105 LOG.debug("scannerClose: id=" + id);
1106 ResultScanner scanner = getScanner(id);
1107 if (scanner == null) {
1108 String message = "scanner ID is invalid";
1109 LOG.warn(message);
1110 throw new IllegalArgument("scanner ID is invalid");
1111 }
1112 scanner.close();
1113 removeScanner(id);
1114 }
1115
1116 @Override
1117 public List<TRowResult> scannerGetList(int id,int nbRows)
1118 throws IllegalArgument, IOError {
1119 LOG.debug("scannerGetList: id=" + id);
1120 ResultScanner scanner = getScanner(id);
1121 if (null == scanner) {
1122 String message = "scanner ID is invalid";
1123 LOG.warn(message);
1124 throw new IllegalArgument("scanner ID is invalid");
1125 }
1126
1127 Result [] results = null;
1128 try {
1129 results = scanner.next(nbRows);
1130 if (null == results) {
1131 return new ArrayList<TRowResult>();
1132 }
1133 } catch (IOException e) {
1134 LOG.warn(e.getMessage(), e);
1135 throw new IOError(e.getMessage());
1136 }
1137 return ThriftUtilities.rowResultFromHBase(results);
1138 }
1139
1140 @Override
1141 public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
1142 return scannerGetList(id,1);
1143 }
1144
1145 public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
1146 Map<ByteBuffer, ByteBuffer> attributes)
1147 throws IOError {
1148 try {
1149 HTable table = getTable(tableName);
1150 Scan scan = new Scan();
1151 addAttributes(scan, attributes);
1152 if (tScan.isSetStartRow()) {
1153 scan.setStartRow(tScan.getStartRow());
1154 }
1155 if (tScan.isSetStopRow()) {
1156 scan.setStopRow(tScan.getStopRow());
1157 }
1158 if (tScan.isSetTimestamp()) {
1159 scan.setTimeRange(Long.MIN_VALUE, tScan.getTimestamp());
1160 }
1161 if (tScan.isSetCaching()) {
1162 scan.setCaching(tScan.getCaching());
1163 }
1164 if (tScan.isSetBatchSize()) {
1165 scan.setBatch(tScan.getBatchSize());
1166 }
1167 if (tScan.isSetColumns() && tScan.getColumns().size() != 0) {
1168 for(ByteBuffer column : tScan.getColumns()) {
1169 byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1170 if(famQf.length == 1) {
1171 scan.addFamily(famQf[0]);
1172 } else {
1173 scan.addColumn(famQf[0], famQf[1]);
1174 }
1175 }
1176 }
1177 if (tScan.isSetFilterString()) {
1178 ParseFilter parseFilter = new ParseFilter();
1179 scan.setFilter(
1180 parseFilter.parseFilterString(tScan.getFilterString()));
1181 }
1182 return addScanner(table.getScanner(scan));
1183 } catch (IOException e) {
1184 LOG.warn(e.getMessage(), e);
1185 throw new IOError(e.getMessage());
1186 }
1187 }
1188
1189 @Override
1190 public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
1191 List<ByteBuffer> columns,
1192 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1193 try {
1194 HTable table = getTable(tableName);
1195 Scan scan = new Scan(getBytes(startRow));
1196 addAttributes(scan, attributes);
1197 if(columns != null && columns.size() != 0) {
1198 for(ByteBuffer column : columns) {
1199 byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1200 if(famQf.length == 1) {
1201 scan.addFamily(famQf[0]);
1202 } else {
1203 scan.addColumn(famQf[0], famQf[1]);
1204 }
1205 }
1206 }
1207 return addScanner(table.getScanner(scan));
1208 } catch (IOException e) {
1209 LOG.warn(e.getMessage(), e);
1210 throw new IOError(e.getMessage());
1211 }
1212 }
1213
1214 @Override
1215 public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow,
1216 ByteBuffer stopRow, List<ByteBuffer> columns,
1217 Map<ByteBuffer, ByteBuffer> attributes)
1218 throws IOError, TException {
1219 try {
1220 HTable table = getTable(tableName);
1221 Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1222 addAttributes(scan, attributes);
1223 if(columns != null && columns.size() != 0) {
1224 for(ByteBuffer column : columns) {
1225 byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1226 if(famQf.length == 1) {
1227 scan.addFamily(famQf[0]);
1228 } else {
1229 scan.addColumn(famQf[0], famQf[1]);
1230 }
1231 }
1232 }
1233 return addScanner(table.getScanner(scan));
1234 } catch (IOException e) {
1235 LOG.warn(e.getMessage(), e);
1236 throw new IOError(e.getMessage());
1237 }
1238 }
1239
1240 @Override
1241 public int scannerOpenWithPrefix(ByteBuffer tableName,
1242 ByteBuffer startAndPrefix,
1243 List<ByteBuffer> columns,
1244 Map<ByteBuffer, ByteBuffer> attributes)
1245 throws IOError, TException {
1246 try {
1247 HTable table = getTable(tableName);
1248 Scan scan = new Scan(getBytes(startAndPrefix));
1249 addAttributes(scan, attributes);
1250 Filter f = new WhileMatchFilter(
1251 new PrefixFilter(getBytes(startAndPrefix)));
1252 scan.setFilter(f);
1253 if (columns != null && columns.size() != 0) {
1254 for(ByteBuffer column : columns) {
1255 byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1256 if(famQf.length == 1) {
1257 scan.addFamily(famQf[0]);
1258 } else {
1259 scan.addColumn(famQf[0], famQf[1]);
1260 }
1261 }
1262 }
1263 return addScanner(table.getScanner(scan));
1264 } catch (IOException e) {
1265 LOG.warn(e.getMessage(), e);
1266 throw new IOError(e.getMessage());
1267 }
1268 }
1269
1270 @Override
1271 public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
1272 List<ByteBuffer> columns, long timestamp,
1273 Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
1274 try {
1275 HTable table = getTable(tableName);
1276 Scan scan = new Scan(getBytes(startRow));
1277 addAttributes(scan, attributes);
1278 scan.setTimeRange(Long.MIN_VALUE, timestamp);
1279 if (columns != null && columns.size() != 0) {
1280 for (ByteBuffer column : columns) {
1281 byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1282 if(famQf.length == 1) {
1283 scan.addFamily(famQf[0]);
1284 } else {
1285 scan.addColumn(famQf[0], famQf[1]);
1286 }
1287 }
1288 }
1289 return addScanner(table.getScanner(scan));
1290 } catch (IOException e) {
1291 LOG.warn(e.getMessage(), e);
1292 throw new IOError(e.getMessage());
1293 }
1294 }
1295
1296 @Override
1297 public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow,
1298 ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
1299 Map<ByteBuffer, ByteBuffer> attributes)
1300 throws IOError, TException {
1301 try {
1302 HTable table = getTable(tableName);
1303 Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1304 addAttributes(scan, attributes);
1305 scan.setTimeRange(Long.MIN_VALUE, timestamp);
1306 if (columns != null && columns.size() != 0) {
1307 for (ByteBuffer column : columns) {
1308 byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1309 if(famQf.length == 1) {
1310 scan.addFamily(famQf[0]);
1311 } else {
1312 scan.addColumn(famQf[0], famQf[1]);
1313 }
1314 }
1315 }
1316 scan.setTimeRange(Long.MIN_VALUE, timestamp);
1317 return addScanner(table.getScanner(scan));
1318 } catch (IOException e) {
1319 LOG.warn(e.getMessage(), e);
1320 throw new IOError(e.getMessage());
1321 }
1322 }
1323
1324 @Override
1325 public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
1326 ByteBuffer tableName) throws IOError, TException {
1327 try {
1328 TreeMap<ByteBuffer, ColumnDescriptor> columns =
1329 new TreeMap<ByteBuffer, ColumnDescriptor>();
1330
1331 HTable table = getTable(tableName);
1332 HTableDescriptor desc = table.getTableDescriptor();
1333
1334 for (HColumnDescriptor e : desc.getFamilies()) {
1335 ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
1336 columns.put(col.name, col);
1337 }
1338 return columns;
1339 } catch (IOException e) {
1340 LOG.warn(e.getMessage(), e);
1341 throw new IOError(e.getMessage());
1342 }
1343 }
1344
1345 @Override
1346 public List<TCell> getRowOrBefore(ByteBuffer tableName, ByteBuffer row,
1347 ByteBuffer family) throws IOError {
1348 try {
1349 HTable table = getTable(getBytes(tableName));
1350 Result result = table.getRowOrBefore(getBytes(row), getBytes(family));
1351 return ThriftUtilities.cellFromHBase(result.raw());
1352 } catch (IOException e) {
1353 LOG.warn(e.getMessage(), e);
1354 throw new IOError(e.getMessage());
1355 }
1356 }
1357
1358 @Override
1359 public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
1360 try {
1361 HTable table = getTable(HConstants.META_TABLE_NAME);
1362 byte[] row = getBytes(searchRow);
1363 Result startRowResult = table.getRowOrBefore(
1364 row, HConstants.CATALOG_FAMILY);
1365
1366 if (startRowResult == null) {
1367 throw new IOException("Cannot find row in .META., row="
1368 + Bytes.toStringBinary(row));
1369 }
1370
1371
1372 HRegionInfo regionInfo = HRegionInfo.getHRegionInfo(startRowResult);
1373 if (regionInfo == null) {
1374 throw new IOException("HRegionInfo REGIONINFO was null or " +
1375 " empty in Meta for row="
1376 + Bytes.toStringBinary(row));
1377 }
1378 TRegionInfo region = new TRegionInfo();
1379 region.setStartKey(regionInfo.getStartKey());
1380 region.setEndKey(regionInfo.getEndKey());
1381 region.id = regionInfo.getRegionId();
1382 region.setName(regionInfo.getRegionName());
1383 region.version = regionInfo.getVersion();
1384
1385
1386 ServerName serverName = HRegionInfo.getServerName(startRowResult);
1387 if (serverName != null) {
1388 region.setServerName(Bytes.toBytes(serverName.getHostname()));
1389 region.port = serverName.getPort();
1390 }
1391 return region;
1392 } catch (IOException e) {
1393 LOG.warn(e.getMessage(), e);
1394 throw new IOError(e.getMessage());
1395 }
1396 }
1397
1398 private void initMetrics(ThriftMetrics metrics) {
1399 this.metrics = metrics;
1400 }
1401
1402 @Override
1403 public void increment(TIncrement tincrement) throws IOError, TException {
1404
1405 if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) {
1406 throw new TException("Must supply a table and a row key; can't increment");
1407 }
1408
1409 if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1410 this.coalescer.queueIncrement(tincrement);
1411 return;
1412 }
1413
1414 try {
1415 HTable table = getTable(tincrement.getTable());
1416 Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
1417 table.increment(inc);
1418 } catch (IOException e) {
1419 LOG.warn(e.getMessage(), e);
1420 throw new IOError(e.getMessage());
1421 }
1422 }
1423
1424 @Override
1425 public void incrementRows(List<TIncrement> tincrements) throws IOError, TException {
1426 if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1427 this.coalescer.queueIncrements(tincrements);
1428 return;
1429 }
1430 for (TIncrement tinc : tincrements) {
1431 increment(tinc);
1432 }
1433 }
1434 }
1435
1436
1437
1438
1439
1440
1441 private static void addAttributes(OperationWithAttributes op,
1442 Map<ByteBuffer, ByteBuffer> attributes) {
1443 if (attributes == null || attributes.size() == 0) {
1444 return;
1445 }
1446 for (Map.Entry<ByteBuffer, ByteBuffer> entry : attributes.entrySet()) {
1447 String name = Bytes.toStringBinary(getBytes(entry.getKey()));
1448 byte[] value = getBytes(entry.getValue());
1449 op.setAttribute(name, value);
1450 }
1451 }
1452
1453 public static void registerFilters(Configuration conf) {
1454 String[] filters = conf.getStrings("hbase.thrift.filters");
1455 if(filters != null) {
1456 for(String filterClass: filters) {
1457 String[] filterPart = filterClass.split(":");
1458 if(filterPart.length != 2) {
1459 LOG.warn("Invalid filter specification " + filterClass + " - skipping");
1460 } else {
1461 ParseFilter.registerFilter(filterPart[0], filterPart[1]);
1462 }
1463 }
1464 }
1465 }
1466 }