1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.thrift;
20  
21  import static org.apache.hadoop.hbase.util.Bytes.getBytes;
22  
23  import java.io.IOException;
24  import java.net.InetAddress;
25  import java.net.InetSocketAddress;
26  import java.net.UnknownHostException;
27  import java.nio.ByteBuffer;
28  import java.util.ArrayList;
29  import java.util.Arrays;
30  import java.util.Collections;
31  import java.util.HashMap;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.TreeMap;
35  import java.util.concurrent.BlockingQueue;
36  import java.util.concurrent.ExecutorService;
37  import java.util.concurrent.LinkedBlockingQueue;
38  import java.util.concurrent.ThreadPoolExecutor;
39  import java.util.concurrent.TimeUnit;
40  
41  import org.apache.commons.cli.CommandLine;
42  import org.apache.commons.cli.Option;
43  import org.apache.commons.cli.OptionGroup;
44  import org.apache.commons.logging.Log;
45  import org.apache.commons.logging.LogFactory;
46  import org.apache.hadoop.classification.InterfaceAudience;
47  import org.apache.hadoop.conf.Configuration;
48  import org.apache.hadoop.hbase.HBaseConfiguration;
49  import org.apache.hadoop.hbase.HColumnDescriptor;
50  import org.apache.hadoop.hbase.HConstants;
51  import org.apache.hadoop.hbase.HRegionInfo;
52  import org.apache.hadoop.hbase.HTableDescriptor;
53  import org.apache.hadoop.hbase.KeyValue;
54  import org.apache.hadoop.hbase.ServerName;
55  import org.apache.hadoop.hbase.exceptions.TableNotFoundException;
56  import org.apache.hadoop.hbase.client.Delete;
57  import org.apache.hadoop.hbase.client.Get;
58  import org.apache.hadoop.hbase.client.HBaseAdmin;
59  import org.apache.hadoop.hbase.client.HTable;
60  import org.apache.hadoop.hbase.client.Increment;
61  import org.apache.hadoop.hbase.client.OperationWithAttributes;
62  import org.apache.hadoop.hbase.client.Put;
63  import org.apache.hadoop.hbase.client.Result;
64  import org.apache.hadoop.hbase.client.ResultScanner;
65  import org.apache.hadoop.hbase.client.Scan;
66  import org.apache.hadoop.hbase.client.Durability;
67  import org.apache.hadoop.hbase.filter.Filter;
68  import org.apache.hadoop.hbase.filter.ParseFilter;
69  import org.apache.hadoop.hbase.filter.PrefixFilter;
70  import org.apache.hadoop.hbase.filter.WhileMatchFilter;
71  import org.apache.hadoop.hbase.thrift.CallQueue.Call;
72  import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
73  import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
74  import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
75  import org.apache.hadoop.hbase.thrift.generated.Hbase;
76  import org.apache.hadoop.hbase.thrift.generated.IOError;
77  import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
78  import org.apache.hadoop.hbase.thrift.generated.Mutation;
79  import org.apache.hadoop.hbase.thrift.generated.TCell;
80  import org.apache.hadoop.hbase.thrift.generated.TIncrement;
81  import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
82  import org.apache.hadoop.hbase.thrift.generated.TRowResult;
83  import org.apache.hadoop.hbase.thrift.generated.TScan;
84  import org.apache.hadoop.hbase.util.Bytes;
85  import org.apache.thrift.TException;
86  import org.apache.thrift.protocol.TBinaryProtocol;
87  import org.apache.thrift.protocol.TCompactProtocol;
88  import org.apache.thrift.protocol.TProtocolFactory;
89  import org.apache.thrift.server.THsHaServer;
90  import org.apache.thrift.server.TNonblockingServer;
91  import org.apache.thrift.server.TServer;
92  import org.apache.thrift.server.TThreadedSelectorServer;
93  import org.apache.thrift.transport.TFramedTransport;
94  import org.apache.thrift.transport.TNonblockingServerSocket;
95  import org.apache.thrift.transport.TNonblockingServerTransport;
96  import org.apache.thrift.transport.TServerSocket;
97  import org.apache.thrift.transport.TServerTransport;
98  import org.apache.thrift.transport.TTransportFactory;
99  
100 import com.google.common.base.Joiner;
101 import com.google.common.util.concurrent.ThreadFactoryBuilder;
102 
103 /**
104  * ThriftServerRunner - this class starts up a Thrift server which implements
105  * the Hbase API specified in the Hbase.thrift IDL file.
106  */
107 @InterfaceAudience.Private
108 public class ThriftServerRunner implements Runnable {
109 
110   private static final Log LOG = LogFactory.getLog(ThriftServerRunner.class);
111 
112   static final String SERVER_TYPE_CONF_KEY =
113       "hbase.regionserver.thrift.server.type";
114 
115   static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress";
116   static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
117   static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
118   static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
119   static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
120 
121   private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
122   public static final int DEFAULT_LISTEN_PORT = 9090;
123   private final int listenPort;
124 
125   private Configuration conf;
126   volatile TServer tserver;
127   private final Hbase.Iface handler;
128   private final ThriftMetrics metrics;
129 
130   /** An enum of server implementation selections */
131   enum ImplType {
132     HS_HA("hsha", true, THsHaServer.class, true),
133     NONBLOCKING("nonblocking", true, TNonblockingServer.class, true),
134     THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true),
135     THREADED_SELECTOR(
136         "threadedselector", true, TThreadedSelectorServer.class, true);
137 
138     public static final ImplType DEFAULT = THREAD_POOL;
139 
140     final String option;
141     final boolean isAlwaysFramed;
142     final Class<? extends TServer> serverClass;
143     final boolean canSpecifyBindIP;
144 
145     ImplType(String option, boolean isAlwaysFramed,
146         Class<? extends TServer> serverClass, boolean canSpecifyBindIP) {
147       this.option = option;
148       this.isAlwaysFramed = isAlwaysFramed;
149       this.serverClass = serverClass;
150       this.canSpecifyBindIP = canSpecifyBindIP;
151     }
152 
153     /**
154      * @return <code>-option</code> so we can get the list of options from
155      *         {@link #values()}
156      */
157     @Override
158     public String toString() {
159       return "-" + option;
160     }
161 
162     String getDescription() {
163       StringBuilder sb = new StringBuilder("Use the " +
164           serverClass.getSimpleName());
165       if (isAlwaysFramed) {
166         sb.append(" This implies the framed transport.");
167       }
168       if (this == DEFAULT) {
169         sb.append("This is the default.");
170       }
171       return sb.toString();
172     }
173 
174     static OptionGroup createOptionGroup() {
175       OptionGroup group = new OptionGroup();
176       for (ImplType t : values()) {
177         group.addOption(new Option(t.option, t.getDescription()));
178       }
179       return group;
180     }
181 
182     static ImplType getServerImpl(Configuration conf) {
183       String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option);
184       for (ImplType t : values()) {
185         if (confType.equals(t.option)) {
186           return t;
187         }
188       }
189       throw new AssertionError("Unknown server ImplType.option:" + confType);
190     }
191 
192     static void setServerImpl(CommandLine cmd, Configuration conf) {
193       ImplType chosenType = null;
194       int numChosen = 0;
195       for (ImplType t : values()) {
196         if (cmd.hasOption(t.option)) {
197           chosenType = t;
198           ++numChosen;
199         }
200       }
201       if (numChosen < 1) {
202         LOG.info("Using default thrift server type");
203         chosenType = DEFAULT;
204       } else if (numChosen > 1) {
205         throw new AssertionError("Exactly one option out of " +
206           Arrays.toString(values()) + " has to be specified");
207       }
208       LOG.info("Using thrift server type " + chosenType.option);
209       conf.set(SERVER_TYPE_CONF_KEY, chosenType.option);
210     }
211 
212     public String simpleClassName() {
213       return serverClass.getSimpleName();
214     }
215 
216     public static List<String> serversThatCannotSpecifyBindIP() {
217       List<String> l = new ArrayList<String>();
218       for (ImplType t : values()) {
219         if (!t.canSpecifyBindIP) {
220           l.add(t.simpleClassName());
221         }
222       }
223       return l;
224     }
225 
226   }
227 
228   public ThriftServerRunner(Configuration conf) throws IOException {
229     this(conf, new ThriftServerRunner.HBaseHandler(conf));
230   }
231 
232   public ThriftServerRunner(Configuration conf, HBaseHandler handler) {
233     this.conf = HBaseConfiguration.create(conf);
234     this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
235     this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
236     handler.initMetrics(metrics);
237     this.handler = HbaseHandlerMetricsProxy.newInstance(handler, metrics, conf);
238   }
239 
240   /*
241    * Runs the Thrift server
242    */
243   @Override
244   public void run() {
245     try {
246       setupServer();
247       tserver.serve();
248     } catch (Exception e) {
249       LOG.fatal("Cannot run ThriftServer", e);
250       // Crash the process if the ThriftServer is not running
251       System.exit(-1);
252     }
253   }
254 
255   public void shutdown() {
256     if (tserver != null) {
257       tserver.stop();
258       tserver = null;
259     }
260   }
261 
262   /**
263    * Setting up the thrift TServer
264    */
265   private void setupServer() throws Exception {
266     // Construct correct ProtocolFactory
267     TProtocolFactory protocolFactory;
268     if (conf.getBoolean(COMPACT_CONF_KEY, false)) {
269       LOG.debug("Using compact protocol");
270       protocolFactory = new TCompactProtocol.Factory();
271     } else {
272       LOG.debug("Using binary protocol");
273       protocolFactory = new TBinaryProtocol.Factory();
274     }
275 
276     Hbase.Processor<Hbase.Iface> processor =
277         new Hbase.Processor<Hbase.Iface>(handler);
278     ImplType implType = ImplType.getServerImpl(conf);
279 
280     // Construct correct TransportFactory
281     TTransportFactory transportFactory;
282     if (conf.getBoolean(FRAMED_CONF_KEY, false) || implType.isAlwaysFramed) {
283       transportFactory = new TFramedTransport.Factory();
284       LOG.debug("Using framed transport");
285     } else {
286       transportFactory = new TTransportFactory();
287     }
288 
289     if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
290       LOG.error("Server types " + Joiner.on(", ").join(
291           ImplType.serversThatCannotSpecifyBindIP()) + " don't support IP " +
292           "address binding at the moment. See " +
293           "https://issues.apache.org/jira/browse/HBASE-2155 for details.");
294       throw new RuntimeException(
295           "-" + BIND_CONF_KEY + " not supported with " + implType);
296     }
297 
298     if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING ||
299         implType == ImplType.THREADED_SELECTOR) {
300 
301       InetAddress listenAddress = getBindAddress(conf); 
302       TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(
303           new InetSocketAddress(listenAddress, listenPort));
304 
305       if (implType == ImplType.NONBLOCKING) {
306         TNonblockingServer.Args serverArgs =
307             new TNonblockingServer.Args(serverTransport);
308         serverArgs.processor(processor)
309                   .transportFactory(transportFactory)
310                   .protocolFactory(protocolFactory);
311         tserver = new TNonblockingServer(serverArgs);
312       } else if (implType == ImplType.HS_HA) {
313         THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
314         CallQueue callQueue =
315             new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
316         ExecutorService executorService = createExecutor(
317             callQueue, serverArgs.getWorkerThreads());
318         serverArgs.executorService(executorService)
319                   .processor(processor)
320                   .transportFactory(transportFactory)
321                   .protocolFactory(protocolFactory);
322         tserver = new THsHaServer(serverArgs);
323       } else { // THREADED_SELECTOR
324         TThreadedSelectorServer.Args serverArgs =
325             new HThreadedSelectorServerArgs(serverTransport, conf);
326         CallQueue callQueue =
327             new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
328         ExecutorService executorService = createExecutor(
329             callQueue, serverArgs.getWorkerThreads());
330         serverArgs.executorService(executorService)
331                   .processor(processor)
332                   .transportFactory(transportFactory)
333                   .protocolFactory(protocolFactory);
334         tserver = new TThreadedSelectorServer(serverArgs);
335       }
336       LOG.info("starting HBase " + implType.simpleClassName() +
337           " server on " + Integer.toString(listenPort));
338     } else if (implType == ImplType.THREAD_POOL) {
339       // Thread pool server. Get the IP address to bind to.
340       InetAddress listenAddress = getBindAddress(conf);
341 
342       TServerTransport serverTransport = new TServerSocket(
343           new InetSocketAddress(listenAddress, listenPort));
344 
345       TBoundedThreadPoolServer.Args serverArgs =
346           new TBoundedThreadPoolServer.Args(serverTransport, conf);
347       serverArgs.processor(processor)
348                 .transportFactory(transportFactory)
349                 .protocolFactory(protocolFactory);
350       LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
351           + listenAddress + ":" + Integer.toString(listenPort)
352           + "; " + serverArgs);
353       TBoundedThreadPoolServer tserver =
354           new TBoundedThreadPoolServer(serverArgs, metrics);
355       this.tserver = tserver;
356     } else {
357       throw new AssertionError("Unsupported Thrift server implementation: " +
358           implType.simpleClassName());
359     }
360 
361     // A sanity check that we instantiated the right type of server.
362     if (tserver.getClass() != implType.serverClass) {
363       throw new AssertionError("Expected to create Thrift server class " +
364           implType.serverClass.getName() + " but got " +
365           tserver.getClass().getName());
366     }
367 
368    
369 
370     registerFilters(conf);
371   }
372 
373   ExecutorService createExecutor(BlockingQueue<Runnable> callQueue,
374                                  int workerThreads) {
375     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
376     tfb.setDaemon(true);
377     tfb.setNameFormat("thrift-worker-%d");
378     return new ThreadPoolExecutor(workerThreads, workerThreads,
379             Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
380   }
381 
382   private InetAddress getBindAddress(Configuration conf)
383       throws UnknownHostException {
384     String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
385     return InetAddress.getByName(bindAddressStr);
386   }
387 
388   /**
389    * The HBaseHandler is a glue object that connects Thrift RPC calls to the
390    * HBase client API primarily defined in the HBaseAdmin and HTable objects.
391    */
392   public static class HBaseHandler implements Hbase.Iface {
393     protected Configuration conf;
394     protected volatile HBaseAdmin admin = null;
395     protected final Log LOG = LogFactory.getLog(this.getClass().getName());
396 
397     // nextScannerId and scannerMap are used to manage scanner state
398     protected int nextScannerId = 0;
399     protected HashMap<Integer, ResultScanner> scannerMap = null;
400     private ThriftMetrics metrics = null;
401 
402     private static ThreadLocal<Map<String, HTable>> threadLocalTables =
403         new ThreadLocal<Map<String, HTable>>() {
404       @Override
405       protected Map<String, HTable> initialValue() {
406         return new TreeMap<String, HTable>();
407       }
408     };
409 
410     IncrementCoalescer coalescer = null;
411 
412     /**
413      * Returns a list of all the column families for a given htable.
414      *
415      * @param table
416      * @return
417      * @throws IOException
418      */
419     byte[][] getAllColumns(HTable table) throws IOException {
420       HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies();
421       byte[][] columns = new byte[cds.length][];
422       for (int i = 0; i < cds.length; i++) {
423         columns[i] = Bytes.add(cds[i].getName(),
424             KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
425       }
426       return columns;
427     }
428 
429     /**
430      * Creates and returns an HTable instance from a given table name.
431      *
432      * @param tableName
433      *          name of table
434      * @return HTable object
435      * @throws IOException
436      * @throws IOError
437      */
438     public HTable getTable(final byte[] tableName) throws
439         IOException {
440       String table = new String(tableName);
441       Map<String, HTable> tables = threadLocalTables.get();
442       if (!tables.containsKey(table)) {
443         tables.put(table, new HTable(conf, tableName));
444       }
445       return tables.get(table);
446     }
447 
448     public HTable getTable(final ByteBuffer tableName) throws IOException {
449       return getTable(getBytes(tableName));
450     }
451 
452     /**
453      * Assigns a unique ID to the scanner and adds the mapping to an internal
454      * hash-map.
455      *
456      * @param scanner
457      * @return integer scanner id
458      */
459     protected synchronized int addScanner(ResultScanner scanner) {
460       int id = nextScannerId++;
461       scannerMap.put(id, scanner);
462       return id;
463     }
464 
465     /**
466      * Returns the scanner associated with the specified ID.
467      *
468      * @param id
469      * @return a Scanner, or null if ID was invalid.
470      */
471     protected synchronized ResultScanner getScanner(int id) {
472       return scannerMap.get(id);
473     }
474 
475     /**
476      * Removes the scanner associated with the specified ID from the internal
477      * id->scanner hash-map.
478      *
479      * @param id
480      * @return a Scanner, or null if ID was invalid.
481      */
482     protected synchronized ResultScanner removeScanner(int id) {
483       return scannerMap.remove(id);
484     }
485 
486     /**
487      * Constructs an HBaseHandler object.
488      * @throws IOException
489      */
490     protected HBaseHandler()
491     throws IOException {
492       this(HBaseConfiguration.create());
493     }
494 
495     protected HBaseHandler(final Configuration c) throws IOException {
496       this.conf = c;
497       scannerMap = new HashMap<Integer, ResultScanner>();
498       this.coalescer = new IncrementCoalescer(this);
499     }
500 
501     /**
502      * Obtain HBaseAdmin. Creates the instance if it is not already created.
503      */
504     private HBaseAdmin getHBaseAdmin() throws IOException {
505       if (admin == null) {
506         synchronized (this) {
507           if (admin == null) {
508             admin = new HBaseAdmin(conf);
509           }
510         }
511       }
512       return admin;
513     }
514 
515     @Override
516     public void enableTable(ByteBuffer tableName) throws IOError {
517       try{
518         getHBaseAdmin().enableTable(getBytes(tableName));
519       } catch (IOException e) {
520         LOG.warn(e.getMessage(), e);
521         throw new IOError(e.getMessage());
522       }
523     }
524 
525     @Override
526     public void disableTable(ByteBuffer tableName) throws IOError{
527       try{
528         getHBaseAdmin().disableTable(getBytes(tableName));
529       } catch (IOException e) {
530         LOG.warn(e.getMessage(), e);
531         throw new IOError(e.getMessage());
532       }
533     }
534 
535     @Override
536     public boolean isTableEnabled(ByteBuffer tableName) throws IOError {
537       try {
538         return HTable.isTableEnabled(this.conf, getBytes(tableName));
539       } catch (IOException e) {
540         LOG.warn(e.getMessage(), e);
541         throw new IOError(e.getMessage());
542       }
543     }
544 
545     @Override
546     public void compact(ByteBuffer tableNameOrRegionName) throws IOError {
547       try{
548         getHBaseAdmin().compact(getBytes(tableNameOrRegionName));
549       } catch (InterruptedException e) {
550         throw new IOError(e.getMessage());
551       } catch (IOException e) {
552         LOG.warn(e.getMessage(), e);
553         throw new IOError(e.getMessage());
554       }
555     }
556 
557     @Override
558     public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError {
559       try{
560         getHBaseAdmin().majorCompact(getBytes(tableNameOrRegionName));
561       } catch (InterruptedException e) {
562         LOG.warn(e.getMessage(), e);
563         throw new IOError(e.getMessage());
564       } catch (IOException e) {
565         LOG.warn(e.getMessage(), e);
566         throw new IOError(e.getMessage());
567       }
568     }
569 
570     @Override
571     public List<ByteBuffer> getTableNames() throws IOError {
572       try {
573         HTableDescriptor[] tables = this.getHBaseAdmin().listTables();
574         ArrayList<ByteBuffer> list = new ArrayList<ByteBuffer>(tables.length);
575         for (int i = 0; i < tables.length; i++) {
576           list.add(ByteBuffer.wrap(tables[i].getName()));
577         }
578         return list;
579       } catch (IOException e) {
580         LOG.warn(e.getMessage(), e);
581         throw new IOError(e.getMessage());
582       }
583     }
584 
585     /**
586      * @return the list of regions in the given table, or an empty list if the table does not exist
587      */
588     @Override
589     public List<TRegionInfo> getTableRegions(ByteBuffer tableName)
590     throws IOError {
591       try {
592         HTable table;
593         try {
594           table = getTable(tableName);
595         } catch (TableNotFoundException ex) {
596           return new ArrayList<TRegionInfo>();
597         }
598         Map<HRegionInfo, ServerName> regionLocations =
599             table.getRegionLocations();
600         List<TRegionInfo> results = new ArrayList<TRegionInfo>();
601         for (Map.Entry<HRegionInfo, ServerName> entry :
602             regionLocations.entrySet()) {
603           HRegionInfo info = entry.getKey();
604           ServerName serverName = entry.getValue();
605           TRegionInfo region = new TRegionInfo();
606           region.serverName = ByteBuffer.wrap(
607               Bytes.toBytes(serverName.getHostname()));
608           region.port = serverName.getPort();
609           region.startKey = ByteBuffer.wrap(info.getStartKey());
610           region.endKey = ByteBuffer.wrap(info.getEndKey());
611           region.id = info.getRegionId();
612           region.name = ByteBuffer.wrap(info.getRegionName());
613           region.version = info.getVersion();
614           results.add(region);
615         }
616         return results;
617       } catch (TableNotFoundException e) {
618         // Return empty list for non-existing table
619         return Collections.emptyList();
620       } catch (IOException e){
621         LOG.warn(e.getMessage(), e);
622         throw new IOError(e.getMessage());
623       }
624     }
625 
626     @Deprecated
627     @Override
628     public List<TCell> get(
629         ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
630         Map<ByteBuffer, ByteBuffer> attributes)
631         throws IOError {
632       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
633       if(famAndQf.length == 1) {
634         return get(tableName, row, famAndQf[0], new byte[0], attributes);
635       }
636       return get(tableName, row, famAndQf[0], famAndQf[1], attributes);
637     }
638 
639     protected List<TCell> get(ByteBuffer tableName,
640                               ByteBuffer row,
641                               byte[] family,
642                               byte[] qualifier,
643                               Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
644       try {
645         HTable table = getTable(tableName);
646         Get get = new Get(getBytes(row));
647         addAttributes(get, attributes);
648         if (qualifier == null || qualifier.length == 0) {
649           get.addFamily(family);
650         } else {
651           get.addColumn(family, qualifier);
652         }
653         Result result = table.get(get);
654         return ThriftUtilities.cellFromHBase(result.raw());
655       } catch (IOException e) {
656         LOG.warn(e.getMessage(), e);
657         throw new IOError(e.getMessage());
658       }
659     }
660 
661     @Deprecated
662     @Override
663     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row,
664         ByteBuffer column, int numVersions,
665         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
666       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
667       if(famAndQf.length == 1) {
668         return getVer(tableName, row, famAndQf[0],
669             new byte[0], numVersions, attributes);
670       }
671       return getVer(tableName, row,
672           famAndQf[0], famAndQf[1], numVersions, attributes);
673     }
674 
675     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row,
676                               byte[] family,
677         byte[] qualifier, int numVersions,
678         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
679       try {
680         HTable table = getTable(tableName);
681         Get get = new Get(getBytes(row));
682         addAttributes(get, attributes);
683         get.addColumn(family, qualifier);
684         get.setMaxVersions(numVersions);
685         Result result = table.get(get);
686         return ThriftUtilities.cellFromHBase(result.raw());
687       } catch (IOException e) {
688         LOG.warn(e.getMessage(), e);
689         throw new IOError(e.getMessage());
690       }
691     }
692 
693     @Deprecated
694     @Override
695     public List<TCell> getVerTs(ByteBuffer tableName,
696                                    ByteBuffer row,
697         ByteBuffer column,
698         long timestamp,
699         int numVersions,
700         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
701       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
702       if(famAndQf.length == 1) {
703         return getVerTs(tableName, row, famAndQf[0], new byte[0], timestamp,
704             numVersions, attributes);
705       }
706       return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp,
707           numVersions, attributes);
708     }
709 
710     protected List<TCell> getVerTs(ByteBuffer tableName,
711                                    ByteBuffer row, byte [] family,
712         byte [] qualifier, long timestamp, int numVersions,
713         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
714       try {
715         HTable table = getTable(tableName);
716         Get get = new Get(getBytes(row));
717         addAttributes(get, attributes);
718         get.addColumn(family, qualifier);
719         get.setTimeRange(Long.MIN_VALUE, timestamp);
720         get.setMaxVersions(numVersions);
721         Result result = table.get(get);
722         return ThriftUtilities.cellFromHBase(result.raw());
723       } catch (IOException e) {
724         LOG.warn(e.getMessage(), e);
725         throw new IOError(e.getMessage());
726       }
727     }
728 
729     @Override
730     public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row,
731         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
732       return getRowWithColumnsTs(tableName, row, null,
733                                  HConstants.LATEST_TIMESTAMP,
734                                  attributes);
735     }
736 
737     @Override
738     public List<TRowResult> getRowWithColumns(ByteBuffer tableName,
739                                               ByteBuffer row,
740         List<ByteBuffer> columns,
741         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
742       return getRowWithColumnsTs(tableName, row, columns,
743                                  HConstants.LATEST_TIMESTAMP,
744                                  attributes);
745     }
746 
747     @Override
748     public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row,
749         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
750       return getRowWithColumnsTs(tableName, row, null,
751                                  timestamp, attributes);
752     }
753 
754     @Override
755     public List<TRowResult> getRowWithColumnsTs(
756         ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
757         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
758       try {
759         HTable table = getTable(tableName);
760         if (columns == null) {
761           Get get = new Get(getBytes(row));
762           addAttributes(get, attributes);
763           get.setTimeRange(Long.MIN_VALUE, timestamp);
764           Result result = table.get(get);
765           return ThriftUtilities.rowResultFromHBase(result);
766         }
767         Get get = new Get(getBytes(row));
768         addAttributes(get, attributes);
769         for(ByteBuffer column : columns) {
770           byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
771           if (famAndQf.length == 1) {
772               get.addFamily(famAndQf[0]);
773           } else {
774               get.addColumn(famAndQf[0], famAndQf[1]);
775           }
776         }
777         get.setTimeRange(Long.MIN_VALUE, timestamp);
778         Result result = table.get(get);
779         return ThriftUtilities.rowResultFromHBase(result);
780       } catch (IOException e) {
781         LOG.warn(e.getMessage(), e);
782         throw new IOError(e.getMessage());
783       }
784     }
785 
786     @Override
787     public List<TRowResult> getRows(ByteBuffer tableName,
788                                     List<ByteBuffer> rows,
789         Map<ByteBuffer, ByteBuffer> attributes)
790         throws IOError {
791       return getRowsWithColumnsTs(tableName, rows, null,
792                                   HConstants.LATEST_TIMESTAMP,
793                                   attributes);
794     }
795 
796     @Override
797     public List<TRowResult> getRowsWithColumns(ByteBuffer tableName,
798                                                List<ByteBuffer> rows,
799         List<ByteBuffer> columns,
800         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
801       return getRowsWithColumnsTs(tableName, rows, columns,
802                                   HConstants.LATEST_TIMESTAMP,
803                                   attributes);
804     }
805 
806     @Override
807     public List<TRowResult> getRowsTs(ByteBuffer tableName,
808                                       List<ByteBuffer> rows,
809         long timestamp,
810         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
811       return getRowsWithColumnsTs(tableName, rows, null,
812                                   timestamp, attributes);
813     }
814 
815     @Override
816     public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName,
817                                                  List<ByteBuffer> rows,
818         List<ByteBuffer> columns, long timestamp,
819         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
820       try {
821         List<Get> gets = new ArrayList<Get>(rows.size());
822         HTable table = getTable(tableName);
823         if (metrics != null) {
824           metrics.incNumRowKeysInBatchGet(rows.size());
825         }
826         for (ByteBuffer row : rows) {
827           Get get = new Get(getBytes(row));
828           addAttributes(get, attributes);
829           if (columns != null) {
830 
831             for(ByteBuffer column : columns) {
832               byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
833               if (famAndQf.length == 1) {
834                 get.addFamily(famAndQf[0]);
835               } else {
836                 get.addColumn(famAndQf[0], famAndQf[1]);
837               }
838             }
839           }
840           get.setTimeRange(Long.MIN_VALUE, timestamp);
841           gets.add(get);
842         }
843         Result[] result = table.get(gets);
844         return ThriftUtilities.rowResultFromHBase(result);
845       } catch (IOException e) {
846         LOG.warn(e.getMessage(), e);
847         throw new IOError(e.getMessage());
848       }
849     }
850 
851     @Override
852     public void deleteAll(
853         ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
854         Map<ByteBuffer, ByteBuffer> attributes)
855         throws IOError {
856       deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP,
857                   attributes);
858     }
859 
860     @Override
861     public void deleteAllTs(ByteBuffer tableName,
862                             ByteBuffer row,
863                             ByteBuffer column,
864         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
865       try {
866         HTable table = getTable(tableName);
867         Delete delete  = new Delete(getBytes(row));
868         addAttributes(delete, attributes);
869         byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
870         if (famAndQf.length == 1) {
871           delete.deleteFamily(famAndQf[0], timestamp);
872         } else {
873           delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
874         }
875         table.delete(delete);
876 
877       } catch (IOException e) {
878         LOG.warn(e.getMessage(), e);
879         throw new IOError(e.getMessage());
880       }
881     }
882 
883     @Override
884     public void deleteAllRow(
885         ByteBuffer tableName, ByteBuffer row,
886         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
887       deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, attributes);
888     }
889 
890     @Override
891     public void deleteAllRowTs(
892         ByteBuffer tableName, ByteBuffer row, long timestamp,
893         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
894       try {
895         HTable table = getTable(tableName);
896         Delete delete  = new Delete(getBytes(row), timestamp);
897         addAttributes(delete, attributes);
898         table.delete(delete);
899       } catch (IOException e) {
900         LOG.warn(e.getMessage(), e);
901         throw new IOError(e.getMessage());
902       }
903     }
904 
905     @Override
906     public void createTable(ByteBuffer in_tableName,
907         List<ColumnDescriptor> columnFamilies) throws IOError,
908         IllegalArgument, AlreadyExists {
909       byte [] tableName = getBytes(in_tableName);
910       try {
911         if (getHBaseAdmin().tableExists(tableName)) {
912           throw new AlreadyExists("table name already in use");
913         }
914         HTableDescriptor desc = new HTableDescriptor(tableName);
915         for (ColumnDescriptor col : columnFamilies) {
916           HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col);
917           desc.addFamily(colDesc);
918         }
919         getHBaseAdmin().createTable(desc);
920       } catch (IOException e) {
921         LOG.warn(e.getMessage(), e);
922         throw new IOError(e.getMessage());
923       } catch (IllegalArgumentException e) {
924         LOG.warn(e.getMessage(), e);
925         throw new IllegalArgument(e.getMessage());
926       }
927     }
928 
929     @Override
930     public void deleteTable(ByteBuffer in_tableName) throws IOError {
931       byte [] tableName = getBytes(in_tableName);
932       if (LOG.isDebugEnabled()) {
933         LOG.debug("deleteTable: table=" + Bytes.toString(tableName));
934       }
935       try {
936         if (!getHBaseAdmin().tableExists(tableName)) {
937           throw new IOException("table does not exist");
938         }
939         getHBaseAdmin().deleteTable(tableName);
940       } catch (IOException e) {
941         LOG.warn(e.getMessage(), e);
942         throw new IOError(e.getMessage());
943       }
944     }
945 
946     @Override
947     public void mutateRow(ByteBuffer tableName, ByteBuffer row,
948         List<Mutation> mutations, Map<ByteBuffer, ByteBuffer> attributes)
949         throws IOError, IllegalArgument {
950       mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP,
951                   attributes);
952     }
953 
954     @Override
955     public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
956         List<Mutation> mutations, long timestamp,
957         Map<ByteBuffer, ByteBuffer> attributes)
958         throws IOError, IllegalArgument {
959       HTable table = null;
960       try {
961         table = getTable(tableName);
962         Put put = new Put(getBytes(row), timestamp);
963         addAttributes(put, attributes);
964 
965         Delete delete = new Delete(getBytes(row));
966         addAttributes(delete, attributes);
967         if (metrics != null) {
968           metrics.incNumRowKeysInBatchMutate(mutations.size());
969         }
970 
971         // I apologize for all this mess :)
972         for (Mutation m : mutations) {
973           byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
974           if (m.isDelete) {
975             if (famAndQf.length == 1) {
976               delete.deleteFamily(famAndQf[0], timestamp);
977             } else {
978               delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
979             }
980             delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
981                 : Durability.SKIP_WAL);
982           } else {
983             if(famAndQf.length == 1) {
984               put.add(famAndQf[0], HConstants.EMPTY_BYTE_ARRAY,
985                   m.value != null ? getBytes(m.value)
986                       : HConstants.EMPTY_BYTE_ARRAY);
987             } else {
988               put.add(famAndQf[0], famAndQf[1],
989                   m.value != null ? getBytes(m.value)
990                       : HConstants.EMPTY_BYTE_ARRAY);
991             }
992             put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
993           }
994         }
995         if (!delete.isEmpty())
996           table.delete(delete);
997         if (!put.isEmpty())
998           table.put(put);
999       } catch (IOException e) {
1000         LOG.warn(e.getMessage(), e);
1001         throw new IOError(e.getMessage());
1002       } catch (IllegalArgumentException e) {
1003         LOG.warn(e.getMessage(), e);
1004         throw new IllegalArgument(e.getMessage());
1005       }
1006     }
1007 
1008     @Override
1009     public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches,
1010         Map<ByteBuffer, ByteBuffer> attributes)
1011         throws IOError, IllegalArgument, TException {
1012       mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes);
1013     }
1014 
1015     @Override
1016     public void mutateRowsTs(
1017         ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp,
1018         Map<ByteBuffer, ByteBuffer> attributes)
1019         throws IOError, IllegalArgument, TException {
1020       List<Put> puts = new ArrayList<Put>();
1021       List<Delete> deletes = new ArrayList<Delete>();
1022 
1023       for (BatchMutation batch : rowBatches) {
1024         byte[] row = getBytes(batch.row);
1025         List<Mutation> mutations = batch.mutations;
1026         Delete delete = new Delete(row);
1027         addAttributes(delete, attributes);
1028         Put put = new Put(row, timestamp);
1029         addAttributes(put, attributes);
1030         for (Mutation m : mutations) {
1031           byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1032           if (m.isDelete) {
1033             // no qualifier, family only.
1034             if (famAndQf.length == 1) {
1035               delete.deleteFamily(famAndQf[0], timestamp);
1036             } else {
1037               delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1038             }
1039             delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1040                 : Durability.SKIP_WAL);
1041           } else {
1042             if(famAndQf.length == 1) {
1043               put.add(famAndQf[0], HConstants.EMPTY_BYTE_ARRAY,
1044                   m.value != null ? getBytes(m.value)
1045                       : HConstants.EMPTY_BYTE_ARRAY);
1046             } else {
1047               put.add(famAndQf[0], famAndQf[1],
1048                   m.value != null ? getBytes(m.value)
1049                       : HConstants.EMPTY_BYTE_ARRAY);
1050             }
1051             put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1052           }
1053         }
1054         if (!delete.isEmpty())
1055           deletes.add(delete);
1056         if (!put.isEmpty())
1057           puts.add(put);
1058       }
1059 
1060       HTable table = null;
1061       try {
1062         table = getTable(tableName);
1063         if (!puts.isEmpty())
1064           table.put(puts);
1065         if (!deletes.isEmpty())
1066           table.delete(deletes);
1067         
1068       } catch (IOException e) {
1069         LOG.warn(e.getMessage(), e);
1070         throw new IOError(e.getMessage());
1071       } catch (IllegalArgumentException e) {
1072         LOG.warn(e.getMessage(), e);
1073         throw new IllegalArgument(e.getMessage());
1074       }
1075     }
1076 
1077     @Deprecated
1078     @Override
1079     public long atomicIncrement(
1080         ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount)
1081             throws IOError, IllegalArgument, TException {
1082       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1083       if(famAndQf.length == 1) {
1084         return atomicIncrement(tableName, row, famAndQf[0], new byte[0],
1085             amount);
1086       }
1087       return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
1088     }
1089 
1090     protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
1091         byte [] family, byte [] qualifier, long amount)
1092         throws IOError, IllegalArgument, TException {
1093       HTable table;
1094       try {
1095         table = getTable(tableName);
1096         return table.incrementColumnValue(
1097             getBytes(row), family, qualifier, amount);
1098       } catch (IOException e) {
1099         LOG.warn(e.getMessage(), e);
1100         throw new IOError(e.getMessage());
1101       }
1102     }
1103 
1104     public void scannerClose(int id) throws IOError, IllegalArgument {
1105       LOG.debug("scannerClose: id=" + id);
1106       ResultScanner scanner = getScanner(id);
1107       if (scanner == null) {
1108         String message = "scanner ID is invalid";
1109         LOG.warn(message);
1110         throw new IllegalArgument("scanner ID is invalid");
1111       }
1112       scanner.close();
1113       removeScanner(id);
1114     }
1115 
1116     @Override
1117     public List<TRowResult> scannerGetList(int id,int nbRows)
1118         throws IllegalArgument, IOError {
1119       LOG.debug("scannerGetList: id=" + id);
1120       ResultScanner scanner = getScanner(id);
1121       if (null == scanner) {
1122         String message = "scanner ID is invalid";
1123         LOG.warn(message);
1124         throw new IllegalArgument("scanner ID is invalid");
1125       }
1126 
1127       Result [] results = null;
1128       try {
1129         results = scanner.next(nbRows);
1130         if (null == results) {
1131           return new ArrayList<TRowResult>();
1132         }
1133       } catch (IOException e) {
1134         LOG.warn(e.getMessage(), e);
1135         throw new IOError(e.getMessage());
1136       }
1137       return ThriftUtilities.rowResultFromHBase(results);
1138     }
1139 
1140     @Override
1141     public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
1142       return scannerGetList(id,1);
1143     }
1144 
1145     public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
1146         Map<ByteBuffer, ByteBuffer> attributes)
1147         throws IOError {
1148       try {
1149         HTable table = getTable(tableName);
1150         Scan scan = new Scan();
1151         addAttributes(scan, attributes);
1152         if (tScan.isSetStartRow()) {
1153           scan.setStartRow(tScan.getStartRow());
1154         }
1155         if (tScan.isSetStopRow()) {
1156           scan.setStopRow(tScan.getStopRow());
1157         }
1158         if (tScan.isSetTimestamp()) {
1159           scan.setTimeRange(Long.MIN_VALUE, tScan.getTimestamp());
1160         }
1161         if (tScan.isSetCaching()) {
1162           scan.setCaching(tScan.getCaching());
1163         }
1164         if (tScan.isSetBatchSize()) {
1165           scan.setBatch(tScan.getBatchSize());
1166         }
1167         if (tScan.isSetColumns() && tScan.getColumns().size() != 0) {
1168           for(ByteBuffer column : tScan.getColumns()) {
1169             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1170             if(famQf.length == 1) {
1171               scan.addFamily(famQf[0]);
1172             } else {
1173               scan.addColumn(famQf[0], famQf[1]);
1174             }
1175           }
1176         }
1177         if (tScan.isSetFilterString()) {
1178           ParseFilter parseFilter = new ParseFilter();
1179           scan.setFilter(
1180               parseFilter.parseFilterString(tScan.getFilterString()));
1181         }
1182         return addScanner(table.getScanner(scan));
1183       } catch (IOException e) {
1184         LOG.warn(e.getMessage(), e);
1185         throw new IOError(e.getMessage());
1186       }
1187     }
1188 
1189     @Override
1190     public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
1191         List<ByteBuffer> columns,
1192         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1193       try {
1194         HTable table = getTable(tableName);
1195         Scan scan = new Scan(getBytes(startRow));
1196         addAttributes(scan, attributes);
1197         if(columns != null && columns.size() != 0) {
1198           for(ByteBuffer column : columns) {
1199             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1200             if(famQf.length == 1) {
1201               scan.addFamily(famQf[0]);
1202             } else {
1203               scan.addColumn(famQf[0], famQf[1]);
1204             }
1205           }
1206         }
1207         return addScanner(table.getScanner(scan));
1208       } catch (IOException e) {
1209         LOG.warn(e.getMessage(), e);
1210         throw new IOError(e.getMessage());
1211       }
1212     }
1213 
1214     @Override
1215     public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow,
1216         ByteBuffer stopRow, List<ByteBuffer> columns,
1217         Map<ByteBuffer, ByteBuffer> attributes)
1218         throws IOError, TException {
1219       try {
1220         HTable table = getTable(tableName);
1221         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1222         addAttributes(scan, attributes);
1223         if(columns != null && columns.size() != 0) {
1224           for(ByteBuffer column : columns) {
1225             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1226             if(famQf.length == 1) {
1227               scan.addFamily(famQf[0]);
1228             } else {
1229               scan.addColumn(famQf[0], famQf[1]);
1230             }
1231           }
1232         }
1233         return addScanner(table.getScanner(scan));
1234       } catch (IOException e) {
1235         LOG.warn(e.getMessage(), e);
1236         throw new IOError(e.getMessage());
1237       }
1238     }
1239 
1240     @Override
1241     public int scannerOpenWithPrefix(ByteBuffer tableName,
1242                                      ByteBuffer startAndPrefix,
1243                                      List<ByteBuffer> columns,
1244         Map<ByteBuffer, ByteBuffer> attributes)
1245         throws IOError, TException {
1246       try {
1247         HTable table = getTable(tableName);
1248         Scan scan = new Scan(getBytes(startAndPrefix));
1249         addAttributes(scan, attributes);
1250         Filter f = new WhileMatchFilter(
1251             new PrefixFilter(getBytes(startAndPrefix)));
1252         scan.setFilter(f);
1253         if (columns != null && columns.size() != 0) {
1254           for(ByteBuffer column : columns) {
1255             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1256             if(famQf.length == 1) {
1257               scan.addFamily(famQf[0]);
1258             } else {
1259               scan.addColumn(famQf[0], famQf[1]);
1260             }
1261           }
1262         }
1263         return addScanner(table.getScanner(scan));
1264       } catch (IOException e) {
1265         LOG.warn(e.getMessage(), e);
1266         throw new IOError(e.getMessage());
1267       }
1268     }
1269 
1270     @Override
1271     public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
1272         List<ByteBuffer> columns, long timestamp,
1273         Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
1274       try {
1275         HTable table = getTable(tableName);
1276         Scan scan = new Scan(getBytes(startRow));
1277         addAttributes(scan, attributes);
1278         scan.setTimeRange(Long.MIN_VALUE, timestamp);
1279         if (columns != null && columns.size() != 0) {
1280           for (ByteBuffer column : columns) {
1281             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1282             if(famQf.length == 1) {
1283               scan.addFamily(famQf[0]);
1284             } else {
1285               scan.addColumn(famQf[0], famQf[1]);
1286             }
1287           }
1288         }
1289         return addScanner(table.getScanner(scan));
1290       } catch (IOException e) {
1291         LOG.warn(e.getMessage(), e);
1292         throw new IOError(e.getMessage());
1293       }
1294     }
1295 
1296     @Override
1297     public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow,
1298         ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
1299         Map<ByteBuffer, ByteBuffer> attributes)
1300         throws IOError, TException {
1301       try {
1302         HTable table = getTable(tableName);
1303         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1304         addAttributes(scan, attributes);
1305         scan.setTimeRange(Long.MIN_VALUE, timestamp);
1306         if (columns != null && columns.size() != 0) {
1307           for (ByteBuffer column : columns) {
1308             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1309             if(famQf.length == 1) {
1310               scan.addFamily(famQf[0]);
1311             } else {
1312               scan.addColumn(famQf[0], famQf[1]);
1313             }
1314           }
1315         }
1316         scan.setTimeRange(Long.MIN_VALUE, timestamp);
1317         return addScanner(table.getScanner(scan));
1318       } catch (IOException e) {
1319         LOG.warn(e.getMessage(), e);
1320         throw new IOError(e.getMessage());
1321       }
1322     }
1323 
1324     @Override
1325     public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
1326         ByteBuffer tableName) throws IOError, TException {
1327       try {
1328         TreeMap<ByteBuffer, ColumnDescriptor> columns =
1329           new TreeMap<ByteBuffer, ColumnDescriptor>();
1330 
1331         HTable table = getTable(tableName);
1332         HTableDescriptor desc = table.getTableDescriptor();
1333 
1334         for (HColumnDescriptor e : desc.getFamilies()) {
1335           ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
1336           columns.put(col.name, col);
1337         }
1338         return columns;
1339       } catch (IOException e) {
1340         LOG.warn(e.getMessage(), e);
1341         throw new IOError(e.getMessage());
1342       }
1343     }
1344 
1345     @Override
1346     public List<TCell> getRowOrBefore(ByteBuffer tableName, ByteBuffer row,
1347         ByteBuffer family) throws IOError {
1348       try {
1349         HTable table = getTable(getBytes(tableName));
1350         Result result = table.getRowOrBefore(getBytes(row), getBytes(family));
1351         return ThriftUtilities.cellFromHBase(result.raw());
1352       } catch (IOException e) {
1353         LOG.warn(e.getMessage(), e);
1354         throw new IOError(e.getMessage());
1355       }
1356     }
1357 
1358     @Override
1359     public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
1360       try {
1361         HTable table = getTable(HConstants.META_TABLE_NAME);
1362         byte[] row = getBytes(searchRow);
1363         Result startRowResult = table.getRowOrBefore(
1364           row, HConstants.CATALOG_FAMILY);
1365 
1366         if (startRowResult == null) {
1367           throw new IOException("Cannot find row in .META., row="
1368                                 + Bytes.toStringBinary(row));
1369         }
1370 
1371         // find region start and end keys
1372         HRegionInfo regionInfo = HRegionInfo.getHRegionInfo(startRowResult);
1373         if (regionInfo == null) {
1374           throw new IOException("HRegionInfo REGIONINFO was null or " +
1375                                 " empty in Meta for row="
1376                                 + Bytes.toStringBinary(row));
1377         }
1378         TRegionInfo region = new TRegionInfo();
1379         region.setStartKey(regionInfo.getStartKey());
1380         region.setEndKey(regionInfo.getEndKey());
1381         region.id = regionInfo.getRegionId();
1382         region.setName(regionInfo.getRegionName());
1383         region.version = regionInfo.getVersion();
1384 
1385         // find region assignment to server
1386         ServerName serverName = HRegionInfo.getServerName(startRowResult);
1387         if (serverName != null) {
1388           region.setServerName(Bytes.toBytes(serverName.getHostname()));
1389           region.port = serverName.getPort();
1390         }
1391         return region;
1392       } catch (IOException e) {
1393         LOG.warn(e.getMessage(), e);
1394         throw new IOError(e.getMessage());
1395       }
1396     }
1397 
1398     private void initMetrics(ThriftMetrics metrics) {
1399       this.metrics = metrics;
1400     }
1401 
1402     @Override
1403     public void increment(TIncrement tincrement) throws IOError, TException {
1404 
1405       if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) {
1406         throw new TException("Must supply a table and a row key; can't increment");
1407       }
1408 
1409       if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1410         this.coalescer.queueIncrement(tincrement);
1411         return;
1412       }
1413 
1414       try {
1415         HTable table = getTable(tincrement.getTable());
1416         Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
1417         table.increment(inc);
1418       } catch (IOException e) {
1419         LOG.warn(e.getMessage(), e);
1420         throw new IOError(e.getMessage());
1421       }
1422     }
1423 
1424     @Override
1425     public void incrementRows(List<TIncrement> tincrements) throws IOError, TException {
1426       if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1427         this.coalescer.queueIncrements(tincrements);
1428         return;
1429       }
1430       for (TIncrement tinc : tincrements) {
1431         increment(tinc);
1432       }
1433     }
1434   }
1435 
1436 
1437 
1438   /**
1439    * Adds all the attributes into the Operation object
1440    */
1441   private static void addAttributes(OperationWithAttributes op,
1442     Map<ByteBuffer, ByteBuffer> attributes) {
1443     if (attributes == null || attributes.size() == 0) {
1444       return;
1445     }
1446     for (Map.Entry<ByteBuffer, ByteBuffer> entry : attributes.entrySet()) {
1447       String name = Bytes.toStringBinary(getBytes(entry.getKey()));
1448       byte[] value =  getBytes(entry.getValue());
1449       op.setAttribute(name, value);
1450     }
1451   }
1452 
1453   public static void registerFilters(Configuration conf) {
1454     String[] filters = conf.getStrings("hbase.thrift.filters");
1455     if(filters != null) {
1456       for(String filterClass: filters) {
1457         String[] filterPart = filterClass.split(":");
1458         if(filterPart.length != 2) {
1459           LOG.warn("Invalid filter specification " + filterClass + " - skipping");
1460         } else {
1461           ParseFilter.registerFilter(filterPart[0], filterPart[1]);
1462         }
1463       }
1464     }
1465   }
1466 }