View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.thrift;
20  
21  import static org.apache.hadoop.hbase.util.Bytes.getBytes;
22  
23  import java.io.IOException;
24  import java.net.InetAddress;
25  import java.net.InetSocketAddress;
26  import java.net.UnknownHostException;
27  import java.nio.ByteBuffer;
28  import java.util.ArrayList;
29  import java.util.Arrays;
30  import java.util.Collections;
31  import java.util.HashMap;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.TreeMap;
35  import java.util.concurrent.BlockingQueue;
36  import java.util.concurrent.ExecutorService;
37  import java.util.concurrent.LinkedBlockingQueue;
38  import java.util.concurrent.ThreadPoolExecutor;
39  import java.util.concurrent.TimeUnit;
40  
41  import org.apache.commons.cli.CommandLine;
42  import org.apache.commons.cli.Option;
43  import org.apache.commons.cli.OptionGroup;
44  import org.apache.commons.logging.Log;
45  import org.apache.commons.logging.LogFactory;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.hbase.HBaseConfiguration;
48  import org.apache.hadoop.hbase.HColumnDescriptor;
49  import org.apache.hadoop.hbase.HConstants;
50  import org.apache.hadoop.hbase.HRegionInfo;
51  import org.apache.hadoop.hbase.HTableDescriptor;
52  import org.apache.hadoop.hbase.KeyValue;
53  import org.apache.hadoop.hbase.ServerName;
54  import org.apache.hadoop.hbase.TableNotFoundException;
55  import org.apache.hadoop.hbase.client.Delete;
56  import org.apache.hadoop.hbase.client.Get;
57  import org.apache.hadoop.hbase.client.HBaseAdmin;
58  import org.apache.hadoop.hbase.client.HTable;
59  import org.apache.hadoop.hbase.client.Increment;
60  import org.apache.hadoop.hbase.client.OperationWithAttributes;
61  import org.apache.hadoop.hbase.client.Put;
62  import org.apache.hadoop.hbase.client.Result;
63  import org.apache.hadoop.hbase.client.ResultScanner;
64  import org.apache.hadoop.hbase.client.Scan;
65  import org.apache.hadoop.hbase.filter.Filter;
66  import org.apache.hadoop.hbase.filter.ParseFilter;
67  import org.apache.hadoop.hbase.filter.PrefixFilter;
68  import org.apache.hadoop.hbase.filter.WhileMatchFilter;
69  import org.apache.hadoop.hbase.thrift.CallQueue.Call;
70  import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
71  import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
72  import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
73  import org.apache.hadoop.hbase.thrift.generated.Hbase;
74  import org.apache.hadoop.hbase.thrift.generated.IOError;
75  import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
76  import org.apache.hadoop.hbase.thrift.generated.Mutation;
77  import org.apache.hadoop.hbase.thrift.generated.TCell;
78  import org.apache.hadoop.hbase.thrift.generated.TIncrement;
79  import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
80  import org.apache.hadoop.hbase.thrift.generated.TRowResult;
81  import org.apache.hadoop.hbase.thrift.generated.TScan;
82  import org.apache.hadoop.hbase.util.Addressing;
83  import org.apache.hadoop.hbase.util.Bytes;
84  import org.apache.hadoop.hbase.util.Writables;
85  import org.apache.thrift.TException;
86  import org.apache.thrift.protocol.TBinaryProtocol;
87  import org.apache.thrift.protocol.TCompactProtocol;
88  import org.apache.thrift.protocol.TProtocolFactory;
89  import org.apache.thrift.server.THsHaServer;
90  import org.apache.thrift.server.TNonblockingServer;
91  import org.apache.thrift.server.TServer;
92  import org.apache.thrift.server.TThreadedSelectorServer;
93  import org.apache.thrift.transport.TFramedTransport;
94  import org.apache.thrift.transport.TNonblockingServerSocket;
95  import org.apache.thrift.transport.TNonblockingServerTransport;
96  import org.apache.thrift.transport.TServerSocket;
97  import org.apache.thrift.transport.TServerTransport;
98  import org.apache.thrift.transport.TTransportFactory;
99  
100 import com.google.common.base.Joiner;
101 import com.google.common.util.concurrent.ThreadFactoryBuilder;
102 
103 /**
104  * ThriftServerRunner - this class starts up a Thrift server which implements
105  * the Hbase API specified in the Hbase.thrift IDL file.
106  */
107 public class ThriftServerRunner implements Runnable {
108 
109   private static final Log LOG = LogFactory.getLog(ThriftServerRunner.class);
110 
111   static final String SERVER_TYPE_CONF_KEY =
112       "hbase.regionserver.thrift.server.type";
113 
114   static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress";
115   static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
116   static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
117   static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
118   //The max length of an individual thrift message and frames in MB
119   static final String MAX_MESSAGE_LENGTH_CONF_KEY = "hbase.regionserver.thrift.binary.max_message_length_in_mb";
120   static final String MAX_FRAME_SIZE_CONF_KEY = "hbase.regionserver.thrift.framed.max_frame_size_in_mb";
121   static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
122 
123   private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
124   public static final int DEFAULT_LISTEN_PORT = 9090;
125   private final int listenPort;
126 
127   private Configuration conf;
128   volatile TServer tserver;
129   private final Hbase.Iface handler;
130   private final ThriftMetrics metrics;
131 
132   /** An enum of server implementation selections */
133   enum ImplType {
134     HS_HA("hsha", true, THsHaServer.class, false),
135     NONBLOCKING("nonblocking", true, TNonblockingServer.class, false),
136     THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true),
137     THREADED_SELECTOR(
138         "threadedselector", true, TThreadedSelectorServer.class, false);
139 
140     public static final ImplType DEFAULT = THREAD_POOL;
141 
142     final String option;
143     final boolean isAlwaysFramed;
144     final Class<? extends TServer> serverClass;
145     final boolean canSpecifyBindIP;
146 
147     ImplType(String option, boolean isAlwaysFramed,
148         Class<? extends TServer> serverClass, boolean canSpecifyBindIP) {
149       this.option = option;
150       this.isAlwaysFramed = isAlwaysFramed;
151       this.serverClass = serverClass;
152       this.canSpecifyBindIP = canSpecifyBindIP;
153     }
154 
155     /**
156      * @return <code>-option</code> so we can get the list of options from
157      *         {@link #values()}
158      */
159     @Override
160     public String toString() {
161       return "-" + option;
162     }
163 
164     String getDescription() {
165       StringBuilder sb = new StringBuilder("Use the " +
166           serverClass.getSimpleName());
167       if (isAlwaysFramed) {
168         sb.append(" This implies the framed transport.");
169       }
170       if (this == DEFAULT) {
171         sb.append("This is the default.");
172       }
173       return sb.toString();
174     }
175 
176     static OptionGroup createOptionGroup() {
177       OptionGroup group = new OptionGroup();
178       for (ImplType t : values()) {
179         group.addOption(new Option(t.option, t.getDescription()));
180       }
181       return group;
182     }
183 
184     static ImplType getServerImpl(Configuration conf) {
185       String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option);
186       for (ImplType t : values()) {
187         if (confType.equals(t.option)) {
188           return t;
189         }
190       }
191       throw new AssertionError("Unknown server ImplType.option:" + confType);
192     }
193 
194     static void setServerImpl(CommandLine cmd, Configuration conf) {
195       ImplType chosenType = null;
196       int numChosen = 0;
197       for (ImplType t : values()) {
198         if (cmd.hasOption(t.option)) {
199           chosenType = t;
200           ++numChosen;
201         }
202       }
203       if (numChosen < 1) {
204         LOG.info("Using default thrift server type");
205         chosenType = DEFAULT;
206       } else if (numChosen > 1) {
207         throw new AssertionError("Exactly one option out of " +
208           Arrays.toString(values()) + " has to be specified");
209       }
210       LOG.info("Using thrift server type " + chosenType.option);
211       conf.set(SERVER_TYPE_CONF_KEY, chosenType.option);
212     }
213 
214     public String simpleClassName() {
215       return serverClass.getSimpleName();
216     }
217 
218     public static List<String> serversThatCannotSpecifyBindIP() {
219       List<String> l = new ArrayList<String>();
220       for (ImplType t : values()) {
221         if (!t.canSpecifyBindIP) {
222           l.add(t.simpleClassName());
223         }
224       }
225       return l;
226     }
227 
228   }
229 
230   public ThriftServerRunner(Configuration conf) throws IOException {
231     this(conf, new ThriftServerRunner.HBaseHandler(conf));
232   }
233 
234   public ThriftServerRunner(Configuration conf, HBaseHandler handler) {
235     this.conf = HBaseConfiguration.create(conf);
236     this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
237     this.metrics = new ThriftMetrics(listenPort, conf, Hbase.Iface.class);
238     handler.initMetrics(metrics);
239     this.handler = HbaseHandlerMetricsProxy.newInstance(handler, metrics, conf);
240   }
241 
242   /*
243    * Runs the Thrift server
244    */
245   @Override
246   public void run() {
247     try {
248       setupServer();
249       tserver.serve();
250     } catch (Exception e) {
251       LOG.fatal("Cannot run ThriftServer", e);
252       // Crash the process if the ThriftServer is not running
253       System.exit(-1);
254     }
255   }
256 
257   public void shutdown() {
258     if (tserver != null) {
259       tserver.stop();
260       tserver = null;
261     }
262   }
263 
264   /**
265    * Setting up the thrift TServer
266    */
267   private void setupServer() throws Exception {
268     // Construct correct ProtocolFactory
269     TProtocolFactory protocolFactory;
270     if (conf.getBoolean(COMPACT_CONF_KEY, false)) {
271       LOG.debug("Using compact protocol");
272       protocolFactory = new TCompactProtocol.Factory();
273     } else {
274       LOG.debug("Using binary protocol");
275       int maxMessageLength = conf.getInt(MAX_MESSAGE_LENGTH_CONF_KEY, 2) * 1024 * 1024;
276       protocolFactory = new TBinaryProtocol.Factory(false, true, maxMessageLength);
277     }
278 
279     Hbase.Processor<Hbase.Iface> processor =
280         new Hbase.Processor<Hbase.Iface>(handler);
281     ImplType implType = ImplType.getServerImpl(conf);
282 
283     // Construct correct TransportFactory
284     TTransportFactory transportFactory;
285     if (conf.getBoolean(FRAMED_CONF_KEY, false) || implType.isAlwaysFramed) {
286       int maxFrameSize = conf.getInt(MAX_FRAME_SIZE_CONF_KEY, 2)  * 1024 * 1024;
287       transportFactory = new TFramedTransport.Factory(maxFrameSize);
288       LOG.debug("Using framed transport");
289     } else {
290       transportFactory = new TTransportFactory();
291     }
292 
293     if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
294       LOG.error("Server types " + Joiner.on(", ").join(
295           ImplType.serversThatCannotSpecifyBindIP()) + " don't support IP " +
296           "address binding at the moment. See " +
297           "https://issues.apache.org/jira/browse/HBASE-2155 for details.");
298       throw new RuntimeException(
299           "-" + BIND_CONF_KEY + " not supported with " + implType);
300     }
301 
302     if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING ||
303         implType == ImplType.THREADED_SELECTOR) {
304 
305       TNonblockingServerTransport serverTransport =
306           new TNonblockingServerSocket(listenPort);
307 
308       if (implType == ImplType.NONBLOCKING) {
309         TNonblockingServer.Args serverArgs =
310             new TNonblockingServer.Args(serverTransport);
311         serverArgs.processor(processor)
312                   .transportFactory(transportFactory)
313                   .protocolFactory(protocolFactory);
314         tserver = new TNonblockingServer(serverArgs);
315       } else if (implType == ImplType.HS_HA) {
316         THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
317         CallQueue callQueue =
318             new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
319         ExecutorService executorService = createExecutor(
320             callQueue, serverArgs.getWorkerThreads());
321         serverArgs.executorService(executorService)
322                   .processor(processor)
323                   .transportFactory(transportFactory)
324                   .protocolFactory(protocolFactory);
325         tserver = new THsHaServer(serverArgs);
326       } else { // THREADED_SELECTOR
327         TThreadedSelectorServer.Args serverArgs =
328             new HThreadedSelectorServerArgs(serverTransport, conf);
329         CallQueue callQueue =
330             new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
331         ExecutorService executorService = createExecutor(
332             callQueue, serverArgs.getWorkerThreads());
333         serverArgs.executorService(executorService)
334                   .processor(processor)
335                   .transportFactory(transportFactory)
336                   .protocolFactory(protocolFactory);
337         tserver = new TThreadedSelectorServer(serverArgs);
338       }
339       LOG.info("starting HBase " + implType.simpleClassName() +
340           " server on " + Integer.toString(listenPort));
341     } else if (implType == ImplType.THREAD_POOL) {
342       // Thread pool server. Get the IP address to bind to.
343       InetAddress listenAddress = getBindAddress(conf);
344 
345       TServerTransport serverTransport = new TServerSocket(
346           new InetSocketAddress(listenAddress, listenPort));
347 
348       TBoundedThreadPoolServer.Args serverArgs =
349           new TBoundedThreadPoolServer.Args(serverTransport, conf);
350       serverArgs.processor(processor)
351                 .transportFactory(transportFactory)
352                 .protocolFactory(protocolFactory);
353       LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
354           + listenAddress + ":" + Integer.toString(listenPort)
355           + "; " + serverArgs);
356       TBoundedThreadPoolServer tserver =
357           new TBoundedThreadPoolServer(serverArgs, metrics);
358       this.tserver = tserver;
359     } else {
360       throw new AssertionError("Unsupported Thrift server implementation: " +
361           implType.simpleClassName());
362     }
363 
364     // A sanity check that we instantiated the right type of server.
365     if (tserver.getClass() != implType.serverClass) {
366       throw new AssertionError("Expected to create Thrift server class " +
367           implType.serverClass.getName() + " but got " +
368           tserver.getClass().getName());
369     }
370 
371     registerFilters(conf);
372   }
373 
374   ExecutorService createExecutor(BlockingQueue<Runnable> callQueue,
375                                  int workerThreads) {
376     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
377     tfb.setDaemon(true);
378     tfb.setNameFormat("thrift-worker-%d");
379     return new ThreadPoolExecutor(workerThreads, workerThreads,
380             Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
381   }
382 
383   private InetAddress getBindAddress(Configuration conf)
384       throws UnknownHostException {
385     String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
386     return InetAddress.getByName(bindAddressStr);
387   }
388 
389   protected static class ResultScannerWrapper {
390 
391     private final ResultScanner scanner; 
392     private final boolean sortColumns; 
393     public ResultScannerWrapper(ResultScanner resultScanner,
394                                 boolean sortResultColumns) {
395       scanner = resultScanner; 
396       sortColumns = sortResultColumns;  
397    } 
398 
399     public ResultScanner getScanner() {
400       return scanner; 
401     }
402 
403     public boolean isColumnSorted() {
404       return sortColumns; 
405     }
406   }
407 
408   /**
409    * The HBaseHandler is a glue object that connects Thrift RPC calls to the
410    * HBase client API primarily defined in the HBaseAdmin and HTable objects.
411    */
412   public static class HBaseHandler implements Hbase.Iface {
413     protected Configuration conf;
414     protected HBaseAdmin admin = null;
415     protected final Log LOG = LogFactory.getLog(this.getClass().getName());
416 
417     // nextScannerId and scannerMap are used to manage scanner state
418     protected int nextScannerId = 0;
419     protected HashMap<Integer, ResultScannerWrapper> scannerMap = null;
420     private ThriftMetrics metrics = null;
421 
422     private static ThreadLocal<Map<String, HTable>> threadLocalTables =
423         new ThreadLocal<Map<String, HTable>>() {
424       @Override
425       protected Map<String, HTable> initialValue() {
426         return new TreeMap<String, HTable>();
427       }
428     };
429 
430     IncrementCoalescer coalescer = null;
431 
432     /**
433      * Returns a list of all the column families for a given htable.
434      *
435      * @param table
436      * @return
437      * @throws IOException
438      */
439     byte[][] getAllColumns(HTable table) throws IOException {
440       HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies();
441       byte[][] columns = new byte[cds.length][];
442       for (int i = 0; i < cds.length; i++) {
443         columns[i] = Bytes.add(cds[i].getName(),
444             KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
445       }
446       return columns;
447     }
448 
449     /**
450      * Creates and returns an HTable instance from a given table name.
451      *
452      * @param tableName
453      *          name of table
454      * @return HTable object
455      * @throws IOException
456      * @throws IOError
457      */
458     public HTable getTable(final byte[] tableName) throws
459         IOException {
460       String table = new String(tableName);
461       Map<String, HTable> tables = threadLocalTables.get();
462       if (!tables.containsKey(table)) {
463         tables.put(table, new HTable(conf, tableName));
464       }
465       return tables.get(table);
466     }
467 
468     public HTable getTable(final ByteBuffer tableName) throws IOException {
469       return getTable(getBytes(tableName));
470     }
471 
472     /**
473      * Assigns a unique ID to the scanner and adds the mapping to an internal
474      * hash-map.
475      *
476      * @param scanner
477      * @return integer scanner id
478      */
479     protected synchronized int addScanner(ResultScanner scanner,boolean sortColumns) {
480       int id = nextScannerId++;
481       ResultScannerWrapper resultScannerWrapper = new ResultScannerWrapper(scanner, sortColumns);
482       scannerMap.put(id, resultScannerWrapper);
483       return id;
484     }
485 
486     /**
487      * Returns the scanner associated with the specified ID.
488      *
489      * @param id
490      * @return a Scanner, or null if ID was invalid.
491      */
492     protected synchronized ResultScannerWrapper getScanner(int id) {
493       return scannerMap.get(id);
494     }
495 
496     /**
497      * Removes the scanner associated with the specified ID from the internal
498      * id->scanner hash-map.
499      *
500      * @param id
501      * @return a Scanner, or null if ID was invalid.
502      */
503     protected synchronized ResultScannerWrapper removeScanner(int id) {
504       return scannerMap.remove(id);
505     }
506 
507     /**
508      * Constructs an HBaseHandler object.
509      * @throws IOException
510      */
511     protected HBaseHandler()
512     throws IOException {
513       this(HBaseConfiguration.create());
514     }
515 
516     protected HBaseHandler(final Configuration c) throws IOException {
517       this.conf = c;
518       admin = new HBaseAdmin(conf);
519       scannerMap = new HashMap<Integer, ResultScannerWrapper>();
520       this.coalescer = new IncrementCoalescer(this);
521     }
522 
523     @Override
524     public void enableTable(ByteBuffer tableName) throws IOError {
525       try{
526         admin.enableTable(getBytes(tableName));
527       } catch (IOException e) {
528         LOG.warn(e.getMessage(), e);
529         throw new IOError(e.getMessage());
530       }
531     }
532 
533     @Override
534     public void disableTable(ByteBuffer tableName) throws IOError{
535       try{
536         admin.disableTable(getBytes(tableName));
537       } catch (IOException e) {
538         LOG.warn(e.getMessage(), e);
539         throw new IOError(e.getMessage());
540       }
541     }
542 
543     @Override
544     public boolean isTableEnabled(ByteBuffer tableName) throws IOError {
545       try {
546         return HTable.isTableEnabled(this.conf, getBytes(tableName));
547       } catch (IOException e) {
548         LOG.warn(e.getMessage(), e);
549         throw new IOError(e.getMessage());
550       }
551     }
552 
553     @Override
554     public void compact(ByteBuffer tableNameOrRegionName) throws IOError {
555       try{
556         admin.compact(getBytes(tableNameOrRegionName));
557       } catch (InterruptedException e) {
558         throw new IOError(e.getMessage());
559       } catch (IOException e) {
560         LOG.warn(e.getMessage(), e);
561         throw new IOError(e.getMessage());
562       }
563     }
564 
565     @Override
566     public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError {
567       try{
568         admin.majorCompact(getBytes(tableNameOrRegionName));
569       } catch (InterruptedException e) {
570         LOG.warn(e.getMessage(), e);
571         throw new IOError(e.getMessage());
572       } catch (IOException e) {
573         LOG.warn(e.getMessage(), e);
574         throw new IOError(e.getMessage());
575       }
576     }
577 
578     @Override
579     public List<ByteBuffer> getTableNames() throws IOError {
580       try {
581         String[] tableNames = this.admin.getTableNames();
582         ArrayList<ByteBuffer> list = new ArrayList<ByteBuffer>(tableNames.length);
583         for (int i = 0; i < tableNames.length; i++) {
584           list.add(ByteBuffer.wrap(Bytes.toBytes(tableNames[i])));
585         }
586         return list;
587       } catch (IOException e) {
588         LOG.warn(e.getMessage(), e);
589         throw new IOError(e.getMessage());
590       }
591     }
592 
593     @Override
594     public List<TRegionInfo> getTableRegions(ByteBuffer tableName)
595     throws IOError {
596       try {
597         HTable table = getTable(tableName);
598         Map<HRegionInfo, ServerName> regionLocations =
599             table.getRegionLocations();
600         List<TRegionInfo> results = new ArrayList<TRegionInfo>();
601         for (Map.Entry<HRegionInfo, ServerName> entry :
602             regionLocations.entrySet()) {
603           HRegionInfo info = entry.getKey();
604           ServerName serverName = entry.getValue();
605           TRegionInfo region = new TRegionInfo();
606           region.serverName = ByteBuffer.wrap(
607               Bytes.toBytes(serverName.getHostname()));
608           region.port = serverName.getPort();
609           region.startKey = ByteBuffer.wrap(info.getStartKey());
610           region.endKey = ByteBuffer.wrap(info.getEndKey());
611           region.id = info.getRegionId();
612           region.name = ByteBuffer.wrap(info.getRegionName());
613           region.version = info.getVersion();
614           results.add(region);
615         }
616         return results;
617       } catch (TableNotFoundException e) {
618         // Return empty list for non-existing table
619         return Collections.emptyList();
620       } catch (IOException e){
621         LOG.warn(e.getMessage(), e);
622         throw new IOError(e.getMessage());
623       }
624     }
625 
626     /**
627      * Convert ByteBuffer to byte array. Note that this cannot be replaced by
628      * Bytes.toBytes().
629      */
630     public static byte[] toBytes(ByteBuffer bb) {
631       byte[] result = new byte[bb.remaining()];
632       // Make a duplicate so the position doesn't change
633       ByteBuffer dup = bb.duplicate();
634       dup.get(result, 0, result.length);
635       return result;
636     }
637 
638     @Deprecated
639     @Override
640     public List<TCell> get(
641         ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
642         Map<ByteBuffer, ByteBuffer> attributes)
643         throws IOError {
644       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
645       if(famAndQf.length == 1) {
646         return get(tableName, row, famAndQf[0], new byte[0], attributes);
647       }
648       return get(tableName, row, famAndQf[0], famAndQf[1], attributes);
649     }
650 
651     protected List<TCell> get(ByteBuffer tableName,
652                               ByteBuffer row,
653                               byte[] family,
654                               byte[] qualifier,
655                               Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
656       try {
657         HTable table = getTable(tableName);
658         Get get = new Get(getBytes(row));
659         addAttributes(get, attributes);
660         if (qualifier == null || qualifier.length == 0) {
661           get.addFamily(family);
662         } else {
663           get.addColumn(family, qualifier);
664         }
665         Result result = table.get(get);
666         return ThriftUtilities.cellFromHBase(result.raw());
667       } catch (IOException e) {
668         LOG.warn(e.getMessage(), e);
669         throw new IOError(e.getMessage());
670       }
671     }
672 
673     @Deprecated
674     @Override
675     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row,
676         ByteBuffer column, int numVersions,
677         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
678       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
679       if(famAndQf.length == 1) {
680         return getVer(tableName, row, famAndQf[0],
681             new byte[0], numVersions, attributes);
682       }
683       return getVer(tableName, row,
684           famAndQf[0], famAndQf[1], numVersions, attributes);
685     }
686 
687     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row,
688                               byte[] family,
689         byte[] qualifier, int numVersions,
690         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
691       try {
692         HTable table = getTable(tableName);
693         Get get = new Get(getBytes(row));
694         addAttributes(get, attributes);
695         get.addColumn(family, qualifier);
696         get.setMaxVersions(numVersions);
697         Result result = table.get(get);
698         return ThriftUtilities.cellFromHBase(result.raw());
699       } catch (IOException e) {
700         LOG.warn(e.getMessage(), e);
701         throw new IOError(e.getMessage());
702       }
703     }
704 
705     @Deprecated
706     @Override
707     public List<TCell> getVerTs(ByteBuffer tableName,
708                                    ByteBuffer row,
709         ByteBuffer column,
710         long timestamp,
711         int numVersions,
712         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
713       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
714       if(famAndQf.length == 1) {
715         return getVerTs(tableName, row, famAndQf[0], new byte[0], timestamp,
716             numVersions, attributes);
717       }
718       return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp,
719           numVersions, attributes);
720     }
721 
722     protected List<TCell> getVerTs(ByteBuffer tableName,
723                                    ByteBuffer row, byte [] family,
724         byte [] qualifier, long timestamp, int numVersions,
725         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
726       try {
727         HTable table = getTable(tableName);
728         Get get = new Get(getBytes(row));
729         addAttributes(get, attributes);
730         get.addColumn(family, qualifier);
731         get.setTimeRange(Long.MIN_VALUE, timestamp);
732         get.setMaxVersions(numVersions);
733         Result result = table.get(get);
734         return ThriftUtilities.cellFromHBase(result.raw());
735       } catch (IOException e) {
736         LOG.warn(e.getMessage(), e);
737         throw new IOError(e.getMessage());
738       }
739     }
740 
741     @Override
742     public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row,
743         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
744       return getRowWithColumnsTs(tableName, row, null,
745                                  HConstants.LATEST_TIMESTAMP,
746                                  attributes);
747     }
748 
749     @Override
750     public List<TRowResult> getRowWithColumns(ByteBuffer tableName,
751                                               ByteBuffer row,
752         List<ByteBuffer> columns,
753         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
754       return getRowWithColumnsTs(tableName, row, columns,
755                                  HConstants.LATEST_TIMESTAMP,
756                                  attributes);
757     }
758 
759     @Override
760     public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row,
761         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
762       return getRowWithColumnsTs(tableName, row, null,
763                                  timestamp, attributes);
764     }
765 
766     @Override
767     public List<TRowResult> getRowWithColumnsTs(
768         ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
769         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
770       try {
771         HTable table = getTable(tableName);
772         if (columns == null) {
773           Get get = new Get(getBytes(row));
774           addAttributes(get, attributes);
775           get.setTimeRange(Long.MIN_VALUE, timestamp);
776           Result result = table.get(get);
777           return ThriftUtilities.rowResultFromHBase(result);
778         }
779         Get get = new Get(getBytes(row));
780         addAttributes(get, attributes);
781         for(ByteBuffer column : columns) {
782           byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
783           if (famAndQf.length == 1) {
784               get.addFamily(famAndQf[0]);
785           } else {
786               get.addColumn(famAndQf[0], famAndQf[1]);
787           }
788         }
789         get.setTimeRange(Long.MIN_VALUE, timestamp);
790         Result result = table.get(get);
791         return ThriftUtilities.rowResultFromHBase(result);
792       } catch (IOException e) {
793         LOG.warn(e.getMessage(), e);
794         throw new IOError(e.getMessage());
795       }
796     }
797 
798     @Override
799     public List<TRowResult> getRows(ByteBuffer tableName,
800                                     List<ByteBuffer> rows,
801         Map<ByteBuffer, ByteBuffer> attributes)
802         throws IOError {
803       return getRowsWithColumnsTs(tableName, rows, null,
804                                   HConstants.LATEST_TIMESTAMP,
805                                   attributes);
806     }
807 
808     @Override
809     public List<TRowResult> getRowsWithColumns(ByteBuffer tableName,
810                                                List<ByteBuffer> rows,
811         List<ByteBuffer> columns,
812         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
813       return getRowsWithColumnsTs(tableName, rows, columns,
814                                   HConstants.LATEST_TIMESTAMP,
815                                   attributes);
816     }
817 
818     @Override
819     public List<TRowResult> getRowsTs(ByteBuffer tableName,
820                                       List<ByteBuffer> rows,
821         long timestamp,
822         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
823       return getRowsWithColumnsTs(tableName, rows, null,
824                                   timestamp, attributes);
825     }
826 
827     @Override
828     public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName,
829                                                  List<ByteBuffer> rows,
830         List<ByteBuffer> columns, long timestamp,
831         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
832       try {
833         List<Get> gets = new ArrayList<Get>(rows.size());
834         HTable table = getTable(tableName);
835         if (metrics != null) {
836           metrics.incNumRowKeysInBatchGet(rows.size());
837         }
838         for (ByteBuffer row : rows) {
839           Get get = new Get(getBytes(row));
840           addAttributes(get, attributes);
841           if (columns != null) {
842 
843             for(ByteBuffer column : columns) {
844               byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
845               if (famAndQf.length == 1) {
846                 get.addFamily(famAndQf[0]);
847               } else {
848                 get.addColumn(famAndQf[0], famAndQf[1]);
849               }
850             }
851           }
852           get.setTimeRange(Long.MIN_VALUE, timestamp);
853           gets.add(get);
854         }
855         Result[] result = table.get(gets);
856         return ThriftUtilities.rowResultFromHBase(result);
857       } catch (IOException e) {
858         LOG.warn(e.getMessage(), e);
859         throw new IOError(e.getMessage());
860       }
861     }
862 
863     @Override
864     public void deleteAll(
865         ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
866         Map<ByteBuffer, ByteBuffer> attributes)
867         throws IOError {
868       deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP,
869                   attributes);
870     }
871 
872     @Override
873     public void deleteAllTs(ByteBuffer tableName,
874                             ByteBuffer row,
875                             ByteBuffer column,
876         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
877       try {
878         HTable table = getTable(tableName);
879         Delete delete  = new Delete(getBytes(row));
880         addAttributes(delete, attributes);
881         byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
882         if (famAndQf.length == 1) {
883           delete.deleteFamily(famAndQf[0], timestamp);
884         } else {
885           delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
886         }
887         table.delete(delete);
888 
889       } catch (IOException e) {
890         LOG.warn(e.getMessage(), e);
891         throw new IOError(e.getMessage());
892       }
893     }
894 
895     @Override
896     public void deleteAllRow(
897         ByteBuffer tableName, ByteBuffer row,
898         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
899       deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, attributes);
900     }
901 
902     @Override
903     public void deleteAllRowTs(
904         ByteBuffer tableName, ByteBuffer row, long timestamp,
905         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
906       try {
907         HTable table = getTable(tableName);
908         Delete delete  = new Delete(getBytes(row), timestamp, null);
909         addAttributes(delete, attributes);
910         table.delete(delete);
911       } catch (IOException e) {
912         LOG.warn(e.getMessage(), e);
913         throw new IOError(e.getMessage());
914       }
915     }
916 
917     @Override
918     public void createTable(ByteBuffer in_tableName,
919         List<ColumnDescriptor> columnFamilies) throws IOError,
920         IllegalArgument, AlreadyExists {
921       byte [] tableName = getBytes(in_tableName);
922       try {
923         if (admin.tableExists(tableName)) {
924           throw new AlreadyExists("table name already in use");
925         }
926         HTableDescriptor desc = new HTableDescriptor(tableName);
927         for (ColumnDescriptor col : columnFamilies) {
928           HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col);
929           desc.addFamily(colDesc);
930         }
931         admin.createTable(desc);
932       } catch (IOException e) {
933         LOG.warn(e.getMessage(), e);
934         throw new IOError(e.getMessage());
935       } catch (IllegalArgumentException e) {
936         LOG.warn(e.getMessage(), e);
937         throw new IllegalArgument(e.getMessage());
938       }
939     }
940 
941     @Override
942     public void deleteTable(ByteBuffer in_tableName) throws IOError {
943       byte [] tableName = getBytes(in_tableName);
944       if (LOG.isDebugEnabled()) {
945         LOG.debug("deleteTable: table=" + Bytes.toString(tableName));
946       }
947       try {
948         if (!admin.tableExists(tableName)) {
949           throw new IOException("table does not exist");
950         }
951         admin.deleteTable(tableName);
952       } catch (IOException e) {
953         LOG.warn(e.getMessage(), e);
954         throw new IOError(e.getMessage());
955       }
956     }
957 
958     @Override
959     public void mutateRow(ByteBuffer tableName, ByteBuffer row,
960         List<Mutation> mutations, Map<ByteBuffer, ByteBuffer> attributes)
961         throws IOError, IllegalArgument {
962       mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP,
963                   attributes);
964     }
965 
966     @Override
967     public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
968         List<Mutation> mutations, long timestamp,
969         Map<ByteBuffer, ByteBuffer> attributes)
970         throws IOError, IllegalArgument {
971       HTable table = null;
972       try {
973         table = getTable(tableName);
974         Put put = new Put(getBytes(row), timestamp, null);
975         addAttributes(put, attributes);
976 
977         Delete delete = new Delete(getBytes(row));
978         addAttributes(delete, attributes);
979         if (metrics != null) {
980           metrics.incNumRowKeysInBatchMutate(mutations.size());
981         }
982 
983         // I apologize for all this mess :)
984         for (Mutation m : mutations) {
985           byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
986           if (m.isDelete) {
987             if (famAndQf.length == 1) {
988               delete.deleteFamily(famAndQf[0], timestamp);
989             } else {
990               delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
991             }
992             delete.setWriteToWAL(m.writeToWAL);
993           } else {
994             if(famAndQf.length == 1) {
995               put.add(famAndQf[0], HConstants.EMPTY_BYTE_ARRAY,
996                   m.value != null ? getBytes(m.value)
997                       : HConstants.EMPTY_BYTE_ARRAY);
998             } else {
999               put.add(famAndQf[0], famAndQf[1],
1000                   m.value != null ? getBytes(m.value)
1001                       : HConstants.EMPTY_BYTE_ARRAY);
1002             }
1003             put.setWriteToWAL(m.writeToWAL);
1004           }
1005         }
1006         if (!delete.isEmpty())
1007           table.delete(delete);
1008         if (!put.isEmpty())
1009           table.put(put);
1010       } catch (IOException e) {
1011         LOG.warn(e.getMessage(), e);
1012         throw new IOError(e.getMessage());
1013       } catch (IllegalArgumentException e) {
1014         LOG.warn(e.getMessage(), e);
1015         throw new IllegalArgument(e.getMessage());
1016       }
1017     }
1018 
1019     @Override
1020     public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches,
1021         Map<ByteBuffer, ByteBuffer> attributes)
1022         throws IOError, IllegalArgument, TException {
1023       mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes);
1024     }
1025 
1026     @Override
1027     public void mutateRowsTs(
1028         ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp,
1029         Map<ByteBuffer, ByteBuffer> attributes)
1030         throws IOError, IllegalArgument, TException {
1031       List<Put> puts = new ArrayList<Put>();
1032       List<Delete> deletes = new ArrayList<Delete>();
1033 
1034       for (BatchMutation batch : rowBatches) {
1035         byte[] row = getBytes(batch.row);
1036         List<Mutation> mutations = batch.mutations;
1037         Delete delete = new Delete(row);
1038         addAttributes(delete, attributes);
1039         Put put = new Put(row, timestamp, null);
1040         addAttributes(put, attributes);
1041         for (Mutation m : mutations) {
1042           byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1043           if (m.isDelete) {
1044             // no qualifier, family only.
1045             if (famAndQf.length == 1) {
1046               delete.deleteFamily(famAndQf[0], timestamp);
1047             } else {
1048               delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1049             }
1050             delete.setWriteToWAL(m.writeToWAL);
1051           } else {
1052             if(famAndQf.length == 1) {
1053               put.add(famAndQf[0], HConstants.EMPTY_BYTE_ARRAY,
1054                   m.value != null ? getBytes(m.value)
1055                       : HConstants.EMPTY_BYTE_ARRAY);
1056             } else {
1057               put.add(famAndQf[0], famAndQf[1],
1058                   m.value != null ? getBytes(m.value)
1059                       : HConstants.EMPTY_BYTE_ARRAY);
1060             }
1061             put.setWriteToWAL(m.writeToWAL);
1062           }
1063         }
1064         if (!delete.isEmpty())
1065           deletes.add(delete);
1066         if (!put.isEmpty())
1067           puts.add(put);
1068       }
1069 
1070       HTable table = null;
1071       try {
1072         table = getTable(tableName);
1073         if (!puts.isEmpty())
1074           table.put(puts);
1075         if (!deletes.isEmpty()) table.delete(deletes);
1076       } catch (IOException e) {
1077         LOG.warn(e.getMessage(), e);
1078         throw new IOError(e.getMessage());
1079       } catch (IllegalArgumentException e) {
1080         LOG.warn(e.getMessage(), e);
1081         throw new IllegalArgument(e.getMessage());
1082       }
1083     }
1084 
1085     @Deprecated
1086     @Override
1087     public long atomicIncrement(
1088         ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount)
1089             throws IOError, IllegalArgument, TException {
1090       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1091       if(famAndQf.length == 1) {
1092         return atomicIncrement(tableName, row, famAndQf[0], new byte[0],
1093             amount);
1094       }
1095       return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
1096     }
1097 
1098     protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
1099         byte [] family, byte [] qualifier, long amount)
1100         throws IOError, IllegalArgument, TException {
1101       HTable table;
1102       try {
1103         table = getTable(tableName);
1104         return table.incrementColumnValue(
1105             getBytes(row), family, qualifier, amount);
1106       } catch (IOException e) {
1107         LOG.warn(e.getMessage(), e);
1108         throw new IOError(e.getMessage());
1109       }
1110     }
1111 
1112     public void scannerClose(int id) throws IOError, IllegalArgument {
1113       LOG.debug("scannerClose: id=" + id);
1114       ResultScannerWrapper resultScannerWrapper = getScanner(id);
1115       if (resultScannerWrapper == null) {
1116         String message = "scanner ID is invalid";
1117         LOG.warn(message);
1118         throw new IllegalArgument("scanner ID is invalid");
1119       }
1120       resultScannerWrapper.getScanner().close();
1121       removeScanner(id);
1122     }
1123 
1124     @Override
1125     public List<TRowResult> scannerGetList(int id,int nbRows)
1126         throws IllegalArgument, IOError {
1127       LOG.debug("scannerGetList: id=" + id);
1128       ResultScannerWrapper resultScannerWrapper = getScanner(id);
1129       if (null == resultScannerWrapper) {
1130         String message = "scanner ID is invalid";
1131         LOG.warn(message);
1132         throw new IllegalArgument("scanner ID is invalid");
1133       }
1134 
1135       Result [] results = null;
1136       try {
1137         results = resultScannerWrapper.getScanner().next(nbRows);
1138         if (null == results) {
1139           return new ArrayList<TRowResult>();
1140         }
1141       } catch (IOException e) {
1142         LOG.warn(e.getMessage(), e);
1143         throw new IOError(e.getMessage());
1144       }
1145       return ThriftUtilities.rowResultFromHBase(results, resultScannerWrapper.isColumnSorted());
1146     }
1147 
1148     @Override
1149     public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
1150       return scannerGetList(id,1);
1151     }
1152 
1153     public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
1154         Map<ByteBuffer, ByteBuffer> attributes)
1155         throws IOError {
1156       try {
1157         HTable table = getTable(tableName);
1158         Scan scan = new Scan();
1159         addAttributes(scan, attributes);
1160         if (tScan.isSetStartRow()) {
1161           scan.setStartRow(tScan.getStartRow());
1162         }
1163         if (tScan.isSetStopRow()) {
1164           scan.setStopRow(tScan.getStopRow());
1165         }
1166         if (tScan.isSetTimestamp()) {
1167           scan.setTimeRange(Long.MIN_VALUE, tScan.getTimestamp());
1168         }
1169         if (tScan.isSetCaching()) {
1170           scan.setCaching(tScan.getCaching());
1171         }
1172         if (tScan.isSetColumns() && tScan.getColumns().size() != 0) {
1173           for(ByteBuffer column : tScan.getColumns()) {
1174             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1175             if(famQf.length == 1) {
1176               scan.addFamily(famQf[0]);
1177             } else {
1178               scan.addColumn(famQf[0], famQf[1]);
1179             }
1180           }
1181         }
1182         if (tScan.isSetFilterString()) {
1183           ParseFilter parseFilter = new ParseFilter();
1184           scan.setFilter(
1185               parseFilter.parseFilterString(tScan.getFilterString()));
1186         }
1187         return addScanner(table.getScanner(scan), tScan.sortColumns);
1188       } catch (IOException e) {
1189         LOG.warn(e.getMessage(), e);
1190         throw new IOError(e.getMessage());
1191       }
1192     }
1193 
1194     @Override
1195     public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
1196         List<ByteBuffer> columns,
1197         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1198       try {
1199         HTable table = getTable(tableName);
1200         Scan scan = new Scan(getBytes(startRow));
1201         addAttributes(scan, attributes);
1202         if(columns != null && columns.size() != 0) {
1203           for(ByteBuffer column : columns) {
1204             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1205             if(famQf.length == 1) {
1206               scan.addFamily(famQf[0]);
1207             } else {
1208               scan.addColumn(famQf[0], famQf[1]);
1209             }
1210           }
1211         }
1212         return addScanner(table.getScanner(scan), false);
1213       } catch (IOException e) {
1214         LOG.warn(e.getMessage(), e);
1215         throw new IOError(e.getMessage());
1216       }
1217     }
1218 
1219     @Override
1220     public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow,
1221         ByteBuffer stopRow, List<ByteBuffer> columns,
1222         Map<ByteBuffer, ByteBuffer> attributes)
1223         throws IOError, TException {
1224       try {
1225         HTable table = getTable(tableName);
1226         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1227         addAttributes(scan, attributes);
1228         if(columns != null && columns.size() != 0) {
1229           for(ByteBuffer column : columns) {
1230             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1231             if(famQf.length == 1) {
1232               scan.addFamily(famQf[0]);
1233             } else {
1234               scan.addColumn(famQf[0], famQf[1]);
1235             }
1236           }
1237         }
1238         return addScanner(table.getScanner(scan), false);
1239       } catch (IOException e) {
1240         LOG.warn(e.getMessage(), e);
1241         throw new IOError(e.getMessage());
1242       }
1243     }
1244 
1245     @Override
1246     public int scannerOpenWithPrefix(ByteBuffer tableName,
1247                                      ByteBuffer startAndPrefix,
1248                                      List<ByteBuffer> columns,
1249         Map<ByteBuffer, ByteBuffer> attributes)
1250         throws IOError, TException {
1251       try {
1252         HTable table = getTable(tableName);
1253         Scan scan = new Scan(getBytes(startAndPrefix));
1254         addAttributes(scan, attributes);
1255         Filter f = new WhileMatchFilter(
1256             new PrefixFilter(getBytes(startAndPrefix)));
1257         scan.setFilter(f);
1258         if (columns != null && columns.size() != 0) {
1259           for(ByteBuffer column : columns) {
1260             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1261             if(famQf.length == 1) {
1262               scan.addFamily(famQf[0]);
1263             } else {
1264               scan.addColumn(famQf[0], famQf[1]);
1265             }
1266           }
1267         }
1268         return addScanner(table.getScanner(scan), false);
1269       } catch (IOException e) {
1270         LOG.warn(e.getMessage(), e);
1271         throw new IOError(e.getMessage());
1272       }
1273     }
1274 
1275     @Override
1276     public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
1277         List<ByteBuffer> columns, long timestamp,
1278         Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
1279       try {
1280         HTable table = getTable(tableName);
1281         Scan scan = new Scan(getBytes(startRow));
1282         addAttributes(scan, attributes);
1283         scan.setTimeRange(Long.MIN_VALUE, timestamp);
1284         if (columns != null && columns.size() != 0) {
1285           for (ByteBuffer column : columns) {
1286             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1287             if(famQf.length == 1) {
1288               scan.addFamily(famQf[0]);
1289             } else {
1290               scan.addColumn(famQf[0], famQf[1]);
1291             }
1292           }
1293         }
1294         return addScanner(table.getScanner(scan), false);
1295       } catch (IOException e) {
1296         LOG.warn(e.getMessage(), e);
1297         throw new IOError(e.getMessage());
1298       }
1299     }
1300 
1301     @Override
1302     public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow,
1303         ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
1304         Map<ByteBuffer, ByteBuffer> attributes)
1305         throws IOError, TException {
1306       try {
1307         HTable table = getTable(tableName);
1308         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1309         addAttributes(scan, attributes);
1310         scan.setTimeRange(Long.MIN_VALUE, timestamp);
1311         if (columns != null && columns.size() != 0) {
1312           for (ByteBuffer column : columns) {
1313             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1314             if(famQf.length == 1) {
1315               scan.addFamily(famQf[0]);
1316             } else {
1317               scan.addColumn(famQf[0], famQf[1]);
1318             }
1319           }
1320         }
1321         scan.setTimeRange(Long.MIN_VALUE, timestamp);
1322         return addScanner(table.getScanner(scan), false);
1323       } catch (IOException e) {
1324         LOG.warn(e.getMessage(), e);
1325         throw new IOError(e.getMessage());
1326       }
1327     }
1328 
1329     @Override
1330     public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
1331         ByteBuffer tableName) throws IOError, TException {
1332       try {
1333         TreeMap<ByteBuffer, ColumnDescriptor> columns =
1334           new TreeMap<ByteBuffer, ColumnDescriptor>();
1335 
1336         HTable table = getTable(tableName);
1337         HTableDescriptor desc = table.getTableDescriptor();
1338 
1339         for (HColumnDescriptor e : desc.getFamilies()) {
1340           ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
1341           columns.put(col.name, col);
1342         }
1343         return columns;
1344       } catch (IOException e) {
1345         LOG.warn(e.getMessage(), e);
1346         throw new IOError(e.getMessage());
1347       }
1348     }
1349 
1350     @Override
1351     public List<TCell> getRowOrBefore(ByteBuffer tableName, ByteBuffer row,
1352         ByteBuffer family) throws IOError {
1353       try {
1354         HTable table = getTable(getBytes(tableName));
1355         Result result = table.getRowOrBefore(getBytes(row), getBytes(family));
1356         return ThriftUtilities.cellFromHBase(result.raw());
1357       } catch (IOException e) {
1358         LOG.warn(e.getMessage(), e);
1359         throw new IOError(e.getMessage());
1360       }
1361     }
1362 
1363     @Override
1364     public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
1365       try {
1366         HTable table = getTable(HConstants.META_TABLE_NAME);
1367         byte[] row = toBytes(searchRow);
1368         Result startRowResult = table.getRowOrBefore(
1369           row, HConstants.CATALOG_FAMILY);
1370 
1371         if (startRowResult == null) {
1372           throw new IOException("Cannot find row in .META., row="
1373                                 + Bytes.toString(searchRow.array()));
1374         }
1375 
1376         // find region start and end keys
1377         byte[] value = startRowResult.getValue(HConstants.CATALOG_FAMILY,
1378                                                HConstants.REGIONINFO_QUALIFIER);
1379         if (value == null || value.length == 0) {
1380           throw new IOException("HRegionInfo REGIONINFO was null or " +
1381                                 " empty in Meta for row="
1382                                 + Bytes.toString(row));
1383         }
1384         HRegionInfo regionInfo = Writables.getHRegionInfo(value);
1385         TRegionInfo region = new TRegionInfo();
1386         region.setStartKey(regionInfo.getStartKey());
1387         region.setEndKey(regionInfo.getEndKey());
1388         region.id = regionInfo.getRegionId();
1389         region.setName(regionInfo.getRegionName());
1390         region.version = regionInfo.getVersion();
1391 
1392         // find region assignment to server
1393         value = startRowResult.getValue(HConstants.CATALOG_FAMILY,
1394                                         HConstants.SERVER_QUALIFIER);
1395         if (value != null && value.length > 0) {
1396           String hostAndPort = Bytes.toString(value);
1397           region.setServerName(Bytes.toBytes(
1398               Addressing.parseHostname(hostAndPort)));
1399           region.port = Addressing.parsePort(hostAndPort);
1400         }
1401         return region;
1402       } catch (IOException e) {
1403         LOG.warn(e.getMessage(), e);
1404         throw new IOError(e.getMessage());
1405       }
1406     }
1407 
1408     private void initMetrics(ThriftMetrics metrics) {
1409       this.metrics = metrics;
1410     }
1411 
1412     @Override
1413     public void increment(TIncrement tincrement) throws IOError, TException {
1414 
1415       if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) {
1416         throw new TException("Must supply a table and a row key; can't increment");
1417       }
1418 
1419       if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1420         this.coalescer.queueIncrement(tincrement);
1421         return;
1422       }
1423 
1424       try {
1425         HTable table = getTable(tincrement.getTable());
1426         Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
1427         table.increment(inc);
1428       } catch (IOException e) {
1429         LOG.warn(e.getMessage(), e);
1430         throw new IOError(e.getMessage());
1431       }
1432     }
1433 
1434     @Override
1435     public void incrementRows(List<TIncrement> tincrements) throws IOError, TException {
1436       if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1437         this.coalescer.queueIncrements(tincrements);
1438         return;
1439       }
1440       for (TIncrement tinc : tincrements) {
1441         increment(tinc);
1442       }
1443     }
1444   }
1445 
1446 
1447 
1448   /**
1449    * Adds all the attributes into the Operation object
1450    */
1451   private static void addAttributes(OperationWithAttributes op,
1452     Map<ByteBuffer, ByteBuffer> attributes) {
1453     if (attributes == null || attributes.size() == 0) {
1454       return;
1455     }
1456     for (Map.Entry<ByteBuffer, ByteBuffer> entry : attributes.entrySet()) {
1457       String name = Bytes.toStringBinary(getBytes(entry.getKey()));
1458       byte[] value =  getBytes(entry.getValue());
1459       op.setAttribute(name, value);
1460     }
1461   }
1462 
1463   public static void registerFilters(Configuration conf) {
1464     String[] filters = conf.getStrings("hbase.thrift.filters");
1465     if(filters != null) {
1466       for(String filterClass: filters) {
1467         String[] filterPart = filterClass.split(":");
1468         if(filterPart.length != 2) {
1469           LOG.warn("Invalid filter specification " + filterClass + " - skipping");
1470         } else {
1471           ParseFilter.registerFilter(filterPart[0], filterPart[1]);
1472         }
1473       }
1474     }
1475   }
1476 }