View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.thrift;
20  
21  import static org.apache.hadoop.hbase.util.Bytes.getBytes;
22  
23  import java.io.IOException;
24  import java.net.InetAddress;
25  import java.net.InetSocketAddress;
26  import java.net.UnknownHostException;
27  import java.nio.ByteBuffer;
28  import java.security.PrivilegedAction;
29  import java.util.ArrayList;
30  import java.util.Arrays;
31  import java.util.Collections;
32  import java.util.HashMap;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.TreeMap;
36  import java.util.concurrent.BlockingQueue;
37  import java.util.concurrent.ExecutorService;
38  import java.util.concurrent.LinkedBlockingQueue;
39  import java.util.concurrent.ThreadPoolExecutor;
40  import java.util.concurrent.TimeUnit;
41  
42  import javax.security.auth.callback.Callback;
43  import javax.security.auth.callback.UnsupportedCallbackException;
44  import javax.security.sasl.AuthorizeCallback;
45  import javax.security.sasl.Sasl;
46  import javax.security.sasl.SaslServer;
47  
48  import org.apache.commons.cli.CommandLine;
49  import org.apache.commons.cli.Option;
50  import org.apache.commons.cli.OptionGroup;
51  import org.apache.commons.logging.Log;
52  import org.apache.commons.logging.LogFactory;
53  import org.apache.hadoop.classification.InterfaceAudience;
54  import org.apache.hadoop.conf.Configuration;
55  import org.apache.hadoop.hbase.HBaseConfiguration;
56  import org.apache.hadoop.hbase.HColumnDescriptor;
57  import org.apache.hadoop.hbase.HConstants;
58  import org.apache.hadoop.hbase.HRegionInfo;
59  import org.apache.hadoop.hbase.HTableDescriptor;
60  import org.apache.hadoop.hbase.KeyValue;
61  import org.apache.hadoop.hbase.ServerName;
62  import org.apache.hadoop.hbase.TableName;
63  import org.apache.hadoop.hbase.TableNotFoundException;
64  import org.apache.hadoop.hbase.client.Append;
65  import org.apache.hadoop.hbase.client.Delete;
66  import org.apache.hadoop.hbase.client.Durability;
67  import org.apache.hadoop.hbase.client.Get;
68  import org.apache.hadoop.hbase.client.HBaseAdmin;
69  import org.apache.hadoop.hbase.client.HTable;
70  import org.apache.hadoop.hbase.client.Increment;
71  import org.apache.hadoop.hbase.client.OperationWithAttributes;
72  import org.apache.hadoop.hbase.client.Put;
73  import org.apache.hadoop.hbase.client.Result;
74  import org.apache.hadoop.hbase.client.ResultScanner;
75  import org.apache.hadoop.hbase.client.Scan;
76  import org.apache.hadoop.hbase.filter.Filter;
77  import org.apache.hadoop.hbase.filter.ParseFilter;
78  import org.apache.hadoop.hbase.filter.PrefixFilter;
79  import org.apache.hadoop.hbase.filter.WhileMatchFilter;
80  import org.apache.hadoop.hbase.security.SecurityUtil;
81  import org.apache.hadoop.hbase.security.UserProvider;
82  import org.apache.hadoop.hbase.thrift.CallQueue.Call;
83  import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
84  import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
85  import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
86  import org.apache.hadoop.hbase.thrift.generated.Hbase;
87  import org.apache.hadoop.hbase.thrift.generated.IOError;
88  import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
89  import org.apache.hadoop.hbase.thrift.generated.Mutation;
90  import org.apache.hadoop.hbase.thrift.generated.TAppend;
91  import org.apache.hadoop.hbase.thrift.generated.TCell;
92  import org.apache.hadoop.hbase.thrift.generated.TIncrement;
93  import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
94  import org.apache.hadoop.hbase.thrift.generated.TRowResult;
95  import org.apache.hadoop.hbase.thrift.generated.TScan;
96  import org.apache.hadoop.hbase.util.Bytes;
97  import org.apache.hadoop.hbase.util.ConnectionCache;
98  import org.apache.hadoop.hbase.util.Strings;
99  import org.apache.hadoop.net.DNS;
100 import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
101 import org.apache.hadoop.security.UserGroupInformation;
102 import org.apache.thrift.TException;
103 import org.apache.thrift.TProcessor;
104 import org.apache.thrift.protocol.TBinaryProtocol;
105 import org.apache.thrift.protocol.TCompactProtocol;
106 import org.apache.thrift.protocol.TProtocol;
107 import org.apache.thrift.protocol.TProtocolFactory;
108 import org.apache.thrift.server.THsHaServer;
109 import org.apache.thrift.server.TNonblockingServer;
110 import org.apache.thrift.server.TServer;
111 import org.apache.thrift.server.TThreadedSelectorServer;
112 import org.apache.thrift.transport.TFramedTransport;
113 import org.apache.thrift.transport.TNonblockingServerSocket;
114 import org.apache.thrift.transport.TNonblockingServerTransport;
115 import org.apache.thrift.transport.TSaslServerTransport;
116 import org.apache.thrift.transport.TServerSocket;
117 import org.apache.thrift.transport.TServerTransport;
118 import org.apache.thrift.transport.TTransportFactory;
119 
120 import com.google.common.base.Joiner;
121 import com.google.common.util.concurrent.ThreadFactoryBuilder;
122 
123 /**
124  * ThriftServerRunner - this class starts up a Thrift server which implements
125  * the Hbase API specified in the Hbase.thrift IDL file.
126  */
127 @InterfaceAudience.Private
128 @SuppressWarnings("deprecation")
129 public class ThriftServerRunner implements Runnable {
130 
131   private static final Log LOG = LogFactory.getLog(ThriftServerRunner.class);
132 
133   static final String SERVER_TYPE_CONF_KEY =
134       "hbase.regionserver.thrift.server.type";
135 
136   static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress";
137   static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
138   static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
139   static final String MAX_FRAME_SIZE_CONF_KEY = "hbase.regionserver.thrift.framed.max_frame_size_in_mb";
140   static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
141   static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
142 
143   /**
144    * Thrift quality of protection configuration key. Valid values can be:
145    * auth-conf: authentication, integrity and confidentiality checking
146    * auth-int: authentication and integrity checking
147    * auth: authentication only
148    *
149    * This is used to authenticate the callers and support impersonation.
150    * The thrift server and the HBase cluster must run in secure mode.
151    */
152   static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
153 
154   private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
155   public static final int DEFAULT_LISTEN_PORT = 9090;
156   public static final int HREGION_VERSION = 1;
157   private final int listenPort;
158 
159   private Configuration conf;
160   volatile TServer tserver;
161   private final Hbase.Iface handler;
162   private final ThriftMetrics metrics;
163   private final HBaseHandler hbaseHandler;
164   private final UserGroupInformation realUser;
165 
166   private final String qop;
167   private String host;
168 
169   /** An enum of server implementation selections */
170   enum ImplType {
171     HS_HA("hsha", true, THsHaServer.class, true),
172     NONBLOCKING("nonblocking", true, TNonblockingServer.class, true),
173     THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true),
174     THREADED_SELECTOR(
175         "threadedselector", true, TThreadedSelectorServer.class, true);
176 
177     public static final ImplType DEFAULT = THREAD_POOL;
178 
179     final String option;
180     final boolean isAlwaysFramed;
181     final Class<? extends TServer> serverClass;
182     final boolean canSpecifyBindIP;
183 
184     ImplType(String option, boolean isAlwaysFramed,
185         Class<? extends TServer> serverClass, boolean canSpecifyBindIP) {
186       this.option = option;
187       this.isAlwaysFramed = isAlwaysFramed;
188       this.serverClass = serverClass;
189       this.canSpecifyBindIP = canSpecifyBindIP;
190     }
191 
192     /**
193      * @return <code>-option</code> so we can get the list of options from
194      *         {@link #values()}
195      */
196     @Override
197     public String toString() {
198       return "-" + option;
199     }
200 
201     String getDescription() {
202       StringBuilder sb = new StringBuilder("Use the " +
203           serverClass.getSimpleName());
204       if (isAlwaysFramed) {
205         sb.append(" This implies the framed transport.");
206       }
207       if (this == DEFAULT) {
208         sb.append("This is the default.");
209       }
210       return sb.toString();
211     }
212 
213     static OptionGroup createOptionGroup() {
214       OptionGroup group = new OptionGroup();
215       for (ImplType t : values()) {
216         group.addOption(new Option(t.option, t.getDescription()));
217       }
218       return group;
219     }
220 
221     static ImplType getServerImpl(Configuration conf) {
222       String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option);
223       for (ImplType t : values()) {
224         if (confType.equals(t.option)) {
225           return t;
226         }
227       }
228       throw new AssertionError("Unknown server ImplType.option:" + confType);
229     }
230 
231     static void setServerImpl(CommandLine cmd, Configuration conf) {
232       ImplType chosenType = null;
233       int numChosen = 0;
234       for (ImplType t : values()) {
235         if (cmd.hasOption(t.option)) {
236           chosenType = t;
237           ++numChosen;
238         }
239       }
240       if (numChosen < 1) {
241         LOG.info("Using default thrift server type");
242         chosenType = DEFAULT;
243       } else if (numChosen > 1) {
244         throw new AssertionError("Exactly one option out of " +
245           Arrays.toString(values()) + " has to be specified");
246       }
247       LOG.info("Using thrift server type " + chosenType.option);
248       conf.set(SERVER_TYPE_CONF_KEY, chosenType.option);
249     }
250 
251     public String simpleClassName() {
252       return serverClass.getSimpleName();
253     }
254 
255     public static List<String> serversThatCannotSpecifyBindIP() {
256       List<String> l = new ArrayList<String>();
257       for (ImplType t : values()) {
258         if (!t.canSpecifyBindIP) {
259           l.add(t.simpleClassName());
260         }
261       }
262       return l;
263     }
264 
265   }
266 
267   public ThriftServerRunner(Configuration conf) throws IOException {
268     UserProvider userProvider = UserProvider.instantiate(conf);
269     // login the server principal (if using secure Hadoop)
270     boolean securityEnabled = userProvider.isHadoopSecurityEnabled()
271       && userProvider.isHBaseSecurityEnabled();
272     if (securityEnabled) {
273       host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
274         conf.get("hbase.thrift.dns.interface", "default"),
275         conf.get("hbase.thrift.dns.nameserver", "default")));
276       userProvider.login("hbase.thrift.keytab.file",
277         "hbase.thrift.kerberos.principal", host);
278     }
279     this.conf = HBaseConfiguration.create(conf);
280     this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
281     this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
282     this.hbaseHandler = new HBaseHandler(conf, userProvider);
283     this.hbaseHandler.initMetrics(metrics);
284     this.handler = HbaseHandlerMetricsProxy.newInstance(
285       hbaseHandler, metrics, conf);
286     this.realUser = userProvider.getCurrent().getUGI();
287     qop = conf.get(THRIFT_QOP_KEY);
288     if (qop != null) {
289       if (!qop.equals("auth") && !qop.equals("auth-int")
290           && !qop.equals("auth-conf")) {
291         throw new IOException("Invalid " + THRIFT_QOP_KEY + ": " + qop
292           + ", it must be 'auth', 'auth-int', or 'auth-conf'");
293       }
294       if (!securityEnabled) {
295         throw new IOException("Thrift server must"
296           + " run in secure mode to support authentication");
297       }
298     }
299   }
300 
301   /*
302    * Runs the Thrift server
303    */
304   @Override
305   public void run() {
306     realUser.doAs(
307       new PrivilegedAction<Object>() {
308         @Override
309         public Object run() {
310           try {
311             setupServer();
312             tserver.serve();
313           } catch (Exception e) {
314             LOG.fatal("Cannot run ThriftServer", e);
315             // Crash the process if the ThriftServer is not running
316             System.exit(-1);
317           }
318           return null;
319         }
320       });
321   }
322 
323   public void shutdown() {
324     if (tserver != null) {
325       tserver.stop();
326       tserver = null;
327     }
328   }
329 
330   /**
331    * Setting up the thrift TServer
332    */
333   private void setupServer() throws Exception {
334     // Construct correct ProtocolFactory
335     TProtocolFactory protocolFactory;
336     if (conf.getBoolean(COMPACT_CONF_KEY, false)) {
337       LOG.debug("Using compact protocol");
338       protocolFactory = new TCompactProtocol.Factory();
339     } else {
340       LOG.debug("Using binary protocol");
341       protocolFactory = new TBinaryProtocol.Factory();
342     }
343 
344     final TProcessor p = new Hbase.Processor<Hbase.Iface>(handler);
345     ImplType implType = ImplType.getServerImpl(conf);
346     TProcessor processor = p;
347 
348     // Construct correct TransportFactory
349     TTransportFactory transportFactory;
350     if (conf.getBoolean(FRAMED_CONF_KEY, false) || implType.isAlwaysFramed) {
351       if (qop != null) {
352         throw new RuntimeException("Thrift server authentication"
353           + " doesn't work with framed transport yet");
354       }
355       transportFactory = new TFramedTransport.Factory(
356           conf.getInt(MAX_FRAME_SIZE_CONF_KEY, 2)  * 1024 * 1024);
357       LOG.debug("Using framed transport");
358     } else if (qop == null) {
359       transportFactory = new TTransportFactory();
360     } else {
361       // Extract the name from the principal
362       String name = SecurityUtil.getUserFromPrincipal(
363         conf.get("hbase.thrift.kerberos.principal"));
364       Map<String, String> saslProperties = new HashMap<String, String>();
365       saslProperties.put(Sasl.QOP, qop);
366       TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
367       saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
368         new SaslGssCallbackHandler() {
369           @Override
370           public void handle(Callback[] callbacks)
371               throws UnsupportedCallbackException {
372             AuthorizeCallback ac = null;
373             for (Callback callback : callbacks) {
374               if (callback instanceof AuthorizeCallback) {
375                 ac = (AuthorizeCallback) callback;
376               } else {
377                 throw new UnsupportedCallbackException(callback,
378                     "Unrecognized SASL GSSAPI Callback");
379               }
380             }
381             if (ac != null) {
382               String authid = ac.getAuthenticationID();
383               String authzid = ac.getAuthorizationID();
384               if (!authid.equals(authzid)) {
385                 ac.setAuthorized(false);
386               } else {
387                 ac.setAuthorized(true);
388                 String userName = SecurityUtil.getUserFromPrincipal(authzid);
389                 LOG.info("Effective user: " + userName);
390                 ac.setAuthorizedID(userName);
391               }
392             }
393           }
394         });
395       transportFactory = saslFactory;
396 
397       // Create a processor wrapper, to get the caller
398       processor = new TProcessor() {
399         @Override
400         public boolean process(TProtocol inProt,
401             TProtocol outProt) throws TException {
402           TSaslServerTransport saslServerTransport =
403             (TSaslServerTransport)inProt.getTransport();
404           SaslServer saslServer = saslServerTransport.getSaslServer();
405           String principal = saslServer.getAuthorizationID();
406           hbaseHandler.setEffectiveUser(principal);
407           return p.process(inProt, outProt);
408         }
409       };
410     }
411 
412     if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
413       LOG.error("Server types " + Joiner.on(", ").join(
414           ImplType.serversThatCannotSpecifyBindIP()) + " don't support IP " +
415           "address binding at the moment. See " +
416           "https://issues.apache.org/jira/browse/HBASE-2155 for details.");
417       throw new RuntimeException(
418           "-" + BIND_CONF_KEY + " not supported with " + implType);
419     }
420 
421     if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING ||
422         implType == ImplType.THREADED_SELECTOR) {
423 
424       InetAddress listenAddress = getBindAddress(conf);
425       TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(
426           new InetSocketAddress(listenAddress, listenPort));
427 
428       if (implType == ImplType.NONBLOCKING) {
429         TNonblockingServer.Args serverArgs =
430             new TNonblockingServer.Args(serverTransport);
431         serverArgs.processor(processor)
432                   .transportFactory(transportFactory)
433                   .protocolFactory(protocolFactory);
434         tserver = new TNonblockingServer(serverArgs);
435       } else if (implType == ImplType.HS_HA) {
436         THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
437         CallQueue callQueue =
438             new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
439         ExecutorService executorService = createExecutor(
440             callQueue, serverArgs.getWorkerThreads());
441         serverArgs.executorService(executorService)
442                   .processor(processor)
443                   .transportFactory(transportFactory)
444                   .protocolFactory(protocolFactory);
445         tserver = new THsHaServer(serverArgs);
446       } else { // THREADED_SELECTOR
447         TThreadedSelectorServer.Args serverArgs =
448             new HThreadedSelectorServerArgs(serverTransport, conf);
449         CallQueue callQueue =
450             new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
451         ExecutorService executorService = createExecutor(
452             callQueue, serverArgs.getWorkerThreads());
453         serverArgs.executorService(executorService)
454                   .processor(processor)
455                   .transportFactory(transportFactory)
456                   .protocolFactory(protocolFactory);
457         tserver = new TThreadedSelectorServer(serverArgs);
458       }
459       LOG.info("starting HBase " + implType.simpleClassName() +
460           " server on " + Integer.toString(listenPort));
461     } else if (implType == ImplType.THREAD_POOL) {
462       // Thread pool server. Get the IP address to bind to.
463       InetAddress listenAddress = getBindAddress(conf);
464 
465       TServerTransport serverTransport = new TServerSocket(
466           new InetSocketAddress(listenAddress, listenPort));
467 
468       TBoundedThreadPoolServer.Args serverArgs =
469           new TBoundedThreadPoolServer.Args(serverTransport, conf);
470       serverArgs.processor(processor)
471                 .transportFactory(transportFactory)
472                 .protocolFactory(protocolFactory);
473       LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
474           + listenAddress + ":" + Integer.toString(listenPort)
475           + "; " + serverArgs);
476       TBoundedThreadPoolServer tserver =
477           new TBoundedThreadPoolServer(serverArgs, metrics);
478       this.tserver = tserver;
479     } else {
480       throw new AssertionError("Unsupported Thrift server implementation: " +
481           implType.simpleClassName());
482     }
483 
484     // A sanity check that we instantiated the right type of server.
485     if (tserver.getClass() != implType.serverClass) {
486       throw new AssertionError("Expected to create Thrift server class " +
487           implType.serverClass.getName() + " but got " +
488           tserver.getClass().getName());
489     }
490 
491 
492 
493     registerFilters(conf);
494   }
495 
496   ExecutorService createExecutor(BlockingQueue<Runnable> callQueue,
497                                  int workerThreads) {
498     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
499     tfb.setDaemon(true);
500     tfb.setNameFormat("thrift-worker-%d");
501     return new ThreadPoolExecutor(workerThreads, workerThreads,
502             Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
503   }
504 
505   private InetAddress getBindAddress(Configuration conf)
506       throws UnknownHostException {
507     String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
508     return InetAddress.getByName(bindAddressStr);
509   }
510 
511   protected static class ResultScannerWrapper {
512 
513     private final ResultScanner scanner;
514     private final boolean sortColumns;
515     public ResultScannerWrapper(ResultScanner resultScanner,
516                                 boolean sortResultColumns) {
517       scanner = resultScanner;
518       sortColumns = sortResultColumns;
519    }
520 
521     public ResultScanner getScanner() {
522       return scanner;
523     }
524 
525     public boolean isColumnSorted() {
526       return sortColumns;
527     }
528   }
529 
530   /**
531    * The HBaseHandler is a glue object that connects Thrift RPC calls to the
532    * HBase client API primarily defined in the HBaseAdmin and HTable objects.
533    */
534   public static class HBaseHandler implements Hbase.Iface {
535     protected Configuration conf;
536     protected final Log LOG = LogFactory.getLog(this.getClass().getName());
537 
538     // nextScannerId and scannerMap are used to manage scanner state
539     protected int nextScannerId = 0;
540     protected HashMap<Integer, ResultScannerWrapper> scannerMap = null;
541     private ThriftMetrics metrics = null;
542 
543     private final ConnectionCache connectionCache;
544 
545     private static ThreadLocal<Map<String, HTable>> threadLocalTables =
546         new ThreadLocal<Map<String, HTable>>() {
547       @Override
548       protected Map<String, HTable> initialValue() {
549         return new TreeMap<String, HTable>();
550       }
551     };
552 
553     IncrementCoalescer coalescer = null;
554 
555     static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
556     static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
557 
558     /**
559      * Returns a list of all the column families for a given htable.
560      *
561      * @param table
562      * @throws IOException
563      */
564     byte[][] getAllColumns(HTable table) throws IOException {
565       HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies();
566       byte[][] columns = new byte[cds.length][];
567       for (int i = 0; i < cds.length; i++) {
568         columns[i] = Bytes.add(cds[i].getName(),
569             KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
570       }
571       return columns;
572     }
573 
574     /**
575      * Creates and returns an HTable instance from a given table name.
576      *
577      * @param tableName
578      *          name of table
579      * @return HTable object
580      * @throws IOException
581      * @throws IOError
582      */
583     public HTable getTable(final byte[] tableName) throws
584         IOException {
585       String table = Bytes.toString(tableName);
586       Map<String, HTable> tables = threadLocalTables.get();
587       if (!tables.containsKey(table)) {
588         tables.put(table, (HTable)connectionCache.getTable(table));
589       }
590       return tables.get(table);
591     }
592 
593     public HTable getTable(final ByteBuffer tableName) throws IOException {
594       return getTable(getBytes(tableName));
595     }
596 
597     /**
598      * Assigns a unique ID to the scanner and adds the mapping to an internal
599      * hash-map.
600      *
601      * @param scanner
602      * @return integer scanner id
603      */
604     protected synchronized int addScanner(ResultScanner scanner,boolean sortColumns) {
605       int id = nextScannerId++;
606       ResultScannerWrapper resultScannerWrapper = new ResultScannerWrapper(scanner, sortColumns);
607       scannerMap.put(id, resultScannerWrapper);
608       return id;
609     }
610 
611     /**
612      * Returns the scanner associated with the specified ID.
613      *
614      * @param id
615      * @return a Scanner, or null if ID was invalid.
616      */
617     protected synchronized ResultScannerWrapper getScanner(int id) {
618       return scannerMap.get(id);
619     }
620 
621     /**
622      * Removes the scanner associated with the specified ID from the internal
623      * id->scanner hash-map.
624      *
625      * @param id
626      * @return a Scanner, or null if ID was invalid.
627      */
628     protected synchronized ResultScannerWrapper removeScanner(int id) {
629       return scannerMap.remove(id);
630     }
631 
632     protected HBaseHandler(final Configuration c,
633         final UserProvider userProvider) throws IOException {
634       this.conf = c;
635       scannerMap = new HashMap<Integer, ResultScannerWrapper>();
636       this.coalescer = new IncrementCoalescer(this);
637 
638       int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
639       int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
640       connectionCache = new ConnectionCache(
641         conf, userProvider, cleanInterval, maxIdleTime);
642     }
643 
644     /**
645      * Obtain HBaseAdmin. Creates the instance if it is not already created.
646      */
647     private HBaseAdmin getHBaseAdmin() throws IOException {
648       return connectionCache.getAdmin();
649     }
650 
651     void setEffectiveUser(String effectiveUser) {
652       connectionCache.setEffectiveUser(effectiveUser);
653     }
654 
655     @Override
656     public void enableTable(ByteBuffer tableName) throws IOError {
657       try{
658         getHBaseAdmin().enableTable(getBytes(tableName));
659       } catch (IOException e) {
660         LOG.warn(e.getMessage(), e);
661         throw new IOError(e.getMessage());
662       }
663     }
664 
665     @Override
666     public void disableTable(ByteBuffer tableName) throws IOError{
667       try{
668         getHBaseAdmin().disableTable(getBytes(tableName));
669       } catch (IOException e) {
670         LOG.warn(e.getMessage(), e);
671         throw new IOError(e.getMessage());
672       }
673     }
674 
675     @Override
676     public boolean isTableEnabled(ByteBuffer tableName) throws IOError {
677       try {
678         return HTable.isTableEnabled(this.conf, getBytes(tableName));
679       } catch (IOException e) {
680         LOG.warn(e.getMessage(), e);
681         throw new IOError(e.getMessage());
682       }
683     }
684 
685     @Override
686     public void compact(ByteBuffer tableNameOrRegionName) throws IOError {
687       try{
688         getHBaseAdmin().compact(getBytes(tableNameOrRegionName));
689       } catch (InterruptedException e) {
690         throw new IOError(e.getMessage());
691       } catch (IOException e) {
692         LOG.warn(e.getMessage(), e);
693         throw new IOError(e.getMessage());
694       }
695     }
696 
697     @Override
698     public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError {
699       try{
700         getHBaseAdmin().majorCompact(getBytes(tableNameOrRegionName));
701       } catch (InterruptedException e) {
702         LOG.warn(e.getMessage(), e);
703         throw new IOError(e.getMessage());
704       } catch (IOException e) {
705         LOG.warn(e.getMessage(), e);
706         throw new IOError(e.getMessage());
707       }
708     }
709 
710     @Override
711     public List<ByteBuffer> getTableNames() throws IOError {
712       try {
713         TableName[] tableNames = this.getHBaseAdmin().listTableNames();
714         ArrayList<ByteBuffer> list = new ArrayList<ByteBuffer>(tableNames.length);
715         for (int i = 0; i < tableNames.length; i++) {
716           list.add(ByteBuffer.wrap(tableNames[i].getName()));
717         }
718         return list;
719       } catch (IOException e) {
720         LOG.warn(e.getMessage(), e);
721         throw new IOError(e.getMessage());
722       }
723     }
724 
725     /**
726      * @return the list of regions in the given table, or an empty list if the table does not exist
727      */
728     @Override
729     public List<TRegionInfo> getTableRegions(ByteBuffer tableName)
730     throws IOError {
731       try {
732         HTable table;
733         try {
734           table = getTable(tableName);
735         } catch (TableNotFoundException ex) {
736           return new ArrayList<TRegionInfo>();
737         }
738         Map<HRegionInfo, ServerName> regionLocations =
739             table.getRegionLocations();
740         List<TRegionInfo> results = new ArrayList<TRegionInfo>();
741         for (Map.Entry<HRegionInfo, ServerName> entry :
742             regionLocations.entrySet()) {
743           HRegionInfo info = entry.getKey();
744           ServerName serverName = entry.getValue();
745           TRegionInfo region = new TRegionInfo();
746           region.serverName = ByteBuffer.wrap(
747               Bytes.toBytes(serverName.getHostname()));
748           region.port = serverName.getPort();
749           region.startKey = ByteBuffer.wrap(info.getStartKey());
750           region.endKey = ByteBuffer.wrap(info.getEndKey());
751           region.id = info.getRegionId();
752           region.name = ByteBuffer.wrap(info.getRegionName());
753           region.version = HREGION_VERSION; // HRegion now not versioned, PB encoding used
754           results.add(region);
755         }
756         return results;
757       } catch (TableNotFoundException e) {
758         // Return empty list for non-existing table
759         return Collections.emptyList();
760       } catch (IOException e){
761         LOG.warn(e.getMessage(), e);
762         throw new IOError(e.getMessage());
763       }
764     }
765 
766     @Deprecated
767     @Override
768     public List<TCell> get(
769         ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
770         Map<ByteBuffer, ByteBuffer> attributes)
771         throws IOError {
772       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
773       if (famAndQf.length == 1) {
774         return get(tableName, row, famAndQf[0], null, attributes);
775       }
776       if (famAndQf.length == 2) {
777         return get(tableName, row, famAndQf[0], famAndQf[1], attributes);
778       }
779       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
780     }
781 
782     /**
783      * Note: this internal interface is slightly different from public APIs in regard to handling
784      * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
785      * we respect qual == null as a request for the entire column family. The caller (
786      * {@link #get(ByteBuffer, ByteBuffer, ByteBuffer, Map)}) interface IS consistent in that the
787      * column is parse like normal.
788      */
789     protected List<TCell> get(ByteBuffer tableName,
790                               ByteBuffer row,
791                               byte[] family,
792                               byte[] qualifier,
793                               Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
794       try {
795         HTable table = getTable(tableName);
796         Get get = new Get(getBytes(row));
797         addAttributes(get, attributes);
798         if (qualifier == null) {
799           get.addFamily(family);
800         } else {
801           get.addColumn(family, qualifier);
802         }
803         Result result = table.get(get);
804         return ThriftUtilities.cellFromHBase(result.rawCells());
805       } catch (IOException e) {
806         LOG.warn(e.getMessage(), e);
807         throw new IOError(e.getMessage());
808       }
809     }
810 
811     @Deprecated
812     @Override
813     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
814         int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
815       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
816       if(famAndQf.length == 1) {
817         return getVer(tableName, row, famAndQf[0], null, numVersions, attributes);
818       }
819       if (famAndQf.length == 2) {
820         return getVer(tableName, row, famAndQf[0], famAndQf[1], numVersions, attributes);
821       }
822       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
823 
824     }
825 
826     /**
827      * Note: this public interface is slightly different from public Java APIs in regard to
828      * handling of the qualifier. Here we differ from the public Java API in that null != byte[0].
829      * Rather, we respect qual == null as a request for the entire column family. If you want to
830      * access the entire column family, use
831      * {@link #getVer(ByteBuffer, ByteBuffer, ByteBuffer, int, Map)} with a {@code column} value
832      * that lacks a {@code ':'}.
833      */
834     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family,
835         byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
836       try {
837         HTable table = getTable(tableName);
838         Get get = new Get(getBytes(row));
839         addAttributes(get, attributes);
840         if (null == qualifier) {
841           get.addFamily(family);
842         } else {
843           get.addColumn(family, qualifier);
844         }
845         get.setMaxVersions(numVersions);
846         Result result = table.get(get);
847         return ThriftUtilities.cellFromHBase(result.rawCells());
848       } catch (IOException e) {
849         LOG.warn(e.getMessage(), e);
850         throw new IOError(e.getMessage());
851       }
852     }
853 
854     @Deprecated
855     @Override
856     public List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
857         long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
858       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
859       if (famAndQf.length == 1) {
860         return getVerTs(tableName, row, famAndQf[0], null, timestamp, numVersions, attributes);
861       }
862       if (famAndQf.length == 2) {
863         return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, numVersions,
864           attributes);
865       }
866       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
867     }
868 
869     /**
870      * Note: this internal interface is slightly different from public APIs in regard to handling
871      * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
872      * we respect qual == null as a request for the entire column family. The caller (
873      * {@link #getVerTs(ByteBuffer, ByteBuffer, ByteBuffer, long, int, Map)}) interface IS
874      * consistent in that the column is parse like normal.
875      */
876     protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
877         byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
878         throws IOError {
879       try {
880         HTable table = getTable(tableName);
881         Get get = new Get(getBytes(row));
882         addAttributes(get, attributes);
883         if (null == qualifier) {
884           get.addFamily(family);
885         } else {
886           get.addColumn(family, qualifier);
887         }
888         get.setTimeRange(0, timestamp);
889         get.setMaxVersions(numVersions);
890         Result result = table.get(get);
891         return ThriftUtilities.cellFromHBase(result.rawCells());
892       } catch (IOException e) {
893         LOG.warn(e.getMessage(), e);
894         throw new IOError(e.getMessage());
895       }
896     }
897 
898     @Override
899     public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row,
900         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
901       return getRowWithColumnsTs(tableName, row, null,
902                                  HConstants.LATEST_TIMESTAMP,
903                                  attributes);
904     }
905 
906     @Override
907     public List<TRowResult> getRowWithColumns(ByteBuffer tableName,
908                                               ByteBuffer row,
909         List<ByteBuffer> columns,
910         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
911       return getRowWithColumnsTs(tableName, row, columns,
912                                  HConstants.LATEST_TIMESTAMP,
913                                  attributes);
914     }
915 
916     @Override
917     public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row,
918         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
919       return getRowWithColumnsTs(tableName, row, null,
920                                  timestamp, attributes);
921     }
922 
923     @Override
924     public List<TRowResult> getRowWithColumnsTs(
925         ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
926         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
927       try {
928         HTable table = getTable(tableName);
929         if (columns == null) {
930           Get get = new Get(getBytes(row));
931           addAttributes(get, attributes);
932           get.setTimeRange(0, timestamp);
933           Result result = table.get(get);
934           return ThriftUtilities.rowResultFromHBase(result);
935         }
936         Get get = new Get(getBytes(row));
937         addAttributes(get, attributes);
938         for(ByteBuffer column : columns) {
939           byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
940           if (famAndQf.length == 1) {
941               get.addFamily(famAndQf[0]);
942           } else {
943               get.addColumn(famAndQf[0], famAndQf[1]);
944           }
945         }
946         get.setTimeRange(0, timestamp);
947         Result result = table.get(get);
948         return ThriftUtilities.rowResultFromHBase(result);
949       } catch (IOException e) {
950         LOG.warn(e.getMessage(), e);
951         throw new IOError(e.getMessage());
952       }
953     }
954 
955     @Override
956     public List<TRowResult> getRows(ByteBuffer tableName,
957                                     List<ByteBuffer> rows,
958         Map<ByteBuffer, ByteBuffer> attributes)
959         throws IOError {
960       return getRowsWithColumnsTs(tableName, rows, null,
961                                   HConstants.LATEST_TIMESTAMP,
962                                   attributes);
963     }
964 
965     @Override
966     public List<TRowResult> getRowsWithColumns(ByteBuffer tableName,
967                                                List<ByteBuffer> rows,
968         List<ByteBuffer> columns,
969         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
970       return getRowsWithColumnsTs(tableName, rows, columns,
971                                   HConstants.LATEST_TIMESTAMP,
972                                   attributes);
973     }
974 
975     @Override
976     public List<TRowResult> getRowsTs(ByteBuffer tableName,
977                                       List<ByteBuffer> rows,
978         long timestamp,
979         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
980       return getRowsWithColumnsTs(tableName, rows, null,
981                                   timestamp, attributes);
982     }
983 
984     @Override
985     public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName,
986                                                  List<ByteBuffer> rows,
987         List<ByteBuffer> columns, long timestamp,
988         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
989       try {
990         List<Get> gets = new ArrayList<Get>(rows.size());
991         HTable table = getTable(tableName);
992         if (metrics != null) {
993           metrics.incNumRowKeysInBatchGet(rows.size());
994         }
995         for (ByteBuffer row : rows) {
996           Get get = new Get(getBytes(row));
997           addAttributes(get, attributes);
998           if (columns != null) {
999 
1000             for(ByteBuffer column : columns) {
1001               byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1002               if (famAndQf.length == 1) {
1003                 get.addFamily(famAndQf[0]);
1004               } else {
1005                 get.addColumn(famAndQf[0], famAndQf[1]);
1006               }
1007             }
1008           }
1009           get.setTimeRange(0, timestamp);
1010           gets.add(get);
1011         }
1012         Result[] result = table.get(gets);
1013         return ThriftUtilities.rowResultFromHBase(result);
1014       } catch (IOException e) {
1015         LOG.warn(e.getMessage(), e);
1016         throw new IOError(e.getMessage());
1017       }
1018     }
1019 
1020     @Override
1021     public void deleteAll(
1022         ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1023         Map<ByteBuffer, ByteBuffer> attributes)
1024         throws IOError {
1025       deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP,
1026                   attributes);
1027     }
1028 
1029     @Override
1030     public void deleteAllTs(ByteBuffer tableName,
1031                             ByteBuffer row,
1032                             ByteBuffer column,
1033         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1034       try {
1035         HTable table = getTable(tableName);
1036         Delete delete  = new Delete(getBytes(row));
1037         addAttributes(delete, attributes);
1038         byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1039         if (famAndQf.length == 1) {
1040           delete.deleteFamily(famAndQf[0], timestamp);
1041         } else {
1042           delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1043         }
1044         table.delete(delete);
1045 
1046       } catch (IOException e) {
1047         LOG.warn(e.getMessage(), e);
1048         throw new IOError(e.getMessage());
1049       }
1050     }
1051 
1052     @Override
1053     public void deleteAllRow(
1054         ByteBuffer tableName, ByteBuffer row,
1055         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1056       deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, attributes);
1057     }
1058 
1059     @Override
1060     public void deleteAllRowTs(
1061         ByteBuffer tableName, ByteBuffer row, long timestamp,
1062         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1063       try {
1064         HTable table = getTable(tableName);
1065         Delete delete  = new Delete(getBytes(row), timestamp);
1066         addAttributes(delete, attributes);
1067         table.delete(delete);
1068       } catch (IOException e) {
1069         LOG.warn(e.getMessage(), e);
1070         throw new IOError(e.getMessage());
1071       }
1072     }
1073 
1074     @Override
1075     public void createTable(ByteBuffer in_tableName,
1076         List<ColumnDescriptor> columnFamilies) throws IOError,
1077         IllegalArgument, AlreadyExists {
1078       byte [] tableName = getBytes(in_tableName);
1079       try {
1080         if (getHBaseAdmin().tableExists(tableName)) {
1081           throw new AlreadyExists("table name already in use");
1082         }
1083         HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
1084         for (ColumnDescriptor col : columnFamilies) {
1085           HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col);
1086           desc.addFamily(colDesc);
1087         }
1088         getHBaseAdmin().createTable(desc);
1089       } catch (IOException e) {
1090         LOG.warn(e.getMessage(), e);
1091         throw new IOError(e.getMessage());
1092       } catch (IllegalArgumentException e) {
1093         LOG.warn(e.getMessage(), e);
1094         throw new IllegalArgument(e.getMessage());
1095       }
1096     }
1097 
1098     @Override
1099     public void deleteTable(ByteBuffer in_tableName) throws IOError {
1100       byte [] tableName = getBytes(in_tableName);
1101       if (LOG.isDebugEnabled()) {
1102         LOG.debug("deleteTable: table=" + Bytes.toString(tableName));
1103       }
1104       try {
1105         if (!getHBaseAdmin().tableExists(tableName)) {
1106           throw new IOException("table does not exist");
1107         }
1108         getHBaseAdmin().deleteTable(tableName);
1109       } catch (IOException e) {
1110         LOG.warn(e.getMessage(), e);
1111         throw new IOError(e.getMessage());
1112       }
1113     }
1114 
1115     @Override
1116     public void mutateRow(ByteBuffer tableName, ByteBuffer row,
1117         List<Mutation> mutations, Map<ByteBuffer, ByteBuffer> attributes)
1118         throws IOError, IllegalArgument {
1119       mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP,
1120                   attributes);
1121     }
1122 
1123     @Override
1124     public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
1125         List<Mutation> mutations, long timestamp,
1126         Map<ByteBuffer, ByteBuffer> attributes)
1127         throws IOError, IllegalArgument {
1128       HTable table = null;
1129       try {
1130         table = getTable(tableName);
1131         Put put = new Put(getBytes(row), timestamp);
1132         addAttributes(put, attributes);
1133 
1134         Delete delete = new Delete(getBytes(row));
1135         addAttributes(delete, attributes);
1136         if (metrics != null) {
1137           metrics.incNumRowKeysInBatchMutate(mutations.size());
1138         }
1139 
1140         // I apologize for all this mess :)
1141         for (Mutation m : mutations) {
1142           byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1143           if (m.isDelete) {
1144             if (famAndQf.length == 1) {
1145               delete.deleteFamily(famAndQf[0], timestamp);
1146             } else {
1147               delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1148             }
1149             delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1150                 : Durability.SKIP_WAL);
1151           } else {
1152             if(famAndQf.length == 1) {
1153               LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1154                   + "over the whole column family.");
1155             } else {
1156               put.addImmutable(famAndQf[0], famAndQf[1],
1157                   m.value != null ? getBytes(m.value)
1158                       : HConstants.EMPTY_BYTE_ARRAY);
1159             }
1160             put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1161           }
1162         }
1163         if (!delete.isEmpty())
1164           table.delete(delete);
1165         if (!put.isEmpty())
1166           table.put(put);
1167       } catch (IOException e) {
1168         LOG.warn(e.getMessage(), e);
1169         throw new IOError(e.getMessage());
1170       } catch (IllegalArgumentException e) {
1171         LOG.warn(e.getMessage(), e);
1172         throw new IllegalArgument(e.getMessage());
1173       }
1174     }
1175 
1176     @Override
1177     public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches,
1178         Map<ByteBuffer, ByteBuffer> attributes)
1179         throws IOError, IllegalArgument, TException {
1180       mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes);
1181     }
1182 
1183     @Override
1184     public void mutateRowsTs(
1185         ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp,
1186         Map<ByteBuffer, ByteBuffer> attributes)
1187         throws IOError, IllegalArgument, TException {
1188       List<Put> puts = new ArrayList<Put>();
1189       List<Delete> deletes = new ArrayList<Delete>();
1190 
1191       for (BatchMutation batch : rowBatches) {
1192         byte[] row = getBytes(batch.row);
1193         List<Mutation> mutations = batch.mutations;
1194         Delete delete = new Delete(row);
1195         addAttributes(delete, attributes);
1196         Put put = new Put(row, timestamp);
1197         addAttributes(put, attributes);
1198         for (Mutation m : mutations) {
1199           byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1200           if (m.isDelete) {
1201             // no qualifier, family only.
1202             if (famAndQf.length == 1) {
1203               delete.deleteFamily(famAndQf[0], timestamp);
1204             } else {
1205               delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1206             }
1207             delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1208                 : Durability.SKIP_WAL);
1209           } else {
1210             if (famAndQf.length == 1) {
1211               LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1212                   + "over the whole column family.");
1213             }
1214             if (famAndQf.length == 2) {
1215               put.addImmutable(famAndQf[0], famAndQf[1],
1216                   m.value != null ? getBytes(m.value)
1217                       : HConstants.EMPTY_BYTE_ARRAY);
1218             } else {
1219               throw new IllegalArgumentException("Invalid famAndQf provided.");
1220             }
1221             put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1222           }
1223         }
1224         if (!delete.isEmpty())
1225           deletes.add(delete);
1226         if (!put.isEmpty())
1227           puts.add(put);
1228       }
1229 
1230       HTable table = null;
1231       try {
1232         table = getTable(tableName);
1233         if (!puts.isEmpty())
1234           table.put(puts);
1235         if (!deletes.isEmpty())
1236           table.delete(deletes);
1237 
1238       } catch (IOException e) {
1239         LOG.warn(e.getMessage(), e);
1240         throw new IOError(e.getMessage());
1241       } catch (IllegalArgumentException e) {
1242         LOG.warn(e.getMessage(), e);
1243         throw new IllegalArgument(e.getMessage());
1244       }
1245     }
1246 
1247     @Deprecated
1248     @Override
1249     public long atomicIncrement(
1250         ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount)
1251             throws IOError, IllegalArgument, TException {
1252       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1253       if(famAndQf.length == 1) {
1254         return atomicIncrement(tableName, row, famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, amount);
1255       }
1256       return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
1257     }
1258 
1259     protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
1260         byte [] family, byte [] qualifier, long amount)
1261         throws IOError, IllegalArgument, TException {
1262       HTable table;
1263       try {
1264         table = getTable(tableName);
1265         return table.incrementColumnValue(
1266             getBytes(row), family, qualifier, amount);
1267       } catch (IOException e) {
1268         LOG.warn(e.getMessage(), e);
1269         throw new IOError(e.getMessage());
1270       }
1271     }
1272 
1273     @Override
1274     public void scannerClose(int id) throws IOError, IllegalArgument {
1275       LOG.debug("scannerClose: id=" + id);
1276       ResultScannerWrapper resultScannerWrapper = getScanner(id);
1277       if (resultScannerWrapper == null) {
1278         String message = "scanner ID is invalid";
1279         LOG.warn(message);
1280         throw new IllegalArgument("scanner ID is invalid");
1281       }
1282       resultScannerWrapper.getScanner().close();
1283       removeScanner(id);
1284     }
1285 
1286     @Override
1287     public List<TRowResult> scannerGetList(int id,int nbRows)
1288         throws IllegalArgument, IOError {
1289       LOG.debug("scannerGetList: id=" + id);
1290       ResultScannerWrapper resultScannerWrapper = getScanner(id);
1291       if (null == resultScannerWrapper) {
1292         String message = "scanner ID is invalid";
1293         LOG.warn(message);
1294         throw new IllegalArgument("scanner ID is invalid");
1295       }
1296 
1297       Result [] results = null;
1298       try {
1299         results = resultScannerWrapper.getScanner().next(nbRows);
1300         if (null == results) {
1301           return new ArrayList<TRowResult>();
1302         }
1303       } catch (IOException e) {
1304         LOG.warn(e.getMessage(), e);
1305         throw new IOError(e.getMessage());
1306       }
1307       return ThriftUtilities.rowResultFromHBase(results, resultScannerWrapper.isColumnSorted());
1308     }
1309 
1310     @Override
1311     public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
1312       return scannerGetList(id,1);
1313     }
1314 
1315     @Override
1316     public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
1317         Map<ByteBuffer, ByteBuffer> attributes)
1318         throws IOError {
1319       try {
1320         HTable table = getTable(tableName);
1321         Scan scan = new Scan();
1322         addAttributes(scan, attributes);
1323         if (tScan.isSetStartRow()) {
1324           scan.setStartRow(tScan.getStartRow());
1325         }
1326         if (tScan.isSetStopRow()) {
1327           scan.setStopRow(tScan.getStopRow());
1328         }
1329         if (tScan.isSetTimestamp()) {
1330           scan.setTimeRange(0, tScan.getTimestamp());
1331         }
1332         if (tScan.isSetCaching()) {
1333           scan.setCaching(tScan.getCaching());
1334         }
1335         if (tScan.isSetBatchSize()) {
1336           scan.setBatch(tScan.getBatchSize());
1337         }
1338         if (tScan.isSetColumns() && tScan.getColumns().size() != 0) {
1339           for(ByteBuffer column : tScan.getColumns()) {
1340             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1341             if(famQf.length == 1) {
1342               scan.addFamily(famQf[0]);
1343             } else {
1344               scan.addColumn(famQf[0], famQf[1]);
1345             }
1346           }
1347         }
1348         if (tScan.isSetFilterString()) {
1349           ParseFilter parseFilter = new ParseFilter();
1350           scan.setFilter(
1351               parseFilter.parseFilterString(tScan.getFilterString()));
1352         }
1353         if (tScan.isSetReversed()) {
1354           scan.setReversed(tScan.isReversed());
1355         }
1356         return addScanner(table.getScanner(scan), tScan.sortColumns);
1357       } catch (IOException e) {
1358         LOG.warn(e.getMessage(), e);
1359         throw new IOError(e.getMessage());
1360       }
1361     }
1362 
1363     @Override
1364     public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
1365         List<ByteBuffer> columns,
1366         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1367       try {
1368         HTable table = getTable(tableName);
1369         Scan scan = new Scan(getBytes(startRow));
1370         addAttributes(scan, attributes);
1371         if(columns != null && columns.size() != 0) {
1372           for(ByteBuffer column : columns) {
1373             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1374             if(famQf.length == 1) {
1375               scan.addFamily(famQf[0]);
1376             } else {
1377               scan.addColumn(famQf[0], famQf[1]);
1378             }
1379           }
1380         }
1381         return addScanner(table.getScanner(scan), false);
1382       } catch (IOException e) {
1383         LOG.warn(e.getMessage(), e);
1384         throw new IOError(e.getMessage());
1385       }
1386     }
1387 
1388     @Override
1389     public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow,
1390         ByteBuffer stopRow, List<ByteBuffer> columns,
1391         Map<ByteBuffer, ByteBuffer> attributes)
1392         throws IOError, TException {
1393       try {
1394         HTable table = getTable(tableName);
1395         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1396         addAttributes(scan, attributes);
1397         if(columns != null && columns.size() != 0) {
1398           for(ByteBuffer column : columns) {
1399             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1400             if(famQf.length == 1) {
1401               scan.addFamily(famQf[0]);
1402             } else {
1403               scan.addColumn(famQf[0], famQf[1]);
1404             }
1405           }
1406         }
1407         return addScanner(table.getScanner(scan), false);
1408       } catch (IOException e) {
1409         LOG.warn(e.getMessage(), e);
1410         throw new IOError(e.getMessage());
1411       }
1412     }
1413 
1414     @Override
1415     public int scannerOpenWithPrefix(ByteBuffer tableName,
1416                                      ByteBuffer startAndPrefix,
1417                                      List<ByteBuffer> columns,
1418         Map<ByteBuffer, ByteBuffer> attributes)
1419         throws IOError, TException {
1420       try {
1421         HTable table = getTable(tableName);
1422         Scan scan = new Scan(getBytes(startAndPrefix));
1423         addAttributes(scan, attributes);
1424         Filter f = new WhileMatchFilter(
1425             new PrefixFilter(getBytes(startAndPrefix)));
1426         scan.setFilter(f);
1427         if (columns != null && columns.size() != 0) {
1428           for(ByteBuffer column : columns) {
1429             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1430             if(famQf.length == 1) {
1431               scan.addFamily(famQf[0]);
1432             } else {
1433               scan.addColumn(famQf[0], famQf[1]);
1434             }
1435           }
1436         }
1437         return addScanner(table.getScanner(scan), false);
1438       } catch (IOException e) {
1439         LOG.warn(e.getMessage(), e);
1440         throw new IOError(e.getMessage());
1441       }
1442     }
1443 
1444     @Override
1445     public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
1446         List<ByteBuffer> columns, long timestamp,
1447         Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
1448       try {
1449         HTable table = getTable(tableName);
1450         Scan scan = new Scan(getBytes(startRow));
1451         addAttributes(scan, attributes);
1452         scan.setTimeRange(0, timestamp);
1453         if (columns != null && columns.size() != 0) {
1454           for (ByteBuffer column : columns) {
1455             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1456             if(famQf.length == 1) {
1457               scan.addFamily(famQf[0]);
1458             } else {
1459               scan.addColumn(famQf[0], famQf[1]);
1460             }
1461           }
1462         }
1463         return addScanner(table.getScanner(scan), false);
1464       } catch (IOException e) {
1465         LOG.warn(e.getMessage(), e);
1466         throw new IOError(e.getMessage());
1467       }
1468     }
1469 
1470     @Override
1471     public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow,
1472         ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
1473         Map<ByteBuffer, ByteBuffer> attributes)
1474         throws IOError, TException {
1475       try {
1476         HTable table = getTable(tableName);
1477         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1478         addAttributes(scan, attributes);
1479         scan.setTimeRange(0, timestamp);
1480         if (columns != null && columns.size() != 0) {
1481           for (ByteBuffer column : columns) {
1482             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1483             if(famQf.length == 1) {
1484               scan.addFamily(famQf[0]);
1485             } else {
1486               scan.addColumn(famQf[0], famQf[1]);
1487             }
1488           }
1489         }
1490         scan.setTimeRange(0, timestamp);
1491         return addScanner(table.getScanner(scan), false);
1492       } catch (IOException e) {
1493         LOG.warn(e.getMessage(), e);
1494         throw new IOError(e.getMessage());
1495       }
1496     }
1497 
1498     @Override
1499     public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
1500         ByteBuffer tableName) throws IOError, TException {
1501       try {
1502         TreeMap<ByteBuffer, ColumnDescriptor> columns =
1503           new TreeMap<ByteBuffer, ColumnDescriptor>();
1504 
1505         HTable table = getTable(tableName);
1506         HTableDescriptor desc = table.getTableDescriptor();
1507 
1508         for (HColumnDescriptor e : desc.getFamilies()) {
1509           ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
1510           columns.put(col.name, col);
1511         }
1512         return columns;
1513       } catch (IOException e) {
1514         LOG.warn(e.getMessage(), e);
1515         throw new IOError(e.getMessage());
1516       }
1517     }
1518 
1519     @Deprecated
1520     @Override
1521     public List<TCell> getRowOrBefore(ByteBuffer tableName, ByteBuffer row,
1522         ByteBuffer family) throws IOError {
1523       try {
1524         HTable table = getTable(getBytes(tableName));
1525         Result result = table.getRowOrBefore(getBytes(row), getBytes(family));
1526         return ThriftUtilities.cellFromHBase(result.rawCells());
1527       } catch (IOException e) {
1528         LOG.warn(e.getMessage(), e);
1529         throw new IOError(e.getMessage());
1530       }
1531     }
1532 
1533     @Override
1534     public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
1535       try {
1536         HTable table = getTable(TableName.META_TABLE_NAME.getName());
1537         byte[] row = getBytes(searchRow);
1538         Result startRowResult = table.getRowOrBefore(
1539           row, HConstants.CATALOG_FAMILY);
1540 
1541         if (startRowResult == null) {
1542           throw new IOException("Cannot find row in "+ TableName.META_TABLE_NAME+", row="
1543                                 + Bytes.toStringBinary(row));
1544         }
1545 
1546         // find region start and end keys
1547         HRegionInfo regionInfo = HRegionInfo.getHRegionInfo(startRowResult);
1548         if (regionInfo == null) {
1549           throw new IOException("HRegionInfo REGIONINFO was null or " +
1550                                 " empty in Meta for row="
1551                                 + Bytes.toStringBinary(row));
1552         }
1553         TRegionInfo region = new TRegionInfo();
1554         region.setStartKey(regionInfo.getStartKey());
1555         region.setEndKey(regionInfo.getEndKey());
1556         region.id = regionInfo.getRegionId();
1557         region.setName(regionInfo.getRegionName());
1558         region.version = HREGION_VERSION; // version not used anymore, PB encoding used.
1559 
1560         // find region assignment to server
1561         ServerName serverName = HRegionInfo.getServerName(startRowResult);
1562         if (serverName != null) {
1563           region.setServerName(Bytes.toBytes(serverName.getHostname()));
1564           region.port = serverName.getPort();
1565         }
1566         return region;
1567       } catch (IOException e) {
1568         LOG.warn(e.getMessage(), e);
1569         throw new IOError(e.getMessage());
1570       }
1571     }
1572 
1573     private void initMetrics(ThriftMetrics metrics) {
1574       this.metrics = metrics;
1575     }
1576 
1577     @Override
1578     public void increment(TIncrement tincrement) throws IOError, TException {
1579 
1580       if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) {
1581         throw new TException("Must supply a table and a row key; can't increment");
1582       }
1583 
1584       if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1585         this.coalescer.queueIncrement(tincrement);
1586         return;
1587       }
1588 
1589       try {
1590         HTable table = getTable(tincrement.getTable());
1591         Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
1592         table.increment(inc);
1593       } catch (IOException e) {
1594         LOG.warn(e.getMessage(), e);
1595         throw new IOError(e.getMessage());
1596       }
1597     }
1598 
1599     @Override
1600     public void incrementRows(List<TIncrement> tincrements) throws IOError, TException {
1601       if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1602         this.coalescer.queueIncrements(tincrements);
1603         return;
1604       }
1605       for (TIncrement tinc : tincrements) {
1606         increment(tinc);
1607       }
1608     }
1609 
1610     @Override
1611     public List<TCell> append(TAppend tappend) throws IOError, TException {
1612       if (tappend.getRow().length == 0 || tappend.getTable().length == 0) {
1613         throw new TException("Must supply a table and a row key; can't append");
1614       }
1615 
1616       try {
1617         HTable table = getTable(tappend.getTable());
1618         Append append = ThriftUtilities.appendFromThrift(tappend);
1619         Result result = table.append(append);
1620         return ThriftUtilities.cellFromHBase(result.rawCells());
1621       } catch (IOException e) {
1622         LOG.warn(e.getMessage(), e);
1623         throw new IOError(e.getMessage());
1624       }
1625     }
1626 
1627     @Override
1628     public boolean checkAndPut(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1629         ByteBuffer value, Mutation mput, Map<ByteBuffer, ByteBuffer> attributes) throws IOError,
1630         IllegalArgument, TException {
1631       Put put;
1632       try {
1633         put = new Put(getBytes(row), HConstants.LATEST_TIMESTAMP);
1634         addAttributes(put, attributes);
1635 
1636         byte[][] famAndQf = KeyValue.parseColumn(getBytes(mput.column));
1637 
1638         put.addImmutable(famAndQf[0], famAndQf[1], mput.value != null ? getBytes(mput.value)
1639             : HConstants.EMPTY_BYTE_ARRAY);
1640 
1641         put.setDurability(mput.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1642       } catch (IllegalArgumentException e) {
1643         LOG.warn(e.getMessage(), e);
1644         throw new IllegalArgument(e.getMessage());
1645       }
1646 
1647       HTable table = null;
1648       try {
1649         table = getTable(tableName);
1650         byte[][] famAndQf = KeyValue.parseColumn(getBytes(column));
1651         return table.checkAndPut(getBytes(row), famAndQf[0], famAndQf[1],
1652           value != null ? getBytes(value) : HConstants.EMPTY_BYTE_ARRAY, put);
1653       } catch (IOException e) {
1654         LOG.warn(e.getMessage(), e);
1655         throw new IOError(e.getMessage());
1656       } catch (IllegalArgumentException e) {
1657         LOG.warn(e.getMessage(), e);
1658         throw new IllegalArgument(e.getMessage());
1659       }
1660     }
1661   }
1662 
1663 
1664 
1665   /**
1666    * Adds all the attributes into the Operation object
1667    */
1668   private static void addAttributes(OperationWithAttributes op,
1669     Map<ByteBuffer, ByteBuffer> attributes) {
1670     if (attributes == null || attributes.size() == 0) {
1671       return;
1672     }
1673     for (Map.Entry<ByteBuffer, ByteBuffer> entry : attributes.entrySet()) {
1674       String name = Bytes.toStringBinary(getBytes(entry.getKey()));
1675       byte[] value =  getBytes(entry.getValue());
1676       op.setAttribute(name, value);
1677     }
1678   }
1679 
1680   public static void registerFilters(Configuration conf) {
1681     String[] filters = conf.getStrings("hbase.thrift.filters");
1682     if(filters != null) {
1683       for(String filterClass: filters) {
1684         String[] filterPart = filterClass.split(":");
1685         if(filterPart.length != 2) {
1686           LOG.warn("Invalid filter specification " + filterClass + " - skipping");
1687         } else {
1688           ParseFilter.registerFilter(filterPart[0], filterPart[1]);
1689         }
1690       }
1691     }
1692   }
1693 }