View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.thrift;
20  
21  import static org.apache.hadoop.hbase.util.Bytes.getBytes;
22  
23  import java.io.IOException;
24  import java.net.InetAddress;
25  import java.net.InetSocketAddress;
26  import java.net.UnknownHostException;
27  import java.nio.ByteBuffer;
28  import java.security.PrivilegedAction;
29  import java.util.ArrayList;
30  import java.util.Arrays;
31  import java.util.Collections;
32  import java.util.HashMap;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.TreeMap;
36  import java.util.concurrent.BlockingQueue;
37  import java.util.concurrent.ExecutorService;
38  import java.util.concurrent.LinkedBlockingQueue;
39  import java.util.concurrent.ThreadPoolExecutor;
40  import java.util.concurrent.TimeUnit;
41  
42  import javax.security.auth.callback.Callback;
43  import javax.security.auth.callback.UnsupportedCallbackException;
44  import javax.security.sasl.AuthorizeCallback;
45  import javax.security.sasl.Sasl;
46  import javax.security.sasl.SaslServer;
47  
48  import org.apache.commons.cli.CommandLine;
49  import org.apache.commons.cli.Option;
50  import org.apache.commons.cli.OptionGroup;
51  import org.apache.commons.logging.Log;
52  import org.apache.commons.logging.LogFactory;
53  import org.apache.hadoop.hbase.classification.InterfaceAudience;
54  import org.apache.hadoop.conf.Configuration;
55  import org.apache.hadoop.hbase.HBaseConfiguration;
56  import org.apache.hadoop.hbase.HColumnDescriptor;
57  import org.apache.hadoop.hbase.HConstants;
58  import org.apache.hadoop.hbase.HRegionInfo;
59  import org.apache.hadoop.hbase.HTableDescriptor;
60  import org.apache.hadoop.hbase.KeyValue;
61  import org.apache.hadoop.hbase.ServerName;
62  import org.apache.hadoop.hbase.TableName;
63  import org.apache.hadoop.hbase.TableNotFoundException;
64  import org.apache.hadoop.hbase.client.Append;
65  import org.apache.hadoop.hbase.client.Delete;
66  import org.apache.hadoop.hbase.client.Durability;
67  import org.apache.hadoop.hbase.client.Get;
68  import org.apache.hadoop.hbase.client.HBaseAdmin;
69  import org.apache.hadoop.hbase.client.HTable;
70  import org.apache.hadoop.hbase.client.Increment;
71  import org.apache.hadoop.hbase.client.OperationWithAttributes;
72  import org.apache.hadoop.hbase.client.Put;
73  import org.apache.hadoop.hbase.client.Result;
74  import org.apache.hadoop.hbase.client.ResultScanner;
75  import org.apache.hadoop.hbase.client.Scan;
76  import org.apache.hadoop.hbase.filter.Filter;
77  import org.apache.hadoop.hbase.filter.ParseFilter;
78  import org.apache.hadoop.hbase.filter.PrefixFilter;
79  import org.apache.hadoop.hbase.filter.WhileMatchFilter;
80  import org.apache.hadoop.hbase.security.SecurityUtil;
81  import org.apache.hadoop.hbase.security.UserProvider;
82  import org.apache.hadoop.hbase.thrift.CallQueue.Call;
83  import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
84  import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
85  import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
86  import org.apache.hadoop.hbase.thrift.generated.Hbase;
87  import org.apache.hadoop.hbase.thrift.generated.IOError;
88  import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
89  import org.apache.hadoop.hbase.thrift.generated.Mutation;
90  import org.apache.hadoop.hbase.thrift.generated.TAppend;
91  import org.apache.hadoop.hbase.thrift.generated.TCell;
92  import org.apache.hadoop.hbase.thrift.generated.TIncrement;
93  import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
94  import org.apache.hadoop.hbase.thrift.generated.TRowResult;
95  import org.apache.hadoop.hbase.thrift.generated.TScan;
96  import org.apache.hadoop.hbase.util.Bytes;
97  import org.apache.hadoop.hbase.util.ConnectionCache;
98  import org.apache.hadoop.hbase.util.Strings;
99  import org.apache.hadoop.net.DNS;
100 import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
101 import org.apache.hadoop.security.UserGroupInformation;
102 import org.apache.thrift.TException;
103 import org.apache.thrift.TProcessor;
104 import org.apache.thrift.protocol.TBinaryProtocol;
105 import org.apache.thrift.protocol.TCompactProtocol;
106 import org.apache.thrift.protocol.TProtocol;
107 import org.apache.thrift.protocol.TProtocolFactory;
108 import org.apache.thrift.server.THsHaServer;
109 import org.apache.thrift.server.TNonblockingServer;
110 import org.apache.thrift.server.TServer;
111 import org.apache.thrift.server.TThreadedSelectorServer;
112 import org.apache.thrift.transport.TFramedTransport;
113 import org.apache.thrift.transport.TNonblockingServerSocket;
114 import org.apache.thrift.transport.TNonblockingServerTransport;
115 import org.apache.thrift.transport.TSaslServerTransport;
116 import org.apache.thrift.transport.TServerSocket;
117 import org.apache.thrift.transport.TServerTransport;
118 import org.apache.thrift.transport.TTransportFactory;
119 
120 import com.google.common.base.Joiner;
121 import com.google.common.util.concurrent.ThreadFactoryBuilder;
122 
123 /**
124  * ThriftServerRunner - this class starts up a Thrift server which implements
125  * the Hbase API specified in the Hbase.thrift IDL file.
126  */
127 @InterfaceAudience.Private
128 @SuppressWarnings("deprecation")
129 public class ThriftServerRunner implements Runnable {
130 
131   private static final Log LOG = LogFactory.getLog(ThriftServerRunner.class);
132 
133   static final String SERVER_TYPE_CONF_KEY =
134       "hbase.regionserver.thrift.server.type";
135 
136   static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress";
137   static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
138   static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
139   static final String MAX_FRAME_SIZE_CONF_KEY = "hbase.regionserver.thrift.framed.max_frame_size_in_mb";
140   static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
141   static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
142 
143   /**
144    * Thrift quality of protection configuration key. Valid values can be:
145    * auth-conf: authentication, integrity and confidentiality checking
146    * auth-int: authentication and integrity checking
147    * auth: authentication only
148    *
149    * This is used to authenticate the callers and support impersonation.
150    * The thrift server and the HBase cluster must run in secure mode.
151    */
152   static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
153 
154   private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
155   public static final int DEFAULT_LISTEN_PORT = 9090;
156   public static final int HREGION_VERSION = 1;
157   private final int listenPort;
158 
159   private Configuration conf;
160   volatile TServer tserver;
161   private final Hbase.Iface handler;
162   private final ThriftMetrics metrics;
163   private final HBaseHandler hbaseHandler;
164   private final UserGroupInformation realUser;
165 
166   private final String qop;
167   private String host;
168 
169   /** An enum of server implementation selections */
170   enum ImplType {
171     HS_HA("hsha", true, THsHaServer.class, true),
172     NONBLOCKING("nonblocking", true, TNonblockingServer.class, true),
173     THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true),
174     THREADED_SELECTOR(
175         "threadedselector", true, TThreadedSelectorServer.class, true);
176 
177     public static final ImplType DEFAULT = THREAD_POOL;
178 
179     final String option;
180     final boolean isAlwaysFramed;
181     final Class<? extends TServer> serverClass;
182     final boolean canSpecifyBindIP;
183 
184     ImplType(String option, boolean isAlwaysFramed,
185         Class<? extends TServer> serverClass, boolean canSpecifyBindIP) {
186       this.option = option;
187       this.isAlwaysFramed = isAlwaysFramed;
188       this.serverClass = serverClass;
189       this.canSpecifyBindIP = canSpecifyBindIP;
190     }
191 
192     /**
193      * @return <code>-option</code> so we can get the list of options from
194      *         {@link #values()}
195      */
196     @Override
197     public String toString() {
198       return "-" + option;
199     }
200 
201     String getDescription() {
202       StringBuilder sb = new StringBuilder("Use the " +
203           serverClass.getSimpleName());
204       if (isAlwaysFramed) {
205         sb.append(" This implies the framed transport.");
206       }
207       if (this == DEFAULT) {
208         sb.append("This is the default.");
209       }
210       return sb.toString();
211     }
212 
213     static OptionGroup createOptionGroup() {
214       OptionGroup group = new OptionGroup();
215       for (ImplType t : values()) {
216         group.addOption(new Option(t.option, t.getDescription()));
217       }
218       return group;
219     }
220 
221     static ImplType getServerImpl(Configuration conf) {
222       String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option);
223       for (ImplType t : values()) {
224         if (confType.equals(t.option)) {
225           return t;
226         }
227       }
228       throw new AssertionError("Unknown server ImplType.option:" + confType);
229     }
230 
231     static void setServerImpl(CommandLine cmd, Configuration conf) {
232       ImplType chosenType = null;
233       int numChosen = 0;
234       for (ImplType t : values()) {
235         if (cmd.hasOption(t.option)) {
236           chosenType = t;
237           ++numChosen;
238         }
239       }
240       if (numChosen < 1) {
241         LOG.info("Using default thrift server type");
242         chosenType = DEFAULT;
243       } else if (numChosen > 1) {
244         throw new AssertionError("Exactly one option out of " +
245           Arrays.toString(values()) + " has to be specified");
246       }
247       LOG.info("Using thrift server type " + chosenType.option);
248       conf.set(SERVER_TYPE_CONF_KEY, chosenType.option);
249     }
250 
251     public String simpleClassName() {
252       return serverClass.getSimpleName();
253     }
254 
255     public static List<String> serversThatCannotSpecifyBindIP() {
256       List<String> l = new ArrayList<String>();
257       for (ImplType t : values()) {
258         if (!t.canSpecifyBindIP) {
259           l.add(t.simpleClassName());
260         }
261       }
262       return l;
263     }
264 
265   }
266 
267   public ThriftServerRunner(Configuration conf) throws IOException {
268     UserProvider userProvider = UserProvider.instantiate(conf);
269     // login the server principal (if using secure Hadoop)
270     boolean securityEnabled = userProvider.isHadoopSecurityEnabled()
271       && userProvider.isHBaseSecurityEnabled();
272     if (securityEnabled) {
273       host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
274         conf.get("hbase.thrift.dns.interface", "default"),
275         conf.get("hbase.thrift.dns.nameserver", "default")));
276       userProvider.login("hbase.thrift.keytab.file",
277         "hbase.thrift.kerberos.principal", host);
278     }
279     this.conf = HBaseConfiguration.create(conf);
280     this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
281     this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
282     this.hbaseHandler = new HBaseHandler(conf, userProvider);
283     this.hbaseHandler.initMetrics(metrics);
284     this.handler = HbaseHandlerMetricsProxy.newInstance(
285       hbaseHandler, metrics, conf);
286     this.realUser = userProvider.getCurrent().getUGI();
287     qop = conf.get(THRIFT_QOP_KEY);
288     if (qop != null) {
289       if (!qop.equals("auth") && !qop.equals("auth-int")
290           && !qop.equals("auth-conf")) {
291         throw new IOException("Invalid " + THRIFT_QOP_KEY + ": " + qop
292           + ", it must be 'auth', 'auth-int', or 'auth-conf'");
293       }
294       if (!securityEnabled) {
295         throw new IOException("Thrift server must"
296           + " run in secure mode to support authentication");
297       }
298     }
299   }
300 
301   /*
302    * Runs the Thrift server
303    */
304   @Override
305   public void run() {
306     realUser.doAs(
307       new PrivilegedAction<Object>() {
308         @Override
309         public Object run() {
310           try {
311             setupServer();
312             tserver.serve();
313           } catch (Exception e) {
314             LOG.fatal("Cannot run ThriftServer", e);
315             // Crash the process if the ThriftServer is not running
316             System.exit(-1);
317           }
318           return null;
319         }
320       });
321   }
322 
323   public void shutdown() {
324     if (tserver != null) {
325       tserver.stop();
326       tserver = null;
327     }
328   }
329 
330   /**
331    * Setting up the thrift TServer
332    */
333   private void setupServer() throws Exception {
334     // Construct correct ProtocolFactory
335     TProtocolFactory protocolFactory;
336     if (conf.getBoolean(COMPACT_CONF_KEY, false)) {
337       LOG.debug("Using compact protocol");
338       protocolFactory = new TCompactProtocol.Factory();
339     } else {
340       LOG.debug("Using binary protocol");
341       protocolFactory = new TBinaryProtocol.Factory();
342     }
343 
344     final TProcessor p = new Hbase.Processor<Hbase.Iface>(handler);
345     ImplType implType = ImplType.getServerImpl(conf);
346     TProcessor processor = p;
347 
348     // Construct correct TransportFactory
349     TTransportFactory transportFactory;
350     if (conf.getBoolean(FRAMED_CONF_KEY, false) || implType.isAlwaysFramed) {
351       if (qop != null) {
352         throw new RuntimeException("Thrift server authentication"
353           + " doesn't work with framed transport yet");
354       }
355       transportFactory = new TFramedTransport.Factory(
356           conf.getInt(MAX_FRAME_SIZE_CONF_KEY, 2)  * 1024 * 1024);
357       LOG.debug("Using framed transport");
358     } else if (qop == null) {
359       transportFactory = new TTransportFactory();
360     } else {
361       // Extract the name from the principal
362       String name = SecurityUtil.getUserFromPrincipal(
363         conf.get("hbase.thrift.kerberos.principal"));
364       Map<String, String> saslProperties = new HashMap<String, String>();
365       saslProperties.put(Sasl.QOP, qop);
366       TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
367       saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
368         new SaslGssCallbackHandler() {
369           @Override
370           public void handle(Callback[] callbacks)
371               throws UnsupportedCallbackException {
372             AuthorizeCallback ac = null;
373             for (Callback callback : callbacks) {
374               if (callback instanceof AuthorizeCallback) {
375                 ac = (AuthorizeCallback) callback;
376               } else {
377                 throw new UnsupportedCallbackException(callback,
378                     "Unrecognized SASL GSSAPI Callback");
379               }
380             }
381             if (ac != null) {
382               String authid = ac.getAuthenticationID();
383               String authzid = ac.getAuthorizationID();
384               if (!authid.equals(authzid)) {
385                 ac.setAuthorized(false);
386               } else {
387                 ac.setAuthorized(true);
388                 String userName = SecurityUtil.getUserFromPrincipal(authzid);
389                 LOG.info("Effective user: " + userName);
390                 ac.setAuthorizedID(userName);
391               }
392             }
393           }
394         });
395       transportFactory = saslFactory;
396 
397       // Create a processor wrapper, to get the caller
398       processor = new TProcessor() {
399         @Override
400         public boolean process(TProtocol inProt,
401             TProtocol outProt) throws TException {
402           TSaslServerTransport saslServerTransport =
403             (TSaslServerTransport)inProt.getTransport();
404           SaslServer saslServer = saslServerTransport.getSaslServer();
405           String principal = saslServer.getAuthorizationID();
406           hbaseHandler.setEffectiveUser(principal);
407           return p.process(inProt, outProt);
408         }
409       };
410     }
411 
412     if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
413       LOG.error("Server types " + Joiner.on(", ").join(
414           ImplType.serversThatCannotSpecifyBindIP()) + " don't support IP " +
415           "address binding at the moment. See " +
416           "https://issues.apache.org/jira/browse/HBASE-2155 for details.");
417       throw new RuntimeException(
418           "-" + BIND_CONF_KEY + " not supported with " + implType);
419     }
420 
421     if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING ||
422         implType == ImplType.THREADED_SELECTOR) {
423 
424       InetAddress listenAddress = getBindAddress(conf);
425       TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(
426           new InetSocketAddress(listenAddress, listenPort));
427 
428       if (implType == ImplType.NONBLOCKING) {
429         TNonblockingServer.Args serverArgs =
430             new TNonblockingServer.Args(serverTransport);
431         serverArgs.processor(processor)
432                   .transportFactory(transportFactory)
433                   .protocolFactory(protocolFactory);
434         tserver = new TNonblockingServer(serverArgs);
435       } else if (implType == ImplType.HS_HA) {
436         THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
437         CallQueue callQueue =
438             new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
439         ExecutorService executorService = createExecutor(
440             callQueue, serverArgs.getWorkerThreads());
441         serverArgs.executorService(executorService)
442                   .processor(processor)
443                   .transportFactory(transportFactory)
444                   .protocolFactory(protocolFactory);
445         tserver = new THsHaServer(serverArgs);
446       } else { // THREADED_SELECTOR
447         TThreadedSelectorServer.Args serverArgs =
448             new HThreadedSelectorServerArgs(serverTransport, conf);
449         CallQueue callQueue =
450             new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
451         ExecutorService executorService = createExecutor(
452             callQueue, serverArgs.getWorkerThreads());
453         serverArgs.executorService(executorService)
454                   .processor(processor)
455                   .transportFactory(transportFactory)
456                   .protocolFactory(protocolFactory);
457         tserver = new TThreadedSelectorServer(serverArgs);
458       }
459       LOG.info("starting HBase " + implType.simpleClassName() +
460           " server on " + Integer.toString(listenPort));
461     } else if (implType == ImplType.THREAD_POOL) {
462       // Thread pool server. Get the IP address to bind to.
463       InetAddress listenAddress = getBindAddress(conf);
464 
465       TServerTransport serverTransport = new TServerSocket(
466           new InetSocketAddress(listenAddress, listenPort));
467 
468       TBoundedThreadPoolServer.Args serverArgs =
469           new TBoundedThreadPoolServer.Args(serverTransport, conf);
470       serverArgs.processor(processor)
471                 .transportFactory(transportFactory)
472                 .protocolFactory(protocolFactory);
473       LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
474           + listenAddress + ":" + Integer.toString(listenPort)
475           + "; " + serverArgs);
476       TBoundedThreadPoolServer tserver =
477           new TBoundedThreadPoolServer(serverArgs, metrics);
478       this.tserver = tserver;
479     } else {
480       throw new AssertionError("Unsupported Thrift server implementation: " +
481           implType.simpleClassName());
482     }
483 
484     // A sanity check that we instantiated the right type of server.
485     if (tserver.getClass() != implType.serverClass) {
486       throw new AssertionError("Expected to create Thrift server class " +
487           implType.serverClass.getName() + " but got " +
488           tserver.getClass().getName());
489     }
490 
491 
492 
493     registerFilters(conf);
494   }
495 
496   ExecutorService createExecutor(BlockingQueue<Runnable> callQueue,
497                                  int workerThreads) {
498     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
499     tfb.setDaemon(true);
500     tfb.setNameFormat("thrift-worker-%d");
501     return new ThreadPoolExecutor(workerThreads, workerThreads,
502             Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
503   }
504 
505   private InetAddress getBindAddress(Configuration conf)
506       throws UnknownHostException {
507     String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
508     return InetAddress.getByName(bindAddressStr);
509   }
510 
511   protected static class ResultScannerWrapper {
512 
513     private final ResultScanner scanner;
514     private final boolean sortColumns;
515     public ResultScannerWrapper(ResultScanner resultScanner,
516                                 boolean sortResultColumns) {
517       scanner = resultScanner;
518       sortColumns = sortResultColumns;
519    }
520 
521     public ResultScanner getScanner() {
522       return scanner;
523     }
524 
525     public boolean isColumnSorted() {
526       return sortColumns;
527     }
528   }
529 
530   /**
531    * The HBaseHandler is a glue object that connects Thrift RPC calls to the
532    * HBase client API primarily defined in the HBaseAdmin and HTable objects.
533    */
534   public static class HBaseHandler implements Hbase.Iface {
535     protected Configuration conf;
536     protected final Log LOG = LogFactory.getLog(this.getClass().getName());
537 
538     // nextScannerId and scannerMap are used to manage scanner state
539     protected int nextScannerId = 0;
540     protected HashMap<Integer, ResultScannerWrapper> scannerMap = null;
541     private ThriftMetrics metrics = null;
542 
543     private final ConnectionCache connectionCache;
544 
545     private static ThreadLocal<Map<String, HTable>> threadLocalTables =
546         new ThreadLocal<Map<String, HTable>>() {
547       @Override
548       protected Map<String, HTable> initialValue() {
549         return new TreeMap<String, HTable>();
550       }
551     };
552 
553     IncrementCoalescer coalescer = null;
554 
555     static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
556     static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
557 
558     /**
559      * Returns a list of all the column families for a given htable.
560      *
561      * @param table
562      * @throws IOException
563      */
564     byte[][] getAllColumns(HTable table) throws IOException {
565       HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies();
566       byte[][] columns = new byte[cds.length][];
567       for (int i = 0; i < cds.length; i++) {
568         columns[i] = Bytes.add(cds[i].getName(),
569             KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
570       }
571       return columns;
572     }
573 
574     /**
575      * Creates and returns an HTable instance from a given table name.
576      *
577      * @param tableName
578      *          name of table
579      * @return HTable object
580      * @throws IOException
581      * @throws IOError
582      */
583     public HTable getTable(final byte[] tableName) throws
584         IOException {
585       String table = Bytes.toString(tableName);
586       Map<String, HTable> tables = threadLocalTables.get();
587       if (!tables.containsKey(table)) {
588         tables.put(table, (HTable)connectionCache.getTable(table));
589       }
590       return tables.get(table);
591     }
592 
593     public HTable getTable(final ByteBuffer tableName) throws IOException {
594       return getTable(getBytes(tableName));
595     }
596 
597     /**
598      * Assigns a unique ID to the scanner and adds the mapping to an internal
599      * hash-map.
600      *
601      * @param scanner
602      * @return integer scanner id
603      */
604     protected synchronized int addScanner(ResultScanner scanner,boolean sortColumns) {
605       int id = nextScannerId++;
606       ResultScannerWrapper resultScannerWrapper = new ResultScannerWrapper(scanner, sortColumns);
607       scannerMap.put(id, resultScannerWrapper);
608       return id;
609     }
610 
611     /**
612      * Returns the scanner associated with the specified ID.
613      *
614      * @param id
615      * @return a Scanner, or null if ID was invalid.
616      */
617     protected synchronized ResultScannerWrapper getScanner(int id) {
618       return scannerMap.get(id);
619     }
620 
621     /**
622      * Removes the scanner associated with the specified ID from the internal
623      * id->scanner hash-map.
624      *
625      * @param id
626      * @return a Scanner, or null if ID was invalid.
627      */
628     protected synchronized ResultScannerWrapper removeScanner(int id) {
629       return scannerMap.remove(id);
630     }
631 
632     protected HBaseHandler(final Configuration c,
633         final UserProvider userProvider) throws IOException {
634       this.conf = c;
635       scannerMap = new HashMap<Integer, ResultScannerWrapper>();
636       this.coalescer = new IncrementCoalescer(this);
637 
638       int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
639       int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
640       connectionCache = new ConnectionCache(
641         conf, userProvider, cleanInterval, maxIdleTime);
642     }
643 
644     /**
645      * Obtain HBaseAdmin. Creates the instance if it is not already created.
646      */
647     private HBaseAdmin getHBaseAdmin() throws IOException {
648       return connectionCache.getAdmin();
649     }
650 
651     void setEffectiveUser(String effectiveUser) {
652       connectionCache.setEffectiveUser(effectiveUser);
653     }
654 
655     @Override
656     public void enableTable(ByteBuffer tableName) throws IOError {
657       try{
658         getHBaseAdmin().enableTable(getBytes(tableName));
659       } catch (IOException e) {
660         LOG.warn(e.getMessage(), e);
661         throw new IOError(e.getMessage());
662       }
663     }
664 
665     @Override
666     public void disableTable(ByteBuffer tableName) throws IOError{
667       try{
668         getHBaseAdmin().disableTable(getBytes(tableName));
669       } catch (IOException e) {
670         LOG.warn(e.getMessage(), e);
671         throw new IOError(e.getMessage());
672       }
673     }
674 
675     @Override
676     public boolean isTableEnabled(ByteBuffer tableName) throws IOError {
677       try {
678         return HTable.isTableEnabled(this.conf, getBytes(tableName));
679       } catch (IOException e) {
680         LOG.warn(e.getMessage(), e);
681         throw new IOError(e.getMessage());
682       }
683     }
684 
685     @Override
686     public void compact(ByteBuffer tableNameOrRegionName) throws IOError {
687       try{
688         getHBaseAdmin().compact(getBytes(tableNameOrRegionName));
689       } catch (IOException e) {
690         LOG.warn(e.getMessage(), e);
691         throw new IOError(e.getMessage());
692       }
693     }
694 
695     @Override
696     public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError {
697       try{
698         getHBaseAdmin().majorCompact(getBytes(tableNameOrRegionName));
699       } catch (IOException e) {
700         LOG.warn(e.getMessage(), e);
701         throw new IOError(e.getMessage());
702       }
703     }
704 
705     @Override
706     public List<ByteBuffer> getTableNames() throws IOError {
707       try {
708         TableName[] tableNames = this.getHBaseAdmin().listTableNames();
709         ArrayList<ByteBuffer> list = new ArrayList<ByteBuffer>(tableNames.length);
710         for (int i = 0; i < tableNames.length; i++) {
711           list.add(ByteBuffer.wrap(tableNames[i].getName()));
712         }
713         return list;
714       } catch (IOException e) {
715         LOG.warn(e.getMessage(), e);
716         throw new IOError(e.getMessage());
717       }
718     }
719 
720     /**
721      * @return the list of regions in the given table, or an empty list if the table does not exist
722      */
723     @Override
724     public List<TRegionInfo> getTableRegions(ByteBuffer tableName)
725     throws IOError {
726       try {
727         HTable table;
728         try {
729           table = getTable(tableName);
730         } catch (TableNotFoundException ex) {
731           return new ArrayList<TRegionInfo>();
732         }
733         Map<HRegionInfo, ServerName> regionLocations =
734             table.getRegionLocations();
735         List<TRegionInfo> results = new ArrayList<TRegionInfo>();
736         for (Map.Entry<HRegionInfo, ServerName> entry :
737             regionLocations.entrySet()) {
738           HRegionInfo info = entry.getKey();
739           ServerName serverName = entry.getValue();
740           TRegionInfo region = new TRegionInfo();
741           region.serverName = ByteBuffer.wrap(
742               Bytes.toBytes(serverName.getHostname()));
743           region.port = serverName.getPort();
744           region.startKey = ByteBuffer.wrap(info.getStartKey());
745           region.endKey = ByteBuffer.wrap(info.getEndKey());
746           region.id = info.getRegionId();
747           region.name = ByteBuffer.wrap(info.getRegionName());
748           region.version = HREGION_VERSION; // HRegion now not versioned, PB encoding used
749           results.add(region);
750         }
751         return results;
752       } catch (TableNotFoundException e) {
753         // Return empty list for non-existing table
754         return Collections.emptyList();
755       } catch (IOException e){
756         LOG.warn(e.getMessage(), e);
757         throw new IOError(e.getMessage());
758       }
759     }
760 
761     @Deprecated
762     @Override
763     public List<TCell> get(
764         ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
765         Map<ByteBuffer, ByteBuffer> attributes)
766         throws IOError {
767       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
768       if (famAndQf.length == 1) {
769         return get(tableName, row, famAndQf[0], null, attributes);
770       }
771       if (famAndQf.length == 2) {
772         return get(tableName, row, famAndQf[0], famAndQf[1], attributes);
773       }
774       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
775     }
776 
777     /**
778      * Note: this internal interface is slightly different from public APIs in regard to handling
779      * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
780      * we respect qual == null as a request for the entire column family. The caller (
781      * {@link #get(ByteBuffer, ByteBuffer, ByteBuffer, Map)}) interface IS consistent in that the
782      * column is parse like normal.
783      */
784     protected List<TCell> get(ByteBuffer tableName,
785                               ByteBuffer row,
786                               byte[] family,
787                               byte[] qualifier,
788                               Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
789       try {
790         HTable table = getTable(tableName);
791         Get get = new Get(getBytes(row));
792         addAttributes(get, attributes);
793         if (qualifier == null) {
794           get.addFamily(family);
795         } else {
796           get.addColumn(family, qualifier);
797         }
798         Result result = table.get(get);
799         return ThriftUtilities.cellFromHBase(result.rawCells());
800       } catch (IOException e) {
801         LOG.warn(e.getMessage(), e);
802         throw new IOError(e.getMessage());
803       }
804     }
805 
806     @Deprecated
807     @Override
808     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
809         int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
810       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
811       if(famAndQf.length == 1) {
812         return getVer(tableName, row, famAndQf[0], null, numVersions, attributes);
813       }
814       if (famAndQf.length == 2) {
815         return getVer(tableName, row, famAndQf[0], famAndQf[1], numVersions, attributes);
816       }
817       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
818 
819     }
820 
821     /**
822      * Note: this public interface is slightly different from public Java APIs in regard to
823      * handling of the qualifier. Here we differ from the public Java API in that null != byte[0].
824      * Rather, we respect qual == null as a request for the entire column family. If you want to
825      * access the entire column family, use
826      * {@link #getVer(ByteBuffer, ByteBuffer, ByteBuffer, int, Map)} with a {@code column} value
827      * that lacks a {@code ':'}.
828      */
829     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family,
830         byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
831       try {
832         HTable table = getTable(tableName);
833         Get get = new Get(getBytes(row));
834         addAttributes(get, attributes);
835         if (null == qualifier) {
836           get.addFamily(family);
837         } else {
838           get.addColumn(family, qualifier);
839         }
840         get.setMaxVersions(numVersions);
841         Result result = table.get(get);
842         return ThriftUtilities.cellFromHBase(result.rawCells());
843       } catch (IOException e) {
844         LOG.warn(e.getMessage(), e);
845         throw new IOError(e.getMessage());
846       }
847     }
848 
849     @Deprecated
850     @Override
851     public List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
852         long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
853       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
854       if (famAndQf.length == 1) {
855         return getVerTs(tableName, row, famAndQf[0], null, timestamp, numVersions, attributes);
856       }
857       if (famAndQf.length == 2) {
858         return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, numVersions,
859           attributes);
860       }
861       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
862     }
863 
864     /**
865      * Note: this internal interface is slightly different from public APIs in regard to handling
866      * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
867      * we respect qual == null as a request for the entire column family. The caller (
868      * {@link #getVerTs(ByteBuffer, ByteBuffer, ByteBuffer, long, int, Map)}) interface IS
869      * consistent in that the column is parse like normal.
870      */
871     protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
872         byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
873         throws IOError {
874       try {
875         HTable table = getTable(tableName);
876         Get get = new Get(getBytes(row));
877         addAttributes(get, attributes);
878         if (null == qualifier) {
879           get.addFamily(family);
880         } else {
881           get.addColumn(family, qualifier);
882         }
883         get.setTimeRange(0, timestamp);
884         get.setMaxVersions(numVersions);
885         Result result = table.get(get);
886         return ThriftUtilities.cellFromHBase(result.rawCells());
887       } catch (IOException e) {
888         LOG.warn(e.getMessage(), e);
889         throw new IOError(e.getMessage());
890       }
891     }
892 
893     @Override
894     public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row,
895         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
896       return getRowWithColumnsTs(tableName, row, null,
897                                  HConstants.LATEST_TIMESTAMP,
898                                  attributes);
899     }
900 
901     @Override
902     public List<TRowResult> getRowWithColumns(ByteBuffer tableName,
903                                               ByteBuffer row,
904         List<ByteBuffer> columns,
905         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
906       return getRowWithColumnsTs(tableName, row, columns,
907                                  HConstants.LATEST_TIMESTAMP,
908                                  attributes);
909     }
910 
911     @Override
912     public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row,
913         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
914       return getRowWithColumnsTs(tableName, row, null,
915                                  timestamp, attributes);
916     }
917 
918     @Override
919     public List<TRowResult> getRowWithColumnsTs(
920         ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
921         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
922       try {
923         HTable table = getTable(tableName);
924         if (columns == null) {
925           Get get = new Get(getBytes(row));
926           addAttributes(get, attributes);
927           get.setTimeRange(0, timestamp);
928           Result result = table.get(get);
929           return ThriftUtilities.rowResultFromHBase(result);
930         }
931         Get get = new Get(getBytes(row));
932         addAttributes(get, attributes);
933         for(ByteBuffer column : columns) {
934           byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
935           if (famAndQf.length == 1) {
936               get.addFamily(famAndQf[0]);
937           } else {
938               get.addColumn(famAndQf[0], famAndQf[1]);
939           }
940         }
941         get.setTimeRange(0, timestamp);
942         Result result = table.get(get);
943         return ThriftUtilities.rowResultFromHBase(result);
944       } catch (IOException e) {
945         LOG.warn(e.getMessage(), e);
946         throw new IOError(e.getMessage());
947       }
948     }
949 
950     @Override
951     public List<TRowResult> getRows(ByteBuffer tableName,
952                                     List<ByteBuffer> rows,
953         Map<ByteBuffer, ByteBuffer> attributes)
954         throws IOError {
955       return getRowsWithColumnsTs(tableName, rows, null,
956                                   HConstants.LATEST_TIMESTAMP,
957                                   attributes);
958     }
959 
960     @Override
961     public List<TRowResult> getRowsWithColumns(ByteBuffer tableName,
962                                                List<ByteBuffer> rows,
963         List<ByteBuffer> columns,
964         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
965       return getRowsWithColumnsTs(tableName, rows, columns,
966                                   HConstants.LATEST_TIMESTAMP,
967                                   attributes);
968     }
969 
970     @Override
971     public List<TRowResult> getRowsTs(ByteBuffer tableName,
972                                       List<ByteBuffer> rows,
973         long timestamp,
974         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
975       return getRowsWithColumnsTs(tableName, rows, null,
976                                   timestamp, attributes);
977     }
978 
979     @Override
980     public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName,
981                                                  List<ByteBuffer> rows,
982         List<ByteBuffer> columns, long timestamp,
983         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
984       try {
985         List<Get> gets = new ArrayList<Get>(rows.size());
986         HTable table = getTable(tableName);
987         if (metrics != null) {
988           metrics.incNumRowKeysInBatchGet(rows.size());
989         }
990         for (ByteBuffer row : rows) {
991           Get get = new Get(getBytes(row));
992           addAttributes(get, attributes);
993           if (columns != null) {
994 
995             for(ByteBuffer column : columns) {
996               byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
997               if (famAndQf.length == 1) {
998                 get.addFamily(famAndQf[0]);
999               } else {
1000                 get.addColumn(famAndQf[0], famAndQf[1]);
1001               }
1002             }
1003           }
1004           get.setTimeRange(0, timestamp);
1005           gets.add(get);
1006         }
1007         Result[] result = table.get(gets);
1008         return ThriftUtilities.rowResultFromHBase(result);
1009       } catch (IOException e) {
1010         LOG.warn(e.getMessage(), e);
1011         throw new IOError(e.getMessage());
1012       }
1013     }
1014 
1015     @Override
1016     public void deleteAll(
1017         ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1018         Map<ByteBuffer, ByteBuffer> attributes)
1019         throws IOError {
1020       deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP,
1021                   attributes);
1022     }
1023 
1024     @Override
1025     public void deleteAllTs(ByteBuffer tableName,
1026                             ByteBuffer row,
1027                             ByteBuffer column,
1028         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1029       try {
1030         HTable table = getTable(tableName);
1031         Delete delete  = new Delete(getBytes(row));
1032         addAttributes(delete, attributes);
1033         byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1034         if (famAndQf.length == 1) {
1035           delete.deleteFamily(famAndQf[0], timestamp);
1036         } else {
1037           delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1038         }
1039         table.delete(delete);
1040 
1041       } catch (IOException e) {
1042         LOG.warn(e.getMessage(), e);
1043         throw new IOError(e.getMessage());
1044       }
1045     }
1046 
1047     @Override
1048     public void deleteAllRow(
1049         ByteBuffer tableName, ByteBuffer row,
1050         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1051       deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, attributes);
1052     }
1053 
1054     @Override
1055     public void deleteAllRowTs(
1056         ByteBuffer tableName, ByteBuffer row, long timestamp,
1057         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1058       try {
1059         HTable table = getTable(tableName);
1060         Delete delete  = new Delete(getBytes(row), timestamp);
1061         addAttributes(delete, attributes);
1062         table.delete(delete);
1063       } catch (IOException e) {
1064         LOG.warn(e.getMessage(), e);
1065         throw new IOError(e.getMessage());
1066       }
1067     }
1068 
1069     @Override
1070     public void createTable(ByteBuffer in_tableName,
1071         List<ColumnDescriptor> columnFamilies) throws IOError,
1072         IllegalArgument, AlreadyExists {
1073       byte [] tableName = getBytes(in_tableName);
1074       try {
1075         if (getHBaseAdmin().tableExists(tableName)) {
1076           throw new AlreadyExists("table name already in use");
1077         }
1078         HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
1079         for (ColumnDescriptor col : columnFamilies) {
1080           HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col);
1081           desc.addFamily(colDesc);
1082         }
1083         getHBaseAdmin().createTable(desc);
1084       } catch (IOException e) {
1085         LOG.warn(e.getMessage(), e);
1086         throw new IOError(e.getMessage());
1087       } catch (IllegalArgumentException e) {
1088         LOG.warn(e.getMessage(), e);
1089         throw new IllegalArgument(e.getMessage());
1090       }
1091     }
1092 
1093     @Override
1094     public void deleteTable(ByteBuffer in_tableName) throws IOError {
1095       byte [] tableName = getBytes(in_tableName);
1096       if (LOG.isDebugEnabled()) {
1097         LOG.debug("deleteTable: table=" + Bytes.toString(tableName));
1098       }
1099       try {
1100         if (!getHBaseAdmin().tableExists(tableName)) {
1101           throw new IOException("table does not exist");
1102         }
1103         getHBaseAdmin().deleteTable(tableName);
1104       } catch (IOException e) {
1105         LOG.warn(e.getMessage(), e);
1106         throw new IOError(e.getMessage());
1107       }
1108     }
1109 
1110     @Override
1111     public void mutateRow(ByteBuffer tableName, ByteBuffer row,
1112         List<Mutation> mutations, Map<ByteBuffer, ByteBuffer> attributes)
1113         throws IOError, IllegalArgument {
1114       mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP,
1115                   attributes);
1116     }
1117 
1118     @Override
1119     public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
1120         List<Mutation> mutations, long timestamp,
1121         Map<ByteBuffer, ByteBuffer> attributes)
1122         throws IOError, IllegalArgument {
1123       HTable table = null;
1124       try {
1125         table = getTable(tableName);
1126         Put put = new Put(getBytes(row), timestamp);
1127         addAttributes(put, attributes);
1128 
1129         Delete delete = new Delete(getBytes(row));
1130         addAttributes(delete, attributes);
1131         if (metrics != null) {
1132           metrics.incNumRowKeysInBatchMutate(mutations.size());
1133         }
1134 
1135         // I apologize for all this mess :)
1136         for (Mutation m : mutations) {
1137           byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1138           if (m.isDelete) {
1139             if (famAndQf.length == 1) {
1140               delete.deleteFamily(famAndQf[0], timestamp);
1141             } else {
1142               delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1143             }
1144             delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1145                 : Durability.SKIP_WAL);
1146           } else {
1147             if(famAndQf.length == 1) {
1148               LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1149                   + "over the whole column family.");
1150             } else {
1151               put.addImmutable(famAndQf[0], famAndQf[1],
1152                   m.value != null ? getBytes(m.value)
1153                       : HConstants.EMPTY_BYTE_ARRAY);
1154             }
1155             put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1156           }
1157         }
1158         if (!delete.isEmpty())
1159           table.delete(delete);
1160         if (!put.isEmpty())
1161           table.put(put);
1162       } catch (IOException e) {
1163         LOG.warn(e.getMessage(), e);
1164         throw new IOError(e.getMessage());
1165       } catch (IllegalArgumentException e) {
1166         LOG.warn(e.getMessage(), e);
1167         throw new IllegalArgument(e.getMessage());
1168       }
1169     }
1170 
1171     @Override
1172     public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches,
1173         Map<ByteBuffer, ByteBuffer> attributes)
1174         throws IOError, IllegalArgument, TException {
1175       mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes);
1176     }
1177 
1178     @Override
1179     public void mutateRowsTs(
1180         ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp,
1181         Map<ByteBuffer, ByteBuffer> attributes)
1182         throws IOError, IllegalArgument, TException {
1183       List<Put> puts = new ArrayList<Put>();
1184       List<Delete> deletes = new ArrayList<Delete>();
1185 
1186       for (BatchMutation batch : rowBatches) {
1187         byte[] row = getBytes(batch.row);
1188         List<Mutation> mutations = batch.mutations;
1189         Delete delete = new Delete(row);
1190         addAttributes(delete, attributes);
1191         Put put = new Put(row, timestamp);
1192         addAttributes(put, attributes);
1193         for (Mutation m : mutations) {
1194           byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1195           if (m.isDelete) {
1196             // no qualifier, family only.
1197             if (famAndQf.length == 1) {
1198               delete.deleteFamily(famAndQf[0], timestamp);
1199             } else {
1200               delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1201             }
1202             delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1203                 : Durability.SKIP_WAL);
1204           } else {
1205             if (famAndQf.length == 1) {
1206               LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1207                   + "over the whole column family.");
1208             }
1209             if (famAndQf.length == 2) {
1210               put.addImmutable(famAndQf[0], famAndQf[1],
1211                   m.value != null ? getBytes(m.value)
1212                       : HConstants.EMPTY_BYTE_ARRAY);
1213             } else {
1214               throw new IllegalArgumentException("Invalid famAndQf provided.");
1215             }
1216             put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1217           }
1218         }
1219         if (!delete.isEmpty())
1220           deletes.add(delete);
1221         if (!put.isEmpty())
1222           puts.add(put);
1223       }
1224 
1225       HTable table = null;
1226       try {
1227         table = getTable(tableName);
1228         if (!puts.isEmpty())
1229           table.put(puts);
1230         if (!deletes.isEmpty())
1231           table.delete(deletes);
1232 
1233       } catch (IOException e) {
1234         LOG.warn(e.getMessage(), e);
1235         throw new IOError(e.getMessage());
1236       } catch (IllegalArgumentException e) {
1237         LOG.warn(e.getMessage(), e);
1238         throw new IllegalArgument(e.getMessage());
1239       }
1240     }
1241 
1242     @Deprecated
1243     @Override
1244     public long atomicIncrement(
1245         ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount)
1246             throws IOError, IllegalArgument, TException {
1247       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1248       if(famAndQf.length == 1) {
1249         return atomicIncrement(tableName, row, famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, amount);
1250       }
1251       return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
1252     }
1253 
1254     protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
1255         byte [] family, byte [] qualifier, long amount)
1256         throws IOError, IllegalArgument, TException {
1257       HTable table;
1258       try {
1259         table = getTable(tableName);
1260         return table.incrementColumnValue(
1261             getBytes(row), family, qualifier, amount);
1262       } catch (IOException e) {
1263         LOG.warn(e.getMessage(), e);
1264         throw new IOError(e.getMessage());
1265       }
1266     }
1267 
1268     @Override
1269     public void scannerClose(int id) throws IOError, IllegalArgument {
1270       LOG.debug("scannerClose: id=" + id);
1271       ResultScannerWrapper resultScannerWrapper = getScanner(id);
1272       if (resultScannerWrapper == null) {
1273         String message = "scanner ID is invalid";
1274         LOG.warn(message);
1275         throw new IllegalArgument("scanner ID is invalid");
1276       }
1277       resultScannerWrapper.getScanner().close();
1278       removeScanner(id);
1279     }
1280 
1281     @Override
1282     public List<TRowResult> scannerGetList(int id,int nbRows)
1283         throws IllegalArgument, IOError {
1284       LOG.debug("scannerGetList: id=" + id);
1285       ResultScannerWrapper resultScannerWrapper = getScanner(id);
1286       if (null == resultScannerWrapper) {
1287         String message = "scanner ID is invalid";
1288         LOG.warn(message);
1289         throw new IllegalArgument("scanner ID is invalid");
1290       }
1291 
1292       Result [] results = null;
1293       try {
1294         results = resultScannerWrapper.getScanner().next(nbRows);
1295         if (null == results) {
1296           return new ArrayList<TRowResult>();
1297         }
1298       } catch (IOException e) {
1299         LOG.warn(e.getMessage(), e);
1300         throw new IOError(e.getMessage());
1301       }
1302       return ThriftUtilities.rowResultFromHBase(results, resultScannerWrapper.isColumnSorted());
1303     }
1304 
1305     @Override
1306     public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
1307       return scannerGetList(id,1);
1308     }
1309 
1310     @Override
1311     public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
1312         Map<ByteBuffer, ByteBuffer> attributes)
1313         throws IOError {
1314       try {
1315         HTable table = getTable(tableName);
1316         Scan scan = new Scan();
1317         addAttributes(scan, attributes);
1318         if (tScan.isSetStartRow()) {
1319           scan.setStartRow(tScan.getStartRow());
1320         }
1321         if (tScan.isSetStopRow()) {
1322           scan.setStopRow(tScan.getStopRow());
1323         }
1324         if (tScan.isSetTimestamp()) {
1325           scan.setTimeRange(0, tScan.getTimestamp());
1326         }
1327         if (tScan.isSetCaching()) {
1328           scan.setCaching(tScan.getCaching());
1329         }
1330         if (tScan.isSetBatchSize()) {
1331           scan.setBatch(tScan.getBatchSize());
1332         }
1333         if (tScan.isSetColumns() && tScan.getColumns().size() != 0) {
1334           for(ByteBuffer column : tScan.getColumns()) {
1335             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1336             if(famQf.length == 1) {
1337               scan.addFamily(famQf[0]);
1338             } else {
1339               scan.addColumn(famQf[0], famQf[1]);
1340             }
1341           }
1342         }
1343         if (tScan.isSetFilterString()) {
1344           ParseFilter parseFilter = new ParseFilter();
1345           scan.setFilter(
1346               parseFilter.parseFilterString(tScan.getFilterString()));
1347         }
1348         if (tScan.isSetReversed()) {
1349           scan.setReversed(tScan.isReversed());
1350         }
1351         return addScanner(table.getScanner(scan), tScan.sortColumns);
1352       } catch (IOException e) {
1353         LOG.warn(e.getMessage(), e);
1354         throw new IOError(e.getMessage());
1355       }
1356     }
1357 
1358     @Override
1359     public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
1360         List<ByteBuffer> columns,
1361         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1362       try {
1363         HTable table = getTable(tableName);
1364         Scan scan = new Scan(getBytes(startRow));
1365         addAttributes(scan, attributes);
1366         if(columns != null && columns.size() != 0) {
1367           for(ByteBuffer column : columns) {
1368             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1369             if(famQf.length == 1) {
1370               scan.addFamily(famQf[0]);
1371             } else {
1372               scan.addColumn(famQf[0], famQf[1]);
1373             }
1374           }
1375         }
1376         return addScanner(table.getScanner(scan), false);
1377       } catch (IOException e) {
1378         LOG.warn(e.getMessage(), e);
1379         throw new IOError(e.getMessage());
1380       }
1381     }
1382 
1383     @Override
1384     public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow,
1385         ByteBuffer stopRow, List<ByteBuffer> columns,
1386         Map<ByteBuffer, ByteBuffer> attributes)
1387         throws IOError, TException {
1388       try {
1389         HTable table = getTable(tableName);
1390         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1391         addAttributes(scan, attributes);
1392         if(columns != null && columns.size() != 0) {
1393           for(ByteBuffer column : columns) {
1394             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1395             if(famQf.length == 1) {
1396               scan.addFamily(famQf[0]);
1397             } else {
1398               scan.addColumn(famQf[0], famQf[1]);
1399             }
1400           }
1401         }
1402         return addScanner(table.getScanner(scan), false);
1403       } catch (IOException e) {
1404         LOG.warn(e.getMessage(), e);
1405         throw new IOError(e.getMessage());
1406       }
1407     }
1408 
1409     @Override
1410     public int scannerOpenWithPrefix(ByteBuffer tableName,
1411                                      ByteBuffer startAndPrefix,
1412                                      List<ByteBuffer> columns,
1413         Map<ByteBuffer, ByteBuffer> attributes)
1414         throws IOError, TException {
1415       try {
1416         HTable table = getTable(tableName);
1417         Scan scan = new Scan(getBytes(startAndPrefix));
1418         addAttributes(scan, attributes);
1419         Filter f = new WhileMatchFilter(
1420             new PrefixFilter(getBytes(startAndPrefix)));
1421         scan.setFilter(f);
1422         if (columns != null && columns.size() != 0) {
1423           for(ByteBuffer column : columns) {
1424             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1425             if(famQf.length == 1) {
1426               scan.addFamily(famQf[0]);
1427             } else {
1428               scan.addColumn(famQf[0], famQf[1]);
1429             }
1430           }
1431         }
1432         return addScanner(table.getScanner(scan), false);
1433       } catch (IOException e) {
1434         LOG.warn(e.getMessage(), e);
1435         throw new IOError(e.getMessage());
1436       }
1437     }
1438 
1439     @Override
1440     public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
1441         List<ByteBuffer> columns, long timestamp,
1442         Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
1443       try {
1444         HTable table = getTable(tableName);
1445         Scan scan = new Scan(getBytes(startRow));
1446         addAttributes(scan, attributes);
1447         scan.setTimeRange(0, timestamp);
1448         if (columns != null && columns.size() != 0) {
1449           for (ByteBuffer column : columns) {
1450             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1451             if(famQf.length == 1) {
1452               scan.addFamily(famQf[0]);
1453             } else {
1454               scan.addColumn(famQf[0], famQf[1]);
1455             }
1456           }
1457         }
1458         return addScanner(table.getScanner(scan), false);
1459       } catch (IOException e) {
1460         LOG.warn(e.getMessage(), e);
1461         throw new IOError(e.getMessage());
1462       }
1463     }
1464 
1465     @Override
1466     public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow,
1467         ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
1468         Map<ByteBuffer, ByteBuffer> attributes)
1469         throws IOError, TException {
1470       try {
1471         HTable table = getTable(tableName);
1472         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1473         addAttributes(scan, attributes);
1474         scan.setTimeRange(0, timestamp);
1475         if (columns != null && columns.size() != 0) {
1476           for (ByteBuffer column : columns) {
1477             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1478             if(famQf.length == 1) {
1479               scan.addFamily(famQf[0]);
1480             } else {
1481               scan.addColumn(famQf[0], famQf[1]);
1482             }
1483           }
1484         }
1485         scan.setTimeRange(0, timestamp);
1486         return addScanner(table.getScanner(scan), false);
1487       } catch (IOException e) {
1488         LOG.warn(e.getMessage(), e);
1489         throw new IOError(e.getMessage());
1490       }
1491     }
1492 
1493     @Override
1494     public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
1495         ByteBuffer tableName) throws IOError, TException {
1496       try {
1497         TreeMap<ByteBuffer, ColumnDescriptor> columns =
1498           new TreeMap<ByteBuffer, ColumnDescriptor>();
1499 
1500         HTable table = getTable(tableName);
1501         HTableDescriptor desc = table.getTableDescriptor();
1502 
1503         for (HColumnDescriptor e : desc.getFamilies()) {
1504           ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
1505           columns.put(col.name, col);
1506         }
1507         return columns;
1508       } catch (IOException e) {
1509         LOG.warn(e.getMessage(), e);
1510         throw new IOError(e.getMessage());
1511       }
1512     }
1513 
1514     @Deprecated
1515     @Override
1516     public List<TCell> getRowOrBefore(ByteBuffer tableName, ByteBuffer row,
1517         ByteBuffer family) throws IOError {
1518       try {
1519         HTable table = getTable(getBytes(tableName));
1520         Result result = table.getRowOrBefore(getBytes(row), getBytes(family));
1521         return ThriftUtilities.cellFromHBase(result.rawCells());
1522       } catch (IOException e) {
1523         LOG.warn(e.getMessage(), e);
1524         throw new IOError(e.getMessage());
1525       }
1526     }
1527 
1528     @Override
1529     public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
1530       try {
1531         HTable table = getTable(TableName.META_TABLE_NAME.getName());
1532         byte[] row = getBytes(searchRow);
1533         Result startRowResult = table.getRowOrBefore(
1534           row, HConstants.CATALOG_FAMILY);
1535 
1536         if (startRowResult == null) {
1537           throw new IOException("Cannot find row in "+ TableName.META_TABLE_NAME+", row="
1538                                 + Bytes.toStringBinary(row));
1539         }
1540 
1541         // find region start and end keys
1542         HRegionInfo regionInfo = HRegionInfo.getHRegionInfo(startRowResult);
1543         if (regionInfo == null) {
1544           throw new IOException("HRegionInfo REGIONINFO was null or " +
1545                                 " empty in Meta for row="
1546                                 + Bytes.toStringBinary(row));
1547         }
1548         TRegionInfo region = new TRegionInfo();
1549         region.setStartKey(regionInfo.getStartKey());
1550         region.setEndKey(regionInfo.getEndKey());
1551         region.id = regionInfo.getRegionId();
1552         region.setName(regionInfo.getRegionName());
1553         region.version = HREGION_VERSION; // version not used anymore, PB encoding used.
1554 
1555         // find region assignment to server
1556         ServerName serverName = HRegionInfo.getServerName(startRowResult);
1557         if (serverName != null) {
1558           region.setServerName(Bytes.toBytes(serverName.getHostname()));
1559           region.port = serverName.getPort();
1560         }
1561         return region;
1562       } catch (IOException e) {
1563         LOG.warn(e.getMessage(), e);
1564         throw new IOError(e.getMessage());
1565       }
1566     }
1567 
1568     private void initMetrics(ThriftMetrics metrics) {
1569       this.metrics = metrics;
1570     }
1571 
1572     @Override
1573     public void increment(TIncrement tincrement) throws IOError, TException {
1574 
1575       if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) {
1576         throw new TException("Must supply a table and a row key; can't increment");
1577       }
1578 
1579       if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1580         this.coalescer.queueIncrement(tincrement);
1581         return;
1582       }
1583 
1584       try {
1585         HTable table = getTable(tincrement.getTable());
1586         Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
1587         table.increment(inc);
1588       } catch (IOException e) {
1589         LOG.warn(e.getMessage(), e);
1590         throw new IOError(e.getMessage());
1591       }
1592     }
1593 
1594     @Override
1595     public void incrementRows(List<TIncrement> tincrements) throws IOError, TException {
1596       if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1597         this.coalescer.queueIncrements(tincrements);
1598         return;
1599       }
1600       for (TIncrement tinc : tincrements) {
1601         increment(tinc);
1602       }
1603     }
1604 
1605     @Override
1606     public List<TCell> append(TAppend tappend) throws IOError, TException {
1607       if (tappend.getRow().length == 0 || tappend.getTable().length == 0) {
1608         throw new TException("Must supply a table and a row key; can't append");
1609       }
1610 
1611       try {
1612         HTable table = getTable(tappend.getTable());
1613         Append append = ThriftUtilities.appendFromThrift(tappend);
1614         Result result = table.append(append);
1615         return ThriftUtilities.cellFromHBase(result.rawCells());
1616       } catch (IOException e) {
1617         LOG.warn(e.getMessage(), e);
1618         throw new IOError(e.getMessage());
1619       }
1620     }
1621 
1622     @Override
1623     public boolean checkAndPut(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1624         ByteBuffer value, Mutation mput, Map<ByteBuffer, ByteBuffer> attributes) throws IOError,
1625         IllegalArgument, TException {
1626       Put put;
1627       try {
1628         put = new Put(getBytes(row), HConstants.LATEST_TIMESTAMP);
1629         addAttributes(put, attributes);
1630 
1631         byte[][] famAndQf = KeyValue.parseColumn(getBytes(mput.column));
1632 
1633         put.addImmutable(famAndQf[0], famAndQf[1], mput.value != null ? getBytes(mput.value)
1634             : HConstants.EMPTY_BYTE_ARRAY);
1635 
1636         put.setDurability(mput.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1637       } catch (IllegalArgumentException e) {
1638         LOG.warn(e.getMessage(), e);
1639         throw new IllegalArgument(e.getMessage());
1640       }
1641 
1642       HTable table = null;
1643       try {
1644         table = getTable(tableName);
1645         byte[][] famAndQf = KeyValue.parseColumn(getBytes(column));
1646         return table.checkAndPut(getBytes(row), famAndQf[0], famAndQf[1],
1647           value != null ? getBytes(value) : HConstants.EMPTY_BYTE_ARRAY, put);
1648       } catch (IOException e) {
1649         LOG.warn(e.getMessage(), e);
1650         throw new IOError(e.getMessage());
1651       } catch (IllegalArgumentException e) {
1652         LOG.warn(e.getMessage(), e);
1653         throw new IllegalArgument(e.getMessage());
1654       }
1655     }
1656   }
1657 
1658 
1659 
1660   /**
1661    * Adds all the attributes into the Operation object
1662    */
1663   private static void addAttributes(OperationWithAttributes op,
1664     Map<ByteBuffer, ByteBuffer> attributes) {
1665     if (attributes == null || attributes.size() == 0) {
1666       return;
1667     }
1668     for (Map.Entry<ByteBuffer, ByteBuffer> entry : attributes.entrySet()) {
1669       String name = Bytes.toStringBinary(getBytes(entry.getKey()));
1670       byte[] value =  getBytes(entry.getValue());
1671       op.setAttribute(name, value);
1672     }
1673   }
1674 
1675   public static void registerFilters(Configuration conf) {
1676     String[] filters = conf.getStrings("hbase.thrift.filters");
1677     if(filters != null) {
1678       for(String filterClass: filters) {
1679         String[] filterPart = filterClass.split(":");
1680         if(filterPart.length != 2) {
1681           LOG.warn("Invalid filter specification " + filterClass + " - skipping");
1682         } else {
1683           ParseFilter.registerFilter(filterPart[0], filterPart[1]);
1684         }
1685       }
1686     }
1687   }
1688 }