View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.thrift;
20  
21  import static org.apache.hadoop.hbase.util.Bytes.getBytes;
22  
23  import java.io.IOException;
24  import java.net.InetAddress;
25  import java.net.InetSocketAddress;
26  import java.net.UnknownHostException;
27  import java.nio.ByteBuffer;
28  import java.security.PrivilegedAction;
29  import java.util.ArrayList;
30  import java.util.Arrays;
31  import java.util.Collections;
32  import java.util.HashMap;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.TreeMap;
36  import java.util.concurrent.BlockingQueue;
37  import java.util.concurrent.ExecutorService;
38  import java.util.concurrent.LinkedBlockingQueue;
39  import java.util.concurrent.ThreadPoolExecutor;
40  import java.util.concurrent.TimeUnit;
41  
42  import javax.security.auth.callback.Callback;
43  import javax.security.auth.callback.UnsupportedCallbackException;
44  import javax.security.sasl.AuthorizeCallback;
45  import javax.security.sasl.Sasl;
46  import javax.security.sasl.SaslServer;
47  
48  import org.apache.commons.cli.CommandLine;
49  import org.apache.commons.cli.Option;
50  import org.apache.commons.cli.OptionGroup;
51  import org.apache.commons.logging.Log;
52  import org.apache.commons.logging.LogFactory;
53  import org.apache.hadoop.conf.Configuration;
54  import org.apache.hadoop.hbase.HBaseConfiguration;
55  import org.apache.hadoop.hbase.HColumnDescriptor;
56  import org.apache.hadoop.hbase.HConstants;
57  import org.apache.hadoop.hbase.HRegionInfo;
58  import org.apache.hadoop.hbase.HRegionLocation;
59  import org.apache.hadoop.hbase.HTableDescriptor;
60  import org.apache.hadoop.hbase.KeyValue;
61  import org.apache.hadoop.hbase.ServerName;
62  import org.apache.hadoop.hbase.TableName;
63  import org.apache.hadoop.hbase.TableNotFoundException;
64  import org.apache.hadoop.hbase.classification.InterfaceAudience;
65  import org.apache.hadoop.hbase.client.Admin;
66  import org.apache.hadoop.hbase.client.Append;
67  import org.apache.hadoop.hbase.client.Delete;
68  import org.apache.hadoop.hbase.client.Durability;
69  import org.apache.hadoop.hbase.client.Get;
70  import org.apache.hadoop.hbase.client.HBaseAdmin;
71  import org.apache.hadoop.hbase.client.Increment;
72  import org.apache.hadoop.hbase.client.OperationWithAttributes;
73  import org.apache.hadoop.hbase.client.Put;
74  import org.apache.hadoop.hbase.client.RegionLocator;
75  import org.apache.hadoop.hbase.client.Result;
76  import org.apache.hadoop.hbase.client.ResultScanner;
77  import org.apache.hadoop.hbase.client.Scan;
78  import org.apache.hadoop.hbase.client.Table;
79  import org.apache.hadoop.hbase.filter.Filter;
80  import org.apache.hadoop.hbase.filter.ParseFilter;
81  import org.apache.hadoop.hbase.filter.PrefixFilter;
82  import org.apache.hadoop.hbase.filter.WhileMatchFilter;
83  import org.apache.hadoop.hbase.jetty.SslSelectChannelConnectorSecure;
84  import org.apache.hadoop.hbase.security.SecurityUtil;
85  import org.apache.hadoop.hbase.security.UserProvider;
86  import org.apache.hadoop.hbase.thrift.CallQueue.Call;
87  import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
88  import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
89  import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
90  import org.apache.hadoop.hbase.thrift.generated.Hbase;
91  import org.apache.hadoop.hbase.thrift.generated.IOError;
92  import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
93  import org.apache.hadoop.hbase.thrift.generated.Mutation;
94  import org.apache.hadoop.hbase.thrift.generated.TAppend;
95  import org.apache.hadoop.hbase.thrift.generated.TCell;
96  import org.apache.hadoop.hbase.thrift.generated.TIncrement;
97  import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
98  import org.apache.hadoop.hbase.thrift.generated.TRowResult;
99  import org.apache.hadoop.hbase.thrift.generated.TScan;
100 import org.apache.hadoop.hbase.util.Bytes;
101 import org.apache.hadoop.hbase.util.ConnectionCache;
102 import org.apache.hadoop.hbase.util.DNS;
103 import org.apache.hadoop.hbase.util.Strings;
104 import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
105 import org.apache.hadoop.security.UserGroupInformation;
106 import org.apache.hadoop.security.authorize.ProxyUsers;
107 import org.apache.thrift.TException;
108 import org.apache.thrift.TProcessor;
109 import org.apache.thrift.protocol.TBinaryProtocol;
110 import org.apache.thrift.protocol.TCompactProtocol;
111 import org.apache.thrift.protocol.TProtocol;
112 import org.apache.thrift.protocol.TProtocolFactory;
113 import org.apache.thrift.server.THsHaServer;
114 import org.apache.thrift.server.TNonblockingServer;
115 import org.apache.thrift.server.TServer;
116 import org.apache.thrift.server.TServlet;
117 import org.apache.thrift.server.TThreadedSelectorServer;
118 import org.apache.thrift.transport.TFramedTransport;
119 import org.apache.thrift.transport.TNonblockingServerSocket;
120 import org.apache.thrift.transport.TNonblockingServerTransport;
121 import org.apache.thrift.transport.TSaslServerTransport;
122 import org.apache.thrift.transport.TServerSocket;
123 import org.apache.thrift.transport.TServerTransport;
124 import org.apache.thrift.transport.TTransportFactory;
125 import org.mortbay.jetty.Connector;
126 import org.mortbay.jetty.Server;
127 import org.mortbay.jetty.nio.SelectChannelConnector;
128 import org.mortbay.jetty.servlet.Context;
129 import org.mortbay.jetty.servlet.ServletHolder;
130 import org.mortbay.thread.QueuedThreadPool;
131 
132 import com.google.common.base.Joiner;
133 import com.google.common.base.Throwables;
134 import com.google.common.util.concurrent.ThreadFactoryBuilder;
135 
136 /**
137  * ThriftServerRunner - this class starts up a Thrift server which implements
138  * the Hbase API specified in the Hbase.thrift IDL file.
139  */
140 @InterfaceAudience.Private
141 public class ThriftServerRunner implements Runnable {
142 
143   private static final Log LOG = LogFactory.getLog(ThriftServerRunner.class);
144 
145   static final String SERVER_TYPE_CONF_KEY =
146       "hbase.regionserver.thrift.server.type";
147 
148   static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress";
149   static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
150   static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
151   static final String MAX_FRAME_SIZE_CONF_KEY = "hbase.regionserver.thrift.framed.max_frame_size_in_mb";
152   static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
153   static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
154   static final String USE_HTTP_CONF_KEY = "hbase.regionserver.thrift.http";
155   static final String HTTP_MIN_THREADS = "hbase.thrift.http_threads.min";
156   static final String HTTP_MAX_THREADS = "hbase.thrift.http_threads.max";
157 
158   static final String THRIFT_SSL_ENABLED = "hbase.thrift.ssl.enabled";
159   static final String THRIFT_SSL_KEYSTORE_STORE = "hbase.thrift.ssl.keystore.store";
160   static final String THRIFT_SSL_KEYSTORE_PASSWORD = "hbase.thrift.ssl.keystore.password";
161   static final String THRIFT_SSL_KEYSTORE_KEYPASSWORD = "hbase.thrift.ssl.keystore.keypassword";
162 
163   /**
164    * Amount of time in milliseconds before a server thread will timeout
165    * waiting for client to send data on a connected socket. Currently,
166    * applies only to TBoundedThreadPoolServer
167    */
168   public static final String THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY =
169     "hbase.thrift.server.socket.read.timeout";
170   public static final int THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT = 60000;
171 
172 
173   /**
174    * Thrift quality of protection configuration key. Valid values can be:
175    * auth-conf: authentication, integrity and confidentiality checking
176    * auth-int: authentication and integrity checking
177    * auth: authentication only
178    *
179    * This is used to authenticate the callers and support impersonation.
180    * The thrift server and the HBase cluster must run in secure mode.
181    */
182   static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
183   static final String BACKLOG_CONF_KEY = "hbase.regionserver.thrift.backlog";
184 
185   private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
186   public static final int DEFAULT_LISTEN_PORT = 9090;
187   public static final int HREGION_VERSION = 1;
188   static final String THRIFT_SUPPORT_PROXYUSER = "hbase.thrift.support.proxyuser";
189   private final int listenPort;
190 
191   private Configuration conf;
192   volatile TServer tserver;
193   volatile Server httpServer;
194   private final Hbase.Iface handler;
195   private final ThriftMetrics metrics;
196   private final HBaseHandler hbaseHandler;
197   private final UserGroupInformation realUser;
198 
199   private final String qop;
200   private String host;
201 
202   private final boolean securityEnabled;
203   private final boolean doAsEnabled;
204 
205   /** An enum of server implementation selections */
206   enum ImplType {
207     HS_HA("hsha", true, THsHaServer.class, true),
208     NONBLOCKING("nonblocking", true, TNonblockingServer.class, true),
209     THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true),
210     THREADED_SELECTOR(
211         "threadedselector", true, TThreadedSelectorServer.class, true);
212 
213     public static final ImplType DEFAULT = THREAD_POOL;
214 
215     final String option;
216     final boolean isAlwaysFramed;
217     final Class<? extends TServer> serverClass;
218     final boolean canSpecifyBindIP;
219 
220     ImplType(String option, boolean isAlwaysFramed,
221         Class<? extends TServer> serverClass, boolean canSpecifyBindIP) {
222       this.option = option;
223       this.isAlwaysFramed = isAlwaysFramed;
224       this.serverClass = serverClass;
225       this.canSpecifyBindIP = canSpecifyBindIP;
226     }
227 
228     /**
229      * @return <code>-option</code> so we can get the list of options from
230      *         {@link #values()}
231      */
232     @Override
233     public String toString() {
234       return "-" + option;
235     }
236 
237     String getDescription() {
238       StringBuilder sb = new StringBuilder("Use the " +
239           serverClass.getSimpleName());
240       if (isAlwaysFramed) {
241         sb.append(" This implies the framed transport.");
242       }
243       if (this == DEFAULT) {
244         sb.append("This is the default.");
245       }
246       return sb.toString();
247     }
248 
249     static OptionGroup createOptionGroup() {
250       OptionGroup group = new OptionGroup();
251       for (ImplType t : values()) {
252         group.addOption(new Option(t.option, t.getDescription()));
253       }
254       return group;
255     }
256 
257     static ImplType getServerImpl(Configuration conf) {
258       String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option);
259       for (ImplType t : values()) {
260         if (confType.equals(t.option)) {
261           return t;
262         }
263       }
264       throw new AssertionError("Unknown server ImplType.option:" + confType);
265     }
266 
267     static void setServerImpl(CommandLine cmd, Configuration conf) {
268       ImplType chosenType = null;
269       int numChosen = 0;
270       for (ImplType t : values()) {
271         if (cmd.hasOption(t.option)) {
272           chosenType = t;
273           ++numChosen;
274         }
275       }
276       if (numChosen < 1) {
277         LOG.info("Using default thrift server type");
278         chosenType = DEFAULT;
279       } else if (numChosen > 1) {
280         throw new AssertionError("Exactly one option out of " +
281           Arrays.toString(values()) + " has to be specified");
282       }
283       LOG.info("Using thrift server type " + chosenType.option);
284       conf.set(SERVER_TYPE_CONF_KEY, chosenType.option);
285     }
286 
287     public String simpleClassName() {
288       return serverClass.getSimpleName();
289     }
290 
291     public static List<String> serversThatCannotSpecifyBindIP() {
292       List<String> l = new ArrayList<String>();
293       for (ImplType t : values()) {
294         if (!t.canSpecifyBindIP) {
295           l.add(t.simpleClassName());
296         }
297       }
298       return l;
299     }
300 
301   }
302 
303   public ThriftServerRunner(Configuration conf) throws IOException {
304     UserProvider userProvider = UserProvider.instantiate(conf);
305     // login the server principal (if using secure Hadoop)
306     securityEnabled = userProvider.isHadoopSecurityEnabled()
307       && userProvider.isHBaseSecurityEnabled();
308     if (securityEnabled) {
309       host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
310         conf.get("hbase.thrift.dns.interface", "default"),
311         conf.get("hbase.thrift.dns.nameserver", "default")));
312       userProvider.login("hbase.thrift.keytab.file",
313         "hbase.thrift.kerberos.principal", host);
314     }
315     this.conf = HBaseConfiguration.create(conf);
316     this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
317     this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
318     this.hbaseHandler = new HBaseHandler(conf, userProvider);
319     this.hbaseHandler.initMetrics(metrics);
320     this.handler = HbaseHandlerMetricsProxy.newInstance(
321       hbaseHandler, metrics, conf);
322     this.realUser = userProvider.getCurrent().getUGI();
323     qop = conf.get(THRIFT_QOP_KEY);
324     doAsEnabled = conf.getBoolean(THRIFT_SUPPORT_PROXYUSER, false);
325     if (qop != null) {
326       if (!qop.equals("auth") && !qop.equals("auth-int")
327           && !qop.equals("auth-conf")) {
328         throw new IOException("Invalid " + THRIFT_QOP_KEY + ": " + qop
329           + ", it must be 'auth', 'auth-int', or 'auth-conf'");
330       }
331       if (!securityEnabled) {
332         throw new IOException("Thrift server must"
333           + " run in secure mode to support authentication");
334       }
335     }
336   }
337 
338   /*
339    * Runs the Thrift server
340    */
341   @Override
342   public void run() {
343     realUser.doAs(new PrivilegedAction<Object>() {
344       @Override
345       public Object run() {
346         try {
347           if (conf.getBoolean(USE_HTTP_CONF_KEY, false)) {
348             setupHTTPServer();
349             httpServer.start();
350             httpServer.join();
351           } else {
352             setupServer();
353             tserver.serve();
354           }
355         } catch (Exception e) {
356           LOG.fatal("Cannot run ThriftServer", e);
357           // Crash the process if the ThriftServer is not running
358           System.exit(-1);
359         }
360         return null;
361       }
362     });
363 
364   }
365 
366   public void shutdown() {
367     if (tserver != null) {
368       tserver.stop();
369       tserver = null;
370     }
371     if (httpServer != null) {
372       try {
373         httpServer.stop();
374         httpServer = null;
375       } catch (Exception e) {
376         LOG.error("Problem encountered in shutting down HTTP server " + e.getCause());
377       }
378       httpServer = null;
379     }
380   }
381 
382   private void setupHTTPServer() throws IOException {
383     TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
384     TProcessor processor = new Hbase.Processor<Hbase.Iface>(handler);
385     TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, realUser,
386         conf, hbaseHandler, securityEnabled, doAsEnabled);
387 
388     httpServer = new Server();
389     // Context handler
390     Context context = new Context(httpServer, "/", Context.SESSIONS);
391     context.setContextPath("/");
392     String httpPath = "/*";
393     httpServer.setHandler(context);
394     context.addServlet(new ServletHolder(thriftHttpServlet), httpPath);
395 
396     // set up Jetty and run the embedded server
397     Connector connector = new SelectChannelConnector();
398     if(conf.getBoolean(THRIFT_SSL_ENABLED, false)) {
399       SslSelectChannelConnectorSecure sslConnector = new SslSelectChannelConnectorSecure();
400       String keystore = conf.get(THRIFT_SSL_KEYSTORE_STORE);
401       String password = HBaseConfiguration.getPassword(conf,
402           THRIFT_SSL_KEYSTORE_PASSWORD, null);
403       String keyPassword = HBaseConfiguration.getPassword(conf,
404           THRIFT_SSL_KEYSTORE_KEYPASSWORD, password);
405       sslConnector.setKeystore(keystore);
406       sslConnector.setPassword(password);
407       sslConnector.setKeyPassword(keyPassword);
408       connector = sslConnector;
409     }
410     String host = getBindAddress(conf).getHostAddress();
411     connector.setPort(listenPort);
412     connector.setHost(host);
413     connector.setHeaderBufferSize(1024 * 64);
414     httpServer.addConnector(connector);
415 
416     if (doAsEnabled) {
417       ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
418     }
419 
420     // Set the default max thread number to 100 to limit
421     // the number of concurrent requests so that Thrfit HTTP server doesn't OOM easily.
422     // Jetty set the default max thread number to 250, if we don't set it.
423     //
424     // Our default min thread number 2 is the same as that used by Jetty.
425     int minThreads = conf.getInt(HTTP_MIN_THREADS, 2);
426     int maxThreads = conf.getInt(HTTP_MAX_THREADS, 100);
427     QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads);
428     threadPool.setMinThreads(minThreads);
429     httpServer.setThreadPool(threadPool);
430 
431     httpServer.setSendServerVersion(false);
432     httpServer.setSendDateHeader(false);
433     httpServer.setStopAtShutdown(true);
434 
435     LOG.info("Starting Thrift HTTP Server on " + Integer.toString(listenPort));
436   }
437 
438   /**
439    * Setting up the thrift TServer
440    */
441   private void setupServer() throws Exception {
442     // Construct correct ProtocolFactory
443     TProtocolFactory protocolFactory;
444     if (conf.getBoolean(COMPACT_CONF_KEY, false)) {
445       LOG.debug("Using compact protocol");
446       protocolFactory = new TCompactProtocol.Factory();
447     } else {
448       LOG.debug("Using binary protocol");
449       protocolFactory = new TBinaryProtocol.Factory();
450     }
451 
452     final TProcessor p = new Hbase.Processor<Hbase.Iface>(handler);
453     ImplType implType = ImplType.getServerImpl(conf);
454     TProcessor processor = p;
455 
456     // Construct correct TransportFactory
457     TTransportFactory transportFactory;
458     if (conf.getBoolean(FRAMED_CONF_KEY, false) || implType.isAlwaysFramed) {
459       if (qop != null) {
460         throw new RuntimeException("Thrift server authentication"
461           + " doesn't work with framed transport yet");
462       }
463       transportFactory = new TFramedTransport.Factory(
464           conf.getInt(MAX_FRAME_SIZE_CONF_KEY, 2)  * 1024 * 1024);
465       LOG.debug("Using framed transport");
466     } else if (qop == null) {
467       transportFactory = new TTransportFactory();
468     } else {
469       // Extract the name from the principal
470       String name = SecurityUtil.getUserFromPrincipal(
471         conf.get("hbase.thrift.kerberos.principal"));
472       Map<String, String> saslProperties = new HashMap<String, String>();
473       saslProperties.put(Sasl.QOP, qop);
474       TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
475       saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
476         new SaslGssCallbackHandler() {
477           @Override
478           public void handle(Callback[] callbacks)
479               throws UnsupportedCallbackException {
480             AuthorizeCallback ac = null;
481             for (Callback callback : callbacks) {
482               if (callback instanceof AuthorizeCallback) {
483                 ac = (AuthorizeCallback) callback;
484               } else {
485                 throw new UnsupportedCallbackException(callback,
486                     "Unrecognized SASL GSSAPI Callback");
487               }
488             }
489             if (ac != null) {
490               String authid = ac.getAuthenticationID();
491               String authzid = ac.getAuthorizationID();
492               if (!authid.equals(authzid)) {
493                 ac.setAuthorized(false);
494               } else {
495                 ac.setAuthorized(true);
496                 String userName = SecurityUtil.getUserFromPrincipal(authzid);
497                 LOG.info("Effective user: " + userName);
498                 ac.setAuthorizedID(userName);
499               }
500             }
501           }
502         });
503       transportFactory = saslFactory;
504 
505       // Create a processor wrapper, to get the caller
506       processor = new TProcessor() {
507         @Override
508         public boolean process(TProtocol inProt,
509             TProtocol outProt) throws TException {
510           TSaslServerTransport saslServerTransport =
511             (TSaslServerTransport)inProt.getTransport();
512           SaslServer saslServer = saslServerTransport.getSaslServer();
513           String principal = saslServer.getAuthorizationID();
514           hbaseHandler.setEffectiveUser(principal);
515           return p.process(inProt, outProt);
516         }
517       };
518     }
519 
520     if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
521       LOG.error("Server types " + Joiner.on(", ").join(
522           ImplType.serversThatCannotSpecifyBindIP()) + " don't support IP " +
523           "address binding at the moment. See " +
524           "https://issues.apache.org/jira/browse/HBASE-2155 for details.");
525       throw new RuntimeException(
526           "-" + BIND_CONF_KEY + " not supported with " + implType);
527     }
528 
529     // Thrift's implementation uses '0' as a placeholder for 'use the default.'
530     int backlog = conf.getInt(BACKLOG_CONF_KEY, 0);
531 
532     if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING ||
533         implType == ImplType.THREADED_SELECTOR) {
534       InetAddress listenAddress = getBindAddress(conf);
535       TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(
536           new InetSocketAddress(listenAddress, listenPort));
537 
538       if (implType == ImplType.NONBLOCKING) {
539         TNonblockingServer.Args serverArgs =
540             new TNonblockingServer.Args(serverTransport);
541         serverArgs.processor(processor)
542                   .transportFactory(transportFactory)
543                   .protocolFactory(protocolFactory);
544         tserver = new TNonblockingServer(serverArgs);
545       } else if (implType == ImplType.HS_HA) {
546         THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
547         CallQueue callQueue =
548             new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
549         ExecutorService executorService = createExecutor(
550             callQueue, serverArgs.getMinWorkerThreads(), serverArgs.getMaxWorkerThreads());
551         serverArgs.executorService(executorService)
552                   .processor(processor)
553                   .transportFactory(transportFactory)
554                   .protocolFactory(protocolFactory);
555         tserver = new THsHaServer(serverArgs);
556       } else { // THREADED_SELECTOR
557         TThreadedSelectorServer.Args serverArgs =
558             new HThreadedSelectorServerArgs(serverTransport, conf);
559         CallQueue callQueue =
560             new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
561         ExecutorService executorService = createExecutor(
562             callQueue, serverArgs.getWorkerThreads(), serverArgs.getWorkerThreads());
563         serverArgs.executorService(executorService)
564                   .processor(processor)
565                   .transportFactory(transportFactory)
566                   .protocolFactory(protocolFactory);
567         tserver = new TThreadedSelectorServer(serverArgs);
568       }
569       LOG.info("starting HBase " + implType.simpleClassName() +
570           " server on " + Integer.toString(listenPort));
571     } else if (implType == ImplType.THREAD_POOL) {
572       // Thread pool server. Get the IP address to bind to.
573       InetAddress listenAddress = getBindAddress(conf);
574       int readTimeout = conf.getInt(THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY,
575           THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT);
576       TServerTransport serverTransport = new TServerSocket(
577           new TServerSocket.ServerSocketTransportArgs().
578               bindAddr(new InetSocketAddress(listenAddress, listenPort)).
579               backlog(backlog).
580               clientTimeout(readTimeout));
581 
582       TBoundedThreadPoolServer.Args serverArgs =
583           new TBoundedThreadPoolServer.Args(serverTransport, conf);
584       serverArgs.processor(processor)
585                 .transportFactory(transportFactory)
586                 .protocolFactory(protocolFactory);
587       LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
588           + listenAddress + ":" + Integer.toString(listenPort)
589           + " with readTimeout " + readTimeout + "ms; " + serverArgs);
590       TBoundedThreadPoolServer tserver =
591           new TBoundedThreadPoolServer(serverArgs, metrics);
592       this.tserver = tserver;
593     } else {
594       throw new AssertionError("Unsupported Thrift server implementation: " +
595           implType.simpleClassName());
596     }
597 
598     // A sanity check that we instantiated the right type of server.
599     if (tserver.getClass() != implType.serverClass) {
600       throw new AssertionError("Expected to create Thrift server class " +
601           implType.serverClass.getName() + " but got " +
602           tserver.getClass().getName());
603     }
604 
605 
606 
607     registerFilters(conf);
608   }
609 
610   ExecutorService createExecutor(BlockingQueue<Runnable> callQueue,
611                                  int minWorkers, int maxWorkers) {
612     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
613     tfb.setDaemon(true);
614     tfb.setNameFormat("thrift-worker-%d");
615     return new ThreadPoolExecutor(minWorkers, maxWorkers,
616             Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
617   }
618 
619   private InetAddress getBindAddress(Configuration conf)
620       throws UnknownHostException {
621     String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
622     return InetAddress.getByName(bindAddressStr);
623   }
624 
625   protected static class ResultScannerWrapper {
626 
627     private final ResultScanner scanner;
628     private final boolean sortColumns;
629     public ResultScannerWrapper(ResultScanner resultScanner,
630                                 boolean sortResultColumns) {
631       scanner = resultScanner;
632       sortColumns = sortResultColumns;
633    }
634 
635     public ResultScanner getScanner() {
636       return scanner;
637     }
638 
639     public boolean isColumnSorted() {
640       return sortColumns;
641     }
642   }
643 
644   /**
645    * The HBaseHandler is a glue object that connects Thrift RPC calls to the
646    * HBase client API primarily defined in the Admin and Table objects.
647    */
648   public static class HBaseHandler implements Hbase.Iface {
649     protected Configuration conf;
650     protected static final Log LOG = LogFactory.getLog(HBaseHandler.class);
651 
652     // nextScannerId and scannerMap are used to manage scanner state
653     protected int nextScannerId = 0;
654     protected HashMap<Integer, ResultScannerWrapper> scannerMap = null;
655     private ThriftMetrics metrics = null;
656 
657     private final ConnectionCache connectionCache;
658     IncrementCoalescer coalescer = null;
659 
660     static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
661     static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
662 
663     /**
664      * Returns a list of all the column families for a given Table.
665      *
666      * @param table
667      * @throws IOException
668      */
669     byte[][] getAllColumns(Table table) throws IOException {
670       HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies();
671       byte[][] columns = new byte[cds.length][];
672       for (int i = 0; i < cds.length; i++) {
673         columns[i] = Bytes.add(cds[i].getName(),
674             KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
675       }
676       return columns;
677     }
678 
679     /**
680      * Creates and returns a Table instance from a given table name.
681      *
682      * @param tableName
683      *          name of table
684      * @return Table object
685      * @throws IOException
686      */
687     public Table getTable(final byte[] tableName) throws
688         IOException {
689       String table = Bytes.toString(tableName);
690       return connectionCache.getTable(table);
691     }
692 
693     public Table getTable(final ByteBuffer tableName) throws IOException {
694       return getTable(getBytes(tableName));
695     }
696 
697     /**
698      * Assigns a unique ID to the scanner and adds the mapping to an internal
699      * hash-map.
700      *
701      * @param scanner
702      * @return integer scanner id
703      */
704     protected synchronized int addScanner(ResultScanner scanner,boolean sortColumns) {
705       int id = nextScannerId++;
706       ResultScannerWrapper resultScannerWrapper = new ResultScannerWrapper(scanner, sortColumns);
707       scannerMap.put(id, resultScannerWrapper);
708       return id;
709     }
710 
711     /**
712      * Returns the scanner associated with the specified ID.
713      *
714      * @param id
715      * @return a Scanner, or null if ID was invalid.
716      */
717     protected synchronized ResultScannerWrapper getScanner(int id) {
718       return scannerMap.get(id);
719     }
720 
721     /**
722      * Removes the scanner associated with the specified ID from the internal
723      * id-&gt;scanner hash-map.
724      *
725      * @param id
726      * @return a Scanner, or null if ID was invalid.
727      */
728     protected synchronized ResultScannerWrapper removeScanner(int id) {
729       return scannerMap.remove(id);
730     }
731 
732     protected HBaseHandler(final Configuration c,
733         final UserProvider userProvider) throws IOException {
734       this.conf = c;
735       scannerMap = new HashMap<Integer, ResultScannerWrapper>();
736       this.coalescer = new IncrementCoalescer(this);
737 
738       int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
739       int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
740       connectionCache = new ConnectionCache(
741         conf, userProvider, cleanInterval, maxIdleTime);
742     }
743 
744     /**
745      * Obtain HBaseAdmin. Creates the instance if it is not already created.
746      */
747     private Admin getAdmin() throws IOException {
748       return connectionCache.getAdmin();
749     }
750 
751     void setEffectiveUser(String effectiveUser) {
752       connectionCache.setEffectiveUser(effectiveUser);
753     }
754 
755     @Override
756     public void enableTable(ByteBuffer tableName) throws IOError {
757       try{
758         getAdmin().enableTable(getTableName(tableName));
759       } catch (IOException e) {
760         LOG.warn(e.getMessage(), e);
761         throw new IOError(Throwables.getStackTraceAsString(e));
762       }
763     }
764 
765     @Override
766     public void disableTable(ByteBuffer tableName) throws IOError{
767       try{
768         getAdmin().disableTable(getTableName(tableName));
769       } catch (IOException e) {
770         LOG.warn(e.getMessage(), e);
771         throw new IOError(Throwables.getStackTraceAsString(e));
772       }
773     }
774 
775     @Override
776     public boolean isTableEnabled(ByteBuffer tableName) throws IOError {
777       try {
778         return this.connectionCache.getAdmin().isTableEnabled(getTableName(tableName));
779       } catch (IOException e) {
780         LOG.warn(e.getMessage(), e);
781         throw new IOError(Throwables.getStackTraceAsString(e));
782       }
783     }
784 
785     @Override
786     public void compact(ByteBuffer tableNameOrRegionName) throws IOError {
787       try {
788         // TODO: HBaseAdmin.compact(byte[]) deprecated and not trivial to replace here.
789         // ThriftServerRunner.compact should be deprecated and replaced with methods specific to
790         // table and region.
791         ((HBaseAdmin) getAdmin()).compact(getBytes(tableNameOrRegionName));
792       } catch (IOException e) {
793         LOG.warn(e.getMessage(), e);
794         throw new IOError(Throwables.getStackTraceAsString(e));
795       }
796     }
797 
798     @Override
799     public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError {
800       try {
801         // TODO: HBaseAdmin.majorCompact(byte[]) deprecated and not trivial to replace here.
802         // ThriftServerRunner.majorCompact should be deprecated and replaced with methods specific
803         // to table and region.
804         ((HBaseAdmin) getAdmin()).majorCompact(getBytes(tableNameOrRegionName));
805       } catch (IOException e) {
806         LOG.warn(e.getMessage(), e);
807         throw new IOError(Throwables.getStackTraceAsString(e));
808       }
809     }
810 
811     @Override
812     public List<ByteBuffer> getTableNames() throws IOError {
813       try {
814         TableName[] tableNames = this.getAdmin().listTableNames();
815         ArrayList<ByteBuffer> list = new ArrayList<ByteBuffer>(tableNames.length);
816         for (int i = 0; i < tableNames.length; i++) {
817           list.add(ByteBuffer.wrap(tableNames[i].getName()));
818         }
819         return list;
820       } catch (IOException e) {
821         LOG.warn(e.getMessage(), e);
822         throw new IOError(Throwables.getStackTraceAsString(e));
823       }
824     }
825 
826     /**
827      * @return the list of regions in the given table, or an empty list if the table does not exist
828      */
829     @Override
830     public List<TRegionInfo> getTableRegions(ByteBuffer tableName)
831     throws IOError {
832       try (RegionLocator locator = connectionCache.getRegionLocator(getBytes(tableName))) {
833         List<HRegionLocation> regionLocations = locator.getAllRegionLocations();
834         List<TRegionInfo> results = new ArrayList<TRegionInfo>();
835         for (HRegionLocation regionLocation : regionLocations) {
836           HRegionInfo info = regionLocation.getRegionInfo();
837           ServerName serverName = regionLocation.getServerName();
838           TRegionInfo region = new TRegionInfo();
839           region.serverName = ByteBuffer.wrap(
840               Bytes.toBytes(serverName.getHostname()));
841           region.port = serverName.getPort();
842           region.startKey = ByteBuffer.wrap(info.getStartKey());
843           region.endKey = ByteBuffer.wrap(info.getEndKey());
844           region.id = info.getRegionId();
845           region.name = ByteBuffer.wrap(info.getRegionName());
846           region.version = info.getVersion();
847           results.add(region);
848         }
849         return results;
850       } catch (TableNotFoundException e) {
851         // Return empty list for non-existing table
852         return Collections.emptyList();
853       } catch (IOException e){
854         LOG.warn(e.getMessage(), e);
855         throw new IOError(Throwables.getStackTraceAsString(e));
856       }
857     }
858 
859     @Override
860     public List<TCell> get(
861         ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
862         Map<ByteBuffer, ByteBuffer> attributes)
863         throws IOError {
864       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
865       if (famAndQf.length == 1) {
866         return get(tableName, row, famAndQf[0], null, attributes);
867       }
868       if (famAndQf.length == 2) {
869         return get(tableName, row, famAndQf[0], famAndQf[1], attributes);
870       }
871       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
872     }
873 
874     /**
875      * Note: this internal interface is slightly different from public APIs in regard to handling
876      * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
877      * we respect qual == null as a request for the entire column family. The caller (
878      * {@link #get(ByteBuffer, ByteBuffer, ByteBuffer, Map)}) interface IS consistent in that the
879      * column is parse like normal.
880      */
881     protected List<TCell> get(ByteBuffer tableName,
882                               ByteBuffer row,
883                               byte[] family,
884                               byte[] qualifier,
885                               Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
886       Table table = null;
887       try {
888         table = getTable(tableName);
889         Get get = new Get(getBytes(row));
890         addAttributes(get, attributes);
891         if (qualifier == null) {
892           get.addFamily(family);
893         } else {
894           get.addColumn(family, qualifier);
895         }
896         Result result = table.get(get);
897         return ThriftUtilities.cellFromHBase(result.rawCells());
898       } catch (IOException e) {
899         LOG.warn(e.getMessage(), e);
900         throw new IOError(Throwables.getStackTraceAsString(e));
901       } finally {
902         closeTable(table);
903       }
904     }
905 
906     @Override
907     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
908         int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
909       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
910       if(famAndQf.length == 1) {
911         return getVer(tableName, row, famAndQf[0], null, numVersions, attributes);
912       }
913       if (famAndQf.length == 2) {
914         return getVer(tableName, row, famAndQf[0], famAndQf[1], numVersions, attributes);
915       }
916       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
917 
918     }
919 
920     /**
921      * Note: this public interface is slightly different from public Java APIs in regard to
922      * handling of the qualifier. Here we differ from the public Java API in that null != byte[0].
923      * Rather, we respect qual == null as a request for the entire column family. If you want to
924      * access the entire column family, use
925      * {@link #getVer(ByteBuffer, ByteBuffer, ByteBuffer, int, Map)} with a {@code column} value
926      * that lacks a {@code ':'}.
927      */
928     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family,
929         byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
930 
931       Table table = null;
932       try {
933         table = getTable(tableName);
934         Get get = new Get(getBytes(row));
935         addAttributes(get, attributes);
936         if (null == qualifier) {
937           get.addFamily(family);
938         } else {
939           get.addColumn(family, qualifier);
940         }
941         get.setMaxVersions(numVersions);
942         Result result = table.get(get);
943         return ThriftUtilities.cellFromHBase(result.rawCells());
944       } catch (IOException e) {
945         LOG.warn(e.getMessage(), e);
946         throw new IOError(Throwables.getStackTraceAsString(e));
947       } finally{
948         closeTable(table);
949       }
950     }
951 
952     @Override
953     public List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
954         long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
955       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
956       if (famAndQf.length == 1) {
957         return getVerTs(tableName, row, famAndQf[0], null, timestamp, numVersions, attributes);
958       }
959       if (famAndQf.length == 2) {
960         return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, numVersions,
961           attributes);
962       }
963       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
964     }
965 
966     /**
967      * Note: this internal interface is slightly different from public APIs in regard to handling
968      * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
969      * we respect qual == null as a request for the entire column family. The caller (
970      * {@link #getVerTs(ByteBuffer, ByteBuffer, ByteBuffer, long, int, Map)}) interface IS
971      * consistent in that the column is parse like normal.
972      */
973     protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
974         byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
975         throws IOError {
976 
977       Table table = null;
978       try {
979         table = getTable(tableName);
980         Get get = new Get(getBytes(row));
981         addAttributes(get, attributes);
982         if (null == qualifier) {
983           get.addFamily(family);
984         } else {
985           get.addColumn(family, qualifier);
986         }
987         get.setTimeRange(0, timestamp);
988         get.setMaxVersions(numVersions);
989         Result result = table.get(get);
990         return ThriftUtilities.cellFromHBase(result.rawCells());
991       } catch (IOException e) {
992         LOG.warn(e.getMessage(), e);
993         throw new IOError(Throwables.getStackTraceAsString(e));
994       } finally{
995         closeTable(table);
996       }
997     }
998 
999     @Override
1000     public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row,
1001         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1002       return getRowWithColumnsTs(tableName, row, null,
1003                                  HConstants.LATEST_TIMESTAMP,
1004                                  attributes);
1005     }
1006 
1007     @Override
1008     public List<TRowResult> getRowWithColumns(ByteBuffer tableName,
1009                                               ByteBuffer row,
1010         List<ByteBuffer> columns,
1011         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1012       return getRowWithColumnsTs(tableName, row, columns,
1013                                  HConstants.LATEST_TIMESTAMP,
1014                                  attributes);
1015     }
1016 
1017     @Override
1018     public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row,
1019         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1020       return getRowWithColumnsTs(tableName, row, null,
1021                                  timestamp, attributes);
1022     }
1023 
1024     @Override
1025     public List<TRowResult> getRowWithColumnsTs(
1026         ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
1027         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1028 
1029       Table table = null;
1030       try {
1031         table = getTable(tableName);
1032         if (columns == null) {
1033           Get get = new Get(getBytes(row));
1034           addAttributes(get, attributes);
1035           get.setTimeRange(0, timestamp);
1036           Result result = table.get(get);
1037           return ThriftUtilities.rowResultFromHBase(result);
1038         }
1039         Get get = new Get(getBytes(row));
1040         addAttributes(get, attributes);
1041         for(ByteBuffer column : columns) {
1042           byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1043           if (famAndQf.length == 1) {
1044               get.addFamily(famAndQf[0]);
1045           } else {
1046               get.addColumn(famAndQf[0], famAndQf[1]);
1047           }
1048         }
1049         get.setTimeRange(0, timestamp);
1050         Result result = table.get(get);
1051         return ThriftUtilities.rowResultFromHBase(result);
1052       } catch (IOException e) {
1053         LOG.warn(e.getMessage(), e);
1054         throw new IOError(Throwables.getStackTraceAsString(e));
1055       } finally{
1056         closeTable(table);
1057       }
1058     }
1059 
1060     @Override
1061     public List<TRowResult> getRows(ByteBuffer tableName,
1062                                     List<ByteBuffer> rows,
1063         Map<ByteBuffer, ByteBuffer> attributes)
1064         throws IOError {
1065       return getRowsWithColumnsTs(tableName, rows, null,
1066                                   HConstants.LATEST_TIMESTAMP,
1067                                   attributes);
1068     }
1069 
1070     @Override
1071     public List<TRowResult> getRowsWithColumns(ByteBuffer tableName,
1072                                                List<ByteBuffer> rows,
1073         List<ByteBuffer> columns,
1074         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1075       return getRowsWithColumnsTs(tableName, rows, columns,
1076                                   HConstants.LATEST_TIMESTAMP,
1077                                   attributes);
1078     }
1079 
1080     @Override
1081     public List<TRowResult> getRowsTs(ByteBuffer tableName,
1082                                       List<ByteBuffer> rows,
1083         long timestamp,
1084         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1085       return getRowsWithColumnsTs(tableName, rows, null,
1086                                   timestamp, attributes);
1087     }
1088 
1089     @Override
1090     public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName,
1091                                                  List<ByteBuffer> rows,
1092         List<ByteBuffer> columns, long timestamp,
1093         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1094 
1095       Table table= null;
1096       try {
1097         List<Get> gets = new ArrayList<Get>(rows.size());
1098         table = getTable(tableName);
1099         if (metrics != null) {
1100           metrics.incNumRowKeysInBatchGet(rows.size());
1101         }
1102         for (ByteBuffer row : rows) {
1103           Get get = new Get(getBytes(row));
1104           addAttributes(get, attributes);
1105           if (columns != null) {
1106 
1107             for(ByteBuffer column : columns) {
1108               byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1109               if (famAndQf.length == 1) {
1110                 get.addFamily(famAndQf[0]);
1111               } else {
1112                 get.addColumn(famAndQf[0], famAndQf[1]);
1113               }
1114             }
1115           }
1116           get.setTimeRange(0, timestamp);
1117           gets.add(get);
1118         }
1119         Result[] result = table.get(gets);
1120         return ThriftUtilities.rowResultFromHBase(result);
1121       } catch (IOException e) {
1122         LOG.warn(e.getMessage(), e);
1123         throw new IOError(Throwables.getStackTraceAsString(e));
1124       } finally{
1125         closeTable(table);
1126       }
1127     }
1128 
1129     @Override
1130     public void deleteAll(
1131         ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1132         Map<ByteBuffer, ByteBuffer> attributes)
1133         throws IOError {
1134       deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP,
1135                   attributes);
1136     }
1137 
1138     @Override
1139     public void deleteAllTs(ByteBuffer tableName,
1140                             ByteBuffer row,
1141                             ByteBuffer column,
1142         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1143       Table table = null;
1144       try {
1145         table = getTable(tableName);
1146         Delete delete  = new Delete(getBytes(row));
1147         addAttributes(delete, attributes);
1148         byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1149         if (famAndQf.length == 1) {
1150           delete.deleteFamily(famAndQf[0], timestamp);
1151         } else {
1152           delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1153         }
1154         table.delete(delete);
1155 
1156       } catch (IOException e) {
1157         LOG.warn(e.getMessage(), e);
1158         throw new IOError(Throwables.getStackTraceAsString(e));
1159       } finally {
1160         closeTable(table);
1161       }
1162     }
1163 
1164     @Override
1165     public void deleteAllRow(
1166         ByteBuffer tableName, ByteBuffer row,
1167         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1168       deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, attributes);
1169     }
1170 
1171     @Override
1172     public void deleteAllRowTs(
1173         ByteBuffer tableName, ByteBuffer row, long timestamp,
1174         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1175       Table table = null;
1176       try {
1177         table = getTable(tableName);
1178         Delete delete  = new Delete(getBytes(row), timestamp);
1179         addAttributes(delete, attributes);
1180         table.delete(delete);
1181       } catch (IOException e) {
1182         LOG.warn(e.getMessage(), e);
1183         throw new IOError(Throwables.getStackTraceAsString(e));
1184       } finally {
1185         closeTable(table);
1186       }
1187     }
1188 
1189     @Override
1190     public void createTable(ByteBuffer in_tableName,
1191         List<ColumnDescriptor> columnFamilies) throws IOError,
1192         IllegalArgument, AlreadyExists {
1193       TableName tableName = getTableName(in_tableName);
1194       try {
1195         if (getAdmin().tableExists(tableName)) {
1196           throw new AlreadyExists("table name already in use");
1197         }
1198         HTableDescriptor desc = new HTableDescriptor(tableName);
1199         for (ColumnDescriptor col : columnFamilies) {
1200           HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col);
1201           desc.addFamily(colDesc);
1202         }
1203         getAdmin().createTable(desc);
1204       } catch (IOException e) {
1205         LOG.warn(e.getMessage(), e);
1206         throw new IOError(Throwables.getStackTraceAsString(e));
1207       } catch (IllegalArgumentException e) {
1208         LOG.warn(e.getMessage(), e);
1209         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1210       }
1211     }
1212 
1213     private static TableName getTableName(ByteBuffer buffer) {
1214       return TableName.valueOf(getBytes(buffer));
1215     }
1216 
1217     @Override
1218     public void deleteTable(ByteBuffer in_tableName) throws IOError {
1219       TableName tableName = getTableName(in_tableName);
1220       if (LOG.isDebugEnabled()) {
1221         LOG.debug("deleteTable: table=" + tableName);
1222       }
1223       try {
1224         if (!getAdmin().tableExists(tableName)) {
1225           throw new IOException("table does not exist");
1226         }
1227         getAdmin().deleteTable(tableName);
1228       } catch (IOException e) {
1229         LOG.warn(e.getMessage(), e);
1230         throw new IOError(Throwables.getStackTraceAsString(e));
1231       }
1232     }
1233 
1234     @Override
1235     public void mutateRow(ByteBuffer tableName, ByteBuffer row,
1236         List<Mutation> mutations, Map<ByteBuffer, ByteBuffer> attributes)
1237         throws IOError, IllegalArgument {
1238       mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP,
1239                   attributes);
1240     }
1241 
1242     @Override
1243     public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
1244         List<Mutation> mutations, long timestamp,
1245         Map<ByteBuffer, ByteBuffer> attributes)
1246         throws IOError, IllegalArgument {
1247       Table table = null;
1248       try {
1249         table = getTable(tableName);
1250         Put put = new Put(getBytes(row), timestamp);
1251         addAttributes(put, attributes);
1252 
1253         Delete delete = new Delete(getBytes(row));
1254         addAttributes(delete, attributes);
1255         if (metrics != null) {
1256           metrics.incNumRowKeysInBatchMutate(mutations.size());
1257         }
1258 
1259         // I apologize for all this mess :)
1260         for (Mutation m : mutations) {
1261           byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1262           if (m.isDelete) {
1263             if (famAndQf.length == 1) {
1264               delete.deleteFamily(famAndQf[0], timestamp);
1265             } else {
1266               delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1267             }
1268             delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1269                 : Durability.SKIP_WAL);
1270           } else {
1271             if(famAndQf.length == 1) {
1272               LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1273                   + "over the whole column family.");
1274             } else {
1275               put.addImmutable(famAndQf[0], famAndQf[1],
1276                   m.value != null ? getBytes(m.value)
1277                       : HConstants.EMPTY_BYTE_ARRAY);
1278             }
1279             put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1280           }
1281         }
1282         if (!delete.isEmpty())
1283           table.delete(delete);
1284         if (!put.isEmpty())
1285           table.put(put);
1286       } catch (IOException e) {
1287         LOG.warn(e.getMessage(), e);
1288         throw new IOError(Throwables.getStackTraceAsString(e));
1289       } catch (IllegalArgumentException e) {
1290         LOG.warn(e.getMessage(), e);
1291         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1292       } finally{
1293         closeTable(table);
1294       }
1295     }
1296 
1297     @Override
1298     public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches,
1299         Map<ByteBuffer, ByteBuffer> attributes)
1300         throws IOError, IllegalArgument, TException {
1301       mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes);
1302     }
1303 
1304     @Override
1305     public void mutateRowsTs(
1306         ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp,
1307         Map<ByteBuffer, ByteBuffer> attributes)
1308         throws IOError, IllegalArgument, TException {
1309       List<Put> puts = new ArrayList<Put>();
1310       List<Delete> deletes = new ArrayList<Delete>();
1311 
1312       for (BatchMutation batch : rowBatches) {
1313         byte[] row = getBytes(batch.row);
1314         List<Mutation> mutations = batch.mutations;
1315         Delete delete = new Delete(row);
1316         addAttributes(delete, attributes);
1317         Put put = new Put(row, timestamp);
1318         addAttributes(put, attributes);
1319         for (Mutation m : mutations) {
1320           byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1321           if (m.isDelete) {
1322             // no qualifier, family only.
1323             if (famAndQf.length == 1) {
1324               delete.deleteFamily(famAndQf[0], timestamp);
1325             } else {
1326               delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1327             }
1328             delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1329                 : Durability.SKIP_WAL);
1330           } else {
1331             if (famAndQf.length == 1) {
1332               LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1333                   + "over the whole column family.");
1334             }
1335             if (famAndQf.length == 2) {
1336               put.addImmutable(famAndQf[0], famAndQf[1],
1337                   m.value != null ? getBytes(m.value)
1338                       : HConstants.EMPTY_BYTE_ARRAY);
1339             } else {
1340               throw new IllegalArgumentException("Invalid famAndQf provided.");
1341             }
1342             put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1343           }
1344         }
1345         if (!delete.isEmpty())
1346           deletes.add(delete);
1347         if (!put.isEmpty())
1348           puts.add(put);
1349       }
1350 
1351       Table table = null;
1352       try {
1353         table = getTable(tableName);
1354         if (!puts.isEmpty())
1355           table.put(puts);
1356         if (!deletes.isEmpty())
1357           table.delete(deletes);
1358 
1359       } catch (IOException e) {
1360         LOG.warn(e.getMessage(), e);
1361         throw new IOError(Throwables.getStackTraceAsString(e));
1362       } catch (IllegalArgumentException e) {
1363         LOG.warn(e.getMessage(), e);
1364         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1365       } finally{
1366         closeTable(table);
1367       }
1368     }
1369 
1370     @Override
1371     public long atomicIncrement(
1372         ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount)
1373             throws IOError, IllegalArgument, TException {
1374       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1375       if(famAndQf.length == 1) {
1376         return atomicIncrement(tableName, row, famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, amount);
1377       }
1378       return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
1379     }
1380 
1381     protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
1382         byte [] family, byte [] qualifier, long amount)
1383         throws IOError, IllegalArgument, TException {
1384       Table table = null;
1385       try {
1386         table = getTable(tableName);
1387         return table.incrementColumnValue(
1388             getBytes(row), family, qualifier, amount);
1389       } catch (IOException e) {
1390         LOG.warn(e.getMessage(), e);
1391         throw new IOError(Throwables.getStackTraceAsString(e));
1392       } finally {
1393         closeTable(table);
1394       }
1395     }
1396 
1397     @Override
1398     public void scannerClose(int id) throws IOError, IllegalArgument {
1399       LOG.debug("scannerClose: id=" + id);
1400       ResultScannerWrapper resultScannerWrapper = getScanner(id);
1401       if (resultScannerWrapper == null) {
1402         String message = "scanner ID is invalid";
1403         LOG.warn(message);
1404         throw new IllegalArgument("scanner ID is invalid");
1405       }
1406       resultScannerWrapper.getScanner().close();
1407       removeScanner(id);
1408     }
1409 
1410     @Override
1411     public List<TRowResult> scannerGetList(int id,int nbRows)
1412         throws IllegalArgument, IOError {
1413       LOG.debug("scannerGetList: id=" + id);
1414       ResultScannerWrapper resultScannerWrapper = getScanner(id);
1415       if (null == resultScannerWrapper) {
1416         String message = "scanner ID is invalid";
1417         LOG.warn(message);
1418         throw new IllegalArgument("scanner ID is invalid");
1419       }
1420 
1421       Result [] results = null;
1422       try {
1423         results = resultScannerWrapper.getScanner().next(nbRows);
1424         if (null == results) {
1425           return new ArrayList<TRowResult>();
1426         }
1427       } catch (IOException e) {
1428         LOG.warn(e.getMessage(), e);
1429         throw new IOError(Throwables.getStackTraceAsString(e));
1430       }
1431       return ThriftUtilities.rowResultFromHBase(results, resultScannerWrapper.isColumnSorted());
1432     }
1433 
1434     @Override
1435     public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
1436       return scannerGetList(id,1);
1437     }
1438 
1439     @Override
1440     public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
1441         Map<ByteBuffer, ByteBuffer> attributes)
1442         throws IOError {
1443 
1444       Table table = null;
1445       try {
1446         table = getTable(tableName);
1447         Scan scan = new Scan();
1448         addAttributes(scan, attributes);
1449         if (tScan.isSetStartRow()) {
1450           scan.setStartRow(tScan.getStartRow());
1451         }
1452         if (tScan.isSetStopRow()) {
1453           scan.setStopRow(tScan.getStopRow());
1454         }
1455         if (tScan.isSetTimestamp()) {
1456           scan.setTimeRange(0, tScan.getTimestamp());
1457         }
1458         if (tScan.isSetCaching()) {
1459           scan.setCaching(tScan.getCaching());
1460         }
1461         if (tScan.isSetBatchSize()) {
1462           scan.setBatch(tScan.getBatchSize());
1463         }
1464         if (tScan.isSetColumns() && tScan.getColumns().size() != 0) {
1465           for(ByteBuffer column : tScan.getColumns()) {
1466             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1467             if(famQf.length == 1) {
1468               scan.addFamily(famQf[0]);
1469             } else {
1470               scan.addColumn(famQf[0], famQf[1]);
1471             }
1472           }
1473         }
1474         if (tScan.isSetFilterString()) {
1475           ParseFilter parseFilter = new ParseFilter();
1476           scan.setFilter(
1477               parseFilter.parseFilterString(tScan.getFilterString()));
1478         }
1479         if (tScan.isSetReversed()) {
1480           scan.setReversed(tScan.isReversed());
1481         }
1482         return addScanner(table.getScanner(scan), tScan.sortColumns);
1483       } catch (IOException e) {
1484         LOG.warn(e.getMessage(), e);
1485         throw new IOError(Throwables.getStackTraceAsString(e));
1486       } finally{
1487         closeTable(table);
1488       }
1489     }
1490 
1491     @Override
1492     public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
1493         List<ByteBuffer> columns,
1494         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1495 
1496       Table table = null;
1497       try {
1498         table = getTable(tableName);
1499         Scan scan = new Scan(getBytes(startRow));
1500         addAttributes(scan, attributes);
1501         if(columns != null && columns.size() != 0) {
1502           for(ByteBuffer column : columns) {
1503             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1504             if(famQf.length == 1) {
1505               scan.addFamily(famQf[0]);
1506             } else {
1507               scan.addColumn(famQf[0], famQf[1]);
1508             }
1509           }
1510         }
1511         return addScanner(table.getScanner(scan), false);
1512       } catch (IOException e) {
1513         LOG.warn(e.getMessage(), e);
1514         throw new IOError(Throwables.getStackTraceAsString(e));
1515       } finally{
1516         closeTable(table);
1517       }
1518     }
1519 
1520     @Override
1521     public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow,
1522         ByteBuffer stopRow, List<ByteBuffer> columns,
1523         Map<ByteBuffer, ByteBuffer> attributes)
1524         throws IOError, TException {
1525 
1526       Table table = null;
1527       try {
1528         table = getTable(tableName);
1529         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1530         addAttributes(scan, attributes);
1531         if(columns != null && columns.size() != 0) {
1532           for(ByteBuffer column : columns) {
1533             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1534             if(famQf.length == 1) {
1535               scan.addFamily(famQf[0]);
1536             } else {
1537               scan.addColumn(famQf[0], famQf[1]);
1538             }
1539           }
1540         }
1541         return addScanner(table.getScanner(scan), false);
1542       } catch (IOException e) {
1543         LOG.warn(e.getMessage(), e);
1544         throw new IOError(Throwables.getStackTraceAsString(e));
1545       } finally{
1546         closeTable(table);
1547       }
1548     }
1549 
1550     @Override
1551     public int scannerOpenWithPrefix(ByteBuffer tableName,
1552                                      ByteBuffer startAndPrefix,
1553                                      List<ByteBuffer> columns,
1554         Map<ByteBuffer, ByteBuffer> attributes)
1555         throws IOError, TException {
1556 
1557       Table table = null;
1558       try {
1559         table = getTable(tableName);
1560         Scan scan = new Scan(getBytes(startAndPrefix));
1561         addAttributes(scan, attributes);
1562         Filter f = new WhileMatchFilter(
1563             new PrefixFilter(getBytes(startAndPrefix)));
1564         scan.setFilter(f);
1565         if (columns != null && columns.size() != 0) {
1566           for(ByteBuffer column : columns) {
1567             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1568             if(famQf.length == 1) {
1569               scan.addFamily(famQf[0]);
1570             } else {
1571               scan.addColumn(famQf[0], famQf[1]);
1572             }
1573           }
1574         }
1575         return addScanner(table.getScanner(scan), false);
1576       } catch (IOException e) {
1577         LOG.warn(e.getMessage(), e);
1578         throw new IOError(Throwables.getStackTraceAsString(e));
1579       } finally{
1580         closeTable(table);
1581       }
1582     }
1583 
1584     @Override
1585     public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
1586         List<ByteBuffer> columns, long timestamp,
1587         Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
1588 
1589       Table table = null;
1590       try {
1591         table = getTable(tableName);
1592         Scan scan = new Scan(getBytes(startRow));
1593         addAttributes(scan, attributes);
1594         scan.setTimeRange(0, timestamp);
1595         if (columns != null && columns.size() != 0) {
1596           for (ByteBuffer column : columns) {
1597             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1598             if(famQf.length == 1) {
1599               scan.addFamily(famQf[0]);
1600             } else {
1601               scan.addColumn(famQf[0], famQf[1]);
1602             }
1603           }
1604         }
1605         return addScanner(table.getScanner(scan), false);
1606       } catch (IOException e) {
1607         LOG.warn(e.getMessage(), e);
1608         throw new IOError(Throwables.getStackTraceAsString(e));
1609       } finally{
1610         closeTable(table);
1611       }
1612     }
1613 
1614     @Override
1615     public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow,
1616         ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
1617         Map<ByteBuffer, ByteBuffer> attributes)
1618         throws IOError, TException {
1619 
1620       Table table = null;
1621       try {
1622         table = getTable(tableName);
1623         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1624         addAttributes(scan, attributes);
1625         scan.setTimeRange(0, timestamp);
1626         if (columns != null && columns.size() != 0) {
1627           for (ByteBuffer column : columns) {
1628             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1629             if(famQf.length == 1) {
1630               scan.addFamily(famQf[0]);
1631             } else {
1632               scan.addColumn(famQf[0], famQf[1]);
1633             }
1634           }
1635         }
1636         scan.setTimeRange(0, timestamp);
1637         return addScanner(table.getScanner(scan), false);
1638       } catch (IOException e) {
1639         LOG.warn(e.getMessage(), e);
1640         throw new IOError(Throwables.getStackTraceAsString(e));
1641       } finally{
1642         closeTable(table);
1643       }
1644     }
1645 
1646     @Override
1647     public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
1648         ByteBuffer tableName) throws IOError, TException {
1649 
1650       Table table = null;
1651       try {
1652         TreeMap<ByteBuffer, ColumnDescriptor> columns =
1653           new TreeMap<ByteBuffer, ColumnDescriptor>();
1654 
1655         table = getTable(tableName);
1656         HTableDescriptor desc = table.getTableDescriptor();
1657 
1658         for (HColumnDescriptor e : desc.getFamilies()) {
1659           ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
1660           columns.put(col.name, col);
1661         }
1662         return columns;
1663       } catch (IOException e) {
1664         LOG.warn(e.getMessage(), e);
1665         throw new IOError(Throwables.getStackTraceAsString(e));
1666       } finally {
1667         closeTable(table);
1668       }
1669     }
1670 
1671     @Deprecated
1672     @Override
1673     public List<TCell> getRowOrBefore(ByteBuffer tableName, ByteBuffer row,
1674         ByteBuffer family) throws IOError {
1675       try {
1676         Result result = getRowOrBefore(getBytes(tableName), getBytes(row), getBytes(family));
1677         return ThriftUtilities.cellFromHBase(result.rawCells());
1678       } catch (IOException e) {
1679         LOG.warn(e.getMessage(), e);
1680         throw new IOError(Throwables.getStackTraceAsString(e));
1681       }
1682     }
1683 
1684     @Override
1685     public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
1686       try {
1687         byte[] row = getBytes(searchRow);
1688         Result startRowResult =
1689             getRowOrBefore(TableName.META_TABLE_NAME.getName(), row, HConstants.CATALOG_FAMILY);
1690 
1691         if (startRowResult == null) {
1692           throw new IOException("Cannot find row in "+ TableName.META_TABLE_NAME+", row="
1693                                 + Bytes.toStringBinary(row));
1694         }
1695 
1696         // find region start and end keys
1697         HRegionInfo regionInfo = HRegionInfo.getHRegionInfo(startRowResult);
1698         if (regionInfo == null) {
1699           throw new IOException("HRegionInfo REGIONINFO was null or " +
1700                                 " empty in Meta for row="
1701                                 + Bytes.toStringBinary(row));
1702         }
1703         TRegionInfo region = new TRegionInfo();
1704         region.setStartKey(regionInfo.getStartKey());
1705         region.setEndKey(regionInfo.getEndKey());
1706         region.id = regionInfo.getRegionId();
1707         region.setName(regionInfo.getRegionName());
1708         region.version = regionInfo.getVersion();
1709 
1710         // find region assignment to server
1711         ServerName serverName = HRegionInfo.getServerName(startRowResult);
1712         if (serverName != null) {
1713           region.setServerName(Bytes.toBytes(serverName.getHostname()));
1714           region.port = serverName.getPort();
1715         }
1716         return region;
1717       } catch (IOException e) {
1718         LOG.warn(e.getMessage(), e);
1719         throw new IOError(Throwables.getStackTraceAsString(e));
1720       }
1721     }
1722 
1723     private void closeTable(Table table) throws IOError
1724     {
1725       try{
1726         if(table != null){
1727           table.close();
1728         }
1729       } catch (IOException e){
1730         LOG.error(e.getMessage(), e);
1731         throw new IOError(Throwables.getStackTraceAsString(e));
1732       }
1733     }
1734 
1735     private Result getRowOrBefore(byte[] tableName, byte[] row, byte[] family) throws IOException {
1736       Scan scan = new Scan(row);
1737       scan.setReversed(true);
1738       scan.addFamily(family);
1739       scan.setStartRow(row);
1740       Table table = getTable(tableName);
1741       try (ResultScanner scanner = table.getScanner(scan)) {
1742         return scanner.next();
1743       } finally{
1744         if(table != null){
1745           table.close();
1746         }
1747       }
1748     }
1749 
1750     private void initMetrics(ThriftMetrics metrics) {
1751       this.metrics = metrics;
1752     }
1753 
1754     @Override
1755     public void increment(TIncrement tincrement) throws IOError, TException {
1756 
1757       if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) {
1758         throw new TException("Must supply a table and a row key; can't increment");
1759       }
1760 
1761       if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1762         this.coalescer.queueIncrement(tincrement);
1763         return;
1764       }
1765 
1766       Table table = null;
1767       try {
1768         table = getTable(tincrement.getTable());
1769         Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
1770         table.increment(inc);
1771       } catch (IOException e) {
1772         LOG.warn(e.getMessage(), e);
1773         throw new IOError(Throwables.getStackTraceAsString(e));
1774       } finally{
1775         closeTable(table);
1776       }
1777     }
1778 
1779     @Override
1780     public void incrementRows(List<TIncrement> tincrements) throws IOError, TException {
1781       if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1782         this.coalescer.queueIncrements(tincrements);
1783         return;
1784       }
1785       for (TIncrement tinc : tincrements) {
1786         increment(tinc);
1787       }
1788     }
1789 
1790     @Override
1791     public List<TCell> append(TAppend tappend) throws IOError, TException {
1792       if (tappend.getRow().length == 0 || tappend.getTable().length == 0) {
1793         throw new TException("Must supply a table and a row key; can't append");
1794       }
1795 
1796       Table table = null;
1797       try {
1798         table = getTable(tappend.getTable());
1799         Append append = ThriftUtilities.appendFromThrift(tappend);
1800         Result result = table.append(append);
1801         return ThriftUtilities.cellFromHBase(result.rawCells());
1802       } catch (IOException e) {
1803         LOG.warn(e.getMessage(), e);
1804         throw new IOError(Throwables.getStackTraceAsString(e));
1805       } finally{
1806           closeTable(table);
1807       }
1808     }
1809 
1810     @Override
1811     public boolean checkAndPut(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1812         ByteBuffer value, Mutation mput, Map<ByteBuffer, ByteBuffer> attributes) throws IOError,
1813         IllegalArgument, TException {
1814       Put put;
1815       try {
1816         put = new Put(getBytes(row), HConstants.LATEST_TIMESTAMP);
1817         addAttributes(put, attributes);
1818 
1819         byte[][] famAndQf = KeyValue.parseColumn(getBytes(mput.column));
1820 
1821         put.addImmutable(famAndQf[0], famAndQf[1], mput.value != null ? getBytes(mput.value)
1822             : HConstants.EMPTY_BYTE_ARRAY);
1823 
1824         put.setDurability(mput.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1825       } catch (IllegalArgumentException e) {
1826         LOG.warn(e.getMessage(), e);
1827         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1828       }
1829 
1830       Table table = null;
1831       try {
1832         table = getTable(tableName);
1833         byte[][] famAndQf = KeyValue.parseColumn(getBytes(column));
1834         return table.checkAndPut(getBytes(row), famAndQf[0], famAndQf[1],
1835           value != null ? getBytes(value) : HConstants.EMPTY_BYTE_ARRAY, put);
1836       } catch (IOException e) {
1837         LOG.warn(e.getMessage(), e);
1838         throw new IOError(Throwables.getStackTraceAsString(e));
1839       } catch (IllegalArgumentException e) {
1840         LOG.warn(e.getMessage(), e);
1841         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1842       } finally {
1843           closeTable(table);
1844       }
1845     }
1846   }
1847 
1848 
1849 
1850   /**
1851    * Adds all the attributes into the Operation object
1852    */
1853   private static void addAttributes(OperationWithAttributes op,
1854     Map<ByteBuffer, ByteBuffer> attributes) {
1855     if (attributes == null || attributes.size() == 0) {
1856       return;
1857     }
1858     for (Map.Entry<ByteBuffer, ByteBuffer> entry : attributes.entrySet()) {
1859       String name = Bytes.toStringBinary(getBytes(entry.getKey()));
1860       byte[] value =  getBytes(entry.getValue());
1861       op.setAttribute(name, value);
1862     }
1863   }
1864 
1865   public static void registerFilters(Configuration conf) {
1866     String[] filters = conf.getStrings("hbase.thrift.filters");
1867     if(filters != null) {
1868       for(String filterClass: filters) {
1869         String[] filterPart = filterClass.split(":");
1870         if(filterPart.length != 2) {
1871           LOG.warn("Invalid filter specification " + filterClass + " - skipping");
1872         } else {
1873           ParseFilter.registerFilter(filterPart[0], filterPart[1]);
1874         }
1875       }
1876     }
1877   }
1878 }