View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.thrift;
20  
21  import static org.apache.hadoop.hbase.util.Bytes.getBytes;
22  
23  import java.io.IOException;
24  import java.net.InetAddress;
25  import java.net.InetSocketAddress;
26  import java.net.UnknownHostException;
27  import java.nio.ByteBuffer;
28  import java.util.ArrayList;
29  import java.util.Arrays;
30  import java.util.Collections;
31  import java.util.HashMap;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.TreeMap;
35  import java.util.concurrent.BlockingQueue;
36  import java.util.concurrent.ExecutorService;
37  import java.util.concurrent.LinkedBlockingQueue;
38  import java.util.concurrent.ThreadPoolExecutor;
39  import java.util.concurrent.TimeUnit;
40  
41  import org.apache.commons.cli.CommandLine;
42  import org.apache.commons.cli.Option;
43  import org.apache.commons.cli.OptionGroup;
44  import org.apache.commons.logging.Log;
45  import org.apache.commons.logging.LogFactory;
46  import org.apache.hadoop.classification.InterfaceAudience;
47  import org.apache.hadoop.conf.Configuration;
48  import org.apache.hadoop.hbase.HBaseConfiguration;
49  import org.apache.hadoop.hbase.HColumnDescriptor;
50  import org.apache.hadoop.hbase.HConstants;
51  import org.apache.hadoop.hbase.HRegionInfo;
52  import org.apache.hadoop.hbase.HTableDescriptor;
53  import org.apache.hadoop.hbase.KeyValue;
54  import org.apache.hadoop.hbase.ServerName;
55  import org.apache.hadoop.hbase.TableName;
56  import org.apache.hadoop.hbase.TableNotFoundException;
57  import org.apache.hadoop.hbase.client.Delete;
58  import org.apache.hadoop.hbase.client.Durability;
59  import org.apache.hadoop.hbase.client.Get;
60  import org.apache.hadoop.hbase.client.HBaseAdmin;
61  import org.apache.hadoop.hbase.client.HTable;
62  import org.apache.hadoop.hbase.client.Increment;
63  import org.apache.hadoop.hbase.client.OperationWithAttributes;
64  import org.apache.hadoop.hbase.client.Put;
65  import org.apache.hadoop.hbase.client.Result;
66  import org.apache.hadoop.hbase.client.ResultScanner;
67  import org.apache.hadoop.hbase.client.Scan;
68  import org.apache.hadoop.hbase.filter.Filter;
69  import org.apache.hadoop.hbase.filter.ParseFilter;
70  import org.apache.hadoop.hbase.filter.PrefixFilter;
71  import org.apache.hadoop.hbase.filter.WhileMatchFilter;
72  import org.apache.hadoop.hbase.thrift.CallQueue.Call;
73  import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
74  import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
75  import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
76  import org.apache.hadoop.hbase.thrift.generated.Hbase;
77  import org.apache.hadoop.hbase.thrift.generated.IOError;
78  import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
79  import org.apache.hadoop.hbase.thrift.generated.Mutation;
80  import org.apache.hadoop.hbase.thrift.generated.TCell;
81  import org.apache.hadoop.hbase.thrift.generated.TIncrement;
82  import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
83  import org.apache.hadoop.hbase.thrift.generated.TRowResult;
84  import org.apache.hadoop.hbase.thrift.generated.TScan;
85  import org.apache.hadoop.hbase.util.Bytes;
86  import org.apache.thrift.TException;
87  import org.apache.thrift.protocol.TBinaryProtocol;
88  import org.apache.thrift.protocol.TCompactProtocol;
89  import org.apache.thrift.protocol.TProtocolFactory;
90  import org.apache.thrift.server.THsHaServer;
91  import org.apache.thrift.server.TNonblockingServer;
92  import org.apache.thrift.server.TServer;
93  import org.apache.thrift.server.TThreadedSelectorServer;
94  import org.apache.thrift.transport.TFramedTransport;
95  import org.apache.thrift.transport.TNonblockingServerSocket;
96  import org.apache.thrift.transport.TNonblockingServerTransport;
97  import org.apache.thrift.transport.TServerSocket;
98  import org.apache.thrift.transport.TServerTransport;
99  import org.apache.thrift.transport.TTransportFactory;
100 
101 import com.google.common.base.Joiner;
102 import com.google.common.util.concurrent.ThreadFactoryBuilder;
103 
104 /**
105  * ThriftServerRunner - this class starts up a Thrift server which implements
106  * the Hbase API specified in the Hbase.thrift IDL file.
107  */
108 @InterfaceAudience.Private
109 public class ThriftServerRunner implements Runnable {
110 
111   private static final Log LOG = LogFactory.getLog(ThriftServerRunner.class);
112 
113   static final String SERVER_TYPE_CONF_KEY =
114       "hbase.regionserver.thrift.server.type";
115 
116   static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress";
117   static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
118   static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
119   static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
120   static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
121 
122   private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
123   public static final int DEFAULT_LISTEN_PORT = 9090;
124   private final int listenPort;
125 
126   private Configuration conf;
127   volatile TServer tserver;
128   private final Hbase.Iface handler;
129   private final ThriftMetrics metrics;
130 
131   /** An enum of server implementation selections */
132   enum ImplType {
133     HS_HA("hsha", true, THsHaServer.class, true),
134     NONBLOCKING("nonblocking", true, TNonblockingServer.class, true),
135     THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true),
136     THREADED_SELECTOR(
137         "threadedselector", true, TThreadedSelectorServer.class, true);
138 
139     public static final ImplType DEFAULT = THREAD_POOL;
140 
141     final String option;
142     final boolean isAlwaysFramed;
143     final Class<? extends TServer> serverClass;
144     final boolean canSpecifyBindIP;
145 
146     ImplType(String option, boolean isAlwaysFramed,
147         Class<? extends TServer> serverClass, boolean canSpecifyBindIP) {
148       this.option = option;
149       this.isAlwaysFramed = isAlwaysFramed;
150       this.serverClass = serverClass;
151       this.canSpecifyBindIP = canSpecifyBindIP;
152     }
153 
154     /**
155      * @return <code>-option</code> so we can get the list of options from
156      *         {@link #values()}
157      */
158     @Override
159     public String toString() {
160       return "-" + option;
161     }
162 
163     String getDescription() {
164       StringBuilder sb = new StringBuilder("Use the " +
165           serverClass.getSimpleName());
166       if (isAlwaysFramed) {
167         sb.append(" This implies the framed transport.");
168       }
169       if (this == DEFAULT) {
170         sb.append("This is the default.");
171       }
172       return sb.toString();
173     }
174 
175     static OptionGroup createOptionGroup() {
176       OptionGroup group = new OptionGroup();
177       for (ImplType t : values()) {
178         group.addOption(new Option(t.option, t.getDescription()));
179       }
180       return group;
181     }
182 
183     static ImplType getServerImpl(Configuration conf) {
184       String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option);
185       for (ImplType t : values()) {
186         if (confType.equals(t.option)) {
187           return t;
188         }
189       }
190       throw new AssertionError("Unknown server ImplType.option:" + confType);
191     }
192 
193     static void setServerImpl(CommandLine cmd, Configuration conf) {
194       ImplType chosenType = null;
195       int numChosen = 0;
196       for (ImplType t : values()) {
197         if (cmd.hasOption(t.option)) {
198           chosenType = t;
199           ++numChosen;
200         }
201       }
202       if (numChosen < 1) {
203         LOG.info("Using default thrift server type");
204         chosenType = DEFAULT;
205       } else if (numChosen > 1) {
206         throw new AssertionError("Exactly one option out of " +
207           Arrays.toString(values()) + " has to be specified");
208       }
209       LOG.info("Using thrift server type " + chosenType.option);
210       conf.set(SERVER_TYPE_CONF_KEY, chosenType.option);
211     }
212 
213     public String simpleClassName() {
214       return serverClass.getSimpleName();
215     }
216 
217     public static List<String> serversThatCannotSpecifyBindIP() {
218       List<String> l = new ArrayList<String>();
219       for (ImplType t : values()) {
220         if (!t.canSpecifyBindIP) {
221           l.add(t.simpleClassName());
222         }
223       }
224       return l;
225     }
226 
227   }
228 
229   public ThriftServerRunner(Configuration conf) throws IOException {
230     this(conf, new ThriftServerRunner.HBaseHandler(conf));
231   }
232 
233   public ThriftServerRunner(Configuration conf, HBaseHandler handler) {
234     this.conf = HBaseConfiguration.create(conf);
235     this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
236     this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
237     handler.initMetrics(metrics);
238     this.handler = HbaseHandlerMetricsProxy.newInstance(handler, metrics, conf);
239   }
240 
241   /*
242    * Runs the Thrift server
243    */
244   @Override
245   public void run() {
246     try {
247       setupServer();
248       tserver.serve();
249     } catch (Exception e) {
250       LOG.fatal("Cannot run ThriftServer", e);
251       // Crash the process if the ThriftServer is not running
252       System.exit(-1);
253     }
254   }
255 
256   public void shutdown() {
257     if (tserver != null) {
258       tserver.stop();
259       tserver = null;
260     }
261   }
262 
263   /**
264    * Setting up the thrift TServer
265    */
266   private void setupServer() throws Exception {
267     // Construct correct ProtocolFactory
268     TProtocolFactory protocolFactory;
269     if (conf.getBoolean(COMPACT_CONF_KEY, false)) {
270       LOG.debug("Using compact protocol");
271       protocolFactory = new TCompactProtocol.Factory();
272     } else {
273       LOG.debug("Using binary protocol");
274       protocolFactory = new TBinaryProtocol.Factory();
275     }
276 
277     Hbase.Processor<Hbase.Iface> processor =
278         new Hbase.Processor<Hbase.Iface>(handler);
279     ImplType implType = ImplType.getServerImpl(conf);
280 
281     // Construct correct TransportFactory
282     TTransportFactory transportFactory;
283     if (conf.getBoolean(FRAMED_CONF_KEY, false) || implType.isAlwaysFramed) {
284       transportFactory = new TFramedTransport.Factory();
285       LOG.debug("Using framed transport");
286     } else {
287       transportFactory = new TTransportFactory();
288     }
289 
290     if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
291       LOG.error("Server types " + Joiner.on(", ").join(
292           ImplType.serversThatCannotSpecifyBindIP()) + " don't support IP " +
293           "address binding at the moment. See " +
294           "https://issues.apache.org/jira/browse/HBASE-2155 for details.");
295       throw new RuntimeException(
296           "-" + BIND_CONF_KEY + " not supported with " + implType);
297     }
298 
299     if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING ||
300         implType == ImplType.THREADED_SELECTOR) {
301 
302       InetAddress listenAddress = getBindAddress(conf);
303       TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(
304           new InetSocketAddress(listenAddress, listenPort));
305 
306       if (implType == ImplType.NONBLOCKING) {
307         TNonblockingServer.Args serverArgs =
308             new TNonblockingServer.Args(serverTransport);
309         serverArgs.processor(processor)
310                   .transportFactory(transportFactory)
311                   .protocolFactory(protocolFactory);
312         tserver = new TNonblockingServer(serverArgs);
313       } else if (implType == ImplType.HS_HA) {
314         THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
315         CallQueue callQueue =
316             new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
317         ExecutorService executorService = createExecutor(
318             callQueue, serverArgs.getWorkerThreads());
319         serverArgs.executorService(executorService)
320                   .processor(processor)
321                   .transportFactory(transportFactory)
322                   .protocolFactory(protocolFactory);
323         tserver = new THsHaServer(serverArgs);
324       } else { // THREADED_SELECTOR
325         TThreadedSelectorServer.Args serverArgs =
326             new HThreadedSelectorServerArgs(serverTransport, conf);
327         CallQueue callQueue =
328             new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
329         ExecutorService executorService = createExecutor(
330             callQueue, serverArgs.getWorkerThreads());
331         serverArgs.executorService(executorService)
332                   .processor(processor)
333                   .transportFactory(transportFactory)
334                   .protocolFactory(protocolFactory);
335         tserver = new TThreadedSelectorServer(serverArgs);
336       }
337       LOG.info("starting HBase " + implType.simpleClassName() +
338           " server on " + Integer.toString(listenPort));
339     } else if (implType == ImplType.THREAD_POOL) {
340       // Thread pool server. Get the IP address to bind to.
341       InetAddress listenAddress = getBindAddress(conf);
342 
343       TServerTransport serverTransport = new TServerSocket(
344           new InetSocketAddress(listenAddress, listenPort));
345 
346       TBoundedThreadPoolServer.Args serverArgs =
347           new TBoundedThreadPoolServer.Args(serverTransport, conf);
348       serverArgs.processor(processor)
349                 .transportFactory(transportFactory)
350                 .protocolFactory(protocolFactory);
351       LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
352           + listenAddress + ":" + Integer.toString(listenPort)
353           + "; " + serverArgs);
354       TBoundedThreadPoolServer tserver =
355           new TBoundedThreadPoolServer(serverArgs, metrics);
356       this.tserver = tserver;
357     } else {
358       throw new AssertionError("Unsupported Thrift server implementation: " +
359           implType.simpleClassName());
360     }
361 
362     // A sanity check that we instantiated the right type of server.
363     if (tserver.getClass() != implType.serverClass) {
364       throw new AssertionError("Expected to create Thrift server class " +
365           implType.serverClass.getName() + " but got " +
366           tserver.getClass().getName());
367     }
368 
369 
370 
371     registerFilters(conf);
372   }
373 
374   ExecutorService createExecutor(BlockingQueue<Runnable> callQueue,
375                                  int workerThreads) {
376     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
377     tfb.setDaemon(true);
378     tfb.setNameFormat("thrift-worker-%d");
379     return new ThreadPoolExecutor(workerThreads, workerThreads,
380             Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
381   }
382 
383   private InetAddress getBindAddress(Configuration conf)
384       throws UnknownHostException {
385     String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
386     return InetAddress.getByName(bindAddressStr);
387   }
388 
389   protected static class ResultScannerWrapper {
390 
391     private final ResultScanner scanner;
392     private final boolean sortColumns;
393     public ResultScannerWrapper(ResultScanner resultScanner,
394                                 boolean sortResultColumns) {
395       scanner = resultScanner;
396       sortColumns = sortResultColumns;
397    }
398 
399     public ResultScanner getScanner() {
400       return scanner;
401     }
402 
403     public boolean isColumnSorted() {
404       return sortColumns;
405     }
406   }
407 
408   /**
409    * The HBaseHandler is a glue object that connects Thrift RPC calls to the
410    * HBase client API primarily defined in the HBaseAdmin and HTable objects.
411    */
412   public static class HBaseHandler implements Hbase.Iface {
413     protected Configuration conf;
414     protected volatile HBaseAdmin admin = null;
415     protected final Log LOG = LogFactory.getLog(this.getClass().getName());
416 
417     // nextScannerId and scannerMap are used to manage scanner state
418     protected int nextScannerId = 0;
419     protected HashMap<Integer, ResultScannerWrapper> scannerMap = null;
420     private ThriftMetrics metrics = null;
421 
422     private static ThreadLocal<Map<String, HTable>> threadLocalTables =
423         new ThreadLocal<Map<String, HTable>>() {
424       @Override
425       protected Map<String, HTable> initialValue() {
426         return new TreeMap<String, HTable>();
427       }
428     };
429 
430     IncrementCoalescer coalescer = null;
431 
432     /**
433      * Returns a list of all the column families for a given htable.
434      *
435      * @param table
436      * @throws IOException
437      */
438     byte[][] getAllColumns(HTable table) throws IOException {
439       HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies();
440       byte[][] columns = new byte[cds.length][];
441       for (int i = 0; i < cds.length; i++) {
442         columns[i] = Bytes.add(cds[i].getName(),
443             KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
444       }
445       return columns;
446     }
447 
448     /**
449      * Creates and returns an HTable instance from a given table name.
450      *
451      * @param tableName
452      *          name of table
453      * @return HTable object
454      * @throws IOException
455      * @throws IOError
456      */
457     public HTable getTable(final byte[] tableName) throws
458         IOException {
459       String table = new String(tableName);
460       Map<String, HTable> tables = threadLocalTables.get();
461       if (!tables.containsKey(table)) {
462         tables.put(table, new HTable(conf, tableName));
463       }
464       return tables.get(table);
465     }
466 
467     public HTable getTable(final ByteBuffer tableName) throws IOException {
468       return getTable(getBytes(tableName));
469     }
470 
471     /**
472      * Assigns a unique ID to the scanner and adds the mapping to an internal
473      * hash-map.
474      *
475      * @param scanner
476      * @return integer scanner id
477      */
478     protected synchronized int addScanner(ResultScanner scanner,boolean sortColumns) {
479       int id = nextScannerId++;
480       ResultScannerWrapper resultScannerWrapper = new ResultScannerWrapper(scanner, sortColumns);
481       scannerMap.put(id, resultScannerWrapper);
482       return id;
483     }
484 
485     /**
486      * Returns the scanner associated with the specified ID.
487      *
488      * @param id
489      * @return a Scanner, or null if ID was invalid.
490      */
491     protected synchronized ResultScannerWrapper getScanner(int id) {
492       return scannerMap.get(id);
493     }
494 
495     /**
496      * Removes the scanner associated with the specified ID from the internal
497      * id->scanner hash-map.
498      *
499      * @param id
500      * @return a Scanner, or null if ID was invalid.
501      */
502     protected synchronized ResultScannerWrapper removeScanner(int id) {
503       return scannerMap.remove(id);
504     }
505 
506     /**
507      * Constructs an HBaseHandler object.
508      * @throws IOException
509      */
510     protected HBaseHandler()
511     throws IOException {
512       this(HBaseConfiguration.create());
513     }
514 
515     protected HBaseHandler(final Configuration c) throws IOException {
516       this.conf = c;
517       scannerMap = new HashMap<Integer, ResultScannerWrapper>();
518       this.coalescer = new IncrementCoalescer(this);
519     }
520 
521     /**
522      * Obtain HBaseAdmin. Creates the instance if it is not already created.
523      */
524     private HBaseAdmin getHBaseAdmin() throws IOException {
525       if (admin == null) {
526         synchronized (this) {
527           if (admin == null) {
528             admin = new HBaseAdmin(conf);
529           }
530         }
531       }
532       return admin;
533     }
534 
535     @Override
536     public void enableTable(ByteBuffer tableName) throws IOError {
537       try{
538         getHBaseAdmin().enableTable(getBytes(tableName));
539       } catch (IOException e) {
540         LOG.warn(e.getMessage(), e);
541         throw new IOError(e.getMessage());
542       }
543     }
544 
545     @Override
546     public void disableTable(ByteBuffer tableName) throws IOError{
547       try{
548         getHBaseAdmin().disableTable(getBytes(tableName));
549       } catch (IOException e) {
550         LOG.warn(e.getMessage(), e);
551         throw new IOError(e.getMessage());
552       }
553     }
554 
555     @Override
556     public boolean isTableEnabled(ByteBuffer tableName) throws IOError {
557       try {
558         return HTable.isTableEnabled(this.conf, getBytes(tableName));
559       } catch (IOException e) {
560         LOG.warn(e.getMessage(), e);
561         throw new IOError(e.getMessage());
562       }
563     }
564 
565     @Override
566     public void compact(ByteBuffer tableNameOrRegionName) throws IOError {
567       try{
568         getHBaseAdmin().compact(getBytes(tableNameOrRegionName));
569       } catch (InterruptedException e) {
570         throw new IOError(e.getMessage());
571       } catch (IOException e) {
572         LOG.warn(e.getMessage(), e);
573         throw new IOError(e.getMessage());
574       }
575     }
576 
577     @Override
578     public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError {
579       try{
580         getHBaseAdmin().majorCompact(getBytes(tableNameOrRegionName));
581       } catch (InterruptedException e) {
582         LOG.warn(e.getMessage(), e);
583         throw new IOError(e.getMessage());
584       } catch (IOException e) {
585         LOG.warn(e.getMessage(), e);
586         throw new IOError(e.getMessage());
587       }
588     }
589 
590     @Override
591     public List<ByteBuffer> getTableNames() throws IOError {
592       try {
593         TableName[] tableNames = this.getHBaseAdmin().listTableNames();
594         ArrayList<ByteBuffer> list = new ArrayList<ByteBuffer>(tableNames.length);
595         for (int i = 0; i < tableNames.length; i++) {
596           list.add(ByteBuffer.wrap(tableNames[i].getName()));
597         }
598         return list;
599       } catch (IOException e) {
600         LOG.warn(e.getMessage(), e);
601         throw new IOError(e.getMessage());
602       }
603     }
604 
605     /**
606      * @return the list of regions in the given table, or an empty list if the table does not exist
607      */
608     @Override
609     public List<TRegionInfo> getTableRegions(ByteBuffer tableName)
610     throws IOError {
611       try {
612         HTable table;
613         try {
614           table = getTable(tableName);
615         } catch (TableNotFoundException ex) {
616           return new ArrayList<TRegionInfo>();
617         }
618         Map<HRegionInfo, ServerName> regionLocations =
619             table.getRegionLocations();
620         List<TRegionInfo> results = new ArrayList<TRegionInfo>();
621         for (Map.Entry<HRegionInfo, ServerName> entry :
622             regionLocations.entrySet()) {
623           HRegionInfo info = entry.getKey();
624           ServerName serverName = entry.getValue();
625           TRegionInfo region = new TRegionInfo();
626           region.serverName = ByteBuffer.wrap(
627               Bytes.toBytes(serverName.getHostname()));
628           region.port = serverName.getPort();
629           region.startKey = ByteBuffer.wrap(info.getStartKey());
630           region.endKey = ByteBuffer.wrap(info.getEndKey());
631           region.id = info.getRegionId();
632           region.name = ByteBuffer.wrap(info.getRegionName());
633           region.version = info.getVersion();
634           results.add(region);
635         }
636         return results;
637       } catch (TableNotFoundException e) {
638         // Return empty list for non-existing table
639         return Collections.emptyList();
640       } catch (IOException e){
641         LOG.warn(e.getMessage(), e);
642         throw new IOError(e.getMessage());
643       }
644     }
645 
646     @Deprecated
647     @Override
648     public List<TCell> get(
649         ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
650         Map<ByteBuffer, ByteBuffer> attributes)
651         throws IOError {
652       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
653       if (famAndQf.length == 1) {
654         return get(tableName, row, famAndQf[0], null, attributes);
655       }
656       if (famAndQf.length == 2) {
657         return get(tableName, row, famAndQf[0], famAndQf[1], attributes);
658       }
659       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
660     }
661 
662     /**
663      * Note: this internal interface is slightly different from public APIs in regard to handling
664      * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
665      * we respect qual == null as a request for the entire column family. The caller (
666      * {@link #get(ByteBuffer, ByteBuffer, ByteBuffer, Map)}) interface IS consistent in that the
667      * column is parse like normal.
668      */
669     protected List<TCell> get(ByteBuffer tableName,
670                               ByteBuffer row,
671                               byte[] family,
672                               byte[] qualifier,
673                               Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
674       try {
675         HTable table = getTable(tableName);
676         Get get = new Get(getBytes(row));
677         addAttributes(get, attributes);
678         if (qualifier == null) {
679           get.addFamily(family);
680         } else {
681           get.addColumn(family, qualifier);
682         }
683         Result result = table.get(get);
684         return ThriftUtilities.cellFromHBase(result.rawCells());
685       } catch (IOException e) {
686         LOG.warn(e.getMessage(), e);
687         throw new IOError(e.getMessage());
688       }
689     }
690 
691     @Deprecated
692     @Override
693     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
694         int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
695       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
696       if(famAndQf.length == 1) {
697         return getVer(tableName, row, famAndQf[0], null, numVersions, attributes);
698       }
699       if (famAndQf.length == 2) {
700         return getVer(tableName, row, famAndQf[0], famAndQf[1], numVersions, attributes);
701       }
702       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
703 
704     }
705 
706     /**
707      * Note: this public interface is slightly different from public Java APIs in regard to
708      * handling of the qualifier. Here we differ from the public Java API in that null != byte[0].
709      * Rather, we respect qual == null as a request for the entire column family. If you want to
710      * access the entire column family, use
711      * {@link #getVer(ByteBuffer, ByteBuffer, ByteBuffer, int, Map)} with a {@code column} value
712      * that lacks a {@code ':'}.
713      */
714     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family,
715         byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
716       try {
717         HTable table = getTable(tableName);
718         Get get = new Get(getBytes(row));
719         addAttributes(get, attributes);
720         if (null == qualifier) {
721           get.addFamily(family);
722         } else {
723           get.addColumn(family, qualifier);
724         }
725         get.setMaxVersions(numVersions);
726         Result result = table.get(get);
727         return ThriftUtilities.cellFromHBase(result.rawCells());
728       } catch (IOException e) {
729         LOG.warn(e.getMessage(), e);
730         throw new IOError(e.getMessage());
731       }
732     }
733 
734     @Deprecated
735     @Override
736     public List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
737         long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
738       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
739       if (famAndQf.length == 1) {
740         return getVerTs(tableName, row, famAndQf[0], null, timestamp, numVersions, attributes);
741       }
742       if (famAndQf.length == 2) {
743         return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, numVersions,
744           attributes);
745       }
746       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
747     }
748 
749     /**
750      * Note: this internal interface is slightly different from public APIs in regard to handling
751      * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
752      * we respect qual == null as a request for the entire column family. The caller (
753      * {@link #getVerTs(ByteBuffer, ByteBuffer, ByteBuffer, long, int, Map)}) interface IS
754      * consistent in that the column is parse like normal.
755      */
756     protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
757         byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
758         throws IOError {
759       try {
760         HTable table = getTable(tableName);
761         Get get = new Get(getBytes(row));
762         addAttributes(get, attributes);
763         if (null == qualifier) {
764           get.addFamily(family);
765         } else {
766           get.addColumn(family, qualifier);
767         }
768         get.setTimeRange(0, timestamp);
769         get.setMaxVersions(numVersions);
770         Result result = table.get(get);
771         return ThriftUtilities.cellFromHBase(result.rawCells());
772       } catch (IOException e) {
773         LOG.warn(e.getMessage(), e);
774         throw new IOError(e.getMessage());
775       }
776     }
777 
778     @Override
779     public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row,
780         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
781       return getRowWithColumnsTs(tableName, row, null,
782                                  HConstants.LATEST_TIMESTAMP,
783                                  attributes);
784     }
785 
786     @Override
787     public List<TRowResult> getRowWithColumns(ByteBuffer tableName,
788                                               ByteBuffer row,
789         List<ByteBuffer> columns,
790         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
791       return getRowWithColumnsTs(tableName, row, columns,
792                                  HConstants.LATEST_TIMESTAMP,
793                                  attributes);
794     }
795 
796     @Override
797     public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row,
798         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
799       return getRowWithColumnsTs(tableName, row, null,
800                                  timestamp, attributes);
801     }
802 
803     @Override
804     public List<TRowResult> getRowWithColumnsTs(
805         ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
806         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
807       try {
808         HTable table = getTable(tableName);
809         if (columns == null) {
810           Get get = new Get(getBytes(row));
811           addAttributes(get, attributes);
812           get.setTimeRange(0, timestamp);
813           Result result = table.get(get);
814           return ThriftUtilities.rowResultFromHBase(result);
815         }
816         Get get = new Get(getBytes(row));
817         addAttributes(get, attributes);
818         for(ByteBuffer column : columns) {
819           byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
820           if (famAndQf.length == 1) {
821               get.addFamily(famAndQf[0]);
822           } else {
823               get.addColumn(famAndQf[0], famAndQf[1]);
824           }
825         }
826         get.setTimeRange(0, timestamp);
827         Result result = table.get(get);
828         return ThriftUtilities.rowResultFromHBase(result);
829       } catch (IOException e) {
830         LOG.warn(e.getMessage(), e);
831         throw new IOError(e.getMessage());
832       }
833     }
834 
835     @Override
836     public List<TRowResult> getRows(ByteBuffer tableName,
837                                     List<ByteBuffer> rows,
838         Map<ByteBuffer, ByteBuffer> attributes)
839         throws IOError {
840       return getRowsWithColumnsTs(tableName, rows, null,
841                                   HConstants.LATEST_TIMESTAMP,
842                                   attributes);
843     }
844 
845     @Override
846     public List<TRowResult> getRowsWithColumns(ByteBuffer tableName,
847                                                List<ByteBuffer> rows,
848         List<ByteBuffer> columns,
849         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
850       return getRowsWithColumnsTs(tableName, rows, columns,
851                                   HConstants.LATEST_TIMESTAMP,
852                                   attributes);
853     }
854 
855     @Override
856     public List<TRowResult> getRowsTs(ByteBuffer tableName,
857                                       List<ByteBuffer> rows,
858         long timestamp,
859         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
860       return getRowsWithColumnsTs(tableName, rows, null,
861                                   timestamp, attributes);
862     }
863 
864     @Override
865     public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName,
866                                                  List<ByteBuffer> rows,
867         List<ByteBuffer> columns, long timestamp,
868         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
869       try {
870         List<Get> gets = new ArrayList<Get>(rows.size());
871         HTable table = getTable(tableName);
872         if (metrics != null) {
873           metrics.incNumRowKeysInBatchGet(rows.size());
874         }
875         for (ByteBuffer row : rows) {
876           Get get = new Get(getBytes(row));
877           addAttributes(get, attributes);
878           if (columns != null) {
879 
880             for(ByteBuffer column : columns) {
881               byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
882               if (famAndQf.length == 1) {
883                 get.addFamily(famAndQf[0]);
884               } else {
885                 get.addColumn(famAndQf[0], famAndQf[1]);
886               }
887             }
888           }
889           get.setTimeRange(0, timestamp);
890           gets.add(get);
891         }
892         Result[] result = table.get(gets);
893         return ThriftUtilities.rowResultFromHBase(result);
894       } catch (IOException e) {
895         LOG.warn(e.getMessage(), e);
896         throw new IOError(e.getMessage());
897       }
898     }
899 
900     @Override
901     public void deleteAll(
902         ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
903         Map<ByteBuffer, ByteBuffer> attributes)
904         throws IOError {
905       deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP,
906                   attributes);
907     }
908 
909     @Override
910     public void deleteAllTs(ByteBuffer tableName,
911                             ByteBuffer row,
912                             ByteBuffer column,
913         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
914       try {
915         HTable table = getTable(tableName);
916         Delete delete  = new Delete(getBytes(row));
917         addAttributes(delete, attributes);
918         byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
919         if (famAndQf.length == 1) {
920           delete.deleteFamily(famAndQf[0], timestamp);
921         } else {
922           delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
923         }
924         table.delete(delete);
925 
926       } catch (IOException e) {
927         LOG.warn(e.getMessage(), e);
928         throw new IOError(e.getMessage());
929       }
930     }
931 
932     @Override
933     public void deleteAllRow(
934         ByteBuffer tableName, ByteBuffer row,
935         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
936       deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, attributes);
937     }
938 
939     @Override
940     public void deleteAllRowTs(
941         ByteBuffer tableName, ByteBuffer row, long timestamp,
942         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
943       try {
944         HTable table = getTable(tableName);
945         Delete delete  = new Delete(getBytes(row), timestamp);
946         addAttributes(delete, attributes);
947         table.delete(delete);
948       } catch (IOException e) {
949         LOG.warn(e.getMessage(), e);
950         throw new IOError(e.getMessage());
951       }
952     }
953 
954     @Override
955     public void createTable(ByteBuffer in_tableName,
956         List<ColumnDescriptor> columnFamilies) throws IOError,
957         IllegalArgument, AlreadyExists {
958       byte [] tableName = getBytes(in_tableName);
959       try {
960         if (getHBaseAdmin().tableExists(tableName)) {
961           throw new AlreadyExists("table name already in use");
962         }
963         HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
964         for (ColumnDescriptor col : columnFamilies) {
965           HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col);
966           desc.addFamily(colDesc);
967         }
968         getHBaseAdmin().createTable(desc);
969       } catch (IOException e) {
970         LOG.warn(e.getMessage(), e);
971         throw new IOError(e.getMessage());
972       } catch (IllegalArgumentException e) {
973         LOG.warn(e.getMessage(), e);
974         throw new IllegalArgument(e.getMessage());
975       }
976     }
977 
978     @Override
979     public void deleteTable(ByteBuffer in_tableName) throws IOError {
980       byte [] tableName = getBytes(in_tableName);
981       if (LOG.isDebugEnabled()) {
982         LOG.debug("deleteTable: table=" + Bytes.toString(tableName));
983       }
984       try {
985         if (!getHBaseAdmin().tableExists(tableName)) {
986           throw new IOException("table does not exist");
987         }
988         getHBaseAdmin().deleteTable(tableName);
989       } catch (IOException e) {
990         LOG.warn(e.getMessage(), e);
991         throw new IOError(e.getMessage());
992       }
993     }
994 
995     @Override
996     public void mutateRow(ByteBuffer tableName, ByteBuffer row,
997         List<Mutation> mutations, Map<ByteBuffer, ByteBuffer> attributes)
998         throws IOError, IllegalArgument {
999       mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP,
1000                   attributes);
1001     }
1002 
1003     @Override
1004     public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
1005         List<Mutation> mutations, long timestamp,
1006         Map<ByteBuffer, ByteBuffer> attributes)
1007         throws IOError, IllegalArgument {
1008       HTable table = null;
1009       try {
1010         table = getTable(tableName);
1011         Put put = new Put(getBytes(row), timestamp);
1012         addAttributes(put, attributes);
1013 
1014         Delete delete = new Delete(getBytes(row));
1015         addAttributes(delete, attributes);
1016         if (metrics != null) {
1017           metrics.incNumRowKeysInBatchMutate(mutations.size());
1018         }
1019 
1020         // I apologize for all this mess :)
1021         for (Mutation m : mutations) {
1022           byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1023           if (m.isDelete) {
1024             if (famAndQf.length == 1) {
1025               delete.deleteFamily(famAndQf[0], timestamp);
1026             } else {
1027               delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1028             }
1029             delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1030                 : Durability.SKIP_WAL);
1031           } else {
1032             if(famAndQf.length == 1) {
1033               LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1034                   + "over the whole column family.");
1035             } else {
1036               put.addImmutable(famAndQf[0], famAndQf[1],
1037                   m.value != null ? getBytes(m.value)
1038                       : HConstants.EMPTY_BYTE_ARRAY);
1039             }
1040             put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1041           }
1042         }
1043         if (!delete.isEmpty())
1044           table.delete(delete);
1045         if (!put.isEmpty())
1046           table.put(put);
1047       } catch (IOException e) {
1048         LOG.warn(e.getMessage(), e);
1049         throw new IOError(e.getMessage());
1050       } catch (IllegalArgumentException e) {
1051         LOG.warn(e.getMessage(), e);
1052         throw new IllegalArgument(e.getMessage());
1053       }
1054     }
1055 
1056     @Override
1057     public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches,
1058         Map<ByteBuffer, ByteBuffer> attributes)
1059         throws IOError, IllegalArgument, TException {
1060       mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes);
1061     }
1062 
1063     @Override
1064     public void mutateRowsTs(
1065         ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp,
1066         Map<ByteBuffer, ByteBuffer> attributes)
1067         throws IOError, IllegalArgument, TException {
1068       List<Put> puts = new ArrayList<Put>();
1069       List<Delete> deletes = new ArrayList<Delete>();
1070 
1071       for (BatchMutation batch : rowBatches) {
1072         byte[] row = getBytes(batch.row);
1073         List<Mutation> mutations = batch.mutations;
1074         Delete delete = new Delete(row);
1075         addAttributes(delete, attributes);
1076         Put put = new Put(row, timestamp);
1077         addAttributes(put, attributes);
1078         for (Mutation m : mutations) {
1079           byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1080           if (m.isDelete) {
1081             // no qualifier, family only.
1082             if (famAndQf.length == 1) {
1083               delete.deleteFamily(famAndQf[0], timestamp);
1084             } else {
1085               delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1086             }
1087             delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1088                 : Durability.SKIP_WAL);
1089           } else {
1090             if (famAndQf.length == 1) {
1091               LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1092                   + "over the whole column family.");
1093             }
1094             if (famAndQf.length == 2) {
1095               put.addImmutable(famAndQf[0], famAndQf[1],
1096                   m.value != null ? getBytes(m.value)
1097                       : HConstants.EMPTY_BYTE_ARRAY);
1098             } else {
1099               throw new IllegalArgumentException("Invalid famAndQf provided.");
1100             }
1101             put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1102           }
1103         }
1104         if (!delete.isEmpty())
1105           deletes.add(delete);
1106         if (!put.isEmpty())
1107           puts.add(put);
1108       }
1109 
1110       HTable table = null;
1111       try {
1112         table = getTable(tableName);
1113         if (!puts.isEmpty())
1114           table.put(puts);
1115         if (!deletes.isEmpty())
1116           table.delete(deletes);
1117 
1118       } catch (IOException e) {
1119         LOG.warn(e.getMessage(), e);
1120         throw new IOError(e.getMessage());
1121       } catch (IllegalArgumentException e) {
1122         LOG.warn(e.getMessage(), e);
1123         throw new IllegalArgument(e.getMessage());
1124       }
1125     }
1126 
1127     @Deprecated
1128     @Override
1129     public long atomicIncrement(
1130         ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount)
1131             throws IOError, IllegalArgument, TException {
1132       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1133       if(famAndQf.length == 1) {
1134         return atomicIncrement(tableName, row, famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, amount);
1135       }
1136       return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
1137     }
1138 
1139     protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
1140         byte [] family, byte [] qualifier, long amount)
1141         throws IOError, IllegalArgument, TException {
1142       HTable table;
1143       try {
1144         table = getTable(tableName);
1145         return table.incrementColumnValue(
1146             getBytes(row), family, qualifier, amount);
1147       } catch (IOException e) {
1148         LOG.warn(e.getMessage(), e);
1149         throw new IOError(e.getMessage());
1150       }
1151     }
1152 
1153     @Override
1154     public void scannerClose(int id) throws IOError, IllegalArgument {
1155       LOG.debug("scannerClose: id=" + id);
1156       ResultScannerWrapper resultScannerWrapper = getScanner(id);
1157       if (resultScannerWrapper == null) {
1158         String message = "scanner ID is invalid";
1159         LOG.warn(message);
1160         throw new IllegalArgument("scanner ID is invalid");
1161       }
1162       resultScannerWrapper.getScanner().close();
1163       removeScanner(id);
1164     }
1165 
1166     @Override
1167     public List<TRowResult> scannerGetList(int id,int nbRows)
1168         throws IllegalArgument, IOError {
1169       LOG.debug("scannerGetList: id=" + id);
1170       ResultScannerWrapper resultScannerWrapper = getScanner(id);
1171       if (null == resultScannerWrapper) {
1172         String message = "scanner ID is invalid";
1173         LOG.warn(message);
1174         throw new IllegalArgument("scanner ID is invalid");
1175       }
1176 
1177       Result [] results = null;
1178       try {
1179         results = resultScannerWrapper.getScanner().next(nbRows);
1180         if (null == results) {
1181           return new ArrayList<TRowResult>();
1182         }
1183       } catch (IOException e) {
1184         LOG.warn(e.getMessage(), e);
1185         throw new IOError(e.getMessage());
1186       }
1187       return ThriftUtilities.rowResultFromHBase(results, resultScannerWrapper.isColumnSorted());
1188     }
1189 
1190     @Override
1191     public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
1192       return scannerGetList(id,1);
1193     }
1194 
1195     @Override
1196     public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
1197         Map<ByteBuffer, ByteBuffer> attributes)
1198         throws IOError {
1199       try {
1200         HTable table = getTable(tableName);
1201         Scan scan = new Scan();
1202         addAttributes(scan, attributes);
1203         if (tScan.isSetStartRow()) {
1204           scan.setStartRow(tScan.getStartRow());
1205         }
1206         if (tScan.isSetStopRow()) {
1207           scan.setStopRow(tScan.getStopRow());
1208         }
1209         if (tScan.isSetTimestamp()) {
1210           scan.setTimeRange(0, tScan.getTimestamp());
1211         }
1212         if (tScan.isSetCaching()) {
1213           scan.setCaching(tScan.getCaching());
1214         }
1215         if (tScan.isSetBatchSize()) {
1216           scan.setBatch(tScan.getBatchSize());
1217         }
1218         if (tScan.isSetColumns() && tScan.getColumns().size() != 0) {
1219           for(ByteBuffer column : tScan.getColumns()) {
1220             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1221             if(famQf.length == 1) {
1222               scan.addFamily(famQf[0]);
1223             } else {
1224               scan.addColumn(famQf[0], famQf[1]);
1225             }
1226           }
1227         }
1228         if (tScan.isSetFilterString()) {
1229           ParseFilter parseFilter = new ParseFilter();
1230           scan.setFilter(
1231               parseFilter.parseFilterString(tScan.getFilterString()));
1232         }
1233         return addScanner(table.getScanner(scan), tScan.sortColumns);
1234       } catch (IOException e) {
1235         LOG.warn(e.getMessage(), e);
1236         throw new IOError(e.getMessage());
1237       }
1238     }
1239 
1240     @Override
1241     public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
1242         List<ByteBuffer> columns,
1243         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1244       try {
1245         HTable table = getTable(tableName);
1246         Scan scan = new Scan(getBytes(startRow));
1247         addAttributes(scan, attributes);
1248         if(columns != null && columns.size() != 0) {
1249           for(ByteBuffer column : columns) {
1250             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1251             if(famQf.length == 1) {
1252               scan.addFamily(famQf[0]);
1253             } else {
1254               scan.addColumn(famQf[0], famQf[1]);
1255             }
1256           }
1257         }
1258         return addScanner(table.getScanner(scan), false);
1259       } catch (IOException e) {
1260         LOG.warn(e.getMessage(), e);
1261         throw new IOError(e.getMessage());
1262       }
1263     }
1264 
1265     @Override
1266     public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow,
1267         ByteBuffer stopRow, List<ByteBuffer> columns,
1268         Map<ByteBuffer, ByteBuffer> attributes)
1269         throws IOError, TException {
1270       try {
1271         HTable table = getTable(tableName);
1272         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1273         addAttributes(scan, attributes);
1274         if(columns != null && columns.size() != 0) {
1275           for(ByteBuffer column : columns) {
1276             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1277             if(famQf.length == 1) {
1278               scan.addFamily(famQf[0]);
1279             } else {
1280               scan.addColumn(famQf[0], famQf[1]);
1281             }
1282           }
1283         }
1284         return addScanner(table.getScanner(scan), false);
1285       } catch (IOException e) {
1286         LOG.warn(e.getMessage(), e);
1287         throw new IOError(e.getMessage());
1288       }
1289     }
1290 
1291     @Override
1292     public int scannerOpenWithPrefix(ByteBuffer tableName,
1293                                      ByteBuffer startAndPrefix,
1294                                      List<ByteBuffer> columns,
1295         Map<ByteBuffer, ByteBuffer> attributes)
1296         throws IOError, TException {
1297       try {
1298         HTable table = getTable(tableName);
1299         Scan scan = new Scan(getBytes(startAndPrefix));
1300         addAttributes(scan, attributes);
1301         Filter f = new WhileMatchFilter(
1302             new PrefixFilter(getBytes(startAndPrefix)));
1303         scan.setFilter(f);
1304         if (columns != null && columns.size() != 0) {
1305           for(ByteBuffer column : columns) {
1306             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1307             if(famQf.length == 1) {
1308               scan.addFamily(famQf[0]);
1309             } else {
1310               scan.addColumn(famQf[0], famQf[1]);
1311             }
1312           }
1313         }
1314         return addScanner(table.getScanner(scan), false);
1315       } catch (IOException e) {
1316         LOG.warn(e.getMessage(), e);
1317         throw new IOError(e.getMessage());
1318       }
1319     }
1320 
1321     @Override
1322     public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
1323         List<ByteBuffer> columns, long timestamp,
1324         Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
1325       try {
1326         HTable table = getTable(tableName);
1327         Scan scan = new Scan(getBytes(startRow));
1328         addAttributes(scan, attributes);
1329         scan.setTimeRange(0, timestamp);
1330         if (columns != null && columns.size() != 0) {
1331           for (ByteBuffer column : columns) {
1332             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1333             if(famQf.length == 1) {
1334               scan.addFamily(famQf[0]);
1335             } else {
1336               scan.addColumn(famQf[0], famQf[1]);
1337             }
1338           }
1339         }
1340         return addScanner(table.getScanner(scan), false);
1341       } catch (IOException e) {
1342         LOG.warn(e.getMessage(), e);
1343         throw new IOError(e.getMessage());
1344       }
1345     }
1346 
1347     @Override
1348     public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow,
1349         ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
1350         Map<ByteBuffer, ByteBuffer> attributes)
1351         throws IOError, TException {
1352       try {
1353         HTable table = getTable(tableName);
1354         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1355         addAttributes(scan, attributes);
1356         scan.setTimeRange(0, timestamp);
1357         if (columns != null && columns.size() != 0) {
1358           for (ByteBuffer column : columns) {
1359             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1360             if(famQf.length == 1) {
1361               scan.addFamily(famQf[0]);
1362             } else {
1363               scan.addColumn(famQf[0], famQf[1]);
1364             }
1365           }
1366         }
1367         scan.setTimeRange(0, timestamp);
1368         return addScanner(table.getScanner(scan), false);
1369       } catch (IOException e) {
1370         LOG.warn(e.getMessage(), e);
1371         throw new IOError(e.getMessage());
1372       }
1373     }
1374 
1375     @Override
1376     public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
1377         ByteBuffer tableName) throws IOError, TException {
1378       try {
1379         TreeMap<ByteBuffer, ColumnDescriptor> columns =
1380           new TreeMap<ByteBuffer, ColumnDescriptor>();
1381 
1382         HTable table = getTable(tableName);
1383         HTableDescriptor desc = table.getTableDescriptor();
1384 
1385         for (HColumnDescriptor e : desc.getFamilies()) {
1386           ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
1387           columns.put(col.name, col);
1388         }
1389         return columns;
1390       } catch (IOException e) {
1391         LOG.warn(e.getMessage(), e);
1392         throw new IOError(e.getMessage());
1393       }
1394     }
1395 
1396     @Override
1397     public List<TCell> getRowOrBefore(ByteBuffer tableName, ByteBuffer row,
1398         ByteBuffer family) throws IOError {
1399       try {
1400         HTable table = getTable(getBytes(tableName));
1401         Result result = table.getRowOrBefore(getBytes(row), getBytes(family));
1402         return ThriftUtilities.cellFromHBase(result.rawCells());
1403       } catch (IOException e) {
1404         LOG.warn(e.getMessage(), e);
1405         throw new IOError(e.getMessage());
1406       }
1407     }
1408 
1409     @Override
1410     public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
1411       try {
1412         HTable table = getTable(TableName.META_TABLE_NAME.getName());
1413         byte[] row = getBytes(searchRow);
1414         Result startRowResult = table.getRowOrBefore(
1415           row, HConstants.CATALOG_FAMILY);
1416 
1417         if (startRowResult == null) {
1418           throw new IOException("Cannot find row in "+ TableName.META_TABLE_NAME+", row="
1419                                 + Bytes.toStringBinary(row));
1420         }
1421 
1422         // find region start and end keys
1423         HRegionInfo regionInfo = HRegionInfo.getHRegionInfo(startRowResult);
1424         if (regionInfo == null) {
1425           throw new IOException("HRegionInfo REGIONINFO was null or " +
1426                                 " empty in Meta for row="
1427                                 + Bytes.toStringBinary(row));
1428         }
1429         TRegionInfo region = new TRegionInfo();
1430         region.setStartKey(regionInfo.getStartKey());
1431         region.setEndKey(regionInfo.getEndKey());
1432         region.id = regionInfo.getRegionId();
1433         region.setName(regionInfo.getRegionName());
1434         region.version = regionInfo.getVersion();
1435 
1436         // find region assignment to server
1437         ServerName serverName = HRegionInfo.getServerName(startRowResult);
1438         if (serverName != null) {
1439           region.setServerName(Bytes.toBytes(serverName.getHostname()));
1440           region.port = serverName.getPort();
1441         }
1442         return region;
1443       } catch (IOException e) {
1444         LOG.warn(e.getMessage(), e);
1445         throw new IOError(e.getMessage());
1446       }
1447     }
1448 
1449     private void initMetrics(ThriftMetrics metrics) {
1450       this.metrics = metrics;
1451     }
1452 
1453     @Override
1454     public void increment(TIncrement tincrement) throws IOError, TException {
1455 
1456       if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) {
1457         throw new TException("Must supply a table and a row key; can't increment");
1458       }
1459 
1460       if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1461         this.coalescer.queueIncrement(tincrement);
1462         return;
1463       }
1464 
1465       try {
1466         HTable table = getTable(tincrement.getTable());
1467         Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
1468         table.increment(inc);
1469       } catch (IOException e) {
1470         LOG.warn(e.getMessage(), e);
1471         throw new IOError(e.getMessage());
1472       }
1473     }
1474 
1475     @Override
1476     public void incrementRows(List<TIncrement> tincrements) throws IOError, TException {
1477       if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1478         this.coalescer.queueIncrements(tincrements);
1479         return;
1480       }
1481       for (TIncrement tinc : tincrements) {
1482         increment(tinc);
1483       }
1484     }
1485   }
1486 
1487 
1488 
1489   /**
1490    * Adds all the attributes into the Operation object
1491    */
1492   private static void addAttributes(OperationWithAttributes op,
1493     Map<ByteBuffer, ByteBuffer> attributes) {
1494     if (attributes == null || attributes.size() == 0) {
1495       return;
1496     }
1497     for (Map.Entry<ByteBuffer, ByteBuffer> entry : attributes.entrySet()) {
1498       String name = Bytes.toStringBinary(getBytes(entry.getKey()));
1499       byte[] value =  getBytes(entry.getValue());
1500       op.setAttribute(name, value);
1501     }
1502   }
1503 
1504   public static void registerFilters(Configuration conf) {
1505     String[] filters = conf.getStrings("hbase.thrift.filters");
1506     if(filters != null) {
1507       for(String filterClass: filters) {
1508         String[] filterPart = filterClass.split(":");
1509         if(filterPart.length != 2) {
1510           LOG.warn("Invalid filter specification " + filterClass + " - skipping");
1511         } else {
1512           ParseFilter.registerFilter(filterPart[0], filterPart[1]);
1513         }
1514       }
1515     }
1516   }
1517 }