View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.thrift;
20  
21  import static org.apache.hadoop.hbase.util.Bytes.getBytes;
22  
23  import java.io.IOException;
24  import java.net.InetAddress;
25  import java.net.InetSocketAddress;
26  import java.net.UnknownHostException;
27  import java.nio.ByteBuffer;
28  import java.security.PrivilegedAction;
29  import java.util.ArrayList;
30  import java.util.Arrays;
31  import java.util.Collections;
32  import java.util.HashMap;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.TreeMap;
36  import java.util.concurrent.BlockingQueue;
37  import java.util.concurrent.ExecutorService;
38  import java.util.concurrent.LinkedBlockingQueue;
39  import java.util.concurrent.ThreadPoolExecutor;
40  import java.util.concurrent.TimeUnit;
41  
42  import javax.security.auth.callback.Callback;
43  import javax.security.auth.callback.UnsupportedCallbackException;
44  import javax.security.sasl.AuthorizeCallback;
45  import javax.security.sasl.Sasl;
46  import javax.security.sasl.SaslServer;
47  
48  import org.apache.commons.cli.CommandLine;
49  import org.apache.commons.cli.Option;
50  import org.apache.commons.cli.OptionGroup;
51  import org.apache.commons.logging.Log;
52  import org.apache.commons.logging.LogFactory;
53  import org.apache.hadoop.conf.Configuration;
54  import org.apache.hadoop.hbase.HBaseConfiguration;
55  import org.apache.hadoop.hbase.HColumnDescriptor;
56  import org.apache.hadoop.hbase.HConstants;
57  import org.apache.hadoop.hbase.HRegionInfo;
58  import org.apache.hadoop.hbase.HRegionLocation;
59  import org.apache.hadoop.hbase.HTableDescriptor;
60  import org.apache.hadoop.hbase.KeyValue;
61  import org.apache.hadoop.hbase.ServerName;
62  import org.apache.hadoop.hbase.TableName;
63  import org.apache.hadoop.hbase.TableNotFoundException;
64  import org.apache.hadoop.hbase.classification.InterfaceAudience;
65  import org.apache.hadoop.hbase.client.Admin;
66  import org.apache.hadoop.hbase.client.Append;
67  import org.apache.hadoop.hbase.client.Delete;
68  import org.apache.hadoop.hbase.client.Durability;
69  import org.apache.hadoop.hbase.client.Get;
70  import org.apache.hadoop.hbase.client.HBaseAdmin;
71  import org.apache.hadoop.hbase.client.Increment;
72  import org.apache.hadoop.hbase.client.OperationWithAttributes;
73  import org.apache.hadoop.hbase.client.Put;
74  import org.apache.hadoop.hbase.client.RegionLocator;
75  import org.apache.hadoop.hbase.client.Result;
76  import org.apache.hadoop.hbase.client.ResultScanner;
77  import org.apache.hadoop.hbase.client.Scan;
78  import org.apache.hadoop.hbase.client.Table;
79  import org.apache.hadoop.hbase.filter.Filter;
80  import org.apache.hadoop.hbase.filter.ParseFilter;
81  import org.apache.hadoop.hbase.filter.PrefixFilter;
82  import org.apache.hadoop.hbase.filter.WhileMatchFilter;
83  import org.apache.hadoop.hbase.jetty.SslSelectChannelConnectorSecure;
84  import org.apache.hadoop.hbase.security.SecurityUtil;
85  import org.apache.hadoop.hbase.security.UserProvider;
86  import org.apache.hadoop.hbase.thrift.CallQueue.Call;
87  import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
88  import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
89  import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
90  import org.apache.hadoop.hbase.thrift.generated.Hbase;
91  import org.apache.hadoop.hbase.thrift.generated.IOError;
92  import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
93  import org.apache.hadoop.hbase.thrift.generated.Mutation;
94  import org.apache.hadoop.hbase.thrift.generated.TAppend;
95  import org.apache.hadoop.hbase.thrift.generated.TCell;
96  import org.apache.hadoop.hbase.thrift.generated.TIncrement;
97  import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
98  import org.apache.hadoop.hbase.thrift.generated.TRowResult;
99  import org.apache.hadoop.hbase.thrift.generated.TScan;
100 import org.apache.hadoop.hbase.util.Bytes;
101 import org.apache.hadoop.hbase.util.ConnectionCache;
102 import org.apache.hadoop.hbase.util.DNS;
103 import org.apache.hadoop.hbase.util.Strings;
104 import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
105 import org.apache.hadoop.security.UserGroupInformation;
106 import org.apache.hadoop.security.authorize.ProxyUsers;
107 import org.apache.thrift.TException;
108 import org.apache.thrift.TProcessor;
109 import org.apache.thrift.protocol.TBinaryProtocol;
110 import org.apache.thrift.protocol.TCompactProtocol;
111 import org.apache.thrift.protocol.TProtocol;
112 import org.apache.thrift.protocol.TProtocolFactory;
113 import org.apache.thrift.server.THsHaServer;
114 import org.apache.thrift.server.TNonblockingServer;
115 import org.apache.thrift.server.TServer;
116 import org.apache.thrift.server.TServlet;
117 import org.apache.thrift.server.TThreadedSelectorServer;
118 import org.apache.thrift.transport.TFramedTransport;
119 import org.apache.thrift.transport.TNonblockingServerSocket;
120 import org.apache.thrift.transport.TNonblockingServerTransport;
121 import org.apache.thrift.transport.TSaslServerTransport;
122 import org.apache.thrift.transport.TServerSocket;
123 import org.apache.thrift.transport.TServerTransport;
124 import org.apache.thrift.transport.TTransportFactory;
125 import org.mortbay.jetty.Connector;
126 import org.mortbay.jetty.Server;
127 import org.mortbay.jetty.nio.SelectChannelConnector;
128 import org.mortbay.jetty.servlet.Context;
129 import org.mortbay.jetty.servlet.ServletHolder;
130 import org.mortbay.thread.QueuedThreadPool;
131 
132 import com.google.common.base.Joiner;
133 import com.google.common.base.Throwables;
134 import com.google.common.util.concurrent.ThreadFactoryBuilder;
135 
136 /**
137  * ThriftServerRunner - this class starts up a Thrift server which implements
138  * the Hbase API specified in the Hbase.thrift IDL file.
139  */
140 @InterfaceAudience.Private
141 public class ThriftServerRunner implements Runnable {
142 
143   private static final Log LOG = LogFactory.getLog(ThriftServerRunner.class);
144 
145   static final String SERVER_TYPE_CONF_KEY =
146       "hbase.regionserver.thrift.server.type";
147 
148   static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress";
149   static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
150   static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
151   static final String MAX_FRAME_SIZE_CONF_KEY = "hbase.regionserver.thrift.framed.max_frame_size_in_mb";
152   static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
153   static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
154   static final String USE_HTTP_CONF_KEY = "hbase.regionserver.thrift.http";
155   static final String HTTP_MIN_THREADS = "hbase.thrift.http_threads.min";
156   static final String HTTP_MAX_THREADS = "hbase.thrift.http_threads.max";
157 
158   static final String THRIFT_SSL_ENABLED = "hbase.thrift.ssl.enabled";
159   static final String THRIFT_SSL_KEYSTORE_STORE = "hbase.thrift.ssl.keystore.store";
160   static final String THRIFT_SSL_KEYSTORE_PASSWORD = "hbase.thrift.ssl.keystore.password";
161   static final String THRIFT_SSL_KEYSTORE_KEYPASSWORD = "hbase.thrift.ssl.keystore.keypassword";
162 
163   /**
164    * Amount of time in milliseconds before a server thread will timeout
165    * waiting for client to send data on a connected socket. Currently,
166    * applies only to TBoundedThreadPoolServer
167    */
168   public static final String THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY =
169     "hbase.thrift.server.socket.read.timeout";
170   public static final int THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT = 60000;
171 
172 
173   /**
174    * Thrift quality of protection configuration key. Valid values can be:
175    * auth-conf: authentication, integrity and confidentiality checking
176    * auth-int: authentication and integrity checking
177    * auth: authentication only
178    *
179    * This is used to authenticate the callers and support impersonation.
180    * The thrift server and the HBase cluster must run in secure mode.
181    */
182   static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
183   static final String BACKLOG_CONF_KEY = "hbase.regionserver.thrift.backlog";
184 
185   private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
186   public static final int DEFAULT_LISTEN_PORT = 9090;
187   public static final int HREGION_VERSION = 1;
188   static final String THRIFT_SUPPORT_PROXYUSER = "hbase.thrift.support.proxyuser";
189   private final int listenPort;
190 
191   private Configuration conf;
192   volatile TServer tserver;
193   volatile Server httpServer;
194   private final Hbase.Iface handler;
195   private final ThriftMetrics metrics;
196   private final HBaseHandler hbaseHandler;
197   private final UserGroupInformation realUser;
198 
199   private final String qop;
200   private String host;
201 
202   private final boolean securityEnabled;
203   private final boolean doAsEnabled;
204 
205   /** An enum of server implementation selections */
206   enum ImplType {
207     HS_HA("hsha", true, THsHaServer.class, true),
208     NONBLOCKING("nonblocking", true, TNonblockingServer.class, true),
209     THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true),
210     THREADED_SELECTOR(
211         "threadedselector", true, TThreadedSelectorServer.class, true);
212 
213     public static final ImplType DEFAULT = THREAD_POOL;
214 
215     final String option;
216     final boolean isAlwaysFramed;
217     final Class<? extends TServer> serverClass;
218     final boolean canSpecifyBindIP;
219 
220     ImplType(String option, boolean isAlwaysFramed,
221         Class<? extends TServer> serverClass, boolean canSpecifyBindIP) {
222       this.option = option;
223       this.isAlwaysFramed = isAlwaysFramed;
224       this.serverClass = serverClass;
225       this.canSpecifyBindIP = canSpecifyBindIP;
226     }
227 
228     /**
229      * @return <code>-option</code> so we can get the list of options from
230      *         {@link #values()}
231      */
232     @Override
233     public String toString() {
234       return "-" + option;
235     }
236 
237     String getDescription() {
238       StringBuilder sb = new StringBuilder("Use the " +
239           serverClass.getSimpleName());
240       if (isAlwaysFramed) {
241         sb.append(" This implies the framed transport.");
242       }
243       if (this == DEFAULT) {
244         sb.append("This is the default.");
245       }
246       return sb.toString();
247     }
248 
249     static OptionGroup createOptionGroup() {
250       OptionGroup group = new OptionGroup();
251       for (ImplType t : values()) {
252         group.addOption(new Option(t.option, t.getDescription()));
253       }
254       return group;
255     }
256 
257     static ImplType getServerImpl(Configuration conf) {
258       String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option);
259       for (ImplType t : values()) {
260         if (confType.equals(t.option)) {
261           return t;
262         }
263       }
264       throw new AssertionError("Unknown server ImplType.option:" + confType);
265     }
266 
267     static void setServerImpl(CommandLine cmd, Configuration conf) {
268       ImplType chosenType = null;
269       int numChosen = 0;
270       for (ImplType t : values()) {
271         if (cmd.hasOption(t.option)) {
272           chosenType = t;
273           ++numChosen;
274         }
275       }
276       if (numChosen < 1) {
277         LOG.info("Using default thrift server type");
278         chosenType = DEFAULT;
279       } else if (numChosen > 1) {
280         throw new AssertionError("Exactly one option out of " +
281           Arrays.toString(values()) + " has to be specified");
282       }
283       LOG.info("Using thrift server type " + chosenType.option);
284       conf.set(SERVER_TYPE_CONF_KEY, chosenType.option);
285     }
286 
287     public String simpleClassName() {
288       return serverClass.getSimpleName();
289     }
290 
291     public static List<String> serversThatCannotSpecifyBindIP() {
292       List<String> l = new ArrayList<String>();
293       for (ImplType t : values()) {
294         if (!t.canSpecifyBindIP) {
295           l.add(t.simpleClassName());
296         }
297       }
298       return l;
299     }
300 
301   }
302 
303   public ThriftServerRunner(Configuration conf) throws IOException {
304     UserProvider userProvider = UserProvider.instantiate(conf);
305     // login the server principal (if using secure Hadoop)
306     securityEnabled = userProvider.isHadoopSecurityEnabled()
307       && userProvider.isHBaseSecurityEnabled();
308     if (securityEnabled) {
309       host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
310         conf.get("hbase.thrift.dns.interface", "default"),
311         conf.get("hbase.thrift.dns.nameserver", "default")));
312       userProvider.login("hbase.thrift.keytab.file",
313         "hbase.thrift.kerberos.principal", host);
314     }
315     this.conf = HBaseConfiguration.create(conf);
316     this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
317     this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
318     this.hbaseHandler = new HBaseHandler(conf, userProvider);
319     this.hbaseHandler.initMetrics(metrics);
320     this.handler = HbaseHandlerMetricsProxy.newInstance(
321       hbaseHandler, metrics, conf);
322     this.realUser = userProvider.getCurrent().getUGI();
323     qop = conf.get(THRIFT_QOP_KEY);
324     doAsEnabled = conf.getBoolean(THRIFT_SUPPORT_PROXYUSER, false);
325     if (doAsEnabled) {
326       if (!conf.getBoolean(USE_HTTP_CONF_KEY, false)) {
327         LOG.warn("Fail to enable the doAs feature. hbase.regionserver.thrift.http is not configured ");
328       }
329     }
330     if (qop != null) {
331       if (!qop.equals("auth") && !qop.equals("auth-int")
332           && !qop.equals("auth-conf")) {
333         throw new IOException("Invalid " + THRIFT_QOP_KEY + ": " + qop
334           + ", it must be 'auth', 'auth-int', or 'auth-conf'");
335       }
336       if (!securityEnabled) {
337         throw new IOException("Thrift server must"
338           + " run in secure mode to support authentication");
339       }
340     }
341   }
342 
343   /*
344    * Runs the Thrift server
345    */
346   @Override
347   public void run() {
348     realUser.doAs(new PrivilegedAction<Object>() {
349       @Override
350       public Object run() {
351         try {
352           if (conf.getBoolean(USE_HTTP_CONF_KEY, false)) {
353             setupHTTPServer();
354             httpServer.start();
355             httpServer.join();
356           } else {
357             setupServer();
358             tserver.serve();
359           }
360         } catch (Exception e) {
361           LOG.fatal("Cannot run ThriftServer", e);
362           // Crash the process if the ThriftServer is not running
363           System.exit(-1);
364         }
365         return null;
366       }
367     });
368 
369   }
370 
371   public void shutdown() {
372     if (tserver != null) {
373       tserver.stop();
374       tserver = null;
375     }
376     if (httpServer != null) {
377       try {
378         httpServer.stop();
379         httpServer = null;
380       } catch (Exception e) {
381         LOG.error("Problem encountered in shutting down HTTP server " + e.getCause());
382       }
383       httpServer = null;
384     }
385   }
386 
387   private void setupHTTPServer() throws IOException {
388     TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
389     TProcessor processor = new Hbase.Processor<Hbase.Iface>(handler);
390     TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, realUser,
391         conf, hbaseHandler, securityEnabled, doAsEnabled);
392 
393     httpServer = new Server();
394     // Context handler
395     Context context = new Context(httpServer, "/", Context.SESSIONS);
396     context.setContextPath("/");
397     String httpPath = "/*";
398     httpServer.setHandler(context);
399     context.addServlet(new ServletHolder(thriftHttpServlet), httpPath);
400 
401     // set up Jetty and run the embedded server
402     Connector connector = new SelectChannelConnector();
403     if(conf.getBoolean(THRIFT_SSL_ENABLED, false)) {
404       SslSelectChannelConnectorSecure sslConnector = new SslSelectChannelConnectorSecure();
405       String keystore = conf.get(THRIFT_SSL_KEYSTORE_STORE);
406       String password = HBaseConfiguration.getPassword(conf,
407           THRIFT_SSL_KEYSTORE_PASSWORD, null);
408       String keyPassword = HBaseConfiguration.getPassword(conf,
409           THRIFT_SSL_KEYSTORE_KEYPASSWORD, password);
410       sslConnector.setKeystore(keystore);
411       sslConnector.setPassword(password);
412       sslConnector.setKeyPassword(keyPassword);
413       connector = sslConnector;
414     }
415     String host = getBindAddress(conf).getHostAddress();
416     connector.setPort(listenPort);
417     connector.setHost(host);
418     connector.setHeaderBufferSize(1024 * 64);
419     httpServer.addConnector(connector);
420 
421     if (doAsEnabled) {
422       ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
423     }
424 
425     // Set the default max thread number to 100 to limit
426     // the number of concurrent requests so that Thrfit HTTP server doesn't OOM easily.
427     // Jetty set the default max thread number to 250, if we don't set it.
428     //
429     // Our default min thread number 2 is the same as that used by Jetty.
430     int minThreads = conf.getInt(HTTP_MIN_THREADS, 2);
431     int maxThreads = conf.getInt(HTTP_MAX_THREADS, 100);
432     QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads);
433     threadPool.setMinThreads(minThreads);
434     httpServer.setThreadPool(threadPool);
435 
436     httpServer.setSendServerVersion(false);
437     httpServer.setSendDateHeader(false);
438     httpServer.setStopAtShutdown(true);
439 
440     LOG.info("Starting Thrift HTTP Server on " + Integer.toString(listenPort));
441   }
442 
443   /**
444    * Setting up the thrift TServer
445    */
446   private void setupServer() throws Exception {
447     // Construct correct ProtocolFactory
448     TProtocolFactory protocolFactory;
449     if (conf.getBoolean(COMPACT_CONF_KEY, false)) {
450       LOG.debug("Using compact protocol");
451       protocolFactory = new TCompactProtocol.Factory();
452     } else {
453       LOG.debug("Using binary protocol");
454       protocolFactory = new TBinaryProtocol.Factory();
455     }
456 
457     final TProcessor p = new Hbase.Processor<Hbase.Iface>(handler);
458     ImplType implType = ImplType.getServerImpl(conf);
459     TProcessor processor = p;
460 
461     // Construct correct TransportFactory
462     TTransportFactory transportFactory;
463     if (conf.getBoolean(FRAMED_CONF_KEY, false) || implType.isAlwaysFramed) {
464       if (qop != null) {
465         throw new RuntimeException("Thrift server authentication"
466           + " doesn't work with framed transport yet");
467       }
468       transportFactory = new TFramedTransport.Factory(
469           conf.getInt(MAX_FRAME_SIZE_CONF_KEY, 2)  * 1024 * 1024);
470       LOG.debug("Using framed transport");
471     } else if (qop == null) {
472       transportFactory = new TTransportFactory();
473     } else {
474       // Extract the name from the principal
475       String name = SecurityUtil.getUserFromPrincipal(
476         conf.get("hbase.thrift.kerberos.principal"));
477       Map<String, String> saslProperties = new HashMap<String, String>();
478       saslProperties.put(Sasl.QOP, qop);
479       TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
480       saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
481         new SaslGssCallbackHandler() {
482           @Override
483           public void handle(Callback[] callbacks)
484               throws UnsupportedCallbackException {
485             AuthorizeCallback ac = null;
486             for (Callback callback : callbacks) {
487               if (callback instanceof AuthorizeCallback) {
488                 ac = (AuthorizeCallback) callback;
489               } else {
490                 throw new UnsupportedCallbackException(callback,
491                     "Unrecognized SASL GSSAPI Callback");
492               }
493             }
494             if (ac != null) {
495               String authid = ac.getAuthenticationID();
496               String authzid = ac.getAuthorizationID();
497               if (!authid.equals(authzid)) {
498                 ac.setAuthorized(false);
499               } else {
500                 ac.setAuthorized(true);
501                 String userName = SecurityUtil.getUserFromPrincipal(authzid);
502                 LOG.info("Effective user: " + userName);
503                 ac.setAuthorizedID(userName);
504               }
505             }
506           }
507         });
508       transportFactory = saslFactory;
509 
510       // Create a processor wrapper, to get the caller
511       processor = new TProcessor() {
512         @Override
513         public boolean process(TProtocol inProt,
514             TProtocol outProt) throws TException {
515           TSaslServerTransport saslServerTransport =
516             (TSaslServerTransport)inProt.getTransport();
517           SaslServer saslServer = saslServerTransport.getSaslServer();
518           String principal = saslServer.getAuthorizationID();
519           hbaseHandler.setEffectiveUser(principal);
520           return p.process(inProt, outProt);
521         }
522       };
523     }
524 
525     if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
526       LOG.error("Server types " + Joiner.on(", ").join(
527           ImplType.serversThatCannotSpecifyBindIP()) + " don't support IP " +
528           "address binding at the moment. See " +
529           "https://issues.apache.org/jira/browse/HBASE-2155 for details.");
530       throw new RuntimeException(
531           "-" + BIND_CONF_KEY + " not supported with " + implType);
532     }
533 
534     // Thrift's implementation uses '0' as a placeholder for 'use the default.'
535     int backlog = conf.getInt(BACKLOG_CONF_KEY, 0);
536 
537     if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING ||
538         implType == ImplType.THREADED_SELECTOR) {
539       InetAddress listenAddress = getBindAddress(conf);
540       TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(
541           new InetSocketAddress(listenAddress, listenPort));
542 
543       if (implType == ImplType.NONBLOCKING) {
544         TNonblockingServer.Args serverArgs =
545             new TNonblockingServer.Args(serverTransport);
546         serverArgs.processor(processor)
547                   .transportFactory(transportFactory)
548                   .protocolFactory(protocolFactory);
549         tserver = new TNonblockingServer(serverArgs);
550       } else if (implType == ImplType.HS_HA) {
551         THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
552         CallQueue callQueue =
553             new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
554         ExecutorService executorService = createExecutor(
555             callQueue, serverArgs.getMinWorkerThreads(), serverArgs.getMaxWorkerThreads());
556         serverArgs.executorService(executorService)
557                   .processor(processor)
558                   .transportFactory(transportFactory)
559                   .protocolFactory(protocolFactory);
560         tserver = new THsHaServer(serverArgs);
561       } else { // THREADED_SELECTOR
562         TThreadedSelectorServer.Args serverArgs =
563             new HThreadedSelectorServerArgs(serverTransport, conf);
564         CallQueue callQueue =
565             new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
566         ExecutorService executorService = createExecutor(
567             callQueue, serverArgs.getWorkerThreads(), serverArgs.getWorkerThreads());
568         serverArgs.executorService(executorService)
569                   .processor(processor)
570                   .transportFactory(transportFactory)
571                   .protocolFactory(protocolFactory);
572         tserver = new TThreadedSelectorServer(serverArgs);
573       }
574       LOG.info("starting HBase " + implType.simpleClassName() +
575           " server on " + Integer.toString(listenPort));
576     } else if (implType == ImplType.THREAD_POOL) {
577       // Thread pool server. Get the IP address to bind to.
578       InetAddress listenAddress = getBindAddress(conf);
579       int readTimeout = conf.getInt(THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY,
580           THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT);
581       TServerTransport serverTransport = new TServerSocket(
582           new TServerSocket.ServerSocketTransportArgs().
583               bindAddr(new InetSocketAddress(listenAddress, listenPort)).
584               backlog(backlog).
585               clientTimeout(readTimeout));
586 
587       TBoundedThreadPoolServer.Args serverArgs =
588           new TBoundedThreadPoolServer.Args(serverTransport, conf);
589       serverArgs.processor(processor)
590                 .transportFactory(transportFactory)
591                 .protocolFactory(protocolFactory);
592       LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
593           + listenAddress + ":" + Integer.toString(listenPort)
594           + " with readTimeout " + readTimeout + "ms; " + serverArgs);
595       TBoundedThreadPoolServer tserver =
596           new TBoundedThreadPoolServer(serverArgs, metrics);
597       this.tserver = tserver;
598     } else {
599       throw new AssertionError("Unsupported Thrift server implementation: " +
600           implType.simpleClassName());
601     }
602 
603     // A sanity check that we instantiated the right type of server.
604     if (tserver.getClass() != implType.serverClass) {
605       throw new AssertionError("Expected to create Thrift server class " +
606           implType.serverClass.getName() + " but got " +
607           tserver.getClass().getName());
608     }
609 
610 
611 
612     registerFilters(conf);
613   }
614 
615   ExecutorService createExecutor(BlockingQueue<Runnable> callQueue,
616                                  int minWorkers, int maxWorkers) {
617     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
618     tfb.setDaemon(true);
619     tfb.setNameFormat("thrift-worker-%d");
620     return new ThreadPoolExecutor(minWorkers, maxWorkers,
621             Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
622   }
623 
624   private InetAddress getBindAddress(Configuration conf)
625       throws UnknownHostException {
626     String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
627     return InetAddress.getByName(bindAddressStr);
628   }
629 
630   protected static class ResultScannerWrapper {
631 
632     private final ResultScanner scanner;
633     private final boolean sortColumns;
634     public ResultScannerWrapper(ResultScanner resultScanner,
635                                 boolean sortResultColumns) {
636       scanner = resultScanner;
637       sortColumns = sortResultColumns;
638    }
639 
640     public ResultScanner getScanner() {
641       return scanner;
642     }
643 
644     public boolean isColumnSorted() {
645       return sortColumns;
646     }
647   }
648 
649   /**
650    * The HBaseHandler is a glue object that connects Thrift RPC calls to the
651    * HBase client API primarily defined in the Admin and Table objects.
652    */
653   public static class HBaseHandler implements Hbase.Iface {
654     protected Configuration conf;
655     protected static final Log LOG = LogFactory.getLog(HBaseHandler.class);
656 
657     // nextScannerId and scannerMap are used to manage scanner state
658     protected int nextScannerId = 0;
659     protected HashMap<Integer, ResultScannerWrapper> scannerMap = null;
660     private ThriftMetrics metrics = null;
661 
662     private final ConnectionCache connectionCache;
663     IncrementCoalescer coalescer = null;
664 
665     static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
666     static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
667 
668     /**
669      * Returns a list of all the column families for a given Table.
670      *
671      * @param table
672      * @throws IOException
673      */
674     byte[][] getAllColumns(Table table) throws IOException {
675       HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies();
676       byte[][] columns = new byte[cds.length][];
677       for (int i = 0; i < cds.length; i++) {
678         columns[i] = Bytes.add(cds[i].getName(),
679             KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
680       }
681       return columns;
682     }
683 
684     /**
685      * Creates and returns a Table instance from a given table name.
686      *
687      * @param tableName
688      *          name of table
689      * @return Table object
690      * @throws IOException
691      */
692     public Table getTable(final byte[] tableName) throws
693         IOException {
694       String table = Bytes.toString(tableName);
695       return connectionCache.getTable(table);
696     }
697 
698     public Table getTable(final ByteBuffer tableName) throws IOException {
699       return getTable(getBytes(tableName));
700     }
701 
702     /**
703      * Assigns a unique ID to the scanner and adds the mapping to an internal
704      * hash-map.
705      *
706      * @param scanner
707      * @return integer scanner id
708      */
709     protected synchronized int addScanner(ResultScanner scanner,boolean sortColumns) {
710       int id = nextScannerId++;
711       ResultScannerWrapper resultScannerWrapper = new ResultScannerWrapper(scanner, sortColumns);
712       scannerMap.put(id, resultScannerWrapper);
713       return id;
714     }
715 
716     /**
717      * Returns the scanner associated with the specified ID.
718      *
719      * @param id
720      * @return a Scanner, or null if ID was invalid.
721      */
722     protected synchronized ResultScannerWrapper getScanner(int id) {
723       return scannerMap.get(id);
724     }
725 
726     /**
727      * Removes the scanner associated with the specified ID from the internal
728      * id-&gt;scanner hash-map.
729      *
730      * @param id
731      * @return a Scanner, or null if ID was invalid.
732      */
733     protected synchronized ResultScannerWrapper removeScanner(int id) {
734       return scannerMap.remove(id);
735     }
736 
737     protected HBaseHandler(final Configuration c,
738         final UserProvider userProvider) throws IOException {
739       this.conf = c;
740       scannerMap = new HashMap<Integer, ResultScannerWrapper>();
741       this.coalescer = new IncrementCoalescer(this);
742 
743       int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
744       int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
745       connectionCache = new ConnectionCache(
746         conf, userProvider, cleanInterval, maxIdleTime);
747     }
748 
749     /**
750      * Obtain HBaseAdmin. Creates the instance if it is not already created.
751      */
752     private Admin getAdmin() throws IOException {
753       return connectionCache.getAdmin();
754     }
755 
756     void setEffectiveUser(String effectiveUser) {
757       connectionCache.setEffectiveUser(effectiveUser);
758     }
759 
760     @Override
761     public void enableTable(ByteBuffer tableName) throws IOError {
762       try{
763         getAdmin().enableTable(getTableName(tableName));
764       } catch (IOException e) {
765         LOG.warn(e.getMessage(), e);
766         throw new IOError(Throwables.getStackTraceAsString(e));
767       }
768     }
769 
770     @Override
771     public void disableTable(ByteBuffer tableName) throws IOError{
772       try{
773         getAdmin().disableTable(getTableName(tableName));
774       } catch (IOException e) {
775         LOG.warn(e.getMessage(), e);
776         throw new IOError(Throwables.getStackTraceAsString(e));
777       }
778     }
779 
780     @Override
781     public boolean isTableEnabled(ByteBuffer tableName) throws IOError {
782       try {
783         return this.connectionCache.getAdmin().isTableEnabled(getTableName(tableName));
784       } catch (IOException e) {
785         LOG.warn(e.getMessage(), e);
786         throw new IOError(Throwables.getStackTraceAsString(e));
787       }
788     }
789 
790     @Override
791     public void compact(ByteBuffer tableNameOrRegionName) throws IOError {
792       try {
793         // TODO: HBaseAdmin.compact(byte[]) deprecated and not trivial to replace here.
794         // ThriftServerRunner.compact should be deprecated and replaced with methods specific to
795         // table and region.
796         ((HBaseAdmin) getAdmin()).compact(getBytes(tableNameOrRegionName));
797       } catch (IOException e) {
798         LOG.warn(e.getMessage(), e);
799         throw new IOError(Throwables.getStackTraceAsString(e));
800       }
801     }
802 
803     @Override
804     public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError {
805       try {
806         // TODO: HBaseAdmin.majorCompact(byte[]) deprecated and not trivial to replace here.
807         // ThriftServerRunner.majorCompact should be deprecated and replaced with methods specific
808         // to table and region.
809         ((HBaseAdmin) getAdmin()).majorCompact(getBytes(tableNameOrRegionName));
810       } catch (IOException e) {
811         LOG.warn(e.getMessage(), e);
812         throw new IOError(Throwables.getStackTraceAsString(e));
813       }
814     }
815 
816     @Override
817     public List<ByteBuffer> getTableNames() throws IOError {
818       try {
819         TableName[] tableNames = this.getAdmin().listTableNames();
820         ArrayList<ByteBuffer> list = new ArrayList<ByteBuffer>(tableNames.length);
821         for (int i = 0; i < tableNames.length; i++) {
822           list.add(ByteBuffer.wrap(tableNames[i].getName()));
823         }
824         return list;
825       } catch (IOException e) {
826         LOG.warn(e.getMessage(), e);
827         throw new IOError(Throwables.getStackTraceAsString(e));
828       }
829     }
830 
831     /**
832      * @return the list of regions in the given table, or an empty list if the table does not exist
833      */
834     @Override
835     public List<TRegionInfo> getTableRegions(ByteBuffer tableName)
836     throws IOError {
837       try (RegionLocator locator = connectionCache.getRegionLocator(getBytes(tableName))) {
838         List<HRegionLocation> regionLocations = locator.getAllRegionLocations();
839         List<TRegionInfo> results = new ArrayList<TRegionInfo>();
840         for (HRegionLocation regionLocation : regionLocations) {
841           HRegionInfo info = regionLocation.getRegionInfo();
842           ServerName serverName = regionLocation.getServerName();
843           TRegionInfo region = new TRegionInfo();
844           region.serverName = ByteBuffer.wrap(
845               Bytes.toBytes(serverName.getHostname()));
846           region.port = serverName.getPort();
847           region.startKey = ByteBuffer.wrap(info.getStartKey());
848           region.endKey = ByteBuffer.wrap(info.getEndKey());
849           region.id = info.getRegionId();
850           region.name = ByteBuffer.wrap(info.getRegionName());
851           region.version = info.getVersion();
852           results.add(region);
853         }
854         return results;
855       } catch (TableNotFoundException e) {
856         // Return empty list for non-existing table
857         return Collections.emptyList();
858       } catch (IOException e){
859         LOG.warn(e.getMessage(), e);
860         throw new IOError(Throwables.getStackTraceAsString(e));
861       }
862     }
863 
864     @Override
865     public List<TCell> get(
866         ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
867         Map<ByteBuffer, ByteBuffer> attributes)
868         throws IOError {
869       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
870       if (famAndQf.length == 1) {
871         return get(tableName, row, famAndQf[0], null, attributes);
872       }
873       if (famAndQf.length == 2) {
874         return get(tableName, row, famAndQf[0], famAndQf[1], attributes);
875       }
876       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
877     }
878 
879     /**
880      * Note: this internal interface is slightly different from public APIs in regard to handling
881      * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
882      * we respect qual == null as a request for the entire column family. The caller (
883      * {@link #get(ByteBuffer, ByteBuffer, ByteBuffer, Map)}) interface IS consistent in that the
884      * column is parse like normal.
885      */
886     protected List<TCell> get(ByteBuffer tableName,
887                               ByteBuffer row,
888                               byte[] family,
889                               byte[] qualifier,
890                               Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
891       Table table = null;
892       try {
893         table = getTable(tableName);
894         Get get = new Get(getBytes(row));
895         addAttributes(get, attributes);
896         if (qualifier == null) {
897           get.addFamily(family);
898         } else {
899           get.addColumn(family, qualifier);
900         }
901         Result result = table.get(get);
902         return ThriftUtilities.cellFromHBase(result.rawCells());
903       } catch (IOException e) {
904         LOG.warn(e.getMessage(), e);
905         throw new IOError(Throwables.getStackTraceAsString(e));
906       } finally {
907         closeTable(table);
908       }
909     }
910 
911     @Override
912     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
913         int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
914       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
915       if(famAndQf.length == 1) {
916         return getVer(tableName, row, famAndQf[0], null, numVersions, attributes);
917       }
918       if (famAndQf.length == 2) {
919         return getVer(tableName, row, famAndQf[0], famAndQf[1], numVersions, attributes);
920       }
921       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
922 
923     }
924 
925     /**
926      * Note: this public interface is slightly different from public Java APIs in regard to
927      * handling of the qualifier. Here we differ from the public Java API in that null != byte[0].
928      * Rather, we respect qual == null as a request for the entire column family. If you want to
929      * access the entire column family, use
930      * {@link #getVer(ByteBuffer, ByteBuffer, ByteBuffer, int, Map)} with a {@code column} value
931      * that lacks a {@code ':'}.
932      */
933     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family,
934         byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
935 
936       Table table = null;
937       try {
938         table = getTable(tableName);
939         Get get = new Get(getBytes(row));
940         addAttributes(get, attributes);
941         if (null == qualifier) {
942           get.addFamily(family);
943         } else {
944           get.addColumn(family, qualifier);
945         }
946         get.setMaxVersions(numVersions);
947         Result result = table.get(get);
948         return ThriftUtilities.cellFromHBase(result.rawCells());
949       } catch (IOException e) {
950         LOG.warn(e.getMessage(), e);
951         throw new IOError(Throwables.getStackTraceAsString(e));
952       } finally{
953         closeTable(table);
954       }
955     }
956 
957     @Override
958     public List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
959         long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
960       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
961       if (famAndQf.length == 1) {
962         return getVerTs(tableName, row, famAndQf[0], null, timestamp, numVersions, attributes);
963       }
964       if (famAndQf.length == 2) {
965         return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, numVersions,
966           attributes);
967       }
968       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
969     }
970 
971     /**
972      * Note: this internal interface is slightly different from public APIs in regard to handling
973      * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
974      * we respect qual == null as a request for the entire column family. The caller (
975      * {@link #getVerTs(ByteBuffer, ByteBuffer, ByteBuffer, long, int, Map)}) interface IS
976      * consistent in that the column is parse like normal.
977      */
978     protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
979         byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
980         throws IOError {
981 
982       Table table = null;
983       try {
984         table = getTable(tableName);
985         Get get = new Get(getBytes(row));
986         addAttributes(get, attributes);
987         if (null == qualifier) {
988           get.addFamily(family);
989         } else {
990           get.addColumn(family, qualifier);
991         }
992         get.setTimeRange(0, timestamp);
993         get.setMaxVersions(numVersions);
994         Result result = table.get(get);
995         return ThriftUtilities.cellFromHBase(result.rawCells());
996       } catch (IOException e) {
997         LOG.warn(e.getMessage(), e);
998         throw new IOError(Throwables.getStackTraceAsString(e));
999       } finally{
1000         closeTable(table);
1001       }
1002     }
1003 
1004     @Override
1005     public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row,
1006         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1007       return getRowWithColumnsTs(tableName, row, null,
1008                                  HConstants.LATEST_TIMESTAMP,
1009                                  attributes);
1010     }
1011 
1012     @Override
1013     public List<TRowResult> getRowWithColumns(ByteBuffer tableName,
1014                                               ByteBuffer row,
1015         List<ByteBuffer> columns,
1016         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1017       return getRowWithColumnsTs(tableName, row, columns,
1018                                  HConstants.LATEST_TIMESTAMP,
1019                                  attributes);
1020     }
1021 
1022     @Override
1023     public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row,
1024         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1025       return getRowWithColumnsTs(tableName, row, null,
1026                                  timestamp, attributes);
1027     }
1028 
1029     @Override
1030     public List<TRowResult> getRowWithColumnsTs(
1031         ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
1032         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1033 
1034       Table table = null;
1035       try {
1036         table = getTable(tableName);
1037         if (columns == null) {
1038           Get get = new Get(getBytes(row));
1039           addAttributes(get, attributes);
1040           get.setTimeRange(0, timestamp);
1041           Result result = table.get(get);
1042           return ThriftUtilities.rowResultFromHBase(result);
1043         }
1044         Get get = new Get(getBytes(row));
1045         addAttributes(get, attributes);
1046         for(ByteBuffer column : columns) {
1047           byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1048           if (famAndQf.length == 1) {
1049               get.addFamily(famAndQf[0]);
1050           } else {
1051               get.addColumn(famAndQf[0], famAndQf[1]);
1052           }
1053         }
1054         get.setTimeRange(0, timestamp);
1055         Result result = table.get(get);
1056         return ThriftUtilities.rowResultFromHBase(result);
1057       } catch (IOException e) {
1058         LOG.warn(e.getMessage(), e);
1059         throw new IOError(Throwables.getStackTraceAsString(e));
1060       } finally{
1061         closeTable(table);
1062       }
1063     }
1064 
1065     @Override
1066     public List<TRowResult> getRows(ByteBuffer tableName,
1067                                     List<ByteBuffer> rows,
1068         Map<ByteBuffer, ByteBuffer> attributes)
1069         throws IOError {
1070       return getRowsWithColumnsTs(tableName, rows, null,
1071                                   HConstants.LATEST_TIMESTAMP,
1072                                   attributes);
1073     }
1074 
1075     @Override
1076     public List<TRowResult> getRowsWithColumns(ByteBuffer tableName,
1077                                                List<ByteBuffer> rows,
1078         List<ByteBuffer> columns,
1079         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1080       return getRowsWithColumnsTs(tableName, rows, columns,
1081                                   HConstants.LATEST_TIMESTAMP,
1082                                   attributes);
1083     }
1084 
1085     @Override
1086     public List<TRowResult> getRowsTs(ByteBuffer tableName,
1087                                       List<ByteBuffer> rows,
1088         long timestamp,
1089         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1090       return getRowsWithColumnsTs(tableName, rows, null,
1091                                   timestamp, attributes);
1092     }
1093 
1094     @Override
1095     public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName,
1096                                                  List<ByteBuffer> rows,
1097         List<ByteBuffer> columns, long timestamp,
1098         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1099 
1100       Table table= null;
1101       try {
1102         List<Get> gets = new ArrayList<Get>(rows.size());
1103         table = getTable(tableName);
1104         if (metrics != null) {
1105           metrics.incNumRowKeysInBatchGet(rows.size());
1106         }
1107         for (ByteBuffer row : rows) {
1108           Get get = new Get(getBytes(row));
1109           addAttributes(get, attributes);
1110           if (columns != null) {
1111 
1112             for(ByteBuffer column : columns) {
1113               byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1114               if (famAndQf.length == 1) {
1115                 get.addFamily(famAndQf[0]);
1116               } else {
1117                 get.addColumn(famAndQf[0], famAndQf[1]);
1118               }
1119             }
1120           }
1121           get.setTimeRange(0, timestamp);
1122           gets.add(get);
1123         }
1124         Result[] result = table.get(gets);
1125         return ThriftUtilities.rowResultFromHBase(result);
1126       } catch (IOException e) {
1127         LOG.warn(e.getMessage(), e);
1128         throw new IOError(Throwables.getStackTraceAsString(e));
1129       } finally{
1130         closeTable(table);
1131       }
1132     }
1133 
1134     @Override
1135     public void deleteAll(
1136         ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1137         Map<ByteBuffer, ByteBuffer> attributes)
1138         throws IOError {
1139       deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP,
1140                   attributes);
1141     }
1142 
1143     @Override
1144     public void deleteAllTs(ByteBuffer tableName,
1145                             ByteBuffer row,
1146                             ByteBuffer column,
1147         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1148       Table table = null;
1149       try {
1150         table = getTable(tableName);
1151         Delete delete  = new Delete(getBytes(row));
1152         addAttributes(delete, attributes);
1153         byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1154         if (famAndQf.length == 1) {
1155           delete.deleteFamily(famAndQf[0], timestamp);
1156         } else {
1157           delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1158         }
1159         table.delete(delete);
1160 
1161       } catch (IOException e) {
1162         LOG.warn(e.getMessage(), e);
1163         throw new IOError(Throwables.getStackTraceAsString(e));
1164       } finally {
1165         closeTable(table);
1166       }
1167     }
1168 
1169     @Override
1170     public void deleteAllRow(
1171         ByteBuffer tableName, ByteBuffer row,
1172         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1173       deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, attributes);
1174     }
1175 
1176     @Override
1177     public void deleteAllRowTs(
1178         ByteBuffer tableName, ByteBuffer row, long timestamp,
1179         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1180       Table table = null;
1181       try {
1182         table = getTable(tableName);
1183         Delete delete  = new Delete(getBytes(row), timestamp);
1184         addAttributes(delete, attributes);
1185         table.delete(delete);
1186       } catch (IOException e) {
1187         LOG.warn(e.getMessage(), e);
1188         throw new IOError(Throwables.getStackTraceAsString(e));
1189       } finally {
1190         closeTable(table);
1191       }
1192     }
1193 
1194     @Override
1195     public void createTable(ByteBuffer in_tableName,
1196         List<ColumnDescriptor> columnFamilies) throws IOError,
1197         IllegalArgument, AlreadyExists {
1198       TableName tableName = getTableName(in_tableName);
1199       try {
1200         if (getAdmin().tableExists(tableName)) {
1201           throw new AlreadyExists("table name already in use");
1202         }
1203         HTableDescriptor desc = new HTableDescriptor(tableName);
1204         for (ColumnDescriptor col : columnFamilies) {
1205           HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col);
1206           desc.addFamily(colDesc);
1207         }
1208         getAdmin().createTable(desc);
1209       } catch (IOException e) {
1210         LOG.warn(e.getMessage(), e);
1211         throw new IOError(Throwables.getStackTraceAsString(e));
1212       } catch (IllegalArgumentException e) {
1213         LOG.warn(e.getMessage(), e);
1214         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1215       }
1216     }
1217 
1218     private static TableName getTableName(ByteBuffer buffer) {
1219       return TableName.valueOf(getBytes(buffer));
1220     }
1221 
1222     @Override
1223     public void deleteTable(ByteBuffer in_tableName) throws IOError {
1224       TableName tableName = getTableName(in_tableName);
1225       if (LOG.isDebugEnabled()) {
1226         LOG.debug("deleteTable: table=" + tableName);
1227       }
1228       try {
1229         if (!getAdmin().tableExists(tableName)) {
1230           throw new IOException("table does not exist");
1231         }
1232         getAdmin().deleteTable(tableName);
1233       } catch (IOException e) {
1234         LOG.warn(e.getMessage(), e);
1235         throw new IOError(Throwables.getStackTraceAsString(e));
1236       }
1237     }
1238 
1239     @Override
1240     public void mutateRow(ByteBuffer tableName, ByteBuffer row,
1241         List<Mutation> mutations, Map<ByteBuffer, ByteBuffer> attributes)
1242         throws IOError, IllegalArgument {
1243       mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP,
1244                   attributes);
1245     }
1246 
1247     @Override
1248     public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
1249         List<Mutation> mutations, long timestamp,
1250         Map<ByteBuffer, ByteBuffer> attributes)
1251         throws IOError, IllegalArgument {
1252       Table table = null;
1253       try {
1254         table = getTable(tableName);
1255         Put put = new Put(getBytes(row), timestamp);
1256         addAttributes(put, attributes);
1257 
1258         Delete delete = new Delete(getBytes(row));
1259         addAttributes(delete, attributes);
1260         if (metrics != null) {
1261           metrics.incNumRowKeysInBatchMutate(mutations.size());
1262         }
1263 
1264         // I apologize for all this mess :)
1265         for (Mutation m : mutations) {
1266           byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1267           if (m.isDelete) {
1268             if (famAndQf.length == 1) {
1269               delete.deleteFamily(famAndQf[0], timestamp);
1270             } else {
1271               delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1272             }
1273             delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1274                 : Durability.SKIP_WAL);
1275           } else {
1276             if(famAndQf.length == 1) {
1277               LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1278                   + "over the whole column family.");
1279             } else {
1280               put.addImmutable(famAndQf[0], famAndQf[1],
1281                   m.value != null ? getBytes(m.value)
1282                       : HConstants.EMPTY_BYTE_ARRAY);
1283             }
1284             put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1285           }
1286         }
1287         if (!delete.isEmpty())
1288           table.delete(delete);
1289         if (!put.isEmpty())
1290           table.put(put);
1291       } catch (IOException e) {
1292         LOG.warn(e.getMessage(), e);
1293         throw new IOError(Throwables.getStackTraceAsString(e));
1294       } catch (IllegalArgumentException e) {
1295         LOG.warn(e.getMessage(), e);
1296         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1297       } finally{
1298         closeTable(table);
1299       }
1300     }
1301 
1302     @Override
1303     public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches,
1304         Map<ByteBuffer, ByteBuffer> attributes)
1305         throws IOError, IllegalArgument, TException {
1306       mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes);
1307     }
1308 
1309     @Override
1310     public void mutateRowsTs(
1311         ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp,
1312         Map<ByteBuffer, ByteBuffer> attributes)
1313         throws IOError, IllegalArgument, TException {
1314       List<Put> puts = new ArrayList<Put>();
1315       List<Delete> deletes = new ArrayList<Delete>();
1316 
1317       for (BatchMutation batch : rowBatches) {
1318         byte[] row = getBytes(batch.row);
1319         List<Mutation> mutations = batch.mutations;
1320         Delete delete = new Delete(row);
1321         addAttributes(delete, attributes);
1322         Put put = new Put(row, timestamp);
1323         addAttributes(put, attributes);
1324         for (Mutation m : mutations) {
1325           byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1326           if (m.isDelete) {
1327             // no qualifier, family only.
1328             if (famAndQf.length == 1) {
1329               delete.deleteFamily(famAndQf[0], timestamp);
1330             } else {
1331               delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1332             }
1333             delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1334                 : Durability.SKIP_WAL);
1335           } else {
1336             if (famAndQf.length == 1) {
1337               LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1338                   + "over the whole column family.");
1339             }
1340             if (famAndQf.length == 2) {
1341               put.addImmutable(famAndQf[0], famAndQf[1],
1342                   m.value != null ? getBytes(m.value)
1343                       : HConstants.EMPTY_BYTE_ARRAY);
1344             } else {
1345               throw new IllegalArgumentException("Invalid famAndQf provided.");
1346             }
1347             put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1348           }
1349         }
1350         if (!delete.isEmpty())
1351           deletes.add(delete);
1352         if (!put.isEmpty())
1353           puts.add(put);
1354       }
1355 
1356       Table table = null;
1357       try {
1358         table = getTable(tableName);
1359         if (!puts.isEmpty())
1360           table.put(puts);
1361         if (!deletes.isEmpty())
1362           table.delete(deletes);
1363 
1364       } catch (IOException e) {
1365         LOG.warn(e.getMessage(), e);
1366         throw new IOError(Throwables.getStackTraceAsString(e));
1367       } catch (IllegalArgumentException e) {
1368         LOG.warn(e.getMessage(), e);
1369         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1370       } finally{
1371         closeTable(table);
1372       }
1373     }
1374 
1375     @Override
1376     public long atomicIncrement(
1377         ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount)
1378             throws IOError, IllegalArgument, TException {
1379       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1380       if(famAndQf.length == 1) {
1381         return atomicIncrement(tableName, row, famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, amount);
1382       }
1383       return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
1384     }
1385 
1386     protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
1387         byte [] family, byte [] qualifier, long amount)
1388         throws IOError, IllegalArgument, TException {
1389       Table table = null;
1390       try {
1391         table = getTable(tableName);
1392         return table.incrementColumnValue(
1393             getBytes(row), family, qualifier, amount);
1394       } catch (IOException e) {
1395         LOG.warn(e.getMessage(), e);
1396         throw new IOError(Throwables.getStackTraceAsString(e));
1397       } finally {
1398         closeTable(table);
1399       }
1400     }
1401 
1402     @Override
1403     public void scannerClose(int id) throws IOError, IllegalArgument {
1404       LOG.debug("scannerClose: id=" + id);
1405       ResultScannerWrapper resultScannerWrapper = getScanner(id);
1406       if (resultScannerWrapper == null) {
1407         String message = "scanner ID is invalid";
1408         LOG.warn(message);
1409         throw new IllegalArgument("scanner ID is invalid");
1410       }
1411       resultScannerWrapper.getScanner().close();
1412       removeScanner(id);
1413     }
1414 
1415     @Override
1416     public List<TRowResult> scannerGetList(int id,int nbRows)
1417         throws IllegalArgument, IOError {
1418       LOG.debug("scannerGetList: id=" + id);
1419       ResultScannerWrapper resultScannerWrapper = getScanner(id);
1420       if (null == resultScannerWrapper) {
1421         String message = "scanner ID is invalid";
1422         LOG.warn(message);
1423         throw new IllegalArgument("scanner ID is invalid");
1424       }
1425 
1426       Result [] results = null;
1427       try {
1428         results = resultScannerWrapper.getScanner().next(nbRows);
1429         if (null == results) {
1430           return new ArrayList<TRowResult>();
1431         }
1432       } catch (IOException e) {
1433         LOG.warn(e.getMessage(), e);
1434         throw new IOError(Throwables.getStackTraceAsString(e));
1435       }
1436       return ThriftUtilities.rowResultFromHBase(results, resultScannerWrapper.isColumnSorted());
1437     }
1438 
1439     @Override
1440     public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
1441       return scannerGetList(id,1);
1442     }
1443 
1444     @Override
1445     public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
1446         Map<ByteBuffer, ByteBuffer> attributes)
1447         throws IOError {
1448 
1449       Table table = null;
1450       try {
1451         table = getTable(tableName);
1452         Scan scan = new Scan();
1453         addAttributes(scan, attributes);
1454         if (tScan.isSetStartRow()) {
1455           scan.setStartRow(tScan.getStartRow());
1456         }
1457         if (tScan.isSetStopRow()) {
1458           scan.setStopRow(tScan.getStopRow());
1459         }
1460         if (tScan.isSetTimestamp()) {
1461           scan.setTimeRange(0, tScan.getTimestamp());
1462         }
1463         if (tScan.isSetCaching()) {
1464           scan.setCaching(tScan.getCaching());
1465         }
1466         if (tScan.isSetBatchSize()) {
1467           scan.setBatch(tScan.getBatchSize());
1468         }
1469         if (tScan.isSetColumns() && tScan.getColumns().size() != 0) {
1470           for(ByteBuffer column : tScan.getColumns()) {
1471             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1472             if(famQf.length == 1) {
1473               scan.addFamily(famQf[0]);
1474             } else {
1475               scan.addColumn(famQf[0], famQf[1]);
1476             }
1477           }
1478         }
1479         if (tScan.isSetFilterString()) {
1480           ParseFilter parseFilter = new ParseFilter();
1481           scan.setFilter(
1482               parseFilter.parseFilterString(tScan.getFilterString()));
1483         }
1484         if (tScan.isSetReversed()) {
1485           scan.setReversed(tScan.isReversed());
1486         }
1487         return addScanner(table.getScanner(scan), tScan.sortColumns);
1488       } catch (IOException e) {
1489         LOG.warn(e.getMessage(), e);
1490         throw new IOError(Throwables.getStackTraceAsString(e));
1491       } finally{
1492         closeTable(table);
1493       }
1494     }
1495 
1496     @Override
1497     public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
1498         List<ByteBuffer> columns,
1499         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1500 
1501       Table table = null;
1502       try {
1503         table = getTable(tableName);
1504         Scan scan = new Scan(getBytes(startRow));
1505         addAttributes(scan, attributes);
1506         if(columns != null && columns.size() != 0) {
1507           for(ByteBuffer column : columns) {
1508             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1509             if(famQf.length == 1) {
1510               scan.addFamily(famQf[0]);
1511             } else {
1512               scan.addColumn(famQf[0], famQf[1]);
1513             }
1514           }
1515         }
1516         return addScanner(table.getScanner(scan), false);
1517       } catch (IOException e) {
1518         LOG.warn(e.getMessage(), e);
1519         throw new IOError(Throwables.getStackTraceAsString(e));
1520       } finally{
1521         closeTable(table);
1522       }
1523     }
1524 
1525     @Override
1526     public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow,
1527         ByteBuffer stopRow, List<ByteBuffer> columns,
1528         Map<ByteBuffer, ByteBuffer> attributes)
1529         throws IOError, TException {
1530 
1531       Table table = null;
1532       try {
1533         table = getTable(tableName);
1534         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1535         addAttributes(scan, attributes);
1536         if(columns != null && columns.size() != 0) {
1537           for(ByteBuffer column : columns) {
1538             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1539             if(famQf.length == 1) {
1540               scan.addFamily(famQf[0]);
1541             } else {
1542               scan.addColumn(famQf[0], famQf[1]);
1543             }
1544           }
1545         }
1546         return addScanner(table.getScanner(scan), false);
1547       } catch (IOException e) {
1548         LOG.warn(e.getMessage(), e);
1549         throw new IOError(Throwables.getStackTraceAsString(e));
1550       } finally{
1551         closeTable(table);
1552       }
1553     }
1554 
1555     @Override
1556     public int scannerOpenWithPrefix(ByteBuffer tableName,
1557                                      ByteBuffer startAndPrefix,
1558                                      List<ByteBuffer> columns,
1559         Map<ByteBuffer, ByteBuffer> attributes)
1560         throws IOError, TException {
1561 
1562       Table table = null;
1563       try {
1564         table = getTable(tableName);
1565         Scan scan = new Scan(getBytes(startAndPrefix));
1566         addAttributes(scan, attributes);
1567         Filter f = new WhileMatchFilter(
1568             new PrefixFilter(getBytes(startAndPrefix)));
1569         scan.setFilter(f);
1570         if (columns != null && columns.size() != 0) {
1571           for(ByteBuffer column : columns) {
1572             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1573             if(famQf.length == 1) {
1574               scan.addFamily(famQf[0]);
1575             } else {
1576               scan.addColumn(famQf[0], famQf[1]);
1577             }
1578           }
1579         }
1580         return addScanner(table.getScanner(scan), false);
1581       } catch (IOException e) {
1582         LOG.warn(e.getMessage(), e);
1583         throw new IOError(Throwables.getStackTraceAsString(e));
1584       } finally{
1585         closeTable(table);
1586       }
1587     }
1588 
1589     @Override
1590     public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
1591         List<ByteBuffer> columns, long timestamp,
1592         Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
1593 
1594       Table table = null;
1595       try {
1596         table = getTable(tableName);
1597         Scan scan = new Scan(getBytes(startRow));
1598         addAttributes(scan, attributes);
1599         scan.setTimeRange(0, timestamp);
1600         if (columns != null && columns.size() != 0) {
1601           for (ByteBuffer column : columns) {
1602             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1603             if(famQf.length == 1) {
1604               scan.addFamily(famQf[0]);
1605             } else {
1606               scan.addColumn(famQf[0], famQf[1]);
1607             }
1608           }
1609         }
1610         return addScanner(table.getScanner(scan), false);
1611       } catch (IOException e) {
1612         LOG.warn(e.getMessage(), e);
1613         throw new IOError(Throwables.getStackTraceAsString(e));
1614       } finally{
1615         closeTable(table);
1616       }
1617     }
1618 
1619     @Override
1620     public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow,
1621         ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
1622         Map<ByteBuffer, ByteBuffer> attributes)
1623         throws IOError, TException {
1624 
1625       Table table = null;
1626       try {
1627         table = getTable(tableName);
1628         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1629         addAttributes(scan, attributes);
1630         scan.setTimeRange(0, timestamp);
1631         if (columns != null && columns.size() != 0) {
1632           for (ByteBuffer column : columns) {
1633             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1634             if(famQf.length == 1) {
1635               scan.addFamily(famQf[0]);
1636             } else {
1637               scan.addColumn(famQf[0], famQf[1]);
1638             }
1639           }
1640         }
1641         scan.setTimeRange(0, timestamp);
1642         return addScanner(table.getScanner(scan), false);
1643       } catch (IOException e) {
1644         LOG.warn(e.getMessage(), e);
1645         throw new IOError(Throwables.getStackTraceAsString(e));
1646       } finally{
1647         closeTable(table);
1648       }
1649     }
1650 
1651     @Override
1652     public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
1653         ByteBuffer tableName) throws IOError, TException {
1654 
1655       Table table = null;
1656       try {
1657         TreeMap<ByteBuffer, ColumnDescriptor> columns =
1658           new TreeMap<ByteBuffer, ColumnDescriptor>();
1659 
1660         table = getTable(tableName);
1661         HTableDescriptor desc = table.getTableDescriptor();
1662 
1663         for (HColumnDescriptor e : desc.getFamilies()) {
1664           ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
1665           columns.put(col.name, col);
1666         }
1667         return columns;
1668       } catch (IOException e) {
1669         LOG.warn(e.getMessage(), e);
1670         throw new IOError(Throwables.getStackTraceAsString(e));
1671       } finally {
1672         closeTable(table);
1673       }
1674     }
1675 
1676     @Deprecated
1677     @Override
1678     public List<TCell> getRowOrBefore(ByteBuffer tableName, ByteBuffer row,
1679         ByteBuffer family) throws IOError {
1680       try {
1681         Result result = getRowOrBefore(getBytes(tableName), getBytes(row), getBytes(family));
1682         return ThriftUtilities.cellFromHBase(result.rawCells());
1683       } catch (IOException e) {
1684         LOG.warn(e.getMessage(), e);
1685         throw new IOError(Throwables.getStackTraceAsString(e));
1686       }
1687     }
1688 
1689     @Override
1690     public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
1691       try {
1692         byte[] row = getBytes(searchRow);
1693         Result startRowResult =
1694             getRowOrBefore(TableName.META_TABLE_NAME.getName(), row, HConstants.CATALOG_FAMILY);
1695 
1696         if (startRowResult == null) {
1697           throw new IOException("Cannot find row in "+ TableName.META_TABLE_NAME+", row="
1698                                 + Bytes.toStringBinary(row));
1699         }
1700 
1701         // find region start and end keys
1702         HRegionInfo regionInfo = HRegionInfo.getHRegionInfo(startRowResult);
1703         if (regionInfo == null) {
1704           throw new IOException("HRegionInfo REGIONINFO was null or " +
1705                                 " empty in Meta for row="
1706                                 + Bytes.toStringBinary(row));
1707         }
1708         TRegionInfo region = new TRegionInfo();
1709         region.setStartKey(regionInfo.getStartKey());
1710         region.setEndKey(regionInfo.getEndKey());
1711         region.id = regionInfo.getRegionId();
1712         region.setName(regionInfo.getRegionName());
1713         region.version = regionInfo.getVersion();
1714 
1715         // find region assignment to server
1716         ServerName serverName = HRegionInfo.getServerName(startRowResult);
1717         if (serverName != null) {
1718           region.setServerName(Bytes.toBytes(serverName.getHostname()));
1719           region.port = serverName.getPort();
1720         }
1721         return region;
1722       } catch (IOException e) {
1723         LOG.warn(e.getMessage(), e);
1724         throw new IOError(Throwables.getStackTraceAsString(e));
1725       }
1726     }
1727 
1728     private void closeTable(Table table) throws IOError
1729     {
1730       try{
1731         if(table != null){
1732           table.close();
1733         }
1734       } catch (IOException e){
1735         LOG.error(e.getMessage(), e);
1736         throw new IOError(Throwables.getStackTraceAsString(e));
1737       }
1738     }
1739 
1740     private Result getRowOrBefore(byte[] tableName, byte[] row, byte[] family) throws IOException {
1741       Scan scan = new Scan(row);
1742       scan.setReversed(true);
1743       scan.addFamily(family);
1744       scan.setStartRow(row);
1745       Table table = getTable(tableName);
1746       try (ResultScanner scanner = table.getScanner(scan)) {
1747         return scanner.next();
1748       } finally{
1749         if(table != null){
1750           table.close();
1751         }
1752       }
1753     }
1754 
1755     private void initMetrics(ThriftMetrics metrics) {
1756       this.metrics = metrics;
1757     }
1758 
1759     @Override
1760     public void increment(TIncrement tincrement) throws IOError, TException {
1761 
1762       if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) {
1763         throw new TException("Must supply a table and a row key; can't increment");
1764       }
1765 
1766       if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1767         this.coalescer.queueIncrement(tincrement);
1768         return;
1769       }
1770 
1771       Table table = null;
1772       try {
1773         table = getTable(tincrement.getTable());
1774         Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
1775         table.increment(inc);
1776       } catch (IOException e) {
1777         LOG.warn(e.getMessage(), e);
1778         throw new IOError(Throwables.getStackTraceAsString(e));
1779       } finally{
1780         closeTable(table);
1781       }
1782     }
1783 
1784     @Override
1785     public void incrementRows(List<TIncrement> tincrements) throws IOError, TException {
1786       if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1787         this.coalescer.queueIncrements(tincrements);
1788         return;
1789       }
1790       for (TIncrement tinc : tincrements) {
1791         increment(tinc);
1792       }
1793     }
1794 
1795     @Override
1796     public List<TCell> append(TAppend tappend) throws IOError, TException {
1797       if (tappend.getRow().length == 0 || tappend.getTable().length == 0) {
1798         throw new TException("Must supply a table and a row key; can't append");
1799       }
1800 
1801       Table table = null;
1802       try {
1803         table = getTable(tappend.getTable());
1804         Append append = ThriftUtilities.appendFromThrift(tappend);
1805         Result result = table.append(append);
1806         return ThriftUtilities.cellFromHBase(result.rawCells());
1807       } catch (IOException e) {
1808         LOG.warn(e.getMessage(), e);
1809         throw new IOError(Throwables.getStackTraceAsString(e));
1810       } finally{
1811           closeTable(table);
1812       }
1813     }
1814 
1815     @Override
1816     public boolean checkAndPut(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1817         ByteBuffer value, Mutation mput, Map<ByteBuffer, ByteBuffer> attributes) throws IOError,
1818         IllegalArgument, TException {
1819       Put put;
1820       try {
1821         put = new Put(getBytes(row), HConstants.LATEST_TIMESTAMP);
1822         addAttributes(put, attributes);
1823 
1824         byte[][] famAndQf = KeyValue.parseColumn(getBytes(mput.column));
1825 
1826         put.addImmutable(famAndQf[0], famAndQf[1], mput.value != null ? getBytes(mput.value)
1827             : HConstants.EMPTY_BYTE_ARRAY);
1828 
1829         put.setDurability(mput.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1830       } catch (IllegalArgumentException e) {
1831         LOG.warn(e.getMessage(), e);
1832         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1833       }
1834 
1835       Table table = null;
1836       try {
1837         table = getTable(tableName);
1838         byte[][] famAndQf = KeyValue.parseColumn(getBytes(column));
1839         return table.checkAndPut(getBytes(row), famAndQf[0], famAndQf[1],
1840           value != null ? getBytes(value) : HConstants.EMPTY_BYTE_ARRAY, put);
1841       } catch (IOException e) {
1842         LOG.warn(e.getMessage(), e);
1843         throw new IOError(Throwables.getStackTraceAsString(e));
1844       } catch (IllegalArgumentException e) {
1845         LOG.warn(e.getMessage(), e);
1846         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1847       } finally {
1848           closeTable(table);
1849       }
1850     }
1851   }
1852 
1853 
1854 
1855   /**
1856    * Adds all the attributes into the Operation object
1857    */
1858   private static void addAttributes(OperationWithAttributes op,
1859     Map<ByteBuffer, ByteBuffer> attributes) {
1860     if (attributes == null || attributes.size() == 0) {
1861       return;
1862     }
1863     for (Map.Entry<ByteBuffer, ByteBuffer> entry : attributes.entrySet()) {
1864       String name = Bytes.toStringBinary(getBytes(entry.getKey()));
1865       byte[] value =  getBytes(entry.getValue());
1866       op.setAttribute(name, value);
1867     }
1868   }
1869 
1870   public static void registerFilters(Configuration conf) {
1871     String[] filters = conf.getStrings("hbase.thrift.filters");
1872     if(filters != null) {
1873       for(String filterClass: filters) {
1874         String[] filterPart = filterClass.split(":");
1875         if(filterPart.length != 2) {
1876           LOG.warn("Invalid filter specification " + filterClass + " - skipping");
1877         } else {
1878           ParseFilter.registerFilter(filterPart[0], filterPart[1]);
1879         }
1880       }
1881     }
1882   }
1883 }