View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.thrift;
20  
21  import static org.apache.hadoop.hbase.util.Bytes.getBytes;
22  
23  import java.io.IOException;
24  import java.net.InetAddress;
25  import java.net.InetSocketAddress;
26  import java.net.UnknownHostException;
27  import java.nio.ByteBuffer;
28  import java.security.PrivilegedAction;
29  import java.util.ArrayList;
30  import java.util.Arrays;
31  import java.util.Collections;
32  import java.util.HashMap;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.TreeMap;
36  import java.util.concurrent.BlockingQueue;
37  import java.util.concurrent.ExecutorService;
38  import java.util.concurrent.LinkedBlockingQueue;
39  import java.util.concurrent.ThreadPoolExecutor;
40  import java.util.concurrent.TimeUnit;
41  
42  import javax.security.auth.callback.Callback;
43  import javax.security.auth.callback.UnsupportedCallbackException;
44  import javax.security.sasl.AuthorizeCallback;
45  import javax.security.sasl.Sasl;
46  import javax.security.sasl.SaslServer;
47  
48  import org.apache.commons.cli.CommandLine;
49  import org.apache.commons.cli.Option;
50  import org.apache.commons.cli.OptionGroup;
51  import org.apache.commons.logging.Log;
52  import org.apache.commons.logging.LogFactory;
53  import org.apache.hadoop.conf.Configuration;
54  import org.apache.hadoop.hbase.HBaseConfiguration;
55  import org.apache.hadoop.hbase.HColumnDescriptor;
56  import org.apache.hadoop.hbase.HConstants;
57  import org.apache.hadoop.hbase.HRegionInfo;
58  import org.apache.hadoop.hbase.HRegionLocation;
59  import org.apache.hadoop.hbase.HTableDescriptor;
60  import org.apache.hadoop.hbase.KeyValue;
61  import org.apache.hadoop.hbase.ServerName;
62  import org.apache.hadoop.hbase.TableName;
63  import org.apache.hadoop.hbase.TableNotFoundException;
64  import org.apache.hadoop.hbase.classification.InterfaceAudience;
65  import org.apache.hadoop.hbase.client.Admin;
66  import org.apache.hadoop.hbase.client.Append;
67  import org.apache.hadoop.hbase.client.Delete;
68  import org.apache.hadoop.hbase.client.Durability;
69  import org.apache.hadoop.hbase.client.Get;
70  import org.apache.hadoop.hbase.client.HBaseAdmin;
71  import org.apache.hadoop.hbase.client.Increment;
72  import org.apache.hadoop.hbase.client.OperationWithAttributes;
73  import org.apache.hadoop.hbase.client.Put;
74  import org.apache.hadoop.hbase.client.RegionLocator;
75  import org.apache.hadoop.hbase.client.Result;
76  import org.apache.hadoop.hbase.client.ResultScanner;
77  import org.apache.hadoop.hbase.client.Scan;
78  import org.apache.hadoop.hbase.client.Table;
79  import org.apache.hadoop.hbase.filter.Filter;
80  import org.apache.hadoop.hbase.filter.ParseFilter;
81  import org.apache.hadoop.hbase.filter.PrefixFilter;
82  import org.apache.hadoop.hbase.filter.WhileMatchFilter;
83  import org.apache.hadoop.hbase.jetty.SslSelectChannelConnectorSecure;
84  import org.apache.hadoop.hbase.security.SaslUtil;
85  import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
86  import org.apache.hadoop.hbase.security.SecurityUtil;
87  import org.apache.hadoop.hbase.security.UserProvider;
88  import org.apache.hadoop.hbase.thrift.CallQueue.Call;
89  import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
90  import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
91  import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
92  import org.apache.hadoop.hbase.thrift.generated.Hbase;
93  import org.apache.hadoop.hbase.thrift.generated.IOError;
94  import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
95  import org.apache.hadoop.hbase.thrift.generated.Mutation;
96  import org.apache.hadoop.hbase.thrift.generated.TAppend;
97  import org.apache.hadoop.hbase.thrift.generated.TCell;
98  import org.apache.hadoop.hbase.thrift.generated.TIncrement;
99  import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
100 import org.apache.hadoop.hbase.thrift.generated.TRowResult;
101 import org.apache.hadoop.hbase.thrift.generated.TScan;
102 import org.apache.hadoop.hbase.util.Bytes;
103 import org.apache.hadoop.hbase.util.ConnectionCache;
104 import org.apache.hadoop.hbase.util.DNS;
105 import org.apache.hadoop.hbase.util.Strings;
106 import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
107 import org.apache.hadoop.security.UserGroupInformation;
108 import org.apache.hadoop.security.authorize.ProxyUsers;
109 import org.apache.thrift.TException;
110 import org.apache.thrift.TProcessor;
111 import org.apache.thrift.protocol.TBinaryProtocol;
112 import org.apache.thrift.protocol.TCompactProtocol;
113 import org.apache.thrift.protocol.TProtocol;
114 import org.apache.thrift.protocol.TProtocolFactory;
115 import org.apache.thrift.server.THsHaServer;
116 import org.apache.thrift.server.TNonblockingServer;
117 import org.apache.thrift.server.TServer;
118 import org.apache.thrift.server.TServlet;
119 import org.apache.thrift.server.TThreadedSelectorServer;
120 import org.apache.thrift.transport.TFramedTransport;
121 import org.apache.thrift.transport.TNonblockingServerSocket;
122 import org.apache.thrift.transport.TNonblockingServerTransport;
123 import org.apache.thrift.transport.TSaslServerTransport;
124 import org.apache.thrift.transport.TServerSocket;
125 import org.apache.thrift.transport.TServerTransport;
126 import org.apache.thrift.transport.TTransportFactory;
127 import org.mortbay.jetty.Connector;
128 import org.mortbay.jetty.Server;
129 import org.mortbay.jetty.nio.SelectChannelConnector;
130 import org.mortbay.jetty.servlet.Context;
131 import org.mortbay.jetty.servlet.ServletHolder;
132 import org.mortbay.thread.QueuedThreadPool;
133 
134 import com.google.common.base.Joiner;
135 import com.google.common.base.Throwables;
136 import com.google.common.util.concurrent.ThreadFactoryBuilder;
137 
138 /**
139  * ThriftServerRunner - this class starts up a Thrift server which implements
140  * the Hbase API specified in the Hbase.thrift IDL file.
141  */
142 @InterfaceAudience.Private
143 public class ThriftServerRunner implements Runnable {
144 
145   private static final Log LOG = LogFactory.getLog(ThriftServerRunner.class);
146 
147   static final String SERVER_TYPE_CONF_KEY =
148       "hbase.regionserver.thrift.server.type";
149 
150   static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress";
151   static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
152   static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
153   static final String MAX_FRAME_SIZE_CONF_KEY = "hbase.regionserver.thrift.framed.max_frame_size_in_mb";
154   static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
155   static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
156   static final String USE_HTTP_CONF_KEY = "hbase.regionserver.thrift.http";
157   static final String HTTP_MIN_THREADS = "hbase.thrift.http_threads.min";
158   static final String HTTP_MAX_THREADS = "hbase.thrift.http_threads.max";
159 
160   static final String THRIFT_SSL_ENABLED = "hbase.thrift.ssl.enabled";
161   static final String THRIFT_SSL_KEYSTORE_STORE = "hbase.thrift.ssl.keystore.store";
162   static final String THRIFT_SSL_KEYSTORE_PASSWORD = "hbase.thrift.ssl.keystore.password";
163   static final String THRIFT_SSL_KEYSTORE_KEYPASSWORD = "hbase.thrift.ssl.keystore.keypassword";
164 
165   /**
166    * Amount of time in milliseconds before a server thread will timeout
167    * waiting for client to send data on a connected socket. Currently,
168    * applies only to TBoundedThreadPoolServer
169    */
170   public static final String THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY =
171     "hbase.thrift.server.socket.read.timeout";
172   public static final int THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT = 60000;
173 
174 
175   /**
176    * Thrift quality of protection configuration key. Valid values can be:
177    * auth-conf: authentication, integrity and confidentiality checking
178    * auth-int: authentication and integrity checking
179    * auth: authentication only
180    *
181    * This is used to authenticate the callers and support impersonation.
182    * The thrift server and the HBase cluster must run in secure mode.
183    */
184   static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
185   static final String BACKLOG_CONF_KEY = "hbase.regionserver.thrift.backlog";
186 
187   private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
188   public static final int DEFAULT_LISTEN_PORT = 9090;
189   public static final int HREGION_VERSION = 1;
190   static final String THRIFT_SUPPORT_PROXYUSER = "hbase.thrift.support.proxyuser";
191   private final int listenPort;
192 
193   private Configuration conf;
194   volatile TServer tserver;
195   volatile Server httpServer;
196   private final Hbase.Iface handler;
197   private final ThriftMetrics metrics;
198   private final HBaseHandler hbaseHandler;
199   private final UserGroupInformation realUser;
200 
201   private SaslUtil.QualityOfProtection qop;
202   private String host;
203 
204   private final boolean securityEnabled;
205   private final boolean doAsEnabled;
206 
207   /** An enum of server implementation selections */
208   enum ImplType {
209     HS_HA("hsha", true, THsHaServer.class, true),
210     NONBLOCKING("nonblocking", true, TNonblockingServer.class, true),
211     THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true),
212     THREADED_SELECTOR(
213         "threadedselector", true, TThreadedSelectorServer.class, true);
214 
215     public static final ImplType DEFAULT = THREAD_POOL;
216 
217     final String option;
218     final boolean isAlwaysFramed;
219     final Class<? extends TServer> serverClass;
220     final boolean canSpecifyBindIP;
221 
222     ImplType(String option, boolean isAlwaysFramed,
223         Class<? extends TServer> serverClass, boolean canSpecifyBindIP) {
224       this.option = option;
225       this.isAlwaysFramed = isAlwaysFramed;
226       this.serverClass = serverClass;
227       this.canSpecifyBindIP = canSpecifyBindIP;
228     }
229 
230     /**
231      * @return <code>-option</code> so we can get the list of options from
232      *         {@link #values()}
233      */
234     @Override
235     public String toString() {
236       return "-" + option;
237     }
238 
239     String getDescription() {
240       StringBuilder sb = new StringBuilder("Use the " +
241           serverClass.getSimpleName());
242       if (isAlwaysFramed) {
243         sb.append(" This implies the framed transport.");
244       }
245       if (this == DEFAULT) {
246         sb.append("This is the default.");
247       }
248       return sb.toString();
249     }
250 
251     static OptionGroup createOptionGroup() {
252       OptionGroup group = new OptionGroup();
253       for (ImplType t : values()) {
254         group.addOption(new Option(t.option, t.getDescription()));
255       }
256       return group;
257     }
258 
259     static ImplType getServerImpl(Configuration conf) {
260       String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option);
261       for (ImplType t : values()) {
262         if (confType.equals(t.option)) {
263           return t;
264         }
265       }
266       throw new AssertionError("Unknown server ImplType.option:" + confType);
267     }
268 
269     static void setServerImpl(CommandLine cmd, Configuration conf) {
270       ImplType chosenType = null;
271       int numChosen = 0;
272       for (ImplType t : values()) {
273         if (cmd.hasOption(t.option)) {
274           chosenType = t;
275           ++numChosen;
276         }
277       }
278       if (numChosen < 1) {
279         LOG.info("Using default thrift server type");
280         chosenType = DEFAULT;
281       } else if (numChosen > 1) {
282         throw new AssertionError("Exactly one option out of " +
283           Arrays.toString(values()) + " has to be specified");
284       }
285       LOG.info("Using thrift server type " + chosenType.option);
286       conf.set(SERVER_TYPE_CONF_KEY, chosenType.option);
287     }
288 
289     public String simpleClassName() {
290       return serverClass.getSimpleName();
291     }
292 
293     public static List<String> serversThatCannotSpecifyBindIP() {
294       List<String> l = new ArrayList<String>();
295       for (ImplType t : values()) {
296         if (!t.canSpecifyBindIP) {
297           l.add(t.simpleClassName());
298         }
299       }
300       return l;
301     }
302 
303   }
304 
305   public ThriftServerRunner(Configuration conf) throws IOException {
306     UserProvider userProvider = UserProvider.instantiate(conf);
307     // login the server principal (if using secure Hadoop)
308     securityEnabled = userProvider.isHadoopSecurityEnabled()
309       && userProvider.isHBaseSecurityEnabled();
310     if (securityEnabled) {
311       host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
312         conf.get("hbase.thrift.dns.interface", "default"),
313         conf.get("hbase.thrift.dns.nameserver", "default")));
314       userProvider.login("hbase.thrift.keytab.file",
315         "hbase.thrift.kerberos.principal", host);
316     }
317     this.conf = HBaseConfiguration.create(conf);
318     this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
319     this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
320     this.hbaseHandler = new HBaseHandler(conf, userProvider);
321     this.hbaseHandler.initMetrics(metrics);
322     this.handler = HbaseHandlerMetricsProxy.newInstance(
323       hbaseHandler, metrics, conf);
324     this.realUser = userProvider.getCurrent().getUGI();
325     String strQop = conf.get(THRIFT_QOP_KEY);
326     if (strQop != null) {
327       this.qop = SaslUtil.getQop(strQop);
328     }
329     doAsEnabled = conf.getBoolean(THRIFT_SUPPORT_PROXYUSER, false);
330     if (doAsEnabled) {
331       if (!conf.getBoolean(USE_HTTP_CONF_KEY, false)) {
332         LOG.warn("Fail to enable the doAs feature. hbase.regionserver.thrift.http is not configured ");
333       }
334     }
335     if (qop != null) {
336       if (qop != SaslUtil.QualityOfProtection.AUTHENTICATION &&
337           qop != SaslUtil.QualityOfProtection.INTEGRITY &&
338           qop != SaslUtil.QualityOfProtection.PRIVACY) {
339         throw new IOException(String.format("Invalide %s: It must be one of %s, %s, or %s.",
340                               THRIFT_QOP_KEY,
341                               QualityOfProtection.AUTHENTICATION.name(),
342                               QualityOfProtection.INTEGRITY.name(),
343                               QualityOfProtection.PRIVACY.name()));
344       }
345       checkHttpSecurity(qop, conf);
346       if (!securityEnabled) {
347         throw new IOException("Thrift server must"
348           + " run in secure mode to support authentication");
349       }
350     }
351   }
352 
353   private void checkHttpSecurity(QualityOfProtection qop, Configuration conf) {
354     if (qop == QualityOfProtection.PRIVACY &&
355         conf.getBoolean(USE_HTTP_CONF_KEY, false) &&
356         !conf.getBoolean(THRIFT_SSL_ENABLED, false)) {
357       throw new IllegalArgumentException("Thrift HTTP Server's QoP is privacy, but " +
358           THRIFT_SSL_ENABLED + " is false");
359     }
360   }
361 
362   /*
363    * Runs the Thrift server
364    */
365   @Override
366   public void run() {
367     realUser.doAs(new PrivilegedAction<Object>() {
368       @Override
369       public Object run() {
370         try {
371           if (conf.getBoolean(USE_HTTP_CONF_KEY, false)) {
372             setupHTTPServer();
373             httpServer.start();
374             httpServer.join();
375           } else {
376             setupServer();
377             tserver.serve();
378           }
379         } catch (Exception e) {
380           LOG.fatal("Cannot run ThriftServer", e);
381           // Crash the process if the ThriftServer is not running
382           System.exit(-1);
383         }
384         return null;
385       }
386     });
387 
388   }
389 
390   public void shutdown() {
391     if (tserver != null) {
392       tserver.stop();
393       tserver = null;
394     }
395     if (httpServer != null) {
396       try {
397         httpServer.stop();
398         httpServer = null;
399       } catch (Exception e) {
400         LOG.error("Problem encountered in shutting down HTTP server " + e.getCause());
401       }
402       httpServer = null;
403     }
404   }
405 
406   private void setupHTTPServer() throws IOException {
407     TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
408     TProcessor processor = new Hbase.Processor<Hbase.Iface>(handler);
409     TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, realUser,
410         conf, hbaseHandler, securityEnabled, doAsEnabled);
411 
412     httpServer = new Server();
413     // Context handler
414     Context context = new Context(httpServer, "/", Context.SESSIONS);
415     context.setContextPath("/");
416     String httpPath = "/*";
417     httpServer.setHandler(context);
418     context.addServlet(new ServletHolder(thriftHttpServlet), httpPath);
419 
420     // set up Jetty and run the embedded server
421     Connector connector = new SelectChannelConnector();
422     if(conf.getBoolean(THRIFT_SSL_ENABLED, false)) {
423       SslSelectChannelConnectorSecure sslConnector = new SslSelectChannelConnectorSecure();
424       String keystore = conf.get(THRIFT_SSL_KEYSTORE_STORE);
425       String password = HBaseConfiguration.getPassword(conf,
426           THRIFT_SSL_KEYSTORE_PASSWORD, null);
427       String keyPassword = HBaseConfiguration.getPassword(conf,
428           THRIFT_SSL_KEYSTORE_KEYPASSWORD, password);
429       sslConnector.setKeystore(keystore);
430       sslConnector.setPassword(password);
431       sslConnector.setKeyPassword(keyPassword);
432       connector = sslConnector;
433     }
434     String host = getBindAddress(conf).getHostAddress();
435     connector.setPort(listenPort);
436     connector.setHost(host);
437     connector.setHeaderBufferSize(1024 * 64);
438     httpServer.addConnector(connector);
439 
440     if (doAsEnabled) {
441       ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
442     }
443 
444     // Set the default max thread number to 100 to limit
445     // the number of concurrent requests so that Thrfit HTTP server doesn't OOM easily.
446     // Jetty set the default max thread number to 250, if we don't set it.
447     //
448     // Our default min thread number 2 is the same as that used by Jetty.
449     int minThreads = conf.getInt(HTTP_MIN_THREADS, 2);
450     int maxThreads = conf.getInt(HTTP_MAX_THREADS, 100);
451     QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads);
452     threadPool.setMinThreads(minThreads);
453     httpServer.setThreadPool(threadPool);
454 
455     httpServer.setSendServerVersion(false);
456     httpServer.setSendDateHeader(false);
457     httpServer.setStopAtShutdown(true);
458 
459     LOG.info("Starting Thrift HTTP Server on " + Integer.toString(listenPort));
460   }
461 
462   /**
463    * Setting up the thrift TServer
464    */
465   private void setupServer() throws Exception {
466     // Construct correct ProtocolFactory
467     TProtocolFactory protocolFactory;
468     if (conf.getBoolean(COMPACT_CONF_KEY, false)) {
469       LOG.debug("Using compact protocol");
470       protocolFactory = new TCompactProtocol.Factory();
471     } else {
472       LOG.debug("Using binary protocol");
473       protocolFactory = new TBinaryProtocol.Factory();
474     }
475 
476     final TProcessor p = new Hbase.Processor<Hbase.Iface>(handler);
477     ImplType implType = ImplType.getServerImpl(conf);
478     TProcessor processor = p;
479 
480     // Construct correct TransportFactory
481     TTransportFactory transportFactory;
482     if (conf.getBoolean(FRAMED_CONF_KEY, false) || implType.isAlwaysFramed) {
483       if (qop != null) {
484         throw new RuntimeException("Thrift server authentication"
485           + " doesn't work with framed transport yet");
486       }
487       transportFactory = new TFramedTransport.Factory(
488           conf.getInt(MAX_FRAME_SIZE_CONF_KEY, 2)  * 1024 * 1024);
489       LOG.debug("Using framed transport");
490     } else if (qop == null) {
491       transportFactory = new TTransportFactory();
492     } else {
493       // Extract the name from the principal
494       String name = SecurityUtil.getUserFromPrincipal(
495         conf.get("hbase.thrift.kerberos.principal"));
496       Map<String, String> saslProperties = new HashMap<String, String>();
497       saslProperties.put(Sasl.QOP, qop.getSaslQop());
498       saslProperties.put(Sasl.SERVER_AUTH, "true");
499       TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
500       saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
501         new SaslGssCallbackHandler() {
502           @Override
503           public void handle(Callback[] callbacks)
504               throws UnsupportedCallbackException {
505             AuthorizeCallback ac = null;
506             for (Callback callback : callbacks) {
507               if (callback instanceof AuthorizeCallback) {
508                 ac = (AuthorizeCallback) callback;
509               } else {
510                 throw new UnsupportedCallbackException(callback,
511                     "Unrecognized SASL GSSAPI Callback");
512               }
513             }
514             if (ac != null) {
515               String authid = ac.getAuthenticationID();
516               String authzid = ac.getAuthorizationID();
517               if (!authid.equals(authzid)) {
518                 ac.setAuthorized(false);
519               } else {
520                 ac.setAuthorized(true);
521                 String userName = SecurityUtil.getUserFromPrincipal(authzid);
522                 LOG.info("Effective user: " + userName);
523                 ac.setAuthorizedID(userName);
524               }
525             }
526           }
527         });
528       transportFactory = saslFactory;
529 
530       // Create a processor wrapper, to get the caller
531       processor = new TProcessor() {
532         @Override
533         public boolean process(TProtocol inProt,
534             TProtocol outProt) throws TException {
535           TSaslServerTransport saslServerTransport =
536             (TSaslServerTransport)inProt.getTransport();
537           SaslServer saslServer = saslServerTransport.getSaslServer();
538           String principal = saslServer.getAuthorizationID();
539           hbaseHandler.setEffectiveUser(principal);
540           return p.process(inProt, outProt);
541         }
542       };
543     }
544 
545     if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
546       LOG.error("Server types " + Joiner.on(", ").join(
547           ImplType.serversThatCannotSpecifyBindIP()) + " don't support IP " +
548           "address binding at the moment. See " +
549           "https://issues.apache.org/jira/browse/HBASE-2155 for details.");
550       throw new RuntimeException(
551           "-" + BIND_CONF_KEY + " not supported with " + implType);
552     }
553 
554     // Thrift's implementation uses '0' as a placeholder for 'use the default.'
555     int backlog = conf.getInt(BACKLOG_CONF_KEY, 0);
556 
557     if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING ||
558         implType == ImplType.THREADED_SELECTOR) {
559       InetAddress listenAddress = getBindAddress(conf);
560       TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(
561           new InetSocketAddress(listenAddress, listenPort));
562 
563       if (implType == ImplType.NONBLOCKING) {
564         TNonblockingServer.Args serverArgs =
565             new TNonblockingServer.Args(serverTransport);
566         serverArgs.processor(processor)
567                   .transportFactory(transportFactory)
568                   .protocolFactory(protocolFactory);
569         tserver = new TNonblockingServer(serverArgs);
570       } else if (implType == ImplType.HS_HA) {
571         THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
572         CallQueue callQueue =
573             new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
574         ExecutorService executorService = createExecutor(
575             callQueue, serverArgs.getMinWorkerThreads(), serverArgs.getMaxWorkerThreads());
576         serverArgs.executorService(executorService)
577                   .processor(processor)
578                   .transportFactory(transportFactory)
579                   .protocolFactory(protocolFactory);
580         tserver = new THsHaServer(serverArgs);
581       } else { // THREADED_SELECTOR
582         TThreadedSelectorServer.Args serverArgs =
583             new HThreadedSelectorServerArgs(serverTransport, conf);
584         CallQueue callQueue =
585             new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
586         ExecutorService executorService = createExecutor(
587             callQueue, serverArgs.getWorkerThreads(), serverArgs.getWorkerThreads());
588         serverArgs.executorService(executorService)
589                   .processor(processor)
590                   .transportFactory(transportFactory)
591                   .protocolFactory(protocolFactory);
592         tserver = new TThreadedSelectorServer(serverArgs);
593       }
594       LOG.info("starting HBase " + implType.simpleClassName() +
595           " server on " + Integer.toString(listenPort));
596     } else if (implType == ImplType.THREAD_POOL) {
597       // Thread pool server. Get the IP address to bind to.
598       InetAddress listenAddress = getBindAddress(conf);
599       int readTimeout = conf.getInt(THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY,
600           THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT);
601       TServerTransport serverTransport = new TServerSocket(
602           new TServerSocket.ServerSocketTransportArgs().
603               bindAddr(new InetSocketAddress(listenAddress, listenPort)).
604               backlog(backlog).
605               clientTimeout(readTimeout));
606 
607       TBoundedThreadPoolServer.Args serverArgs =
608           new TBoundedThreadPoolServer.Args(serverTransport, conf);
609       serverArgs.processor(processor)
610                 .transportFactory(transportFactory)
611                 .protocolFactory(protocolFactory);
612       LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
613           + listenAddress + ":" + Integer.toString(listenPort)
614           + " with readTimeout " + readTimeout + "ms; " + serverArgs);
615       TBoundedThreadPoolServer tserver =
616           new TBoundedThreadPoolServer(serverArgs, metrics);
617       this.tserver = tserver;
618     } else {
619       throw new AssertionError("Unsupported Thrift server implementation: " +
620           implType.simpleClassName());
621     }
622 
623     // A sanity check that we instantiated the right type of server.
624     if (tserver.getClass() != implType.serverClass) {
625       throw new AssertionError("Expected to create Thrift server class " +
626           implType.serverClass.getName() + " but got " +
627           tserver.getClass().getName());
628     }
629 
630 
631 
632     registerFilters(conf);
633   }
634 
635   ExecutorService createExecutor(BlockingQueue<Runnable> callQueue,
636                                  int minWorkers, int maxWorkers) {
637     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
638     tfb.setDaemon(true);
639     tfb.setNameFormat("thrift-worker-%d");
640     return new ThreadPoolExecutor(minWorkers, maxWorkers,
641             Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
642   }
643 
644   private InetAddress getBindAddress(Configuration conf)
645       throws UnknownHostException {
646     String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
647     return InetAddress.getByName(bindAddressStr);
648   }
649 
650   protected static class ResultScannerWrapper {
651 
652     private final ResultScanner scanner;
653     private final boolean sortColumns;
654     public ResultScannerWrapper(ResultScanner resultScanner,
655                                 boolean sortResultColumns) {
656       scanner = resultScanner;
657       sortColumns = sortResultColumns;
658    }
659 
660     public ResultScanner getScanner() {
661       return scanner;
662     }
663 
664     public boolean isColumnSorted() {
665       return sortColumns;
666     }
667   }
668 
669   /**
670    * The HBaseHandler is a glue object that connects Thrift RPC calls to the
671    * HBase client API primarily defined in the Admin and Table objects.
672    */
673   public static class HBaseHandler implements Hbase.Iface {
674     protected Configuration conf;
675     protected static final Log LOG = LogFactory.getLog(HBaseHandler.class);
676 
677     // nextScannerId and scannerMap are used to manage scanner state
678     protected int nextScannerId = 0;
679     protected HashMap<Integer, ResultScannerWrapper> scannerMap = null;
680     private ThriftMetrics metrics = null;
681 
682     private final ConnectionCache connectionCache;
683     IncrementCoalescer coalescer = null;
684 
685     static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
686     static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
687 
688     /**
689      * Returns a list of all the column families for a given Table.
690      *
691      * @param table
692      * @throws IOException
693      */
694     byte[][] getAllColumns(Table table) throws IOException {
695       HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies();
696       byte[][] columns = new byte[cds.length][];
697       for (int i = 0; i < cds.length; i++) {
698         columns[i] = Bytes.add(cds[i].getName(),
699             KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
700       }
701       return columns;
702     }
703 
704     /**
705      * Creates and returns a Table instance from a given table name.
706      *
707      * @param tableName
708      *          name of table
709      * @return Table object
710      * @throws IOException
711      */
712     public Table getTable(final byte[] tableName) throws
713         IOException {
714       String table = Bytes.toString(tableName);
715       return connectionCache.getTable(table);
716     }
717 
718     public Table getTable(final ByteBuffer tableName) throws IOException {
719       return getTable(getBytes(tableName));
720     }
721 
722     /**
723      * Assigns a unique ID to the scanner and adds the mapping to an internal
724      * hash-map.
725      *
726      * @param scanner
727      * @return integer scanner id
728      */
729     protected synchronized int addScanner(ResultScanner scanner,boolean sortColumns) {
730       int id = nextScannerId++;
731       ResultScannerWrapper resultScannerWrapper = new ResultScannerWrapper(scanner, sortColumns);
732       scannerMap.put(id, resultScannerWrapper);
733       return id;
734     }
735 
736     /**
737      * Returns the scanner associated with the specified ID.
738      *
739      * @param id
740      * @return a Scanner, or null if ID was invalid.
741      */
742     protected synchronized ResultScannerWrapper getScanner(int id) {
743       return scannerMap.get(id);
744     }
745 
746     /**
747      * Removes the scanner associated with the specified ID from the internal
748      * id-&gt;scanner hash-map.
749      *
750      * @param id
751      * @return a Scanner, or null if ID was invalid.
752      */
753     protected synchronized ResultScannerWrapper removeScanner(int id) {
754       return scannerMap.remove(id);
755     }
756 
757     protected HBaseHandler(final Configuration c,
758         final UserProvider userProvider) throws IOException {
759       this.conf = c;
760       scannerMap = new HashMap<Integer, ResultScannerWrapper>();
761       this.coalescer = new IncrementCoalescer(this);
762 
763       int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
764       int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
765       connectionCache = new ConnectionCache(
766         conf, userProvider, cleanInterval, maxIdleTime);
767     }
768 
769     /**
770      * Obtain HBaseAdmin. Creates the instance if it is not already created.
771      */
772     private Admin getAdmin() throws IOException {
773       return connectionCache.getAdmin();
774     }
775 
776     void setEffectiveUser(String effectiveUser) {
777       connectionCache.setEffectiveUser(effectiveUser);
778     }
779 
780     @Override
781     public void enableTable(ByteBuffer tableName) throws IOError {
782       try{
783         getAdmin().enableTable(getTableName(tableName));
784       } catch (IOException e) {
785         LOG.warn(e.getMessage(), e);
786         throw new IOError(Throwables.getStackTraceAsString(e));
787       }
788     }
789 
790     @Override
791     public void disableTable(ByteBuffer tableName) throws IOError{
792       try{
793         getAdmin().disableTable(getTableName(tableName));
794       } catch (IOException e) {
795         LOG.warn(e.getMessage(), e);
796         throw new IOError(Throwables.getStackTraceAsString(e));
797       }
798     }
799 
800     @Override
801     public boolean isTableEnabled(ByteBuffer tableName) throws IOError {
802       try {
803         return this.connectionCache.getAdmin().isTableEnabled(getTableName(tableName));
804       } catch (IOException e) {
805         LOG.warn(e.getMessage(), e);
806         throw new IOError(Throwables.getStackTraceAsString(e));
807       }
808     }
809 
810     @Override
811     public void compact(ByteBuffer tableNameOrRegionName) throws IOError {
812       try {
813         // TODO: HBaseAdmin.compact(byte[]) deprecated and not trivial to replace here.
814         // ThriftServerRunner.compact should be deprecated and replaced with methods specific to
815         // table and region.
816         ((HBaseAdmin) getAdmin()).compact(getBytes(tableNameOrRegionName));
817       } catch (IOException e) {
818         LOG.warn(e.getMessage(), e);
819         throw new IOError(Throwables.getStackTraceAsString(e));
820       }
821     }
822 
823     @Override
824     public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError {
825       try {
826         // TODO: HBaseAdmin.majorCompact(byte[]) deprecated and not trivial to replace here.
827         // ThriftServerRunner.majorCompact should be deprecated and replaced with methods specific
828         // to table and region.
829         ((HBaseAdmin) getAdmin()).majorCompact(getBytes(tableNameOrRegionName));
830       } catch (IOException e) {
831         LOG.warn(e.getMessage(), e);
832         throw new IOError(Throwables.getStackTraceAsString(e));
833       }
834     }
835 
836     @Override
837     public List<ByteBuffer> getTableNames() throws IOError {
838       try {
839         TableName[] tableNames = this.getAdmin().listTableNames();
840         ArrayList<ByteBuffer> list = new ArrayList<ByteBuffer>(tableNames.length);
841         for (int i = 0; i < tableNames.length; i++) {
842           list.add(ByteBuffer.wrap(tableNames[i].getName()));
843         }
844         return list;
845       } catch (IOException e) {
846         LOG.warn(e.getMessage(), e);
847         throw new IOError(Throwables.getStackTraceAsString(e));
848       }
849     }
850 
851     /**
852      * @return the list of regions in the given table, or an empty list if the table does not exist
853      */
854     @Override
855     public List<TRegionInfo> getTableRegions(ByteBuffer tableName)
856     throws IOError {
857       try (RegionLocator locator = connectionCache.getRegionLocator(getBytes(tableName))) {
858         List<HRegionLocation> regionLocations = locator.getAllRegionLocations();
859         List<TRegionInfo> results = new ArrayList<TRegionInfo>();
860         for (HRegionLocation regionLocation : regionLocations) {
861           HRegionInfo info = regionLocation.getRegionInfo();
862           ServerName serverName = regionLocation.getServerName();
863           TRegionInfo region = new TRegionInfo();
864           region.serverName = ByteBuffer.wrap(
865               Bytes.toBytes(serverName.getHostname()));
866           region.port = serverName.getPort();
867           region.startKey = ByteBuffer.wrap(info.getStartKey());
868           region.endKey = ByteBuffer.wrap(info.getEndKey());
869           region.id = info.getRegionId();
870           region.name = ByteBuffer.wrap(info.getRegionName());
871           region.version = info.getVersion();
872           results.add(region);
873         }
874         return results;
875       } catch (TableNotFoundException e) {
876         // Return empty list for non-existing table
877         return Collections.emptyList();
878       } catch (IOException e){
879         LOG.warn(e.getMessage(), e);
880         throw new IOError(Throwables.getStackTraceAsString(e));
881       }
882     }
883 
884     @Override
885     public List<TCell> get(
886         ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
887         Map<ByteBuffer, ByteBuffer> attributes)
888         throws IOError {
889       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
890       if (famAndQf.length == 1) {
891         return get(tableName, row, famAndQf[0], null, attributes);
892       }
893       if (famAndQf.length == 2) {
894         return get(tableName, row, famAndQf[0], famAndQf[1], attributes);
895       }
896       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
897     }
898 
899     /**
900      * Note: this internal interface is slightly different from public APIs in regard to handling
901      * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
902      * we respect qual == null as a request for the entire column family. The caller (
903      * {@link #get(ByteBuffer, ByteBuffer, ByteBuffer, Map)}) interface IS consistent in that the
904      * column is parse like normal.
905      */
906     protected List<TCell> get(ByteBuffer tableName,
907                               ByteBuffer row,
908                               byte[] family,
909                               byte[] qualifier,
910                               Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
911       Table table = null;
912       try {
913         table = getTable(tableName);
914         Get get = new Get(getBytes(row));
915         addAttributes(get, attributes);
916         if (qualifier == null) {
917           get.addFamily(family);
918         } else {
919           get.addColumn(family, qualifier);
920         }
921         Result result = table.get(get);
922         return ThriftUtilities.cellFromHBase(result.rawCells());
923       } catch (IOException e) {
924         LOG.warn(e.getMessage(), e);
925         throw new IOError(Throwables.getStackTraceAsString(e));
926       } finally {
927         closeTable(table);
928       }
929     }
930 
931     @Override
932     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
933         int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
934       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
935       if(famAndQf.length == 1) {
936         return getVer(tableName, row, famAndQf[0], null, numVersions, attributes);
937       }
938       if (famAndQf.length == 2) {
939         return getVer(tableName, row, famAndQf[0], famAndQf[1], numVersions, attributes);
940       }
941       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
942 
943     }
944 
945     /**
946      * Note: this public interface is slightly different from public Java APIs in regard to
947      * handling of the qualifier. Here we differ from the public Java API in that null != byte[0].
948      * Rather, we respect qual == null as a request for the entire column family. If you want to
949      * access the entire column family, use
950      * {@link #getVer(ByteBuffer, ByteBuffer, ByteBuffer, int, Map)} with a {@code column} value
951      * that lacks a {@code ':'}.
952      */
953     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family,
954         byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
955 
956       Table table = null;
957       try {
958         table = getTable(tableName);
959         Get get = new Get(getBytes(row));
960         addAttributes(get, attributes);
961         if (null == qualifier) {
962           get.addFamily(family);
963         } else {
964           get.addColumn(family, qualifier);
965         }
966         get.setMaxVersions(numVersions);
967         Result result = table.get(get);
968         return ThriftUtilities.cellFromHBase(result.rawCells());
969       } catch (IOException e) {
970         LOG.warn(e.getMessage(), e);
971         throw new IOError(Throwables.getStackTraceAsString(e));
972       } finally{
973         closeTable(table);
974       }
975     }
976 
977     @Override
978     public List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
979         long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
980       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
981       if (famAndQf.length == 1) {
982         return getVerTs(tableName, row, famAndQf[0], null, timestamp, numVersions, attributes);
983       }
984       if (famAndQf.length == 2) {
985         return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, numVersions,
986           attributes);
987       }
988       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
989     }
990 
991     /**
992      * Note: this internal interface is slightly different from public APIs in regard to handling
993      * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
994      * we respect qual == null as a request for the entire column family. The caller (
995      * {@link #getVerTs(ByteBuffer, ByteBuffer, ByteBuffer, long, int, Map)}) interface IS
996      * consistent in that the column is parse like normal.
997      */
998     protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
999         byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
1000         throws IOError {
1001 
1002       Table table = null;
1003       try {
1004         table = getTable(tableName);
1005         Get get = new Get(getBytes(row));
1006         addAttributes(get, attributes);
1007         if (null == qualifier) {
1008           get.addFamily(family);
1009         } else {
1010           get.addColumn(family, qualifier);
1011         }
1012         get.setTimeRange(0, timestamp);
1013         get.setMaxVersions(numVersions);
1014         Result result = table.get(get);
1015         return ThriftUtilities.cellFromHBase(result.rawCells());
1016       } catch (IOException e) {
1017         LOG.warn(e.getMessage(), e);
1018         throw new IOError(Throwables.getStackTraceAsString(e));
1019       } finally{
1020         closeTable(table);
1021       }
1022     }
1023 
1024     @Override
1025     public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row,
1026         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1027       return getRowWithColumnsTs(tableName, row, null,
1028                                  HConstants.LATEST_TIMESTAMP,
1029                                  attributes);
1030     }
1031 
1032     @Override
1033     public List<TRowResult> getRowWithColumns(ByteBuffer tableName,
1034                                               ByteBuffer row,
1035         List<ByteBuffer> columns,
1036         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1037       return getRowWithColumnsTs(tableName, row, columns,
1038                                  HConstants.LATEST_TIMESTAMP,
1039                                  attributes);
1040     }
1041 
1042     @Override
1043     public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row,
1044         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1045       return getRowWithColumnsTs(tableName, row, null,
1046                                  timestamp, attributes);
1047     }
1048 
1049     @Override
1050     public List<TRowResult> getRowWithColumnsTs(
1051         ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
1052         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1053 
1054       Table table = null;
1055       try {
1056         table = getTable(tableName);
1057         if (columns == null) {
1058           Get get = new Get(getBytes(row));
1059           addAttributes(get, attributes);
1060           get.setTimeRange(0, timestamp);
1061           Result result = table.get(get);
1062           return ThriftUtilities.rowResultFromHBase(result);
1063         }
1064         Get get = new Get(getBytes(row));
1065         addAttributes(get, attributes);
1066         for(ByteBuffer column : columns) {
1067           byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1068           if (famAndQf.length == 1) {
1069               get.addFamily(famAndQf[0]);
1070           } else {
1071               get.addColumn(famAndQf[0], famAndQf[1]);
1072           }
1073         }
1074         get.setTimeRange(0, timestamp);
1075         Result result = table.get(get);
1076         return ThriftUtilities.rowResultFromHBase(result);
1077       } catch (IOException e) {
1078         LOG.warn(e.getMessage(), e);
1079         throw new IOError(Throwables.getStackTraceAsString(e));
1080       } finally{
1081         closeTable(table);
1082       }
1083     }
1084 
1085     @Override
1086     public List<TRowResult> getRows(ByteBuffer tableName,
1087                                     List<ByteBuffer> rows,
1088         Map<ByteBuffer, ByteBuffer> attributes)
1089         throws IOError {
1090       return getRowsWithColumnsTs(tableName, rows, null,
1091                                   HConstants.LATEST_TIMESTAMP,
1092                                   attributes);
1093     }
1094 
1095     @Override
1096     public List<TRowResult> getRowsWithColumns(ByteBuffer tableName,
1097                                                List<ByteBuffer> rows,
1098         List<ByteBuffer> columns,
1099         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1100       return getRowsWithColumnsTs(tableName, rows, columns,
1101                                   HConstants.LATEST_TIMESTAMP,
1102                                   attributes);
1103     }
1104 
1105     @Override
1106     public List<TRowResult> getRowsTs(ByteBuffer tableName,
1107                                       List<ByteBuffer> rows,
1108         long timestamp,
1109         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1110       return getRowsWithColumnsTs(tableName, rows, null,
1111                                   timestamp, attributes);
1112     }
1113 
1114     @Override
1115     public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName,
1116                                                  List<ByteBuffer> rows,
1117         List<ByteBuffer> columns, long timestamp,
1118         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1119 
1120       Table table= null;
1121       try {
1122         List<Get> gets = new ArrayList<Get>(rows.size());
1123         table = getTable(tableName);
1124         if (metrics != null) {
1125           metrics.incNumRowKeysInBatchGet(rows.size());
1126         }
1127         for (ByteBuffer row : rows) {
1128           Get get = new Get(getBytes(row));
1129           addAttributes(get, attributes);
1130           if (columns != null) {
1131 
1132             for(ByteBuffer column : columns) {
1133               byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1134               if (famAndQf.length == 1) {
1135                 get.addFamily(famAndQf[0]);
1136               } else {
1137                 get.addColumn(famAndQf[0], famAndQf[1]);
1138               }
1139             }
1140           }
1141           get.setTimeRange(0, timestamp);
1142           gets.add(get);
1143         }
1144         Result[] result = table.get(gets);
1145         return ThriftUtilities.rowResultFromHBase(result);
1146       } catch (IOException e) {
1147         LOG.warn(e.getMessage(), e);
1148         throw new IOError(Throwables.getStackTraceAsString(e));
1149       } finally{
1150         closeTable(table);
1151       }
1152     }
1153 
1154     @Override
1155     public void deleteAll(
1156         ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1157         Map<ByteBuffer, ByteBuffer> attributes)
1158         throws IOError {
1159       deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP,
1160                   attributes);
1161     }
1162 
1163     @Override
1164     public void deleteAllTs(ByteBuffer tableName,
1165                             ByteBuffer row,
1166                             ByteBuffer column,
1167         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1168       Table table = null;
1169       try {
1170         table = getTable(tableName);
1171         Delete delete  = new Delete(getBytes(row));
1172         addAttributes(delete, attributes);
1173         byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1174         if (famAndQf.length == 1) {
1175           delete.deleteFamily(famAndQf[0], timestamp);
1176         } else {
1177           delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1178         }
1179         table.delete(delete);
1180 
1181       } catch (IOException e) {
1182         LOG.warn(e.getMessage(), e);
1183         throw new IOError(Throwables.getStackTraceAsString(e));
1184       } finally {
1185         closeTable(table);
1186       }
1187     }
1188 
1189     @Override
1190     public void deleteAllRow(
1191         ByteBuffer tableName, ByteBuffer row,
1192         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1193       deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, attributes);
1194     }
1195 
1196     @Override
1197     public void deleteAllRowTs(
1198         ByteBuffer tableName, ByteBuffer row, long timestamp,
1199         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1200       Table table = null;
1201       try {
1202         table = getTable(tableName);
1203         Delete delete  = new Delete(getBytes(row), timestamp);
1204         addAttributes(delete, attributes);
1205         table.delete(delete);
1206       } catch (IOException e) {
1207         LOG.warn(e.getMessage(), e);
1208         throw new IOError(Throwables.getStackTraceAsString(e));
1209       } finally {
1210         closeTable(table);
1211       }
1212     }
1213 
1214     @Override
1215     public void createTable(ByteBuffer in_tableName,
1216         List<ColumnDescriptor> columnFamilies) throws IOError,
1217         IllegalArgument, AlreadyExists {
1218       TableName tableName = getTableName(in_tableName);
1219       try {
1220         if (getAdmin().tableExists(tableName)) {
1221           throw new AlreadyExists("table name already in use");
1222         }
1223         HTableDescriptor desc = new HTableDescriptor(tableName);
1224         for (ColumnDescriptor col : columnFamilies) {
1225           HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col);
1226           desc.addFamily(colDesc);
1227         }
1228         getAdmin().createTable(desc);
1229       } catch (IOException e) {
1230         LOG.warn(e.getMessage(), e);
1231         throw new IOError(Throwables.getStackTraceAsString(e));
1232       } catch (IllegalArgumentException e) {
1233         LOG.warn(e.getMessage(), e);
1234         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1235       }
1236     }
1237 
1238     private static TableName getTableName(ByteBuffer buffer) {
1239       return TableName.valueOf(getBytes(buffer));
1240     }
1241 
1242     @Override
1243     public void deleteTable(ByteBuffer in_tableName) throws IOError {
1244       TableName tableName = getTableName(in_tableName);
1245       if (LOG.isDebugEnabled()) {
1246         LOG.debug("deleteTable: table=" + tableName);
1247       }
1248       try {
1249         if (!getAdmin().tableExists(tableName)) {
1250           throw new IOException("table does not exist");
1251         }
1252         getAdmin().deleteTable(tableName);
1253       } catch (IOException e) {
1254         LOG.warn(e.getMessage(), e);
1255         throw new IOError(Throwables.getStackTraceAsString(e));
1256       }
1257     }
1258 
1259     @Override
1260     public void mutateRow(ByteBuffer tableName, ByteBuffer row,
1261         List<Mutation> mutations, Map<ByteBuffer, ByteBuffer> attributes)
1262         throws IOError, IllegalArgument {
1263       mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP,
1264                   attributes);
1265     }
1266 
1267     @Override
1268     public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
1269         List<Mutation> mutations, long timestamp,
1270         Map<ByteBuffer, ByteBuffer> attributes)
1271         throws IOError, IllegalArgument {
1272       Table table = null;
1273       try {
1274         table = getTable(tableName);
1275         Put put = new Put(getBytes(row), timestamp);
1276         addAttributes(put, attributes);
1277 
1278         Delete delete = new Delete(getBytes(row));
1279         addAttributes(delete, attributes);
1280         if (metrics != null) {
1281           metrics.incNumRowKeysInBatchMutate(mutations.size());
1282         }
1283 
1284         // I apologize for all this mess :)
1285         for (Mutation m : mutations) {
1286           byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1287           if (m.isDelete) {
1288             if (famAndQf.length == 1) {
1289               delete.deleteFamily(famAndQf[0], timestamp);
1290             } else {
1291               delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1292             }
1293             delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1294                 : Durability.SKIP_WAL);
1295           } else {
1296             if(famAndQf.length == 1) {
1297               LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1298                   + "over the whole column family.");
1299             } else {
1300               put.addImmutable(famAndQf[0], famAndQf[1],
1301                   m.value != null ? getBytes(m.value)
1302                       : HConstants.EMPTY_BYTE_ARRAY);
1303             }
1304             put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1305           }
1306         }
1307         if (!delete.isEmpty())
1308           table.delete(delete);
1309         if (!put.isEmpty())
1310           table.put(put);
1311       } catch (IOException e) {
1312         LOG.warn(e.getMessage(), e);
1313         throw new IOError(Throwables.getStackTraceAsString(e));
1314       } catch (IllegalArgumentException e) {
1315         LOG.warn(e.getMessage(), e);
1316         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1317       } finally{
1318         closeTable(table);
1319       }
1320     }
1321 
1322     @Override
1323     public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches,
1324         Map<ByteBuffer, ByteBuffer> attributes)
1325         throws IOError, IllegalArgument, TException {
1326       mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes);
1327     }
1328 
1329     @Override
1330     public void mutateRowsTs(
1331         ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp,
1332         Map<ByteBuffer, ByteBuffer> attributes)
1333         throws IOError, IllegalArgument, TException {
1334       List<Put> puts = new ArrayList<Put>();
1335       List<Delete> deletes = new ArrayList<Delete>();
1336 
1337       for (BatchMutation batch : rowBatches) {
1338         byte[] row = getBytes(batch.row);
1339         List<Mutation> mutations = batch.mutations;
1340         Delete delete = new Delete(row);
1341         addAttributes(delete, attributes);
1342         Put put = new Put(row, timestamp);
1343         addAttributes(put, attributes);
1344         for (Mutation m : mutations) {
1345           byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1346           if (m.isDelete) {
1347             // no qualifier, family only.
1348             if (famAndQf.length == 1) {
1349               delete.deleteFamily(famAndQf[0], timestamp);
1350             } else {
1351               delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1352             }
1353             delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1354                 : Durability.SKIP_WAL);
1355           } else {
1356             if (famAndQf.length == 1) {
1357               LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1358                   + "over the whole column family.");
1359             }
1360             if (famAndQf.length == 2) {
1361               put.addImmutable(famAndQf[0], famAndQf[1],
1362                   m.value != null ? getBytes(m.value)
1363                       : HConstants.EMPTY_BYTE_ARRAY);
1364             } else {
1365               throw new IllegalArgumentException("Invalid famAndQf provided.");
1366             }
1367             put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1368           }
1369         }
1370         if (!delete.isEmpty())
1371           deletes.add(delete);
1372         if (!put.isEmpty())
1373           puts.add(put);
1374       }
1375 
1376       Table table = null;
1377       try {
1378         table = getTable(tableName);
1379         if (!puts.isEmpty())
1380           table.put(puts);
1381         if (!deletes.isEmpty())
1382           table.delete(deletes);
1383 
1384       } catch (IOException e) {
1385         LOG.warn(e.getMessage(), e);
1386         throw new IOError(Throwables.getStackTraceAsString(e));
1387       } catch (IllegalArgumentException e) {
1388         LOG.warn(e.getMessage(), e);
1389         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1390       } finally{
1391         closeTable(table);
1392       }
1393     }
1394 
1395     @Override
1396     public long atomicIncrement(
1397         ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount)
1398             throws IOError, IllegalArgument, TException {
1399       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1400       if(famAndQf.length == 1) {
1401         return atomicIncrement(tableName, row, famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, amount);
1402       }
1403       return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
1404     }
1405 
1406     protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
1407         byte [] family, byte [] qualifier, long amount)
1408         throws IOError, IllegalArgument, TException {
1409       Table table = null;
1410       try {
1411         table = getTable(tableName);
1412         return table.incrementColumnValue(
1413             getBytes(row), family, qualifier, amount);
1414       } catch (IOException e) {
1415         LOG.warn(e.getMessage(), e);
1416         throw new IOError(Throwables.getStackTraceAsString(e));
1417       } finally {
1418         closeTable(table);
1419       }
1420     }
1421 
1422     @Override
1423     public void scannerClose(int id) throws IOError, IllegalArgument {
1424       LOG.debug("scannerClose: id=" + id);
1425       ResultScannerWrapper resultScannerWrapper = getScanner(id);
1426       if (resultScannerWrapper == null) {
1427         String message = "scanner ID is invalid";
1428         LOG.warn(message);
1429         throw new IllegalArgument("scanner ID is invalid");
1430       }
1431       resultScannerWrapper.getScanner().close();
1432       removeScanner(id);
1433     }
1434 
1435     @Override
1436     public List<TRowResult> scannerGetList(int id,int nbRows)
1437         throws IllegalArgument, IOError {
1438       LOG.debug("scannerGetList: id=" + id);
1439       ResultScannerWrapper resultScannerWrapper = getScanner(id);
1440       if (null == resultScannerWrapper) {
1441         String message = "scanner ID is invalid";
1442         LOG.warn(message);
1443         throw new IllegalArgument("scanner ID is invalid");
1444       }
1445 
1446       Result [] results = null;
1447       try {
1448         results = resultScannerWrapper.getScanner().next(nbRows);
1449         if (null == results) {
1450           return new ArrayList<TRowResult>();
1451         }
1452       } catch (IOException e) {
1453         LOG.warn(e.getMessage(), e);
1454         throw new IOError(Throwables.getStackTraceAsString(e));
1455       }
1456       return ThriftUtilities.rowResultFromHBase(results, resultScannerWrapper.isColumnSorted());
1457     }
1458 
1459     @Override
1460     public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
1461       return scannerGetList(id,1);
1462     }
1463 
1464     @Override
1465     public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
1466         Map<ByteBuffer, ByteBuffer> attributes)
1467         throws IOError {
1468 
1469       Table table = null;
1470       try {
1471         table = getTable(tableName);
1472         Scan scan = new Scan();
1473         addAttributes(scan, attributes);
1474         if (tScan.isSetStartRow()) {
1475           scan.setStartRow(tScan.getStartRow());
1476         }
1477         if (tScan.isSetStopRow()) {
1478           scan.setStopRow(tScan.getStopRow());
1479         }
1480         if (tScan.isSetTimestamp()) {
1481           scan.setTimeRange(0, tScan.getTimestamp());
1482         }
1483         if (tScan.isSetCaching()) {
1484           scan.setCaching(tScan.getCaching());
1485         }
1486         if (tScan.isSetBatchSize()) {
1487           scan.setBatch(tScan.getBatchSize());
1488         }
1489         if (tScan.isSetColumns() && tScan.getColumns().size() != 0) {
1490           for(ByteBuffer column : tScan.getColumns()) {
1491             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1492             if(famQf.length == 1) {
1493               scan.addFamily(famQf[0]);
1494             } else {
1495               scan.addColumn(famQf[0], famQf[1]);
1496             }
1497           }
1498         }
1499         if (tScan.isSetFilterString()) {
1500           ParseFilter parseFilter = new ParseFilter();
1501           scan.setFilter(
1502               parseFilter.parseFilterString(tScan.getFilterString()));
1503         }
1504         if (tScan.isSetReversed()) {
1505           scan.setReversed(tScan.isReversed());
1506         }
1507         return addScanner(table.getScanner(scan), tScan.sortColumns);
1508       } catch (IOException e) {
1509         LOG.warn(e.getMessage(), e);
1510         throw new IOError(Throwables.getStackTraceAsString(e));
1511       } finally{
1512         closeTable(table);
1513       }
1514     }
1515 
1516     @Override
1517     public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
1518         List<ByteBuffer> columns,
1519         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1520 
1521       Table table = null;
1522       try {
1523         table = getTable(tableName);
1524         Scan scan = new Scan(getBytes(startRow));
1525         addAttributes(scan, attributes);
1526         if(columns != null && columns.size() != 0) {
1527           for(ByteBuffer column : columns) {
1528             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1529             if(famQf.length == 1) {
1530               scan.addFamily(famQf[0]);
1531             } else {
1532               scan.addColumn(famQf[0], famQf[1]);
1533             }
1534           }
1535         }
1536         return addScanner(table.getScanner(scan), false);
1537       } catch (IOException e) {
1538         LOG.warn(e.getMessage(), e);
1539         throw new IOError(Throwables.getStackTraceAsString(e));
1540       } finally{
1541         closeTable(table);
1542       }
1543     }
1544 
1545     @Override
1546     public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow,
1547         ByteBuffer stopRow, List<ByteBuffer> columns,
1548         Map<ByteBuffer, ByteBuffer> attributes)
1549         throws IOError, TException {
1550 
1551       Table table = null;
1552       try {
1553         table = getTable(tableName);
1554         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1555         addAttributes(scan, attributes);
1556         if(columns != null && columns.size() != 0) {
1557           for(ByteBuffer column : columns) {
1558             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1559             if(famQf.length == 1) {
1560               scan.addFamily(famQf[0]);
1561             } else {
1562               scan.addColumn(famQf[0], famQf[1]);
1563             }
1564           }
1565         }
1566         return addScanner(table.getScanner(scan), false);
1567       } catch (IOException e) {
1568         LOG.warn(e.getMessage(), e);
1569         throw new IOError(Throwables.getStackTraceAsString(e));
1570       } finally{
1571         closeTable(table);
1572       }
1573     }
1574 
1575     @Override
1576     public int scannerOpenWithPrefix(ByteBuffer tableName,
1577                                      ByteBuffer startAndPrefix,
1578                                      List<ByteBuffer> columns,
1579         Map<ByteBuffer, ByteBuffer> attributes)
1580         throws IOError, TException {
1581 
1582       Table table = null;
1583       try {
1584         table = getTable(tableName);
1585         Scan scan = new Scan(getBytes(startAndPrefix));
1586         addAttributes(scan, attributes);
1587         Filter f = new WhileMatchFilter(
1588             new PrefixFilter(getBytes(startAndPrefix)));
1589         scan.setFilter(f);
1590         if (columns != null && columns.size() != 0) {
1591           for(ByteBuffer column : columns) {
1592             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1593             if(famQf.length == 1) {
1594               scan.addFamily(famQf[0]);
1595             } else {
1596               scan.addColumn(famQf[0], famQf[1]);
1597             }
1598           }
1599         }
1600         return addScanner(table.getScanner(scan), false);
1601       } catch (IOException e) {
1602         LOG.warn(e.getMessage(), e);
1603         throw new IOError(Throwables.getStackTraceAsString(e));
1604       } finally{
1605         closeTable(table);
1606       }
1607     }
1608 
1609     @Override
1610     public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
1611         List<ByteBuffer> columns, long timestamp,
1612         Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
1613 
1614       Table table = null;
1615       try {
1616         table = getTable(tableName);
1617         Scan scan = new Scan(getBytes(startRow));
1618         addAttributes(scan, attributes);
1619         scan.setTimeRange(0, timestamp);
1620         if (columns != null && columns.size() != 0) {
1621           for (ByteBuffer column : columns) {
1622             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1623             if(famQf.length == 1) {
1624               scan.addFamily(famQf[0]);
1625             } else {
1626               scan.addColumn(famQf[0], famQf[1]);
1627             }
1628           }
1629         }
1630         return addScanner(table.getScanner(scan), false);
1631       } catch (IOException e) {
1632         LOG.warn(e.getMessage(), e);
1633         throw new IOError(Throwables.getStackTraceAsString(e));
1634       } finally{
1635         closeTable(table);
1636       }
1637     }
1638 
1639     @Override
1640     public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow,
1641         ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
1642         Map<ByteBuffer, ByteBuffer> attributes)
1643         throws IOError, TException {
1644 
1645       Table table = null;
1646       try {
1647         table = getTable(tableName);
1648         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1649         addAttributes(scan, attributes);
1650         scan.setTimeRange(0, timestamp);
1651         if (columns != null && columns.size() != 0) {
1652           for (ByteBuffer column : columns) {
1653             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1654             if(famQf.length == 1) {
1655               scan.addFamily(famQf[0]);
1656             } else {
1657               scan.addColumn(famQf[0], famQf[1]);
1658             }
1659           }
1660         }
1661         scan.setTimeRange(0, timestamp);
1662         return addScanner(table.getScanner(scan), false);
1663       } catch (IOException e) {
1664         LOG.warn(e.getMessage(), e);
1665         throw new IOError(Throwables.getStackTraceAsString(e));
1666       } finally{
1667         closeTable(table);
1668       }
1669     }
1670 
1671     @Override
1672     public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
1673         ByteBuffer tableName) throws IOError, TException {
1674 
1675       Table table = null;
1676       try {
1677         TreeMap<ByteBuffer, ColumnDescriptor> columns =
1678           new TreeMap<ByteBuffer, ColumnDescriptor>();
1679 
1680         table = getTable(tableName);
1681         HTableDescriptor desc = table.getTableDescriptor();
1682 
1683         for (HColumnDescriptor e : desc.getFamilies()) {
1684           ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
1685           columns.put(col.name, col);
1686         }
1687         return columns;
1688       } catch (IOException e) {
1689         LOG.warn(e.getMessage(), e);
1690         throw new IOError(Throwables.getStackTraceAsString(e));
1691       } finally {
1692         closeTable(table);
1693       }
1694     }
1695 
1696     @Deprecated
1697     @Override
1698     public List<TCell> getRowOrBefore(ByteBuffer tableName, ByteBuffer row,
1699         ByteBuffer family) throws IOError {
1700       try {
1701         Result result = getRowOrBefore(getBytes(tableName), getBytes(row), getBytes(family));
1702         return ThriftUtilities.cellFromHBase(result.rawCells());
1703       } catch (IOException e) {
1704         LOG.warn(e.getMessage(), e);
1705         throw new IOError(Throwables.getStackTraceAsString(e));
1706       }
1707     }
1708 
1709     @Override
1710     public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
1711       try {
1712         byte[] row = getBytes(searchRow);
1713         Result startRowResult =
1714             getRowOrBefore(TableName.META_TABLE_NAME.getName(), row, HConstants.CATALOG_FAMILY);
1715 
1716         if (startRowResult == null) {
1717           throw new IOException("Cannot find row in "+ TableName.META_TABLE_NAME+", row="
1718                                 + Bytes.toStringBinary(row));
1719         }
1720 
1721         // find region start and end keys
1722         HRegionInfo regionInfo = HRegionInfo.getHRegionInfo(startRowResult);
1723         if (regionInfo == null) {
1724           throw new IOException("HRegionInfo REGIONINFO was null or " +
1725                                 " empty in Meta for row="
1726                                 + Bytes.toStringBinary(row));
1727         }
1728         TRegionInfo region = new TRegionInfo();
1729         region.setStartKey(regionInfo.getStartKey());
1730         region.setEndKey(regionInfo.getEndKey());
1731         region.id = regionInfo.getRegionId();
1732         region.setName(regionInfo.getRegionName());
1733         region.version = regionInfo.getVersion();
1734 
1735         // find region assignment to server
1736         ServerName serverName = HRegionInfo.getServerName(startRowResult);
1737         if (serverName != null) {
1738           region.setServerName(Bytes.toBytes(serverName.getHostname()));
1739           region.port = serverName.getPort();
1740         }
1741         return region;
1742       } catch (IOException e) {
1743         LOG.warn(e.getMessage(), e);
1744         throw new IOError(Throwables.getStackTraceAsString(e));
1745       }
1746     }
1747 
1748     private void closeTable(Table table) throws IOError
1749     {
1750       try{
1751         if(table != null){
1752           table.close();
1753         }
1754       } catch (IOException e){
1755         LOG.error(e.getMessage(), e);
1756         throw new IOError(Throwables.getStackTraceAsString(e));
1757       }
1758     }
1759 
1760     private Result getRowOrBefore(byte[] tableName, byte[] row, byte[] family) throws IOException {
1761       Scan scan = new Scan(row);
1762       scan.setReversed(true);
1763       scan.addFamily(family);
1764       scan.setStartRow(row);
1765       Table table = getTable(tableName);
1766       try (ResultScanner scanner = table.getScanner(scan)) {
1767         return scanner.next();
1768       } finally{
1769         if(table != null){
1770           table.close();
1771         }
1772       }
1773     }
1774 
1775     private void initMetrics(ThriftMetrics metrics) {
1776       this.metrics = metrics;
1777     }
1778 
1779     @Override
1780     public void increment(TIncrement tincrement) throws IOError, TException {
1781 
1782       if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) {
1783         throw new TException("Must supply a table and a row key; can't increment");
1784       }
1785 
1786       if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1787         this.coalescer.queueIncrement(tincrement);
1788         return;
1789       }
1790 
1791       Table table = null;
1792       try {
1793         table = getTable(tincrement.getTable());
1794         Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
1795         table.increment(inc);
1796       } catch (IOException e) {
1797         LOG.warn(e.getMessage(), e);
1798         throw new IOError(Throwables.getStackTraceAsString(e));
1799       } finally{
1800         closeTable(table);
1801       }
1802     }
1803 
1804     @Override
1805     public void incrementRows(List<TIncrement> tincrements) throws IOError, TException {
1806       if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1807         this.coalescer.queueIncrements(tincrements);
1808         return;
1809       }
1810       for (TIncrement tinc : tincrements) {
1811         increment(tinc);
1812       }
1813     }
1814 
1815     @Override
1816     public List<TCell> append(TAppend tappend) throws IOError, TException {
1817       if (tappend.getRow().length == 0 || tappend.getTable().length == 0) {
1818         throw new TException("Must supply a table and a row key; can't append");
1819       }
1820 
1821       Table table = null;
1822       try {
1823         table = getTable(tappend.getTable());
1824         Append append = ThriftUtilities.appendFromThrift(tappend);
1825         Result result = table.append(append);
1826         return ThriftUtilities.cellFromHBase(result.rawCells());
1827       } catch (IOException e) {
1828         LOG.warn(e.getMessage(), e);
1829         throw new IOError(Throwables.getStackTraceAsString(e));
1830       } finally{
1831           closeTable(table);
1832       }
1833     }
1834 
1835     @Override
1836     public boolean checkAndPut(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1837         ByteBuffer value, Mutation mput, Map<ByteBuffer, ByteBuffer> attributes) throws IOError,
1838         IllegalArgument, TException {
1839       Put put;
1840       try {
1841         put = new Put(getBytes(row), HConstants.LATEST_TIMESTAMP);
1842         addAttributes(put, attributes);
1843 
1844         byte[][] famAndQf = KeyValue.parseColumn(getBytes(mput.column));
1845 
1846         put.addImmutable(famAndQf[0], famAndQf[1], mput.value != null ? getBytes(mput.value)
1847             : HConstants.EMPTY_BYTE_ARRAY);
1848 
1849         put.setDurability(mput.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1850       } catch (IllegalArgumentException e) {
1851         LOG.warn(e.getMessage(), e);
1852         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1853       }
1854 
1855       Table table = null;
1856       try {
1857         table = getTable(tableName);
1858         byte[][] famAndQf = KeyValue.parseColumn(getBytes(column));
1859         return table.checkAndPut(getBytes(row), famAndQf[0], famAndQf[1],
1860           value != null ? getBytes(value) : HConstants.EMPTY_BYTE_ARRAY, put);
1861       } catch (IOException e) {
1862         LOG.warn(e.getMessage(), e);
1863         throw new IOError(Throwables.getStackTraceAsString(e));
1864       } catch (IllegalArgumentException e) {
1865         LOG.warn(e.getMessage(), e);
1866         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1867       } finally {
1868           closeTable(table);
1869       }
1870     }
1871   }
1872 
1873 
1874 
1875   /**
1876    * Adds all the attributes into the Operation object
1877    */
1878   private static void addAttributes(OperationWithAttributes op,
1879     Map<ByteBuffer, ByteBuffer> attributes) {
1880     if (attributes == null || attributes.size() == 0) {
1881       return;
1882     }
1883     for (Map.Entry<ByteBuffer, ByteBuffer> entry : attributes.entrySet()) {
1884       String name = Bytes.toStringBinary(getBytes(entry.getKey()));
1885       byte[] value =  getBytes(entry.getValue());
1886       op.setAttribute(name, value);
1887     }
1888   }
1889 
1890   public static void registerFilters(Configuration conf) {
1891     String[] filters = conf.getStrings("hbase.thrift.filters");
1892     if(filters != null) {
1893       for(String filterClass: filters) {
1894         String[] filterPart = filterClass.split(":");
1895         if(filterPart.length != 2) {
1896           LOG.warn("Invalid filter specification " + filterClass + " - skipping");
1897         } else {
1898           ParseFilter.registerFilter(filterPart[0], filterPart[1]);
1899         }
1900       }
1901     }
1902   }
1903 }