View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.thrift;
20  
21  import static org.apache.hadoop.hbase.util.Bytes.getBytes;
22  
23  import java.io.IOException;
24  import java.net.InetAddress;
25  import java.net.InetSocketAddress;
26  import java.net.UnknownHostException;
27  import java.nio.ByteBuffer;
28  import java.security.PrivilegedAction;
29  import java.util.ArrayList;
30  import java.util.Arrays;
31  import java.util.Collections;
32  import java.util.HashMap;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.TreeMap;
36  import java.util.concurrent.BlockingQueue;
37  import java.util.concurrent.ExecutorService;
38  import java.util.concurrent.LinkedBlockingQueue;
39  import java.util.concurrent.ThreadPoolExecutor;
40  import java.util.concurrent.TimeUnit;
41  
42  import javax.security.auth.callback.Callback;
43  import javax.security.auth.callback.UnsupportedCallbackException;
44  import javax.security.sasl.AuthorizeCallback;
45  import javax.security.sasl.Sasl;
46  import javax.security.sasl.SaslServer;
47  
48  import org.apache.commons.cli.CommandLine;
49  import org.apache.commons.cli.Option;
50  import org.apache.commons.cli.OptionGroup;
51  import org.apache.commons.logging.Log;
52  import org.apache.commons.logging.LogFactory;
53  import org.apache.hadoop.classification.InterfaceAudience;
54  import org.apache.hadoop.conf.Configuration;
55  import org.apache.hadoop.hbase.HBaseConfiguration;
56  import org.apache.hadoop.hbase.HColumnDescriptor;
57  import org.apache.hadoop.hbase.HConstants;
58  import org.apache.hadoop.hbase.HRegionInfo;
59  import org.apache.hadoop.hbase.HTableDescriptor;
60  import org.apache.hadoop.hbase.KeyValue;
61  import org.apache.hadoop.hbase.ServerName;
62  import org.apache.hadoop.hbase.TableName;
63  import org.apache.hadoop.hbase.TableNotFoundException;
64  import org.apache.hadoop.hbase.client.Append;
65  import org.apache.hadoop.hbase.client.Delete;
66  import org.apache.hadoop.hbase.client.Durability;
67  import org.apache.hadoop.hbase.client.Get;
68  import org.apache.hadoop.hbase.client.HBaseAdmin;
69  import org.apache.hadoop.hbase.client.HTable;
70  import org.apache.hadoop.hbase.client.Increment;
71  import org.apache.hadoop.hbase.client.OperationWithAttributes;
72  import org.apache.hadoop.hbase.client.Put;
73  import org.apache.hadoop.hbase.client.Result;
74  import org.apache.hadoop.hbase.client.ResultScanner;
75  import org.apache.hadoop.hbase.client.Scan;
76  import org.apache.hadoop.hbase.filter.Filter;
77  import org.apache.hadoop.hbase.filter.ParseFilter;
78  import org.apache.hadoop.hbase.filter.PrefixFilter;
79  import org.apache.hadoop.hbase.filter.WhileMatchFilter;
80  import org.apache.hadoop.hbase.security.SecurityUtil;
81  import org.apache.hadoop.hbase.security.UserProvider;
82  import org.apache.hadoop.hbase.thrift.CallQueue.Call;
83  import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
84  import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
85  import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
86  import org.apache.hadoop.hbase.thrift.generated.Hbase;
87  import org.apache.hadoop.hbase.thrift.generated.IOError;
88  import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
89  import org.apache.hadoop.hbase.thrift.generated.Mutation;
90  import org.apache.hadoop.hbase.thrift.generated.TAppend;
91  import org.apache.hadoop.hbase.thrift.generated.TCell;
92  import org.apache.hadoop.hbase.thrift.generated.TIncrement;
93  import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
94  import org.apache.hadoop.hbase.thrift.generated.TRowResult;
95  import org.apache.hadoop.hbase.thrift.generated.TScan;
96  import org.apache.hadoop.hbase.util.Bytes;
97  import org.apache.hadoop.hbase.util.ConnectionCache;
98  import org.apache.hadoop.hbase.util.Strings;
99  import org.apache.hadoop.net.DNS;
100 import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
101 import org.apache.hadoop.security.UserGroupInformation;
102 import org.apache.thrift.TException;
103 import org.apache.thrift.TProcessor;
104 import org.apache.thrift.protocol.TBinaryProtocol;
105 import org.apache.thrift.protocol.TCompactProtocol;
106 import org.apache.thrift.protocol.TProtocol;
107 import org.apache.thrift.protocol.TProtocolFactory;
108 import org.apache.thrift.server.THsHaServer;
109 import org.apache.thrift.server.TNonblockingServer;
110 import org.apache.thrift.server.TServer;
111 import org.apache.thrift.server.TThreadedSelectorServer;
112 import org.apache.thrift.transport.TFramedTransport;
113 import org.apache.thrift.transport.TNonblockingServerSocket;
114 import org.apache.thrift.transport.TNonblockingServerTransport;
115 import org.apache.thrift.transport.TSaslServerTransport;
116 import org.apache.thrift.transport.TServerSocket;
117 import org.apache.thrift.transport.TServerTransport;
118 import org.apache.thrift.transport.TTransportFactory;
119 
120 import com.google.common.base.Joiner;
121 import com.google.common.util.concurrent.ThreadFactoryBuilder;
122 
123 /**
124  * ThriftServerRunner - this class starts up a Thrift server which implements
125  * the Hbase API specified in the Hbase.thrift IDL file.
126  */
127 @InterfaceAudience.Private
128 @SuppressWarnings("deprecation")
129 public class ThriftServerRunner implements Runnable {
130 
131   private static final Log LOG = LogFactory.getLog(ThriftServerRunner.class);
132 
133   static final String SERVER_TYPE_CONF_KEY =
134       "hbase.regionserver.thrift.server.type";
135 
136   static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress";
137   static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
138   static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
139   static final String MAX_FRAME_SIZE_CONF_KEY = "hbase.regionserver.thrift.framed.max_frame_size_in_mb";
140   static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
141   static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
142 
143   /**
144    * Thrift quality of protection configuration key. Valid values can be:
145    * auth-conf: authentication, integrity and confidentiality checking
146    * auth-int: authentication and integrity checking
147    * auth: authentication only
148    *
149    * This is used to authenticate the callers and support impersonation.
150    * The thrift server and the HBase cluster must run in secure mode.
151    */
152   static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
153 
154   private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
155   public static final int DEFAULT_LISTEN_PORT = 9090;
156   private final int listenPort;
157 
158   private Configuration conf;
159   volatile TServer tserver;
160   private final Hbase.Iface handler;
161   private final ThriftMetrics metrics;
162   private final HBaseHandler hbaseHandler;
163   private final UserGroupInformation realUser;
164 
165   private final String qop;
166   private String host;
167 
168   /** An enum of server implementation selections */
169   enum ImplType {
170     HS_HA("hsha", true, THsHaServer.class, true),
171     NONBLOCKING("nonblocking", true, TNonblockingServer.class, true),
172     THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true),
173     THREADED_SELECTOR(
174         "threadedselector", true, TThreadedSelectorServer.class, true);
175 
176     public static final ImplType DEFAULT = THREAD_POOL;
177 
178     final String option;
179     final boolean isAlwaysFramed;
180     final Class<? extends TServer> serverClass;
181     final boolean canSpecifyBindIP;
182 
183     ImplType(String option, boolean isAlwaysFramed,
184         Class<? extends TServer> serverClass, boolean canSpecifyBindIP) {
185       this.option = option;
186       this.isAlwaysFramed = isAlwaysFramed;
187       this.serverClass = serverClass;
188       this.canSpecifyBindIP = canSpecifyBindIP;
189     }
190 
191     /**
192      * @return <code>-option</code> so we can get the list of options from
193      *         {@link #values()}
194      */
195     @Override
196     public String toString() {
197       return "-" + option;
198     }
199 
200     String getDescription() {
201       StringBuilder sb = new StringBuilder("Use the " +
202           serverClass.getSimpleName());
203       if (isAlwaysFramed) {
204         sb.append(" This implies the framed transport.");
205       }
206       if (this == DEFAULT) {
207         sb.append("This is the default.");
208       }
209       return sb.toString();
210     }
211 
212     static OptionGroup createOptionGroup() {
213       OptionGroup group = new OptionGroup();
214       for (ImplType t : values()) {
215         group.addOption(new Option(t.option, t.getDescription()));
216       }
217       return group;
218     }
219 
220     static ImplType getServerImpl(Configuration conf) {
221       String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option);
222       for (ImplType t : values()) {
223         if (confType.equals(t.option)) {
224           return t;
225         }
226       }
227       throw new AssertionError("Unknown server ImplType.option:" + confType);
228     }
229 
230     static void setServerImpl(CommandLine cmd, Configuration conf) {
231       ImplType chosenType = null;
232       int numChosen = 0;
233       for (ImplType t : values()) {
234         if (cmd.hasOption(t.option)) {
235           chosenType = t;
236           ++numChosen;
237         }
238       }
239       if (numChosen < 1) {
240         LOG.info("Using default thrift server type");
241         chosenType = DEFAULT;
242       } else if (numChosen > 1) {
243         throw new AssertionError("Exactly one option out of " +
244           Arrays.toString(values()) + " has to be specified");
245       }
246       LOG.info("Using thrift server type " + chosenType.option);
247       conf.set(SERVER_TYPE_CONF_KEY, chosenType.option);
248     }
249 
250     public String simpleClassName() {
251       return serverClass.getSimpleName();
252     }
253 
254     public static List<String> serversThatCannotSpecifyBindIP() {
255       List<String> l = new ArrayList<String>();
256       for (ImplType t : values()) {
257         if (!t.canSpecifyBindIP) {
258           l.add(t.simpleClassName());
259         }
260       }
261       return l;
262     }
263 
264   }
265 
266   public ThriftServerRunner(Configuration conf) throws IOException {
267     UserProvider userProvider = UserProvider.instantiate(conf);
268     // login the server principal (if using secure Hadoop)
269     boolean securityEnabled = userProvider.isHadoopSecurityEnabled()
270       && userProvider.isHBaseSecurityEnabled();
271     if (securityEnabled) {
272       host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
273         conf.get("hbase.thrift.dns.interface", "default"),
274         conf.get("hbase.thrift.dns.nameserver", "default")));
275       userProvider.login("hbase.thrift.keytab.file",
276         "hbase.thrift.kerberos.principal", host);
277     }
278     this.conf = HBaseConfiguration.create(conf);
279     this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
280     this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
281     this.hbaseHandler = new HBaseHandler(conf, userProvider);
282     this.hbaseHandler.initMetrics(metrics);
283     this.handler = HbaseHandlerMetricsProxy.newInstance(
284       hbaseHandler, metrics, conf);
285     this.realUser = userProvider.getCurrent().getUGI();
286     qop = conf.get(THRIFT_QOP_KEY);
287     if (qop != null) {
288       if (!qop.equals("auth") && !qop.equals("auth-int")
289           && !qop.equals("auth-conf")) {
290         throw new IOException("Invalid " + THRIFT_QOP_KEY + ": " + qop
291           + ", it must be 'auth', 'auth-int', or 'auth-conf'");
292       }
293       if (!securityEnabled) {
294         throw new IOException("Thrift server must"
295           + " run in secure mode to support authentication");
296       }
297     }
298   }
299 
300   /*
301    * Runs the Thrift server
302    */
303   @Override
304   public void run() {
305     realUser.doAs(
306       new PrivilegedAction<Object>() {
307         @Override
308         public Object run() {
309           try {
310             setupServer();
311             tserver.serve();
312           } catch (Exception e) {
313             LOG.fatal("Cannot run ThriftServer", e);
314             // Crash the process if the ThriftServer is not running
315             System.exit(-1);
316           }
317           return null;
318         }
319       });
320   }
321 
322   public void shutdown() {
323     if (tserver != null) {
324       tserver.stop();
325       tserver = null;
326     }
327   }
328 
329   /**
330    * Setting up the thrift TServer
331    */
332   private void setupServer() throws Exception {
333     // Construct correct ProtocolFactory
334     TProtocolFactory protocolFactory;
335     if (conf.getBoolean(COMPACT_CONF_KEY, false)) {
336       LOG.debug("Using compact protocol");
337       protocolFactory = new TCompactProtocol.Factory();
338     } else {
339       LOG.debug("Using binary protocol");
340       protocolFactory = new TBinaryProtocol.Factory();
341     }
342 
343     final TProcessor p = new Hbase.Processor<Hbase.Iface>(handler);
344     ImplType implType = ImplType.getServerImpl(conf);
345     TProcessor processor = p;
346 
347     // Construct correct TransportFactory
348     TTransportFactory transportFactory;
349     if (conf.getBoolean(FRAMED_CONF_KEY, false) || implType.isAlwaysFramed) {
350       if (qop != null) {
351         throw new RuntimeException("Thrift server authentication"
352           + " doesn't work with framed transport yet");
353       }
354       transportFactory = new TFramedTransport.Factory(
355           conf.getInt(MAX_FRAME_SIZE_CONF_KEY, 2)  * 1024 * 1024);
356       LOG.debug("Using framed transport");
357     } else if (qop == null) {
358       transportFactory = new TTransportFactory();
359     } else {
360       // Extract the name from the principal
361       String name = SecurityUtil.getUserFromPrincipal(
362         conf.get("hbase.thrift.kerberos.principal"));
363       Map<String, String> saslProperties = new HashMap<String, String>();
364       saslProperties.put(Sasl.QOP, qop);
365       TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
366       saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
367         new SaslGssCallbackHandler() {
368           @Override
369           public void handle(Callback[] callbacks)
370               throws UnsupportedCallbackException {
371             AuthorizeCallback ac = null;
372             for (Callback callback : callbacks) {
373               if (callback instanceof AuthorizeCallback) {
374                 ac = (AuthorizeCallback) callback;
375               } else {
376                 throw new UnsupportedCallbackException(callback,
377                     "Unrecognized SASL GSSAPI Callback");
378               }
379             }
380             if (ac != null) {
381               String authid = ac.getAuthenticationID();
382               String authzid = ac.getAuthorizationID();
383               if (!authid.equals(authzid)) {
384                 ac.setAuthorized(false);
385               } else {
386                 ac.setAuthorized(true);
387                 String userName = SecurityUtil.getUserFromPrincipal(authzid);
388                 LOG.info("Effective user: " + userName);
389                 ac.setAuthorizedID(userName);
390               }
391             }
392           }
393         });
394       transportFactory = saslFactory;
395 
396       // Create a processor wrapper, to get the caller
397       processor = new TProcessor() {
398         @Override
399         public boolean process(TProtocol inProt,
400             TProtocol outProt) throws TException {
401           TSaslServerTransport saslServerTransport =
402             (TSaslServerTransport)inProt.getTransport();
403           SaslServer saslServer = saslServerTransport.getSaslServer();
404           String principal = saslServer.getAuthorizationID();
405           hbaseHandler.setEffectiveUser(principal);
406           return p.process(inProt, outProt);
407         }
408       };
409     }
410 
411     if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
412       LOG.error("Server types " + Joiner.on(", ").join(
413           ImplType.serversThatCannotSpecifyBindIP()) + " don't support IP " +
414           "address binding at the moment. See " +
415           "https://issues.apache.org/jira/browse/HBASE-2155 for details.");
416       throw new RuntimeException(
417           "-" + BIND_CONF_KEY + " not supported with " + implType);
418     }
419 
420     if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING ||
421         implType == ImplType.THREADED_SELECTOR) {
422 
423       InetAddress listenAddress = getBindAddress(conf);
424       TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(
425           new InetSocketAddress(listenAddress, listenPort));
426 
427       if (implType == ImplType.NONBLOCKING) {
428         TNonblockingServer.Args serverArgs =
429             new TNonblockingServer.Args(serverTransport);
430         serverArgs.processor(processor)
431                   .transportFactory(transportFactory)
432                   .protocolFactory(protocolFactory);
433         tserver = new TNonblockingServer(serverArgs);
434       } else if (implType == ImplType.HS_HA) {
435         THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
436         CallQueue callQueue =
437             new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
438         ExecutorService executorService = createExecutor(
439             callQueue, serverArgs.getWorkerThreads());
440         serverArgs.executorService(executorService)
441                   .processor(processor)
442                   .transportFactory(transportFactory)
443                   .protocolFactory(protocolFactory);
444         tserver = new THsHaServer(serverArgs);
445       } else { // THREADED_SELECTOR
446         TThreadedSelectorServer.Args serverArgs =
447             new HThreadedSelectorServerArgs(serverTransport, conf);
448         CallQueue callQueue =
449             new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
450         ExecutorService executorService = createExecutor(
451             callQueue, serverArgs.getWorkerThreads());
452         serverArgs.executorService(executorService)
453                   .processor(processor)
454                   .transportFactory(transportFactory)
455                   .protocolFactory(protocolFactory);
456         tserver = new TThreadedSelectorServer(serverArgs);
457       }
458       LOG.info("starting HBase " + implType.simpleClassName() +
459           " server on " + Integer.toString(listenPort));
460     } else if (implType == ImplType.THREAD_POOL) {
461       // Thread pool server. Get the IP address to bind to.
462       InetAddress listenAddress = getBindAddress(conf);
463 
464       TServerTransport serverTransport = new TServerSocket(
465           new InetSocketAddress(listenAddress, listenPort));
466 
467       TBoundedThreadPoolServer.Args serverArgs =
468           new TBoundedThreadPoolServer.Args(serverTransport, conf);
469       serverArgs.processor(processor)
470                 .transportFactory(transportFactory)
471                 .protocolFactory(protocolFactory);
472       LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
473           + listenAddress + ":" + Integer.toString(listenPort)
474           + "; " + serverArgs);
475       TBoundedThreadPoolServer tserver =
476           new TBoundedThreadPoolServer(serverArgs, metrics);
477       this.tserver = tserver;
478     } else {
479       throw new AssertionError("Unsupported Thrift server implementation: " +
480           implType.simpleClassName());
481     }
482 
483     // A sanity check that we instantiated the right type of server.
484     if (tserver.getClass() != implType.serverClass) {
485       throw new AssertionError("Expected to create Thrift server class " +
486           implType.serverClass.getName() + " but got " +
487           tserver.getClass().getName());
488     }
489 
490 
491 
492     registerFilters(conf);
493   }
494 
495   ExecutorService createExecutor(BlockingQueue<Runnable> callQueue,
496                                  int workerThreads) {
497     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
498     tfb.setDaemon(true);
499     tfb.setNameFormat("thrift-worker-%d");
500     return new ThreadPoolExecutor(workerThreads, workerThreads,
501             Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
502   }
503 
504   private InetAddress getBindAddress(Configuration conf)
505       throws UnknownHostException {
506     String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
507     return InetAddress.getByName(bindAddressStr);
508   }
509 
510   protected static class ResultScannerWrapper {
511 
512     private final ResultScanner scanner;
513     private final boolean sortColumns;
514     public ResultScannerWrapper(ResultScanner resultScanner,
515                                 boolean sortResultColumns) {
516       scanner = resultScanner;
517       sortColumns = sortResultColumns;
518    }
519 
520     public ResultScanner getScanner() {
521       return scanner;
522     }
523 
524     public boolean isColumnSorted() {
525       return sortColumns;
526     }
527   }
528 
529   /**
530    * The HBaseHandler is a glue object that connects Thrift RPC calls to the
531    * HBase client API primarily defined in the HBaseAdmin and HTable objects.
532    */
533   public static class HBaseHandler implements Hbase.Iface {
534     protected Configuration conf;
535     protected final Log LOG = LogFactory.getLog(this.getClass().getName());
536 
537     // nextScannerId and scannerMap are used to manage scanner state
538     protected int nextScannerId = 0;
539     protected HashMap<Integer, ResultScannerWrapper> scannerMap = null;
540     private ThriftMetrics metrics = null;
541 
542     private final ConnectionCache connectionCache;
543 
544     private static ThreadLocal<Map<String, HTable>> threadLocalTables =
545         new ThreadLocal<Map<String, HTable>>() {
546       @Override
547       protected Map<String, HTable> initialValue() {
548         return new TreeMap<String, HTable>();
549       }
550     };
551 
552     IncrementCoalescer coalescer = null;
553 
554     static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
555     static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
556 
557     /**
558      * Returns a list of all the column families for a given htable.
559      *
560      * @param table
561      * @throws IOException
562      */
563     byte[][] getAllColumns(HTable table) throws IOException {
564       HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies();
565       byte[][] columns = new byte[cds.length][];
566       for (int i = 0; i < cds.length; i++) {
567         columns[i] = Bytes.add(cds[i].getName(),
568             KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
569       }
570       return columns;
571     }
572 
573     /**
574      * Creates and returns an HTable instance from a given table name.
575      *
576      * @param tableName
577      *          name of table
578      * @return HTable object
579      * @throws IOException
580      * @throws IOError
581      */
582     public HTable getTable(final byte[] tableName) throws
583         IOException {
584       String table = Bytes.toString(tableName);
585       Map<String, HTable> tables = threadLocalTables.get();
586       if (!tables.containsKey(table)) {
587         tables.put(table, (HTable)connectionCache.getTable(table));
588       }
589       return tables.get(table);
590     }
591 
592     public HTable getTable(final ByteBuffer tableName) throws IOException {
593       return getTable(getBytes(tableName));
594     }
595 
596     /**
597      * Assigns a unique ID to the scanner and adds the mapping to an internal
598      * hash-map.
599      *
600      * @param scanner
601      * @return integer scanner id
602      */
603     protected synchronized int addScanner(ResultScanner scanner,boolean sortColumns) {
604       int id = nextScannerId++;
605       ResultScannerWrapper resultScannerWrapper = new ResultScannerWrapper(scanner, sortColumns);
606       scannerMap.put(id, resultScannerWrapper);
607       return id;
608     }
609 
610     /**
611      * Returns the scanner associated with the specified ID.
612      *
613      * @param id
614      * @return a Scanner, or null if ID was invalid.
615      */
616     protected synchronized ResultScannerWrapper getScanner(int id) {
617       return scannerMap.get(id);
618     }
619 
620     /**
621      * Removes the scanner associated with the specified ID from the internal
622      * id->scanner hash-map.
623      *
624      * @param id
625      * @return a Scanner, or null if ID was invalid.
626      */
627     protected synchronized ResultScannerWrapper removeScanner(int id) {
628       return scannerMap.remove(id);
629     }
630 
631     protected HBaseHandler(final Configuration c,
632         final UserProvider userProvider) throws IOException {
633       this.conf = c;
634       scannerMap = new HashMap<Integer, ResultScannerWrapper>();
635       this.coalescer = new IncrementCoalescer(this);
636 
637       int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
638       int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
639       connectionCache = new ConnectionCache(
640         conf, userProvider, cleanInterval, maxIdleTime);
641     }
642 
643     /**
644      * Obtain HBaseAdmin. Creates the instance if it is not already created.
645      */
646     private HBaseAdmin getHBaseAdmin() throws IOException {
647       return connectionCache.getAdmin();
648     }
649 
650     void setEffectiveUser(String effectiveUser) {
651       connectionCache.setEffectiveUser(effectiveUser);
652     }
653 
654     @Override
655     public void enableTable(ByteBuffer tableName) throws IOError {
656       try{
657         getHBaseAdmin().enableTable(getBytes(tableName));
658       } catch (IOException e) {
659         LOG.warn(e.getMessage(), e);
660         throw new IOError(e.getMessage());
661       }
662     }
663 
664     @Override
665     public void disableTable(ByteBuffer tableName) throws IOError{
666       try{
667         getHBaseAdmin().disableTable(getBytes(tableName));
668       } catch (IOException e) {
669         LOG.warn(e.getMessage(), e);
670         throw new IOError(e.getMessage());
671       }
672     }
673 
674     @Override
675     public boolean isTableEnabled(ByteBuffer tableName) throws IOError {
676       try {
677         return HTable.isTableEnabled(this.conf, getBytes(tableName));
678       } catch (IOException e) {
679         LOG.warn(e.getMessage(), e);
680         throw new IOError(e.getMessage());
681       }
682     }
683 
684     @Override
685     public void compact(ByteBuffer tableNameOrRegionName) throws IOError {
686       try{
687         getHBaseAdmin().compact(getBytes(tableNameOrRegionName));
688       } catch (InterruptedException e) {
689         throw new IOError(e.getMessage());
690       } catch (IOException e) {
691         LOG.warn(e.getMessage(), e);
692         throw new IOError(e.getMessage());
693       }
694     }
695 
696     @Override
697     public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError {
698       try{
699         getHBaseAdmin().majorCompact(getBytes(tableNameOrRegionName));
700       } catch (InterruptedException e) {
701         LOG.warn(e.getMessage(), e);
702         throw new IOError(e.getMessage());
703       } catch (IOException e) {
704         LOG.warn(e.getMessage(), e);
705         throw new IOError(e.getMessage());
706       }
707     }
708 
709     @Override
710     public List<ByteBuffer> getTableNames() throws IOError {
711       try {
712         TableName[] tableNames = this.getHBaseAdmin().listTableNames();
713         ArrayList<ByteBuffer> list = new ArrayList<ByteBuffer>(tableNames.length);
714         for (int i = 0; i < tableNames.length; i++) {
715           list.add(ByteBuffer.wrap(tableNames[i].getName()));
716         }
717         return list;
718       } catch (IOException e) {
719         LOG.warn(e.getMessage(), e);
720         throw new IOError(e.getMessage());
721       }
722     }
723 
724     /**
725      * @return the list of regions in the given table, or an empty list if the table does not exist
726      */
727     @Override
728     public List<TRegionInfo> getTableRegions(ByteBuffer tableName)
729     throws IOError {
730       try {
731         HTable table;
732         try {
733           table = getTable(tableName);
734         } catch (TableNotFoundException ex) {
735           return new ArrayList<TRegionInfo>();
736         }
737         Map<HRegionInfo, ServerName> regionLocations =
738             table.getRegionLocations();
739         List<TRegionInfo> results = new ArrayList<TRegionInfo>();
740         for (Map.Entry<HRegionInfo, ServerName> entry :
741             regionLocations.entrySet()) {
742           HRegionInfo info = entry.getKey();
743           ServerName serverName = entry.getValue();
744           TRegionInfo region = new TRegionInfo();
745           region.serverName = ByteBuffer.wrap(
746               Bytes.toBytes(serverName.getHostname()));
747           region.port = serverName.getPort();
748           region.startKey = ByteBuffer.wrap(info.getStartKey());
749           region.endKey = ByteBuffer.wrap(info.getEndKey());
750           region.id = info.getRegionId();
751           region.name = ByteBuffer.wrap(info.getRegionName());
752           region.version = info.getVersion();
753           results.add(region);
754         }
755         return results;
756       } catch (TableNotFoundException e) {
757         // Return empty list for non-existing table
758         return Collections.emptyList();
759       } catch (IOException e){
760         LOG.warn(e.getMessage(), e);
761         throw new IOError(e.getMessage());
762       }
763     }
764 
765     @Deprecated
766     @Override
767     public List<TCell> get(
768         ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
769         Map<ByteBuffer, ByteBuffer> attributes)
770         throws IOError {
771       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
772       if (famAndQf.length == 1) {
773         return get(tableName, row, famAndQf[0], null, attributes);
774       }
775       if (famAndQf.length == 2) {
776         return get(tableName, row, famAndQf[0], famAndQf[1], attributes);
777       }
778       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
779     }
780 
781     /**
782      * Note: this internal interface is slightly different from public APIs in regard to handling
783      * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
784      * we respect qual == null as a request for the entire column family. The caller (
785      * {@link #get(ByteBuffer, ByteBuffer, ByteBuffer, Map)}) interface IS consistent in that the
786      * column is parse like normal.
787      */
788     protected List<TCell> get(ByteBuffer tableName,
789                               ByteBuffer row,
790                               byte[] family,
791                               byte[] qualifier,
792                               Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
793       try {
794         HTable table = getTable(tableName);
795         Get get = new Get(getBytes(row));
796         addAttributes(get, attributes);
797         if (qualifier == null) {
798           get.addFamily(family);
799         } else {
800           get.addColumn(family, qualifier);
801         }
802         Result result = table.get(get);
803         return ThriftUtilities.cellFromHBase(result.rawCells());
804       } catch (IOException e) {
805         LOG.warn(e.getMessage(), e);
806         throw new IOError(e.getMessage());
807       }
808     }
809 
810     @Deprecated
811     @Override
812     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
813         int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
814       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
815       if(famAndQf.length == 1) {
816         return getVer(tableName, row, famAndQf[0], null, numVersions, attributes);
817       }
818       if (famAndQf.length == 2) {
819         return getVer(tableName, row, famAndQf[0], famAndQf[1], numVersions, attributes);
820       }
821       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
822 
823     }
824 
825     /**
826      * Note: this public interface is slightly different from public Java APIs in regard to
827      * handling of the qualifier. Here we differ from the public Java API in that null != byte[0].
828      * Rather, we respect qual == null as a request for the entire column family. If you want to
829      * access the entire column family, use
830      * {@link #getVer(ByteBuffer, ByteBuffer, ByteBuffer, int, Map)} with a {@code column} value
831      * that lacks a {@code ':'}.
832      */
833     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family,
834         byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
835       try {
836         HTable table = getTable(tableName);
837         Get get = new Get(getBytes(row));
838         addAttributes(get, attributes);
839         if (null == qualifier) {
840           get.addFamily(family);
841         } else {
842           get.addColumn(family, qualifier);
843         }
844         get.setMaxVersions(numVersions);
845         Result result = table.get(get);
846         return ThriftUtilities.cellFromHBase(result.rawCells());
847       } catch (IOException e) {
848         LOG.warn(e.getMessage(), e);
849         throw new IOError(e.getMessage());
850       }
851     }
852 
853     @Deprecated
854     @Override
855     public List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
856         long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
857       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
858       if (famAndQf.length == 1) {
859         return getVerTs(tableName, row, famAndQf[0], null, timestamp, numVersions, attributes);
860       }
861       if (famAndQf.length == 2) {
862         return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, numVersions,
863           attributes);
864       }
865       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
866     }
867 
868     /**
869      * Note: this internal interface is slightly different from public APIs in regard to handling
870      * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
871      * we respect qual == null as a request for the entire column family. The caller (
872      * {@link #getVerTs(ByteBuffer, ByteBuffer, ByteBuffer, long, int, Map)}) interface IS
873      * consistent in that the column is parse like normal.
874      */
875     protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
876         byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
877         throws IOError {
878       try {
879         HTable table = getTable(tableName);
880         Get get = new Get(getBytes(row));
881         addAttributes(get, attributes);
882         if (null == qualifier) {
883           get.addFamily(family);
884         } else {
885           get.addColumn(family, qualifier);
886         }
887         get.setTimeRange(0, timestamp);
888         get.setMaxVersions(numVersions);
889         Result result = table.get(get);
890         return ThriftUtilities.cellFromHBase(result.rawCells());
891       } catch (IOException e) {
892         LOG.warn(e.getMessage(), e);
893         throw new IOError(e.getMessage());
894       }
895     }
896 
897     @Override
898     public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row,
899         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
900       return getRowWithColumnsTs(tableName, row, null,
901                                  HConstants.LATEST_TIMESTAMP,
902                                  attributes);
903     }
904 
905     @Override
906     public List<TRowResult> getRowWithColumns(ByteBuffer tableName,
907                                               ByteBuffer row,
908         List<ByteBuffer> columns,
909         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
910       return getRowWithColumnsTs(tableName, row, columns,
911                                  HConstants.LATEST_TIMESTAMP,
912                                  attributes);
913     }
914 
915     @Override
916     public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row,
917         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
918       return getRowWithColumnsTs(tableName, row, null,
919                                  timestamp, attributes);
920     }
921 
922     @Override
923     public List<TRowResult> getRowWithColumnsTs(
924         ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
925         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
926       try {
927         HTable table = getTable(tableName);
928         if (columns == null) {
929           Get get = new Get(getBytes(row));
930           addAttributes(get, attributes);
931           get.setTimeRange(0, timestamp);
932           Result result = table.get(get);
933           return ThriftUtilities.rowResultFromHBase(result);
934         }
935         Get get = new Get(getBytes(row));
936         addAttributes(get, attributes);
937         for(ByteBuffer column : columns) {
938           byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
939           if (famAndQf.length == 1) {
940               get.addFamily(famAndQf[0]);
941           } else {
942               get.addColumn(famAndQf[0], famAndQf[1]);
943           }
944         }
945         get.setTimeRange(0, timestamp);
946         Result result = table.get(get);
947         return ThriftUtilities.rowResultFromHBase(result);
948       } catch (IOException e) {
949         LOG.warn(e.getMessage(), e);
950         throw new IOError(e.getMessage());
951       }
952     }
953 
954     @Override
955     public List<TRowResult> getRows(ByteBuffer tableName,
956                                     List<ByteBuffer> rows,
957         Map<ByteBuffer, ByteBuffer> attributes)
958         throws IOError {
959       return getRowsWithColumnsTs(tableName, rows, null,
960                                   HConstants.LATEST_TIMESTAMP,
961                                   attributes);
962     }
963 
964     @Override
965     public List<TRowResult> getRowsWithColumns(ByteBuffer tableName,
966                                                List<ByteBuffer> rows,
967         List<ByteBuffer> columns,
968         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
969       return getRowsWithColumnsTs(tableName, rows, columns,
970                                   HConstants.LATEST_TIMESTAMP,
971                                   attributes);
972     }
973 
974     @Override
975     public List<TRowResult> getRowsTs(ByteBuffer tableName,
976                                       List<ByteBuffer> rows,
977         long timestamp,
978         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
979       return getRowsWithColumnsTs(tableName, rows, null,
980                                   timestamp, attributes);
981     }
982 
983     @Override
984     public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName,
985                                                  List<ByteBuffer> rows,
986         List<ByteBuffer> columns, long timestamp,
987         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
988       try {
989         List<Get> gets = new ArrayList<Get>(rows.size());
990         HTable table = getTable(tableName);
991         if (metrics != null) {
992           metrics.incNumRowKeysInBatchGet(rows.size());
993         }
994         for (ByteBuffer row : rows) {
995           Get get = new Get(getBytes(row));
996           addAttributes(get, attributes);
997           if (columns != null) {
998 
999             for(ByteBuffer column : columns) {
1000               byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1001               if (famAndQf.length == 1) {
1002                 get.addFamily(famAndQf[0]);
1003               } else {
1004                 get.addColumn(famAndQf[0], famAndQf[1]);
1005               }
1006             }
1007           }
1008           get.setTimeRange(0, timestamp);
1009           gets.add(get);
1010         }
1011         Result[] result = table.get(gets);
1012         return ThriftUtilities.rowResultFromHBase(result);
1013       } catch (IOException e) {
1014         LOG.warn(e.getMessage(), e);
1015         throw new IOError(e.getMessage());
1016       }
1017     }
1018 
1019     @Override
1020     public void deleteAll(
1021         ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1022         Map<ByteBuffer, ByteBuffer> attributes)
1023         throws IOError {
1024       deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP,
1025                   attributes);
1026     }
1027 
1028     @Override
1029     public void deleteAllTs(ByteBuffer tableName,
1030                             ByteBuffer row,
1031                             ByteBuffer column,
1032         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1033       try {
1034         HTable table = getTable(tableName);
1035         Delete delete  = new Delete(getBytes(row));
1036         addAttributes(delete, attributes);
1037         byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1038         if (famAndQf.length == 1) {
1039           delete.deleteFamily(famAndQf[0], timestamp);
1040         } else {
1041           delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1042         }
1043         table.delete(delete);
1044 
1045       } catch (IOException e) {
1046         LOG.warn(e.getMessage(), e);
1047         throw new IOError(e.getMessage());
1048       }
1049     }
1050 
1051     @Override
1052     public void deleteAllRow(
1053         ByteBuffer tableName, ByteBuffer row,
1054         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1055       deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, attributes);
1056     }
1057 
1058     @Override
1059     public void deleteAllRowTs(
1060         ByteBuffer tableName, ByteBuffer row, long timestamp,
1061         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1062       try {
1063         HTable table = getTable(tableName);
1064         Delete delete  = new Delete(getBytes(row), timestamp);
1065         addAttributes(delete, attributes);
1066         table.delete(delete);
1067       } catch (IOException e) {
1068         LOG.warn(e.getMessage(), e);
1069         throw new IOError(e.getMessage());
1070       }
1071     }
1072 
1073     @Override
1074     public void createTable(ByteBuffer in_tableName,
1075         List<ColumnDescriptor> columnFamilies) throws IOError,
1076         IllegalArgument, AlreadyExists {
1077       byte [] tableName = getBytes(in_tableName);
1078       try {
1079         if (getHBaseAdmin().tableExists(tableName)) {
1080           throw new AlreadyExists("table name already in use");
1081         }
1082         HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
1083         for (ColumnDescriptor col : columnFamilies) {
1084           HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col);
1085           desc.addFamily(colDesc);
1086         }
1087         getHBaseAdmin().createTable(desc);
1088       } catch (IOException e) {
1089         LOG.warn(e.getMessage(), e);
1090         throw new IOError(e.getMessage());
1091       } catch (IllegalArgumentException e) {
1092         LOG.warn(e.getMessage(), e);
1093         throw new IllegalArgument(e.getMessage());
1094       }
1095     }
1096 
1097     @Override
1098     public void deleteTable(ByteBuffer in_tableName) throws IOError {
1099       byte [] tableName = getBytes(in_tableName);
1100       if (LOG.isDebugEnabled()) {
1101         LOG.debug("deleteTable: table=" + Bytes.toString(tableName));
1102       }
1103       try {
1104         if (!getHBaseAdmin().tableExists(tableName)) {
1105           throw new IOException("table does not exist");
1106         }
1107         getHBaseAdmin().deleteTable(tableName);
1108       } catch (IOException e) {
1109         LOG.warn(e.getMessage(), e);
1110         throw new IOError(e.getMessage());
1111       }
1112     }
1113 
1114     @Override
1115     public void mutateRow(ByteBuffer tableName, ByteBuffer row,
1116         List<Mutation> mutations, Map<ByteBuffer, ByteBuffer> attributes)
1117         throws IOError, IllegalArgument {
1118       mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP,
1119                   attributes);
1120     }
1121 
1122     @Override
1123     public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
1124         List<Mutation> mutations, long timestamp,
1125         Map<ByteBuffer, ByteBuffer> attributes)
1126         throws IOError, IllegalArgument {
1127       HTable table = null;
1128       try {
1129         table = getTable(tableName);
1130         Put put = new Put(getBytes(row), timestamp);
1131         addAttributes(put, attributes);
1132 
1133         Delete delete = new Delete(getBytes(row));
1134         addAttributes(delete, attributes);
1135         if (metrics != null) {
1136           metrics.incNumRowKeysInBatchMutate(mutations.size());
1137         }
1138 
1139         // I apologize for all this mess :)
1140         for (Mutation m : mutations) {
1141           byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1142           if (m.isDelete) {
1143             if (famAndQf.length == 1) {
1144               delete.deleteFamily(famAndQf[0], timestamp);
1145             } else {
1146               delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1147             }
1148             delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1149                 : Durability.SKIP_WAL);
1150           } else {
1151             if(famAndQf.length == 1) {
1152               LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1153                   + "over the whole column family.");
1154             } else {
1155               put.addImmutable(famAndQf[0], famAndQf[1],
1156                   m.value != null ? getBytes(m.value)
1157                       : HConstants.EMPTY_BYTE_ARRAY);
1158             }
1159             put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1160           }
1161         }
1162         if (!delete.isEmpty())
1163           table.delete(delete);
1164         if (!put.isEmpty())
1165           table.put(put);
1166       } catch (IOException e) {
1167         LOG.warn(e.getMessage(), e);
1168         throw new IOError(e.getMessage());
1169       } catch (IllegalArgumentException e) {
1170         LOG.warn(e.getMessage(), e);
1171         throw new IllegalArgument(e.getMessage());
1172       }
1173     }
1174 
1175     @Override
1176     public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches,
1177         Map<ByteBuffer, ByteBuffer> attributes)
1178         throws IOError, IllegalArgument, TException {
1179       mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes);
1180     }
1181 
1182     @Override
1183     public void mutateRowsTs(
1184         ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp,
1185         Map<ByteBuffer, ByteBuffer> attributes)
1186         throws IOError, IllegalArgument, TException {
1187       List<Put> puts = new ArrayList<Put>();
1188       List<Delete> deletes = new ArrayList<Delete>();
1189 
1190       for (BatchMutation batch : rowBatches) {
1191         byte[] row = getBytes(batch.row);
1192         List<Mutation> mutations = batch.mutations;
1193         Delete delete = new Delete(row);
1194         addAttributes(delete, attributes);
1195         Put put = new Put(row, timestamp);
1196         addAttributes(put, attributes);
1197         for (Mutation m : mutations) {
1198           byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1199           if (m.isDelete) {
1200             // no qualifier, family only.
1201             if (famAndQf.length == 1) {
1202               delete.deleteFamily(famAndQf[0], timestamp);
1203             } else {
1204               delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1205             }
1206             delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1207                 : Durability.SKIP_WAL);
1208           } else {
1209             if (famAndQf.length == 1) {
1210               LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1211                   + "over the whole column family.");
1212             }
1213             if (famAndQf.length == 2) {
1214               put.addImmutable(famAndQf[0], famAndQf[1],
1215                   m.value != null ? getBytes(m.value)
1216                       : HConstants.EMPTY_BYTE_ARRAY);
1217             } else {
1218               throw new IllegalArgumentException("Invalid famAndQf provided.");
1219             }
1220             put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1221           }
1222         }
1223         if (!delete.isEmpty())
1224           deletes.add(delete);
1225         if (!put.isEmpty())
1226           puts.add(put);
1227       }
1228 
1229       HTable table = null;
1230       try {
1231         table = getTable(tableName);
1232         if (!puts.isEmpty())
1233           table.put(puts);
1234         if (!deletes.isEmpty())
1235           table.delete(deletes);
1236 
1237       } catch (IOException e) {
1238         LOG.warn(e.getMessage(), e);
1239         throw new IOError(e.getMessage());
1240       } catch (IllegalArgumentException e) {
1241         LOG.warn(e.getMessage(), e);
1242         throw new IllegalArgument(e.getMessage());
1243       }
1244     }
1245 
1246     @Deprecated
1247     @Override
1248     public long atomicIncrement(
1249         ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount)
1250             throws IOError, IllegalArgument, TException {
1251       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1252       if(famAndQf.length == 1) {
1253         return atomicIncrement(tableName, row, famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, amount);
1254       }
1255       return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
1256     }
1257 
1258     protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
1259         byte [] family, byte [] qualifier, long amount)
1260         throws IOError, IllegalArgument, TException {
1261       HTable table;
1262       try {
1263         table = getTable(tableName);
1264         return table.incrementColumnValue(
1265             getBytes(row), family, qualifier, amount);
1266       } catch (IOException e) {
1267         LOG.warn(e.getMessage(), e);
1268         throw new IOError(e.getMessage());
1269       }
1270     }
1271 
1272     @Override
1273     public void scannerClose(int id) throws IOError, IllegalArgument {
1274       LOG.debug("scannerClose: id=" + id);
1275       ResultScannerWrapper resultScannerWrapper = getScanner(id);
1276       if (resultScannerWrapper == null) {
1277         String message = "scanner ID is invalid";
1278         LOG.warn(message);
1279         throw new IllegalArgument("scanner ID is invalid");
1280       }
1281       resultScannerWrapper.getScanner().close();
1282       removeScanner(id);
1283     }
1284 
1285     @Override
1286     public List<TRowResult> scannerGetList(int id,int nbRows)
1287         throws IllegalArgument, IOError {
1288       LOG.debug("scannerGetList: id=" + id);
1289       ResultScannerWrapper resultScannerWrapper = getScanner(id);
1290       if (null == resultScannerWrapper) {
1291         String message = "scanner ID is invalid";
1292         LOG.warn(message);
1293         throw new IllegalArgument("scanner ID is invalid");
1294       }
1295 
1296       Result [] results = null;
1297       try {
1298         results = resultScannerWrapper.getScanner().next(nbRows);
1299         if (null == results) {
1300           return new ArrayList<TRowResult>();
1301         }
1302       } catch (IOException e) {
1303         LOG.warn(e.getMessage(), e);
1304         throw new IOError(e.getMessage());
1305       }
1306       return ThriftUtilities.rowResultFromHBase(results, resultScannerWrapper.isColumnSorted());
1307     }
1308 
1309     @Override
1310     public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
1311       return scannerGetList(id,1);
1312     }
1313 
1314     @Override
1315     public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
1316         Map<ByteBuffer, ByteBuffer> attributes)
1317         throws IOError {
1318       try {
1319         HTable table = getTable(tableName);
1320         Scan scan = new Scan();
1321         addAttributes(scan, attributes);
1322         if (tScan.isSetStartRow()) {
1323           scan.setStartRow(tScan.getStartRow());
1324         }
1325         if (tScan.isSetStopRow()) {
1326           scan.setStopRow(tScan.getStopRow());
1327         }
1328         if (tScan.isSetTimestamp()) {
1329           scan.setTimeRange(0, tScan.getTimestamp());
1330         }
1331         if (tScan.isSetCaching()) {
1332           scan.setCaching(tScan.getCaching());
1333         }
1334         if (tScan.isSetBatchSize()) {
1335           scan.setBatch(tScan.getBatchSize());
1336         }
1337         if (tScan.isSetColumns() && tScan.getColumns().size() != 0) {
1338           for(ByteBuffer column : tScan.getColumns()) {
1339             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1340             if(famQf.length == 1) {
1341               scan.addFamily(famQf[0]);
1342             } else {
1343               scan.addColumn(famQf[0], famQf[1]);
1344             }
1345           }
1346         }
1347         if (tScan.isSetFilterString()) {
1348           ParseFilter parseFilter = new ParseFilter();
1349           scan.setFilter(
1350               parseFilter.parseFilterString(tScan.getFilterString()));
1351         }
1352         if (tScan.isSetReversed()) {
1353           scan.setReversed(tScan.isReversed());
1354         }
1355         return addScanner(table.getScanner(scan), tScan.sortColumns);
1356       } catch (IOException e) {
1357         LOG.warn(e.getMessage(), e);
1358         throw new IOError(e.getMessage());
1359       }
1360     }
1361 
1362     @Override
1363     public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
1364         List<ByteBuffer> columns,
1365         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1366       try {
1367         HTable table = getTable(tableName);
1368         Scan scan = new Scan(getBytes(startRow));
1369         addAttributes(scan, attributes);
1370         if(columns != null && columns.size() != 0) {
1371           for(ByteBuffer column : columns) {
1372             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1373             if(famQf.length == 1) {
1374               scan.addFamily(famQf[0]);
1375             } else {
1376               scan.addColumn(famQf[0], famQf[1]);
1377             }
1378           }
1379         }
1380         return addScanner(table.getScanner(scan), false);
1381       } catch (IOException e) {
1382         LOG.warn(e.getMessage(), e);
1383         throw new IOError(e.getMessage());
1384       }
1385     }
1386 
1387     @Override
1388     public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow,
1389         ByteBuffer stopRow, List<ByteBuffer> columns,
1390         Map<ByteBuffer, ByteBuffer> attributes)
1391         throws IOError, TException {
1392       try {
1393         HTable table = getTable(tableName);
1394         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1395         addAttributes(scan, attributes);
1396         if(columns != null && columns.size() != 0) {
1397           for(ByteBuffer column : columns) {
1398             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1399             if(famQf.length == 1) {
1400               scan.addFamily(famQf[0]);
1401             } else {
1402               scan.addColumn(famQf[0], famQf[1]);
1403             }
1404           }
1405         }
1406         return addScanner(table.getScanner(scan), false);
1407       } catch (IOException e) {
1408         LOG.warn(e.getMessage(), e);
1409         throw new IOError(e.getMessage());
1410       }
1411     }
1412 
1413     @Override
1414     public int scannerOpenWithPrefix(ByteBuffer tableName,
1415                                      ByteBuffer startAndPrefix,
1416                                      List<ByteBuffer> columns,
1417         Map<ByteBuffer, ByteBuffer> attributes)
1418         throws IOError, TException {
1419       try {
1420         HTable table = getTable(tableName);
1421         Scan scan = new Scan(getBytes(startAndPrefix));
1422         addAttributes(scan, attributes);
1423         Filter f = new WhileMatchFilter(
1424             new PrefixFilter(getBytes(startAndPrefix)));
1425         scan.setFilter(f);
1426         if (columns != null && columns.size() != 0) {
1427           for(ByteBuffer column : columns) {
1428             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1429             if(famQf.length == 1) {
1430               scan.addFamily(famQf[0]);
1431             } else {
1432               scan.addColumn(famQf[0], famQf[1]);
1433             }
1434           }
1435         }
1436         return addScanner(table.getScanner(scan), false);
1437       } catch (IOException e) {
1438         LOG.warn(e.getMessage(), e);
1439         throw new IOError(e.getMessage());
1440       }
1441     }
1442 
1443     @Override
1444     public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
1445         List<ByteBuffer> columns, long timestamp,
1446         Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
1447       try {
1448         HTable table = getTable(tableName);
1449         Scan scan = new Scan(getBytes(startRow));
1450         addAttributes(scan, attributes);
1451         scan.setTimeRange(0, timestamp);
1452         if (columns != null && columns.size() != 0) {
1453           for (ByteBuffer column : columns) {
1454             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1455             if(famQf.length == 1) {
1456               scan.addFamily(famQf[0]);
1457             } else {
1458               scan.addColumn(famQf[0], famQf[1]);
1459             }
1460           }
1461         }
1462         return addScanner(table.getScanner(scan), false);
1463       } catch (IOException e) {
1464         LOG.warn(e.getMessage(), e);
1465         throw new IOError(e.getMessage());
1466       }
1467     }
1468 
1469     @Override
1470     public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow,
1471         ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
1472         Map<ByteBuffer, ByteBuffer> attributes)
1473         throws IOError, TException {
1474       try {
1475         HTable table = getTable(tableName);
1476         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1477         addAttributes(scan, attributes);
1478         scan.setTimeRange(0, timestamp);
1479         if (columns != null && columns.size() != 0) {
1480           for (ByteBuffer column : columns) {
1481             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1482             if(famQf.length == 1) {
1483               scan.addFamily(famQf[0]);
1484             } else {
1485               scan.addColumn(famQf[0], famQf[1]);
1486             }
1487           }
1488         }
1489         scan.setTimeRange(0, timestamp);
1490         return addScanner(table.getScanner(scan), false);
1491       } catch (IOException e) {
1492         LOG.warn(e.getMessage(), e);
1493         throw new IOError(e.getMessage());
1494       }
1495     }
1496 
1497     @Override
1498     public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
1499         ByteBuffer tableName) throws IOError, TException {
1500       try {
1501         TreeMap<ByteBuffer, ColumnDescriptor> columns =
1502           new TreeMap<ByteBuffer, ColumnDescriptor>();
1503 
1504         HTable table = getTable(tableName);
1505         HTableDescriptor desc = table.getTableDescriptor();
1506 
1507         for (HColumnDescriptor e : desc.getFamilies()) {
1508           ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
1509           columns.put(col.name, col);
1510         }
1511         return columns;
1512       } catch (IOException e) {
1513         LOG.warn(e.getMessage(), e);
1514         throw new IOError(e.getMessage());
1515       }
1516     }
1517 
1518     @Deprecated
1519     @Override
1520     public List<TCell> getRowOrBefore(ByteBuffer tableName, ByteBuffer row,
1521         ByteBuffer family) throws IOError {
1522       try {
1523         HTable table = getTable(getBytes(tableName));
1524         Result result = table.getRowOrBefore(getBytes(row), getBytes(family));
1525         return ThriftUtilities.cellFromHBase(result.rawCells());
1526       } catch (IOException e) {
1527         LOG.warn(e.getMessage(), e);
1528         throw new IOError(e.getMessage());
1529       }
1530     }
1531 
1532     @Override
1533     public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
1534       try {
1535         HTable table = getTable(TableName.META_TABLE_NAME.getName());
1536         byte[] row = getBytes(searchRow);
1537         Result startRowResult = table.getRowOrBefore(
1538           row, HConstants.CATALOG_FAMILY);
1539 
1540         if (startRowResult == null) {
1541           throw new IOException("Cannot find row in "+ TableName.META_TABLE_NAME+", row="
1542                                 + Bytes.toStringBinary(row));
1543         }
1544 
1545         // find region start and end keys
1546         HRegionInfo regionInfo = HRegionInfo.getHRegionInfo(startRowResult);
1547         if (regionInfo == null) {
1548           throw new IOException("HRegionInfo REGIONINFO was null or " +
1549                                 " empty in Meta for row="
1550                                 + Bytes.toStringBinary(row));
1551         }
1552         TRegionInfo region = new TRegionInfo();
1553         region.setStartKey(regionInfo.getStartKey());
1554         region.setEndKey(regionInfo.getEndKey());
1555         region.id = regionInfo.getRegionId();
1556         region.setName(regionInfo.getRegionName());
1557         region.version = regionInfo.getVersion();
1558 
1559         // find region assignment to server
1560         ServerName serverName = HRegionInfo.getServerName(startRowResult);
1561         if (serverName != null) {
1562           region.setServerName(Bytes.toBytes(serverName.getHostname()));
1563           region.port = serverName.getPort();
1564         }
1565         return region;
1566       } catch (IOException e) {
1567         LOG.warn(e.getMessage(), e);
1568         throw new IOError(e.getMessage());
1569       }
1570     }
1571 
1572     private void initMetrics(ThriftMetrics metrics) {
1573       this.metrics = metrics;
1574     }
1575 
1576     @Override
1577     public void increment(TIncrement tincrement) throws IOError, TException {
1578 
1579       if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) {
1580         throw new TException("Must supply a table and a row key; can't increment");
1581       }
1582 
1583       if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1584         this.coalescer.queueIncrement(tincrement);
1585         return;
1586       }
1587 
1588       try {
1589         HTable table = getTable(tincrement.getTable());
1590         Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
1591         table.increment(inc);
1592       } catch (IOException e) {
1593         LOG.warn(e.getMessage(), e);
1594         throw new IOError(e.getMessage());
1595       }
1596     }
1597 
1598     @Override
1599     public void incrementRows(List<TIncrement> tincrements) throws IOError, TException {
1600       if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1601         this.coalescer.queueIncrements(tincrements);
1602         return;
1603       }
1604       for (TIncrement tinc : tincrements) {
1605         increment(tinc);
1606       }
1607     }
1608 
1609     @Override
1610     public List<TCell> append(TAppend tappend) throws IOError, TException {
1611       if (tappend.getRow().length == 0 || tappend.getTable().length == 0) {
1612         throw new TException("Must supply a table and a row key; can't append");
1613       }
1614 
1615       try {
1616         HTable table = getTable(tappend.getTable());
1617         Append append = ThriftUtilities.appendFromThrift(tappend);
1618         Result result = table.append(append);
1619         return ThriftUtilities.cellFromHBase(result.rawCells());
1620       } catch (IOException e) {
1621         LOG.warn(e.getMessage(), e);
1622         throw new IOError(e.getMessage());
1623       }
1624     }
1625 
1626     @Override
1627     public boolean checkAndPut(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1628         ByteBuffer value, Mutation mput, Map<ByteBuffer, ByteBuffer> attributes) throws IOError,
1629         IllegalArgument, TException {
1630       Put put;
1631       try {
1632         put = new Put(getBytes(row), HConstants.LATEST_TIMESTAMP);
1633         addAttributes(put, attributes);
1634 
1635         byte[][] famAndQf = KeyValue.parseColumn(getBytes(mput.column));
1636 
1637         put.addImmutable(famAndQf[0], famAndQf[1], mput.value != null ? getBytes(mput.value)
1638             : HConstants.EMPTY_BYTE_ARRAY);
1639 
1640         put.setDurability(mput.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1641       } catch (IllegalArgumentException e) {
1642         LOG.warn(e.getMessage(), e);
1643         throw new IllegalArgument(e.getMessage());
1644       }
1645 
1646       HTable table = null;
1647       try {
1648         table = getTable(tableName);
1649         byte[][] famAndQf = KeyValue.parseColumn(getBytes(column));
1650         return table.checkAndPut(getBytes(row), famAndQf[0], famAndQf[1],
1651           value != null ? getBytes(value) : HConstants.EMPTY_BYTE_ARRAY, put);
1652       } catch (IOException e) {
1653         LOG.warn(e.getMessage(), e);
1654         throw new IOError(e.getMessage());
1655       } catch (IllegalArgumentException e) {
1656         LOG.warn(e.getMessage(), e);
1657         throw new IllegalArgument(e.getMessage());
1658       }
1659     }
1660   }
1661 
1662 
1663 
1664   /**
1665    * Adds all the attributes into the Operation object
1666    */
1667   private static void addAttributes(OperationWithAttributes op,
1668     Map<ByteBuffer, ByteBuffer> attributes) {
1669     if (attributes == null || attributes.size() == 0) {
1670       return;
1671     }
1672     for (Map.Entry<ByteBuffer, ByteBuffer> entry : attributes.entrySet()) {
1673       String name = Bytes.toStringBinary(getBytes(entry.getKey()));
1674       byte[] value =  getBytes(entry.getValue());
1675       op.setAttribute(name, value);
1676     }
1677   }
1678 
1679   public static void registerFilters(Configuration conf) {
1680     String[] filters = conf.getStrings("hbase.thrift.filters");
1681     if(filters != null) {
1682       for(String filterClass: filters) {
1683         String[] filterPart = filterClass.split(":");
1684         if(filterPart.length != 2) {
1685           LOG.warn("Invalid filter specification " + filterClass + " - skipping");
1686         } else {
1687           ParseFilter.registerFilter(filterPart[0], filterPart[1]);
1688         }
1689       }
1690     }
1691   }
1692 }