View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.thrift;
20  
21  import static org.apache.hadoop.hbase.util.Bytes.getBytes;
22  
23  import java.io.IOException;
24  import java.net.InetAddress;
25  import java.net.InetSocketAddress;
26  import java.net.UnknownHostException;
27  import java.nio.ByteBuffer;
28  import java.util.ArrayList;
29  import java.util.Arrays;
30  import java.util.Collections;
31  import java.util.HashMap;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.TreeMap;
35  import java.util.concurrent.BlockingQueue;
36  import java.util.concurrent.ExecutorService;
37  import java.util.concurrent.LinkedBlockingQueue;
38  import java.util.concurrent.ThreadPoolExecutor;
39  import java.util.concurrent.TimeUnit;
40  
41  import org.apache.commons.cli.CommandLine;
42  import org.apache.commons.cli.Option;
43  import org.apache.commons.cli.OptionGroup;
44  import org.apache.commons.logging.Log;
45  import org.apache.commons.logging.LogFactory;
46  import org.apache.hadoop.classification.InterfaceAudience;
47  import org.apache.hadoop.conf.Configuration;
48  import org.apache.hadoop.hbase.HBaseConfiguration;
49  import org.apache.hadoop.hbase.HColumnDescriptor;
50  import org.apache.hadoop.hbase.HConstants;
51  import org.apache.hadoop.hbase.HRegionInfo;
52  import org.apache.hadoop.hbase.HTableDescriptor;
53  import org.apache.hadoop.hbase.KeyValue;
54  import org.apache.hadoop.hbase.ServerName;
55  import org.apache.hadoop.hbase.TableName;
56  import org.apache.hadoop.hbase.TableNotFoundException;
57  import org.apache.hadoop.hbase.client.Append;
58  import org.apache.hadoop.hbase.client.Delete;
59  import org.apache.hadoop.hbase.client.Durability;
60  import org.apache.hadoop.hbase.client.Get;
61  import org.apache.hadoop.hbase.client.HBaseAdmin;
62  import org.apache.hadoop.hbase.client.HTable;
63  import org.apache.hadoop.hbase.client.Increment;
64  import org.apache.hadoop.hbase.client.OperationWithAttributes;
65  import org.apache.hadoop.hbase.client.Put;
66  import org.apache.hadoop.hbase.client.Result;
67  import org.apache.hadoop.hbase.client.ResultScanner;
68  import org.apache.hadoop.hbase.client.Scan;
69  import org.apache.hadoop.hbase.filter.Filter;
70  import org.apache.hadoop.hbase.filter.ParseFilter;
71  import org.apache.hadoop.hbase.filter.PrefixFilter;
72  import org.apache.hadoop.hbase.filter.WhileMatchFilter;
73  import org.apache.hadoop.hbase.thrift.CallQueue.Call;
74  import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
75  import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
76  import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
77  import org.apache.hadoop.hbase.thrift.generated.Hbase;
78  import org.apache.hadoop.hbase.thrift.generated.IOError;
79  import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
80  import org.apache.hadoop.hbase.thrift.generated.Mutation;
81  import org.apache.hadoop.hbase.thrift.generated.TAppend;
82  import org.apache.hadoop.hbase.thrift.generated.TCell;
83  import org.apache.hadoop.hbase.thrift.generated.TIncrement;
84  import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
85  import org.apache.hadoop.hbase.thrift.generated.TRowResult;
86  import org.apache.hadoop.hbase.thrift.generated.TScan;
87  import org.apache.hadoop.hbase.util.Bytes;
88  import org.apache.thrift.TException;
89  import org.apache.thrift.protocol.TBinaryProtocol;
90  import org.apache.thrift.protocol.TCompactProtocol;
91  import org.apache.thrift.protocol.TProtocolFactory;
92  import org.apache.thrift.server.THsHaServer;
93  import org.apache.thrift.server.TNonblockingServer;
94  import org.apache.thrift.server.TServer;
95  import org.apache.thrift.server.TThreadedSelectorServer;
96  import org.apache.thrift.transport.TFramedTransport;
97  import org.apache.thrift.transport.TNonblockingServerSocket;
98  import org.apache.thrift.transport.TNonblockingServerTransport;
99  import org.apache.thrift.transport.TServerSocket;
100 import org.apache.thrift.transport.TServerTransport;
101 import org.apache.thrift.transport.TTransportFactory;
102 
103 import com.google.common.base.Joiner;
104 import com.google.common.util.concurrent.ThreadFactoryBuilder;
105 
106 /**
107  * ThriftServerRunner - this class starts up a Thrift server which implements
108  * the Hbase API specified in the Hbase.thrift IDL file.
109  */
110 @InterfaceAudience.Private
111 public class ThriftServerRunner implements Runnable {
112 
113   private static final Log LOG = LogFactory.getLog(ThriftServerRunner.class);
114 
115   static final String SERVER_TYPE_CONF_KEY =
116       "hbase.regionserver.thrift.server.type";
117 
118   static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress";
119   static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
120   static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
121   static final String MAX_FRAME_SIZE_CONF_KEY = "hbase.regionserver.thrift.framed.max_frame_size_in_mb";
122   static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
123   static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
124 
125   private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
126   public static final int DEFAULT_LISTEN_PORT = 9090;
127   private final int listenPort;
128 
129   private Configuration conf;
130   volatile TServer tserver;
131   private final Hbase.Iface handler;
132   private final ThriftMetrics metrics;
133 
134   /** An enum of server implementation selections */
135   enum ImplType {
136     HS_HA("hsha", true, THsHaServer.class, true),
137     NONBLOCKING("nonblocking", true, TNonblockingServer.class, true),
138     THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true),
139     THREADED_SELECTOR(
140         "threadedselector", true, TThreadedSelectorServer.class, true);
141 
142     public static final ImplType DEFAULT = THREAD_POOL;
143 
144     final String option;
145     final boolean isAlwaysFramed;
146     final Class<? extends TServer> serverClass;
147     final boolean canSpecifyBindIP;
148 
149     ImplType(String option, boolean isAlwaysFramed,
150         Class<? extends TServer> serverClass, boolean canSpecifyBindIP) {
151       this.option = option;
152       this.isAlwaysFramed = isAlwaysFramed;
153       this.serverClass = serverClass;
154       this.canSpecifyBindIP = canSpecifyBindIP;
155     }
156 
157     /**
158      * @return <code>-option</code> so we can get the list of options from
159      *         {@link #values()}
160      */
161     @Override
162     public String toString() {
163       return "-" + option;
164     }
165 
166     String getDescription() {
167       StringBuilder sb = new StringBuilder("Use the " +
168           serverClass.getSimpleName());
169       if (isAlwaysFramed) {
170         sb.append(" This implies the framed transport.");
171       }
172       if (this == DEFAULT) {
173         sb.append("This is the default.");
174       }
175       return sb.toString();
176     }
177 
178     static OptionGroup createOptionGroup() {
179       OptionGroup group = new OptionGroup();
180       for (ImplType t : values()) {
181         group.addOption(new Option(t.option, t.getDescription()));
182       }
183       return group;
184     }
185 
186     static ImplType getServerImpl(Configuration conf) {
187       String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option);
188       for (ImplType t : values()) {
189         if (confType.equals(t.option)) {
190           return t;
191         }
192       }
193       throw new AssertionError("Unknown server ImplType.option:" + confType);
194     }
195 
196     static void setServerImpl(CommandLine cmd, Configuration conf) {
197       ImplType chosenType = null;
198       int numChosen = 0;
199       for (ImplType t : values()) {
200         if (cmd.hasOption(t.option)) {
201           chosenType = t;
202           ++numChosen;
203         }
204       }
205       if (numChosen < 1) {
206         LOG.info("Using default thrift server type");
207         chosenType = DEFAULT;
208       } else if (numChosen > 1) {
209         throw new AssertionError("Exactly one option out of " +
210           Arrays.toString(values()) + " has to be specified");
211       }
212       LOG.info("Using thrift server type " + chosenType.option);
213       conf.set(SERVER_TYPE_CONF_KEY, chosenType.option);
214     }
215 
216     public String simpleClassName() {
217       return serverClass.getSimpleName();
218     }
219 
220     public static List<String> serversThatCannotSpecifyBindIP() {
221       List<String> l = new ArrayList<String>();
222       for (ImplType t : values()) {
223         if (!t.canSpecifyBindIP) {
224           l.add(t.simpleClassName());
225         }
226       }
227       return l;
228     }
229 
230   }
231 
232   public ThriftServerRunner(Configuration conf) throws IOException {
233     this(conf, new ThriftServerRunner.HBaseHandler(conf));
234   }
235 
236   public ThriftServerRunner(Configuration conf, HBaseHandler handler) {
237     this.conf = HBaseConfiguration.create(conf);
238     this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
239     this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
240     handler.initMetrics(metrics);
241     this.handler = HbaseHandlerMetricsProxy.newInstance(handler, metrics, conf);
242   }
243 
244   /*
245    * Runs the Thrift server
246    */
247   @Override
248   public void run() {
249     try {
250       setupServer();
251       tserver.serve();
252     } catch (Exception e) {
253       LOG.fatal("Cannot run ThriftServer", e);
254       // Crash the process if the ThriftServer is not running
255       System.exit(-1);
256     }
257   }
258 
259   public void shutdown() {
260     if (tserver != null) {
261       tserver.stop();
262       tserver = null;
263     }
264   }
265 
266   /**
267    * Setting up the thrift TServer
268    */
269   private void setupServer() throws Exception {
270     // Construct correct ProtocolFactory
271     TProtocolFactory protocolFactory;
272     if (conf.getBoolean(COMPACT_CONF_KEY, false)) {
273       LOG.debug("Using compact protocol");
274       protocolFactory = new TCompactProtocol.Factory();
275     } else {
276       LOG.debug("Using binary protocol");
277       protocolFactory = new TBinaryProtocol.Factory();
278     }
279 
280     Hbase.Processor<Hbase.Iface> processor =
281         new Hbase.Processor<Hbase.Iface>(handler);
282     ImplType implType = ImplType.getServerImpl(conf);
283 
284     // Construct correct TransportFactory
285     TTransportFactory transportFactory;
286     if (conf.getBoolean(FRAMED_CONF_KEY, false) || implType.isAlwaysFramed) {
287       transportFactory = new TFramedTransport.Factory(
288           conf.getInt(MAX_FRAME_SIZE_CONF_KEY, 2)  * 1024 * 1024);
289       LOG.debug("Using framed transport");
290     } else {
291       transportFactory = new TTransportFactory();
292     }
293 
294     if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
295       LOG.error("Server types " + Joiner.on(", ").join(
296           ImplType.serversThatCannotSpecifyBindIP()) + " don't support IP " +
297           "address binding at the moment. See " +
298           "https://issues.apache.org/jira/browse/HBASE-2155 for details.");
299       throw new RuntimeException(
300           "-" + BIND_CONF_KEY + " not supported with " + implType);
301     }
302 
303     if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING ||
304         implType == ImplType.THREADED_SELECTOR) {
305 
306       InetAddress listenAddress = getBindAddress(conf);
307       TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(
308           new InetSocketAddress(listenAddress, listenPort));
309 
310       if (implType == ImplType.NONBLOCKING) {
311         TNonblockingServer.Args serverArgs =
312             new TNonblockingServer.Args(serverTransport);
313         serverArgs.processor(processor)
314                   .transportFactory(transportFactory)
315                   .protocolFactory(protocolFactory);
316         tserver = new TNonblockingServer(serverArgs);
317       } else if (implType == ImplType.HS_HA) {
318         THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
319         CallQueue callQueue =
320             new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
321         ExecutorService executorService = createExecutor(
322             callQueue, serverArgs.getWorkerThreads());
323         serverArgs.executorService(executorService)
324                   .processor(processor)
325                   .transportFactory(transportFactory)
326                   .protocolFactory(protocolFactory);
327         tserver = new THsHaServer(serverArgs);
328       } else { // THREADED_SELECTOR
329         TThreadedSelectorServer.Args serverArgs =
330             new HThreadedSelectorServerArgs(serverTransport, conf);
331         CallQueue callQueue =
332             new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
333         ExecutorService executorService = createExecutor(
334             callQueue, serverArgs.getWorkerThreads());
335         serverArgs.executorService(executorService)
336                   .processor(processor)
337                   .transportFactory(transportFactory)
338                   .protocolFactory(protocolFactory);
339         tserver = new TThreadedSelectorServer(serverArgs);
340       }
341       LOG.info("starting HBase " + implType.simpleClassName() +
342           " server on " + Integer.toString(listenPort));
343     } else if (implType == ImplType.THREAD_POOL) {
344       // Thread pool server. Get the IP address to bind to.
345       InetAddress listenAddress = getBindAddress(conf);
346 
347       TServerTransport serverTransport = new TServerSocket(
348           new InetSocketAddress(listenAddress, listenPort));
349 
350       TBoundedThreadPoolServer.Args serverArgs =
351           new TBoundedThreadPoolServer.Args(serverTransport, conf);
352       serverArgs.processor(processor)
353                 .transportFactory(transportFactory)
354                 .protocolFactory(protocolFactory);
355       LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
356           + listenAddress + ":" + Integer.toString(listenPort)
357           + "; " + serverArgs);
358       TBoundedThreadPoolServer tserver =
359           new TBoundedThreadPoolServer(serverArgs, metrics);
360       this.tserver = tserver;
361     } else {
362       throw new AssertionError("Unsupported Thrift server implementation: " +
363           implType.simpleClassName());
364     }
365 
366     // A sanity check that we instantiated the right type of server.
367     if (tserver.getClass() != implType.serverClass) {
368       throw new AssertionError("Expected to create Thrift server class " +
369           implType.serverClass.getName() + " but got " +
370           tserver.getClass().getName());
371     }
372 
373 
374 
375     registerFilters(conf);
376   }
377 
378   ExecutorService createExecutor(BlockingQueue<Runnable> callQueue,
379                                  int workerThreads) {
380     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
381     tfb.setDaemon(true);
382     tfb.setNameFormat("thrift-worker-%d");
383     return new ThreadPoolExecutor(workerThreads, workerThreads,
384             Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
385   }
386 
387   private InetAddress getBindAddress(Configuration conf)
388       throws UnknownHostException {
389     String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
390     return InetAddress.getByName(bindAddressStr);
391   }
392 
393   protected static class ResultScannerWrapper {
394 
395     private final ResultScanner scanner;
396     private final boolean sortColumns;
397     public ResultScannerWrapper(ResultScanner resultScanner,
398                                 boolean sortResultColumns) {
399       scanner = resultScanner;
400       sortColumns = sortResultColumns;
401    }
402 
403     public ResultScanner getScanner() {
404       return scanner;
405     }
406 
407     public boolean isColumnSorted() {
408       return sortColumns;
409     }
410   }
411 
412   /**
413    * The HBaseHandler is a glue object that connects Thrift RPC calls to the
414    * HBase client API primarily defined in the HBaseAdmin and HTable objects.
415    */
416   public static class HBaseHandler implements Hbase.Iface {
417     protected Configuration conf;
418     protected volatile HBaseAdmin admin = null;
419     protected final Log LOG = LogFactory.getLog(this.getClass().getName());
420 
421     // nextScannerId and scannerMap are used to manage scanner state
422     protected int nextScannerId = 0;
423     protected HashMap<Integer, ResultScannerWrapper> scannerMap = null;
424     private ThriftMetrics metrics = null;
425 
426     private static ThreadLocal<Map<String, HTable>> threadLocalTables =
427         new ThreadLocal<Map<String, HTable>>() {
428       @Override
429       protected Map<String, HTable> initialValue() {
430         return new TreeMap<String, HTable>();
431       }
432     };
433 
434     IncrementCoalescer coalescer = null;
435 
436     /**
437      * Returns a list of all the column families for a given htable.
438      *
439      * @param table
440      * @throws IOException
441      */
442     byte[][] getAllColumns(HTable table) throws IOException {
443       HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies();
444       byte[][] columns = new byte[cds.length][];
445       for (int i = 0; i < cds.length; i++) {
446         columns[i] = Bytes.add(cds[i].getName(),
447             KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
448       }
449       return columns;
450     }
451 
452     /**
453      * Creates and returns an HTable instance from a given table name.
454      *
455      * @param tableName
456      *          name of table
457      * @return HTable object
458      * @throws IOException
459      * @throws IOError
460      */
461     public HTable getTable(final byte[] tableName) throws
462         IOException {
463       String table = new String(tableName);
464       Map<String, HTable> tables = threadLocalTables.get();
465       if (!tables.containsKey(table)) {
466         tables.put(table, new HTable(conf, tableName));
467       }
468       return tables.get(table);
469     }
470 
471     public HTable getTable(final ByteBuffer tableName) throws IOException {
472       return getTable(getBytes(tableName));
473     }
474 
475     /**
476      * Assigns a unique ID to the scanner and adds the mapping to an internal
477      * hash-map.
478      *
479      * @param scanner
480      * @return integer scanner id
481      */
482     protected synchronized int addScanner(ResultScanner scanner,boolean sortColumns) {
483       int id = nextScannerId++;
484       ResultScannerWrapper resultScannerWrapper = new ResultScannerWrapper(scanner, sortColumns);
485       scannerMap.put(id, resultScannerWrapper);
486       return id;
487     }
488 
489     /**
490      * Returns the scanner associated with the specified ID.
491      *
492      * @param id
493      * @return a Scanner, or null if ID was invalid.
494      */
495     protected synchronized ResultScannerWrapper getScanner(int id) {
496       return scannerMap.get(id);
497     }
498 
499     /**
500      * Removes the scanner associated with the specified ID from the internal
501      * id->scanner hash-map.
502      *
503      * @param id
504      * @return a Scanner, or null if ID was invalid.
505      */
506     protected synchronized ResultScannerWrapper removeScanner(int id) {
507       return scannerMap.remove(id);
508     }
509 
510     /**
511      * Constructs an HBaseHandler object.
512      * @throws IOException
513      */
514     protected HBaseHandler()
515     throws IOException {
516       this(HBaseConfiguration.create());
517     }
518 
519     protected HBaseHandler(final Configuration c) throws IOException {
520       this.conf = c;
521       scannerMap = new HashMap<Integer, ResultScannerWrapper>();
522       this.coalescer = new IncrementCoalescer(this);
523     }
524 
525     /**
526      * Obtain HBaseAdmin. Creates the instance if it is not already created.
527      */
528     private HBaseAdmin getHBaseAdmin() throws IOException {
529       if (admin == null) {
530         synchronized (this) {
531           if (admin == null) {
532             admin = new HBaseAdmin(conf);
533           }
534         }
535       }
536       return admin;
537     }
538 
539     @Override
540     public void enableTable(ByteBuffer tableName) throws IOError {
541       try{
542         getHBaseAdmin().enableTable(getBytes(tableName));
543       } catch (IOException e) {
544         LOG.warn(e.getMessage(), e);
545         throw new IOError(e.getMessage());
546       }
547     }
548 
549     @Override
550     public void disableTable(ByteBuffer tableName) throws IOError{
551       try{
552         getHBaseAdmin().disableTable(getBytes(tableName));
553       } catch (IOException e) {
554         LOG.warn(e.getMessage(), e);
555         throw new IOError(e.getMessage());
556       }
557     }
558 
559     @Override
560     public boolean isTableEnabled(ByteBuffer tableName) throws IOError {
561       try {
562         return HTable.isTableEnabled(this.conf, getBytes(tableName));
563       } catch (IOException e) {
564         LOG.warn(e.getMessage(), e);
565         throw new IOError(e.getMessage());
566       }
567     }
568 
569     @Override
570     public void compact(ByteBuffer tableNameOrRegionName) throws IOError {
571       try{
572         getHBaseAdmin().compact(getBytes(tableNameOrRegionName));
573       } catch (InterruptedException e) {
574         throw new IOError(e.getMessage());
575       } catch (IOException e) {
576         LOG.warn(e.getMessage(), e);
577         throw new IOError(e.getMessage());
578       }
579     }
580 
581     @Override
582     public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError {
583       try{
584         getHBaseAdmin().majorCompact(getBytes(tableNameOrRegionName));
585       } catch (InterruptedException e) {
586         LOG.warn(e.getMessage(), e);
587         throw new IOError(e.getMessage());
588       } catch (IOException e) {
589         LOG.warn(e.getMessage(), e);
590         throw new IOError(e.getMessage());
591       }
592     }
593 
594     @Override
595     public List<ByteBuffer> getTableNames() throws IOError {
596       try {
597         TableName[] tableNames = this.getHBaseAdmin().listTableNames();
598         ArrayList<ByteBuffer> list = new ArrayList<ByteBuffer>(tableNames.length);
599         for (int i = 0; i < tableNames.length; i++) {
600           list.add(ByteBuffer.wrap(tableNames[i].getName()));
601         }
602         return list;
603       } catch (IOException e) {
604         LOG.warn(e.getMessage(), e);
605         throw new IOError(e.getMessage());
606       }
607     }
608 
609     /**
610      * @return the list of regions in the given table, or an empty list if the table does not exist
611      */
612     @Override
613     public List<TRegionInfo> getTableRegions(ByteBuffer tableName)
614     throws IOError {
615       try {
616         HTable table;
617         try {
618           table = getTable(tableName);
619         } catch (TableNotFoundException ex) {
620           return new ArrayList<TRegionInfo>();
621         }
622         Map<HRegionInfo, ServerName> regionLocations =
623             table.getRegionLocations();
624         List<TRegionInfo> results = new ArrayList<TRegionInfo>();
625         for (Map.Entry<HRegionInfo, ServerName> entry :
626             regionLocations.entrySet()) {
627           HRegionInfo info = entry.getKey();
628           ServerName serverName = entry.getValue();
629           TRegionInfo region = new TRegionInfo();
630           region.serverName = ByteBuffer.wrap(
631               Bytes.toBytes(serverName.getHostname()));
632           region.port = serverName.getPort();
633           region.startKey = ByteBuffer.wrap(info.getStartKey());
634           region.endKey = ByteBuffer.wrap(info.getEndKey());
635           region.id = info.getRegionId();
636           region.name = ByteBuffer.wrap(info.getRegionName());
637           region.version = info.getVersion();
638           results.add(region);
639         }
640         return results;
641       } catch (TableNotFoundException e) {
642         // Return empty list for non-existing table
643         return Collections.emptyList();
644       } catch (IOException e){
645         LOG.warn(e.getMessage(), e);
646         throw new IOError(e.getMessage());
647       }
648     }
649 
650     @Deprecated
651     @Override
652     public List<TCell> get(
653         ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
654         Map<ByteBuffer, ByteBuffer> attributes)
655         throws IOError {
656       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
657       if (famAndQf.length == 1) {
658         return get(tableName, row, famAndQf[0], null, attributes);
659       }
660       if (famAndQf.length == 2) {
661         return get(tableName, row, famAndQf[0], famAndQf[1], attributes);
662       }
663       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
664     }
665 
666     /**
667      * Note: this internal interface is slightly different from public APIs in regard to handling
668      * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
669      * we respect qual == null as a request for the entire column family. The caller (
670      * {@link #get(ByteBuffer, ByteBuffer, ByteBuffer, Map)}) interface IS consistent in that the
671      * column is parse like normal.
672      */
673     protected List<TCell> get(ByteBuffer tableName,
674                               ByteBuffer row,
675                               byte[] family,
676                               byte[] qualifier,
677                               Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
678       try {
679         HTable table = getTable(tableName);
680         Get get = new Get(getBytes(row));
681         addAttributes(get, attributes);
682         if (qualifier == null) {
683           get.addFamily(family);
684         } else {
685           get.addColumn(family, qualifier);
686         }
687         Result result = table.get(get);
688         return ThriftUtilities.cellFromHBase(result.rawCells());
689       } catch (IOException e) {
690         LOG.warn(e.getMessage(), e);
691         throw new IOError(e.getMessage());
692       }
693     }
694 
695     @Deprecated
696     @Override
697     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
698         int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
699       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
700       if(famAndQf.length == 1) {
701         return getVer(tableName, row, famAndQf[0], null, numVersions, attributes);
702       }
703       if (famAndQf.length == 2) {
704         return getVer(tableName, row, famAndQf[0], famAndQf[1], numVersions, attributes);
705       }
706       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
707 
708     }
709 
710     /**
711      * Note: this public interface is slightly different from public Java APIs in regard to
712      * handling of the qualifier. Here we differ from the public Java API in that null != byte[0].
713      * Rather, we respect qual == null as a request for the entire column family. If you want to
714      * access the entire column family, use
715      * {@link #getVer(ByteBuffer, ByteBuffer, ByteBuffer, int, Map)} with a {@code column} value
716      * that lacks a {@code ':'}.
717      */
718     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family,
719         byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
720       try {
721         HTable table = getTable(tableName);
722         Get get = new Get(getBytes(row));
723         addAttributes(get, attributes);
724         if (null == qualifier) {
725           get.addFamily(family);
726         } else {
727           get.addColumn(family, qualifier);
728         }
729         get.setMaxVersions(numVersions);
730         Result result = table.get(get);
731         return ThriftUtilities.cellFromHBase(result.rawCells());
732       } catch (IOException e) {
733         LOG.warn(e.getMessage(), e);
734         throw new IOError(e.getMessage());
735       }
736     }
737 
738     @Deprecated
739     @Override
740     public List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
741         long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
742       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
743       if (famAndQf.length == 1) {
744         return getVerTs(tableName, row, famAndQf[0], null, timestamp, numVersions, attributes);
745       }
746       if (famAndQf.length == 2) {
747         return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, numVersions,
748           attributes);
749       }
750       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
751     }
752 
753     /**
754      * Note: this internal interface is slightly different from public APIs in regard to handling
755      * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
756      * we respect qual == null as a request for the entire column family. The caller (
757      * {@link #getVerTs(ByteBuffer, ByteBuffer, ByteBuffer, long, int, Map)}) interface IS
758      * consistent in that the column is parse like normal.
759      */
760     protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
761         byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
762         throws IOError {
763       try {
764         HTable table = getTable(tableName);
765         Get get = new Get(getBytes(row));
766         addAttributes(get, attributes);
767         if (null == qualifier) {
768           get.addFamily(family);
769         } else {
770           get.addColumn(family, qualifier);
771         }
772         get.setTimeRange(0, timestamp);
773         get.setMaxVersions(numVersions);
774         Result result = table.get(get);
775         return ThriftUtilities.cellFromHBase(result.rawCells());
776       } catch (IOException e) {
777         LOG.warn(e.getMessage(), e);
778         throw new IOError(e.getMessage());
779       }
780     }
781 
782     @Override
783     public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row,
784         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
785       return getRowWithColumnsTs(tableName, row, null,
786                                  HConstants.LATEST_TIMESTAMP,
787                                  attributes);
788     }
789 
790     @Override
791     public List<TRowResult> getRowWithColumns(ByteBuffer tableName,
792                                               ByteBuffer row,
793         List<ByteBuffer> columns,
794         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
795       return getRowWithColumnsTs(tableName, row, columns,
796                                  HConstants.LATEST_TIMESTAMP,
797                                  attributes);
798     }
799 
800     @Override
801     public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row,
802         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
803       return getRowWithColumnsTs(tableName, row, null,
804                                  timestamp, attributes);
805     }
806 
807     @Override
808     public List<TRowResult> getRowWithColumnsTs(
809         ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
810         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
811       try {
812         HTable table = getTable(tableName);
813         if (columns == null) {
814           Get get = new Get(getBytes(row));
815           addAttributes(get, attributes);
816           get.setTimeRange(0, timestamp);
817           Result result = table.get(get);
818           return ThriftUtilities.rowResultFromHBase(result);
819         }
820         Get get = new Get(getBytes(row));
821         addAttributes(get, attributes);
822         for(ByteBuffer column : columns) {
823           byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
824           if (famAndQf.length == 1) {
825               get.addFamily(famAndQf[0]);
826           } else {
827               get.addColumn(famAndQf[0], famAndQf[1]);
828           }
829         }
830         get.setTimeRange(0, timestamp);
831         Result result = table.get(get);
832         return ThriftUtilities.rowResultFromHBase(result);
833       } catch (IOException e) {
834         LOG.warn(e.getMessage(), e);
835         throw new IOError(e.getMessage());
836       }
837     }
838 
839     @Override
840     public List<TRowResult> getRows(ByteBuffer tableName,
841                                     List<ByteBuffer> rows,
842         Map<ByteBuffer, ByteBuffer> attributes)
843         throws IOError {
844       return getRowsWithColumnsTs(tableName, rows, null,
845                                   HConstants.LATEST_TIMESTAMP,
846                                   attributes);
847     }
848 
849     @Override
850     public List<TRowResult> getRowsWithColumns(ByteBuffer tableName,
851                                                List<ByteBuffer> rows,
852         List<ByteBuffer> columns,
853         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
854       return getRowsWithColumnsTs(tableName, rows, columns,
855                                   HConstants.LATEST_TIMESTAMP,
856                                   attributes);
857     }
858 
859     @Override
860     public List<TRowResult> getRowsTs(ByteBuffer tableName,
861                                       List<ByteBuffer> rows,
862         long timestamp,
863         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
864       return getRowsWithColumnsTs(tableName, rows, null,
865                                   timestamp, attributes);
866     }
867 
868     @Override
869     public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName,
870                                                  List<ByteBuffer> rows,
871         List<ByteBuffer> columns, long timestamp,
872         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
873       try {
874         List<Get> gets = new ArrayList<Get>(rows.size());
875         HTable table = getTable(tableName);
876         if (metrics != null) {
877           metrics.incNumRowKeysInBatchGet(rows.size());
878         }
879         for (ByteBuffer row : rows) {
880           Get get = new Get(getBytes(row));
881           addAttributes(get, attributes);
882           if (columns != null) {
883 
884             for(ByteBuffer column : columns) {
885               byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
886               if (famAndQf.length == 1) {
887                 get.addFamily(famAndQf[0]);
888               } else {
889                 get.addColumn(famAndQf[0], famAndQf[1]);
890               }
891             }
892           }
893           get.setTimeRange(0, timestamp);
894           gets.add(get);
895         }
896         Result[] result = table.get(gets);
897         return ThriftUtilities.rowResultFromHBase(result);
898       } catch (IOException e) {
899         LOG.warn(e.getMessage(), e);
900         throw new IOError(e.getMessage());
901       }
902     }
903 
904     @Override
905     public void deleteAll(
906         ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
907         Map<ByteBuffer, ByteBuffer> attributes)
908         throws IOError {
909       deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP,
910                   attributes);
911     }
912 
913     @Override
914     public void deleteAllTs(ByteBuffer tableName,
915                             ByteBuffer row,
916                             ByteBuffer column,
917         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
918       try {
919         HTable table = getTable(tableName);
920         Delete delete  = new Delete(getBytes(row));
921         addAttributes(delete, attributes);
922         byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
923         if (famAndQf.length == 1) {
924           delete.deleteFamily(famAndQf[0], timestamp);
925         } else {
926           delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
927         }
928         table.delete(delete);
929 
930       } catch (IOException e) {
931         LOG.warn(e.getMessage(), e);
932         throw new IOError(e.getMessage());
933       }
934     }
935 
936     @Override
937     public void deleteAllRow(
938         ByteBuffer tableName, ByteBuffer row,
939         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
940       deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, attributes);
941     }
942 
943     @Override
944     public void deleteAllRowTs(
945         ByteBuffer tableName, ByteBuffer row, long timestamp,
946         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
947       try {
948         HTable table = getTable(tableName);
949         Delete delete  = new Delete(getBytes(row), timestamp);
950         addAttributes(delete, attributes);
951         table.delete(delete);
952       } catch (IOException e) {
953         LOG.warn(e.getMessage(), e);
954         throw new IOError(e.getMessage());
955       }
956     }
957 
958     @Override
959     public void createTable(ByteBuffer in_tableName,
960         List<ColumnDescriptor> columnFamilies) throws IOError,
961         IllegalArgument, AlreadyExists {
962       byte [] tableName = getBytes(in_tableName);
963       try {
964         if (getHBaseAdmin().tableExists(tableName)) {
965           throw new AlreadyExists("table name already in use");
966         }
967         HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
968         for (ColumnDescriptor col : columnFamilies) {
969           HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col);
970           desc.addFamily(colDesc);
971         }
972         getHBaseAdmin().createTable(desc);
973       } catch (IOException e) {
974         LOG.warn(e.getMessage(), e);
975         throw new IOError(e.getMessage());
976       } catch (IllegalArgumentException e) {
977         LOG.warn(e.getMessage(), e);
978         throw new IllegalArgument(e.getMessage());
979       }
980     }
981 
982     @Override
983     public void deleteTable(ByteBuffer in_tableName) throws IOError {
984       byte [] tableName = getBytes(in_tableName);
985       if (LOG.isDebugEnabled()) {
986         LOG.debug("deleteTable: table=" + Bytes.toString(tableName));
987       }
988       try {
989         if (!getHBaseAdmin().tableExists(tableName)) {
990           throw new IOException("table does not exist");
991         }
992         getHBaseAdmin().deleteTable(tableName);
993       } catch (IOException e) {
994         LOG.warn(e.getMessage(), e);
995         throw new IOError(e.getMessage());
996       }
997     }
998 
999     @Override
1000     public void mutateRow(ByteBuffer tableName, ByteBuffer row,
1001         List<Mutation> mutations, Map<ByteBuffer, ByteBuffer> attributes)
1002         throws IOError, IllegalArgument {
1003       mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP,
1004                   attributes);
1005     }
1006 
1007     @Override
1008     public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
1009         List<Mutation> mutations, long timestamp,
1010         Map<ByteBuffer, ByteBuffer> attributes)
1011         throws IOError, IllegalArgument {
1012       HTable table = null;
1013       try {
1014         table = getTable(tableName);
1015         Put put = new Put(getBytes(row), timestamp);
1016         addAttributes(put, attributes);
1017 
1018         Delete delete = new Delete(getBytes(row));
1019         addAttributes(delete, attributes);
1020         if (metrics != null) {
1021           metrics.incNumRowKeysInBatchMutate(mutations.size());
1022         }
1023 
1024         // I apologize for all this mess :)
1025         for (Mutation m : mutations) {
1026           byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1027           if (m.isDelete) {
1028             if (famAndQf.length == 1) {
1029               delete.deleteFamily(famAndQf[0], timestamp);
1030             } else {
1031               delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1032             }
1033             delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1034                 : Durability.SKIP_WAL);
1035           } else {
1036             if(famAndQf.length == 1) {
1037               LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1038                   + "over the whole column family.");
1039             } else {
1040               put.addImmutable(famAndQf[0], famAndQf[1],
1041                   m.value != null ? getBytes(m.value)
1042                       : HConstants.EMPTY_BYTE_ARRAY);
1043             }
1044             put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1045           }
1046         }
1047         if (!delete.isEmpty())
1048           table.delete(delete);
1049         if (!put.isEmpty())
1050           table.put(put);
1051       } catch (IOException e) {
1052         LOG.warn(e.getMessage(), e);
1053         throw new IOError(e.getMessage());
1054       } catch (IllegalArgumentException e) {
1055         LOG.warn(e.getMessage(), e);
1056         throw new IllegalArgument(e.getMessage());
1057       }
1058     }
1059 
1060     @Override
1061     public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches,
1062         Map<ByteBuffer, ByteBuffer> attributes)
1063         throws IOError, IllegalArgument, TException {
1064       mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes);
1065     }
1066 
1067     @Override
1068     public void mutateRowsTs(
1069         ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp,
1070         Map<ByteBuffer, ByteBuffer> attributes)
1071         throws IOError, IllegalArgument, TException {
1072       List<Put> puts = new ArrayList<Put>();
1073       List<Delete> deletes = new ArrayList<Delete>();
1074 
1075       for (BatchMutation batch : rowBatches) {
1076         byte[] row = getBytes(batch.row);
1077         List<Mutation> mutations = batch.mutations;
1078         Delete delete = new Delete(row);
1079         addAttributes(delete, attributes);
1080         Put put = new Put(row, timestamp);
1081         addAttributes(put, attributes);
1082         for (Mutation m : mutations) {
1083           byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1084           if (m.isDelete) {
1085             // no qualifier, family only.
1086             if (famAndQf.length == 1) {
1087               delete.deleteFamily(famAndQf[0], timestamp);
1088             } else {
1089               delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1090             }
1091             delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1092                 : Durability.SKIP_WAL);
1093           } else {
1094             if (famAndQf.length == 1) {
1095               LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1096                   + "over the whole column family.");
1097             }
1098             if (famAndQf.length == 2) {
1099               put.addImmutable(famAndQf[0], famAndQf[1],
1100                   m.value != null ? getBytes(m.value)
1101                       : HConstants.EMPTY_BYTE_ARRAY);
1102             } else {
1103               throw new IllegalArgumentException("Invalid famAndQf provided.");
1104             }
1105             put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1106           }
1107         }
1108         if (!delete.isEmpty())
1109           deletes.add(delete);
1110         if (!put.isEmpty())
1111           puts.add(put);
1112       }
1113 
1114       HTable table = null;
1115       try {
1116         table = getTable(tableName);
1117         if (!puts.isEmpty())
1118           table.put(puts);
1119         if (!deletes.isEmpty())
1120           table.delete(deletes);
1121 
1122       } catch (IOException e) {
1123         LOG.warn(e.getMessage(), e);
1124         throw new IOError(e.getMessage());
1125       } catch (IllegalArgumentException e) {
1126         LOG.warn(e.getMessage(), e);
1127         throw new IllegalArgument(e.getMessage());
1128       }
1129     }
1130 
1131     @Deprecated
1132     @Override
1133     public long atomicIncrement(
1134         ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount)
1135             throws IOError, IllegalArgument, TException {
1136       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1137       if(famAndQf.length == 1) {
1138         return atomicIncrement(tableName, row, famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, amount);
1139       }
1140       return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
1141     }
1142 
1143     protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
1144         byte [] family, byte [] qualifier, long amount)
1145         throws IOError, IllegalArgument, TException {
1146       HTable table;
1147       try {
1148         table = getTable(tableName);
1149         return table.incrementColumnValue(
1150             getBytes(row), family, qualifier, amount);
1151       } catch (IOException e) {
1152         LOG.warn(e.getMessage(), e);
1153         throw new IOError(e.getMessage());
1154       }
1155     }
1156 
1157     @Override
1158     public void scannerClose(int id) throws IOError, IllegalArgument {
1159       LOG.debug("scannerClose: id=" + id);
1160       ResultScannerWrapper resultScannerWrapper = getScanner(id);
1161       if (resultScannerWrapper == null) {
1162         String message = "scanner ID is invalid";
1163         LOG.warn(message);
1164         throw new IllegalArgument("scanner ID is invalid");
1165       }
1166       resultScannerWrapper.getScanner().close();
1167       removeScanner(id);
1168     }
1169 
1170     @Override
1171     public List<TRowResult> scannerGetList(int id,int nbRows)
1172         throws IllegalArgument, IOError {
1173       LOG.debug("scannerGetList: id=" + id);
1174       ResultScannerWrapper resultScannerWrapper = getScanner(id);
1175       if (null == resultScannerWrapper) {
1176         String message = "scanner ID is invalid";
1177         LOG.warn(message);
1178         throw new IllegalArgument("scanner ID is invalid");
1179       }
1180 
1181       Result [] results = null;
1182       try {
1183         results = resultScannerWrapper.getScanner().next(nbRows);
1184         if (null == results) {
1185           return new ArrayList<TRowResult>();
1186         }
1187       } catch (IOException e) {
1188         LOG.warn(e.getMessage(), e);
1189         throw new IOError(e.getMessage());
1190       }
1191       return ThriftUtilities.rowResultFromHBase(results, resultScannerWrapper.isColumnSorted());
1192     }
1193 
1194     @Override
1195     public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
1196       return scannerGetList(id,1);
1197     }
1198 
1199     @Override
1200     public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
1201         Map<ByteBuffer, ByteBuffer> attributes)
1202         throws IOError {
1203       try {
1204         HTable table = getTable(tableName);
1205         Scan scan = new Scan();
1206         addAttributes(scan, attributes);
1207         if (tScan.isSetStartRow()) {
1208           scan.setStartRow(tScan.getStartRow());
1209         }
1210         if (tScan.isSetStopRow()) {
1211           scan.setStopRow(tScan.getStopRow());
1212         }
1213         if (tScan.isSetTimestamp()) {
1214           scan.setTimeRange(0, tScan.getTimestamp());
1215         }
1216         if (tScan.isSetCaching()) {
1217           scan.setCaching(tScan.getCaching());
1218         }
1219         if (tScan.isSetBatchSize()) {
1220           scan.setBatch(tScan.getBatchSize());
1221         }
1222         if (tScan.isSetColumns() && tScan.getColumns().size() != 0) {
1223           for(ByteBuffer column : tScan.getColumns()) {
1224             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1225             if(famQf.length == 1) {
1226               scan.addFamily(famQf[0]);
1227             } else {
1228               scan.addColumn(famQf[0], famQf[1]);
1229             }
1230           }
1231         }
1232         if (tScan.isSetFilterString()) {
1233           ParseFilter parseFilter = new ParseFilter();
1234           scan.setFilter(
1235               parseFilter.parseFilterString(tScan.getFilterString()));
1236         }
1237         if (tScan.isSetReversed()) {
1238           scan.setReversed(tScan.isReversed());
1239         }
1240         return addScanner(table.getScanner(scan), tScan.sortColumns);
1241       } catch (IOException e) {
1242         LOG.warn(e.getMessage(), e);
1243         throw new IOError(e.getMessage());
1244       }
1245     }
1246 
1247     @Override
1248     public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
1249         List<ByteBuffer> columns,
1250         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1251       try {
1252         HTable table = getTable(tableName);
1253         Scan scan = new Scan(getBytes(startRow));
1254         addAttributes(scan, attributes);
1255         if(columns != null && columns.size() != 0) {
1256           for(ByteBuffer column : columns) {
1257             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1258             if(famQf.length == 1) {
1259               scan.addFamily(famQf[0]);
1260             } else {
1261               scan.addColumn(famQf[0], famQf[1]);
1262             }
1263           }
1264         }
1265         return addScanner(table.getScanner(scan), false);
1266       } catch (IOException e) {
1267         LOG.warn(e.getMessage(), e);
1268         throw new IOError(e.getMessage());
1269       }
1270     }
1271 
1272     @Override
1273     public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow,
1274         ByteBuffer stopRow, List<ByteBuffer> columns,
1275         Map<ByteBuffer, ByteBuffer> attributes)
1276         throws IOError, TException {
1277       try {
1278         HTable table = getTable(tableName);
1279         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1280         addAttributes(scan, attributes);
1281         if(columns != null && columns.size() != 0) {
1282           for(ByteBuffer column : columns) {
1283             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1284             if(famQf.length == 1) {
1285               scan.addFamily(famQf[0]);
1286             } else {
1287               scan.addColumn(famQf[0], famQf[1]);
1288             }
1289           }
1290         }
1291         return addScanner(table.getScanner(scan), false);
1292       } catch (IOException e) {
1293         LOG.warn(e.getMessage(), e);
1294         throw new IOError(e.getMessage());
1295       }
1296     }
1297 
1298     @Override
1299     public int scannerOpenWithPrefix(ByteBuffer tableName,
1300                                      ByteBuffer startAndPrefix,
1301                                      List<ByteBuffer> columns,
1302         Map<ByteBuffer, ByteBuffer> attributes)
1303         throws IOError, TException {
1304       try {
1305         HTable table = getTable(tableName);
1306         Scan scan = new Scan(getBytes(startAndPrefix));
1307         addAttributes(scan, attributes);
1308         Filter f = new WhileMatchFilter(
1309             new PrefixFilter(getBytes(startAndPrefix)));
1310         scan.setFilter(f);
1311         if (columns != null && columns.size() != 0) {
1312           for(ByteBuffer column : columns) {
1313             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1314             if(famQf.length == 1) {
1315               scan.addFamily(famQf[0]);
1316             } else {
1317               scan.addColumn(famQf[0], famQf[1]);
1318             }
1319           }
1320         }
1321         return addScanner(table.getScanner(scan), false);
1322       } catch (IOException e) {
1323         LOG.warn(e.getMessage(), e);
1324         throw new IOError(e.getMessage());
1325       }
1326     }
1327 
1328     @Override
1329     public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
1330         List<ByteBuffer> columns, long timestamp,
1331         Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
1332       try {
1333         HTable table = getTable(tableName);
1334         Scan scan = new Scan(getBytes(startRow));
1335         addAttributes(scan, attributes);
1336         scan.setTimeRange(0, timestamp);
1337         if (columns != null && columns.size() != 0) {
1338           for (ByteBuffer column : columns) {
1339             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1340             if(famQf.length == 1) {
1341               scan.addFamily(famQf[0]);
1342             } else {
1343               scan.addColumn(famQf[0], famQf[1]);
1344             }
1345           }
1346         }
1347         return addScanner(table.getScanner(scan), false);
1348       } catch (IOException e) {
1349         LOG.warn(e.getMessage(), e);
1350         throw new IOError(e.getMessage());
1351       }
1352     }
1353 
1354     @Override
1355     public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow,
1356         ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
1357         Map<ByteBuffer, ByteBuffer> attributes)
1358         throws IOError, TException {
1359       try {
1360         HTable table = getTable(tableName);
1361         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1362         addAttributes(scan, attributes);
1363         scan.setTimeRange(0, timestamp);
1364         if (columns != null && columns.size() != 0) {
1365           for (ByteBuffer column : columns) {
1366             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1367             if(famQf.length == 1) {
1368               scan.addFamily(famQf[0]);
1369             } else {
1370               scan.addColumn(famQf[0], famQf[1]);
1371             }
1372           }
1373         }
1374         scan.setTimeRange(0, timestamp);
1375         return addScanner(table.getScanner(scan), false);
1376       } catch (IOException e) {
1377         LOG.warn(e.getMessage(), e);
1378         throw new IOError(e.getMessage());
1379       }
1380     }
1381 
1382     @Override
1383     public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
1384         ByteBuffer tableName) throws IOError, TException {
1385       try {
1386         TreeMap<ByteBuffer, ColumnDescriptor> columns =
1387           new TreeMap<ByteBuffer, ColumnDescriptor>();
1388 
1389         HTable table = getTable(tableName);
1390         HTableDescriptor desc = table.getTableDescriptor();
1391 
1392         for (HColumnDescriptor e : desc.getFamilies()) {
1393           ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
1394           columns.put(col.name, col);
1395         }
1396         return columns;
1397       } catch (IOException e) {
1398         LOG.warn(e.getMessage(), e);
1399         throw new IOError(e.getMessage());
1400       }
1401     }
1402 
1403     @Deprecated
1404     @Override
1405     public List<TCell> getRowOrBefore(ByteBuffer tableName, ByteBuffer row,
1406         ByteBuffer family) throws IOError {
1407       try {
1408         HTable table = getTable(getBytes(tableName));
1409         Result result = table.getRowOrBefore(getBytes(row), getBytes(family));
1410         return ThriftUtilities.cellFromHBase(result.rawCells());
1411       } catch (IOException e) {
1412         LOG.warn(e.getMessage(), e);
1413         throw new IOError(e.getMessage());
1414       }
1415     }
1416 
1417     @Override
1418     public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
1419       try {
1420         HTable table = getTable(TableName.META_TABLE_NAME.getName());
1421         byte[] row = getBytes(searchRow);
1422         Result startRowResult = table.getRowOrBefore(
1423           row, HConstants.CATALOG_FAMILY);
1424 
1425         if (startRowResult == null) {
1426           throw new IOException("Cannot find row in "+ TableName.META_TABLE_NAME+", row="
1427                                 + Bytes.toStringBinary(row));
1428         }
1429 
1430         // find region start and end keys
1431         HRegionInfo regionInfo = HRegionInfo.getHRegionInfo(startRowResult);
1432         if (regionInfo == null) {
1433           throw new IOException("HRegionInfo REGIONINFO was null or " +
1434                                 " empty in Meta for row="
1435                                 + Bytes.toStringBinary(row));
1436         }
1437         TRegionInfo region = new TRegionInfo();
1438         region.setStartKey(regionInfo.getStartKey());
1439         region.setEndKey(regionInfo.getEndKey());
1440         region.id = regionInfo.getRegionId();
1441         region.setName(regionInfo.getRegionName());
1442         region.version = regionInfo.getVersion();
1443 
1444         // find region assignment to server
1445         ServerName serverName = HRegionInfo.getServerName(startRowResult);
1446         if (serverName != null) {
1447           region.setServerName(Bytes.toBytes(serverName.getHostname()));
1448           region.port = serverName.getPort();
1449         }
1450         return region;
1451       } catch (IOException e) {
1452         LOG.warn(e.getMessage(), e);
1453         throw new IOError(e.getMessage());
1454       }
1455     }
1456 
1457     private void initMetrics(ThriftMetrics metrics) {
1458       this.metrics = metrics;
1459     }
1460 
1461     @Override
1462     public void increment(TIncrement tincrement) throws IOError, TException {
1463 
1464       if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) {
1465         throw new TException("Must supply a table and a row key; can't increment");
1466       }
1467 
1468       if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1469         this.coalescer.queueIncrement(tincrement);
1470         return;
1471       }
1472 
1473       try {
1474         HTable table = getTable(tincrement.getTable());
1475         Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
1476         table.increment(inc);
1477       } catch (IOException e) {
1478         LOG.warn(e.getMessage(), e);
1479         throw new IOError(e.getMessage());
1480       }
1481     }
1482 
1483     @Override
1484     public void incrementRows(List<TIncrement> tincrements) throws IOError, TException {
1485       if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1486         this.coalescer.queueIncrements(tincrements);
1487         return;
1488       }
1489       for (TIncrement tinc : tincrements) {
1490         increment(tinc);
1491       }
1492     }
1493 
1494     @Override
1495     public List<TCell> append(TAppend tappend) throws IOError, TException {
1496       if (tappend.getRow().length == 0 || tappend.getTable().length == 0) {
1497         throw new TException("Must supply a table and a row key; can't append");
1498       }
1499 
1500       try {
1501         HTable table = getTable(tappend.getTable());
1502         Append append = ThriftUtilities.appendFromThrift(tappend);
1503         Result result = table.append(append);
1504         return ThriftUtilities.cellFromHBase(result.rawCells());
1505       } catch (IOException e) {
1506         LOG.warn(e.getMessage(), e);
1507         throw new IOError(e.getMessage());
1508       }
1509     }
1510 
1511     @Override
1512     public boolean checkAndPut(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1513         ByteBuffer value, Mutation mput, Map<ByteBuffer, ByteBuffer> attributes) throws IOError,
1514         IllegalArgument, TException {
1515       Put put;
1516       try {
1517         put = new Put(getBytes(row), HConstants.LATEST_TIMESTAMP);
1518         addAttributes(put, attributes);
1519 
1520         byte[][] famAndQf = KeyValue.parseColumn(getBytes(mput.column));
1521 
1522         put.addImmutable(famAndQf[0], famAndQf[1], mput.value != null ? getBytes(mput.value)
1523             : HConstants.EMPTY_BYTE_ARRAY);
1524 
1525         put.setDurability(mput.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1526       } catch (IllegalArgumentException e) {
1527         LOG.warn(e.getMessage(), e);
1528         throw new IllegalArgument(e.getMessage());
1529       }
1530 
1531       HTable table = null;
1532       try {
1533         table = getTable(tableName);
1534         byte[][] famAndQf = KeyValue.parseColumn(getBytes(column));
1535         return table.checkAndPut(getBytes(row), famAndQf[0], famAndQf[1],
1536           value != null ? getBytes(value) : HConstants.EMPTY_BYTE_ARRAY, put);
1537       } catch (IOException e) {
1538         LOG.warn(e.getMessage(), e);
1539         throw new IOError(e.getMessage());
1540       } catch (IllegalArgumentException e) {
1541         LOG.warn(e.getMessage(), e);
1542         throw new IllegalArgument(e.getMessage());
1543       }
1544     }
1545   }
1546 
1547 
1548 
1549   /**
1550    * Adds all the attributes into the Operation object
1551    */
1552   private static void addAttributes(OperationWithAttributes op,
1553     Map<ByteBuffer, ByteBuffer> attributes) {
1554     if (attributes == null || attributes.size() == 0) {
1555       return;
1556     }
1557     for (Map.Entry<ByteBuffer, ByteBuffer> entry : attributes.entrySet()) {
1558       String name = Bytes.toStringBinary(getBytes(entry.getKey()));
1559       byte[] value =  getBytes(entry.getValue());
1560       op.setAttribute(name, value);
1561     }
1562   }
1563 
1564   public static void registerFilters(Configuration conf) {
1565     String[] filters = conf.getStrings("hbase.thrift.filters");
1566     if(filters != null) {
1567       for(String filterClass: filters) {
1568         String[] filterPart = filterClass.split(":");
1569         if(filterPart.length != 2) {
1570           LOG.warn("Invalid filter specification " + filterClass + " - skipping");
1571         } else {
1572           ParseFilter.registerFilter(filterPart[0], filterPart[1]);
1573         }
1574       }
1575     }
1576   }
1577 }