View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.thrift;
20  
21  import static org.apache.hadoop.hbase.util.Bytes.getBytes;
22  
23  import java.io.IOException;
24  import java.net.InetAddress;
25  import java.net.InetSocketAddress;
26  import java.net.UnknownHostException;
27  import java.nio.ByteBuffer;
28  import java.security.PrivilegedAction;
29  import java.util.ArrayList;
30  import java.util.Arrays;
31  import java.util.Collections;
32  import java.util.HashMap;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.TreeMap;
36  import java.util.concurrent.BlockingQueue;
37  import java.util.concurrent.ExecutorService;
38  import java.util.concurrent.LinkedBlockingQueue;
39  import java.util.concurrent.ThreadPoolExecutor;
40  import java.util.concurrent.TimeUnit;
41  
42  import javax.security.auth.callback.Callback;
43  import javax.security.auth.callback.UnsupportedCallbackException;
44  import javax.security.sasl.AuthorizeCallback;
45  import javax.security.sasl.Sasl;
46  import javax.security.sasl.SaslServer;
47  
48  import org.apache.commons.cli.CommandLine;
49  import org.apache.commons.cli.Option;
50  import org.apache.commons.cli.OptionGroup;
51  import org.apache.commons.logging.Log;
52  import org.apache.commons.logging.LogFactory;
53  import org.apache.hadoop.conf.Configuration;
54  import org.apache.hadoop.hbase.HBaseConfiguration;
55  import org.apache.hadoop.hbase.HColumnDescriptor;
56  import org.apache.hadoop.hbase.HConstants;
57  import org.apache.hadoop.hbase.HRegionInfo;
58  import org.apache.hadoop.hbase.HRegionLocation;
59  import org.apache.hadoop.hbase.HTableDescriptor;
60  import org.apache.hadoop.hbase.KeyValue;
61  import org.apache.hadoop.hbase.MetaTableAccessor;
62  import org.apache.hadoop.hbase.ServerName;
63  import org.apache.hadoop.hbase.TableName;
64  import org.apache.hadoop.hbase.TableNotFoundException;
65  import org.apache.hadoop.hbase.classification.InterfaceAudience;
66  import org.apache.hadoop.hbase.client.Admin;
67  import org.apache.hadoop.hbase.client.Append;
68  import org.apache.hadoop.hbase.client.Delete;
69  import org.apache.hadoop.hbase.client.Durability;
70  import org.apache.hadoop.hbase.client.Get;
71  import org.apache.hadoop.hbase.client.Increment;
72  import org.apache.hadoop.hbase.client.OperationWithAttributes;
73  import org.apache.hadoop.hbase.client.Put;
74  import org.apache.hadoop.hbase.client.RegionLocator;
75  import org.apache.hadoop.hbase.client.Result;
76  import org.apache.hadoop.hbase.client.ResultScanner;
77  import org.apache.hadoop.hbase.client.Scan;
78  import org.apache.hadoop.hbase.client.Table;
79  import org.apache.hadoop.hbase.filter.Filter;
80  import org.apache.hadoop.hbase.filter.ParseFilter;
81  import org.apache.hadoop.hbase.filter.PrefixFilter;
82  import org.apache.hadoop.hbase.filter.WhileMatchFilter;
83  import org.apache.hadoop.hbase.security.SecurityUtil;
84  import org.apache.hadoop.hbase.security.UserProvider;
85  import org.apache.hadoop.hbase.thrift.CallQueue.Call;
86  import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
87  import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
88  import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
89  import org.apache.hadoop.hbase.thrift.generated.Hbase;
90  import org.apache.hadoop.hbase.thrift.generated.IOError;
91  import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
92  import org.apache.hadoop.hbase.thrift.generated.Mutation;
93  import org.apache.hadoop.hbase.thrift.generated.TAppend;
94  import org.apache.hadoop.hbase.thrift.generated.TCell;
95  import org.apache.hadoop.hbase.thrift.generated.TIncrement;
96  import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
97  import org.apache.hadoop.hbase.thrift.generated.TRowResult;
98  import org.apache.hadoop.hbase.thrift.generated.TScan;
99  import org.apache.hadoop.hbase.util.Bytes;
100 import org.apache.hadoop.hbase.util.ConnectionCache;
101 import org.apache.hadoop.hbase.util.DNS;
102 import org.apache.hadoop.hbase.util.JvmPauseMonitor;
103 import org.apache.hadoop.hbase.util.Strings;
104 import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
105 import org.apache.hadoop.security.UserGroupInformation;
106 import org.apache.hadoop.security.authorize.ProxyUsers;
107 import org.apache.thrift.TException;
108 import org.apache.thrift.TProcessor;
109 import org.apache.thrift.protocol.TBinaryProtocol;
110 import org.apache.thrift.protocol.TCompactProtocol;
111 import org.apache.thrift.protocol.TProtocol;
112 import org.apache.thrift.protocol.TProtocolFactory;
113 import org.apache.thrift.server.THsHaServer;
114 import org.apache.thrift.server.TNonblockingServer;
115 import org.apache.thrift.server.TServer;
116 import org.apache.thrift.server.TServlet;
117 import org.apache.thrift.server.TThreadedSelectorServer;
118 import org.apache.thrift.transport.TFramedTransport;
119 import org.apache.thrift.transport.TNonblockingServerSocket;
120 import org.apache.thrift.transport.TNonblockingServerTransport;
121 import org.apache.thrift.transport.TSaslServerTransport;
122 import org.apache.thrift.transport.TServerSocket;
123 import org.apache.thrift.transport.TServerTransport;
124 import org.apache.thrift.transport.TTransportFactory;
125 import org.mortbay.jetty.Connector;
126 import org.mortbay.jetty.Server;
127 import org.mortbay.jetty.nio.SelectChannelConnector;
128 import org.mortbay.jetty.security.SslSelectChannelConnector;
129 import org.mortbay.jetty.servlet.Context;
130 import org.mortbay.jetty.servlet.ServletHolder;
131 import org.mortbay.thread.QueuedThreadPool;
132 
133 import com.google.common.base.Joiner;
134 import com.google.common.base.Throwables;
135 import com.google.common.util.concurrent.ThreadFactoryBuilder;
136 
137 /**
138  * ThriftServerRunner - this class starts up a Thrift server which implements
139  * the Hbase API specified in the Hbase.thrift IDL file.
140  */
141 @InterfaceAudience.Private
142 public class ThriftServerRunner implements Runnable {
143 
144   private static final Log LOG = LogFactory.getLog(ThriftServerRunner.class);
145 
146   static final String SERVER_TYPE_CONF_KEY =
147       "hbase.regionserver.thrift.server.type";
148 
149   static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress";
150   static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
151   static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
152   static final String MAX_FRAME_SIZE_CONF_KEY = "hbase.regionserver.thrift.framed.max_frame_size_in_mb";
153   static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
154   static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
155   static final String USE_HTTP_CONF_KEY = "hbase.regionserver.thrift.http";
156   static final String HTTP_MIN_THREADS = "hbase.thrift.http_threads.min";
157   static final String HTTP_MAX_THREADS = "hbase.thrift.http_threads.max";
158 
159   static final String THRIFT_SSL_ENABLED = "hbase.thrift.ssl.enabled";
160   static final String THRIFT_SSL_KEYSTORE_STORE = "hbase.thrift.ssl.keystore.store";
161   static final String THRIFT_SSL_KEYSTORE_PASSWORD = "hbase.thrift.ssl.keystore.password";
162   static final String THRIFT_SSL_KEYSTORE_KEYPASSWORD = "hbase.thrift.ssl.keystore.keypassword";
163 
164   /**
165    * Amount of time in milliseconds before a server thread will timeout
166    * waiting for client to send data on a connected socket. Currently,
167    * applies only to TBoundedThreadPoolServer
168    */
169   public static final String THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY =
170     "hbase.thrift.server.socket.read.timeout";
171   public static final int THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT = 60000;
172 
173 
174   /**
175    * Thrift quality of protection configuration key. Valid values can be:
176    * auth-conf: authentication, integrity and confidentiality checking
177    * auth-int: authentication and integrity checking
178    * auth: authentication only
179    *
180    * This is used to authenticate the callers and support impersonation.
181    * The thrift server and the HBase cluster must run in secure mode.
182    */
183   static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
184   static final String BACKLOG_CONF_KEY = "hbase.regionserver.thrift.backlog";
185 
186   private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
187   public static final int DEFAULT_LISTEN_PORT = 9090;
188   public static final int HREGION_VERSION = 1;
189   static final String THRIFT_SUPPORT_PROXYUSER = "hbase.thrift.support.proxyuser";
190   private final int listenPort;
191 
192   private Configuration conf;
193   volatile TServer tserver;
194   volatile Server httpServer;
195   private final Hbase.Iface handler;
196   private final ThriftMetrics metrics;
197   private final HBaseHandler hbaseHandler;
198   private final UserGroupInformation realUser;
199 
200   private final String qop;
201   private String host;
202 
203   private final boolean securityEnabled;
204   private final boolean doAsEnabled;
205 
206   private final JvmPauseMonitor pauseMonitor;
207 
208   /** An enum of server implementation selections */
209   enum ImplType {
210     HS_HA("hsha", true, THsHaServer.class, true),
211     NONBLOCKING("nonblocking", true, TNonblockingServer.class, true),
212     THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true),
213     THREADED_SELECTOR(
214         "threadedselector", true, TThreadedSelectorServer.class, true);
215 
216     public static final ImplType DEFAULT = THREAD_POOL;
217 
218     final String option;
219     final boolean isAlwaysFramed;
220     final Class<? extends TServer> serverClass;
221     final boolean canSpecifyBindIP;
222 
223     ImplType(String option, boolean isAlwaysFramed,
224         Class<? extends TServer> serverClass, boolean canSpecifyBindIP) {
225       this.option = option;
226       this.isAlwaysFramed = isAlwaysFramed;
227       this.serverClass = serverClass;
228       this.canSpecifyBindIP = canSpecifyBindIP;
229     }
230 
231     /**
232      * @return <code>-option</code> so we can get the list of options from
233      *         {@link #values()}
234      */
235     @Override
236     public String toString() {
237       return "-" + option;
238     }
239 
240     String getDescription() {
241       StringBuilder sb = new StringBuilder("Use the " +
242           serverClass.getSimpleName());
243       if (isAlwaysFramed) {
244         sb.append(" This implies the framed transport.");
245       }
246       if (this == DEFAULT) {
247         sb.append("This is the default.");
248       }
249       return sb.toString();
250     }
251 
252     static OptionGroup createOptionGroup() {
253       OptionGroup group = new OptionGroup();
254       for (ImplType t : values()) {
255         group.addOption(new Option(t.option, t.getDescription()));
256       }
257       return group;
258     }
259 
260     static ImplType getServerImpl(Configuration conf) {
261       String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option);
262       for (ImplType t : values()) {
263         if (confType.equals(t.option)) {
264           return t;
265         }
266       }
267       throw new AssertionError("Unknown server ImplType.option:" + confType);
268     }
269 
270     static void setServerImpl(CommandLine cmd, Configuration conf) {
271       ImplType chosenType = null;
272       int numChosen = 0;
273       for (ImplType t : values()) {
274         if (cmd.hasOption(t.option)) {
275           chosenType = t;
276           ++numChosen;
277         }
278       }
279       if (numChosen < 1) {
280         LOG.info("Using default thrift server type");
281         chosenType = DEFAULT;
282       } else if (numChosen > 1) {
283         throw new AssertionError("Exactly one option out of " +
284           Arrays.toString(values()) + " has to be specified");
285       }
286       LOG.info("Using thrift server type " + chosenType.option);
287       conf.set(SERVER_TYPE_CONF_KEY, chosenType.option);
288     }
289 
290     public String simpleClassName() {
291       return serverClass.getSimpleName();
292     }
293 
294     public static List<String> serversThatCannotSpecifyBindIP() {
295       List<String> l = new ArrayList<String>();
296       for (ImplType t : values()) {
297         if (!t.canSpecifyBindIP) {
298           l.add(t.simpleClassName());
299         }
300       }
301       return l;
302     }
303 
304   }
305 
306   public ThriftServerRunner(Configuration conf) throws IOException {
307     UserProvider userProvider = UserProvider.instantiate(conf);
308     // login the server principal (if using secure Hadoop)
309     securityEnabled = userProvider.isHadoopSecurityEnabled()
310       && userProvider.isHBaseSecurityEnabled();
311     if (securityEnabled) {
312       host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
313         conf.get("hbase.thrift.dns.interface", "default"),
314         conf.get("hbase.thrift.dns.nameserver", "default")));
315       userProvider.login("hbase.thrift.keytab.file",
316         "hbase.thrift.kerberos.principal", host);
317     }
318     this.conf = HBaseConfiguration.create(conf);
319     this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
320     this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
321     this.pauseMonitor = new JvmPauseMonitor(conf, this.metrics.getSource());
322     this.hbaseHandler = new HBaseHandler(conf, userProvider);
323     this.hbaseHandler.initMetrics(metrics);
324     this.handler = HbaseHandlerMetricsProxy.newInstance(
325       hbaseHandler, metrics, conf);
326     this.realUser = userProvider.getCurrent().getUGI();
327     qop = conf.get(THRIFT_QOP_KEY);
328     doAsEnabled = conf.getBoolean(THRIFT_SUPPORT_PROXYUSER, false);
329     if (qop != null) {
330       if (!qop.equals("auth") && !qop.equals("auth-int")
331           && !qop.equals("auth-conf")) {
332         throw new IOException("Invalid " + THRIFT_QOP_KEY + ": " + qop
333           + ", it must be 'auth', 'auth-int', or 'auth-conf'");
334       }
335       if (!securityEnabled) {
336         throw new IOException("Thrift server must"
337           + " run in secure mode to support authentication");
338       }
339     }
340   }
341 
342   /*
343    * Runs the Thrift server
344    */
345   @Override
346   public void run() {
347     realUser.doAs(new PrivilegedAction<Object>() {
348       @Override
349       public Object run() {
350         try {
351           pauseMonitor.start();
352           if (conf.getBoolean(USE_HTTP_CONF_KEY, false)) {
353             setupHTTPServer();
354             httpServer.start();
355             httpServer.join();
356           } else {
357             setupServer();
358             tserver.serve();
359           }
360         } catch (Exception e) {
361           LOG.fatal("Cannot run ThriftServer", e);
362           // Crash the process if the ThriftServer is not running
363           System.exit(-1);
364         }
365         return null;
366       }
367     });
368 
369   }
370 
371   public void shutdown() {
372     if (pauseMonitor != null) {
373       pauseMonitor.stop();
374     }
375     if (tserver != null) {
376       tserver.stop();
377       tserver = null;
378     }
379     if (httpServer != null) {
380       try {
381         httpServer.stop();
382         httpServer = null;
383       } catch (Exception e) {
384         LOG.error("Problem encountered in shutting down HTTP server " + e.getCause());
385       }
386       httpServer = null;
387     }
388   }
389 
390   private void setupHTTPServer() throws IOException {
391     TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
392     TProcessor processor = new Hbase.Processor<Hbase.Iface>(handler);
393     TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, realUser,
394         conf, hbaseHandler, securityEnabled, doAsEnabled);
395 
396     httpServer = new Server();
397     // Context handler
398     Context context = new Context(httpServer, "/", Context.SESSIONS);
399     context.setContextPath("/");
400     String httpPath = "/*";
401     httpServer.setHandler(context);
402     context.addServlet(new ServletHolder(thriftHttpServlet), httpPath);
403 
404     // set up Jetty and run the embedded server
405     Connector connector = new SelectChannelConnector();
406     if(conf.getBoolean(THRIFT_SSL_ENABLED, false)) {
407       SslSelectChannelConnector sslConnector = new SslSelectChannelConnector();
408       String keystore = conf.get(THRIFT_SSL_KEYSTORE_STORE);
409       String password = HBaseConfiguration.getPassword(conf,
410           THRIFT_SSL_KEYSTORE_PASSWORD, null);
411       String keyPassword = HBaseConfiguration.getPassword(conf,
412           THRIFT_SSL_KEYSTORE_KEYPASSWORD, password);
413       sslConnector.setKeystore(keystore);
414       sslConnector.setPassword(password);
415       sslConnector.setKeyPassword(keyPassword);
416       connector = sslConnector;
417     }
418     String host = getBindAddress(conf).getHostAddress();
419     connector.setPort(listenPort);
420     connector.setHost(host);
421     connector.setHeaderBufferSize(1024 * 64);
422     httpServer.addConnector(connector);
423 
424     if (doAsEnabled) {
425       ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
426     }
427 
428     // Set the default max thread number to 100 to limit
429     // the number of concurrent requests so that Thrfit HTTP server doesn't OOM easily.
430     // Jetty set the default max thread number to 250, if we don't set it.
431     //
432     // Our default min thread number 2 is the same as that used by Jetty.
433     int minThreads = conf.getInt(HTTP_MIN_THREADS, 2);
434     int maxThreads = conf.getInt(HTTP_MAX_THREADS, 100);
435     QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads);
436     threadPool.setMinThreads(minThreads);
437     httpServer.setThreadPool(threadPool);
438 
439     httpServer.setSendServerVersion(false);
440     httpServer.setSendDateHeader(false);
441     httpServer.setStopAtShutdown(true);
442 
443     LOG.info("Starting Thrift HTTP Server on " + Integer.toString(listenPort));
444   }
445 
446   /**
447    * Setting up the thrift TServer
448    */
449   private void setupServer() throws Exception {
450     // Construct correct ProtocolFactory
451     TProtocolFactory protocolFactory;
452     if (conf.getBoolean(COMPACT_CONF_KEY, false)) {
453       LOG.debug("Using compact protocol");
454       protocolFactory = new TCompactProtocol.Factory();
455     } else {
456       LOG.debug("Using binary protocol");
457       protocolFactory = new TBinaryProtocol.Factory();
458     }
459 
460     final TProcessor p = new Hbase.Processor<Hbase.Iface>(handler);
461     ImplType implType = ImplType.getServerImpl(conf);
462     TProcessor processor = p;
463 
464     // Construct correct TransportFactory
465     TTransportFactory transportFactory;
466     if (conf.getBoolean(FRAMED_CONF_KEY, false) || implType.isAlwaysFramed) {
467       if (qop != null) {
468         throw new RuntimeException("Thrift server authentication"
469           + " doesn't work with framed transport yet");
470       }
471       transportFactory = new TFramedTransport.Factory(
472           conf.getInt(MAX_FRAME_SIZE_CONF_KEY, 2)  * 1024 * 1024);
473       LOG.debug("Using framed transport");
474     } else if (qop == null) {
475       transportFactory = new TTransportFactory();
476     } else {
477       // Extract the name from the principal
478       String name = SecurityUtil.getUserFromPrincipal(
479         conf.get("hbase.thrift.kerberos.principal"));
480       Map<String, String> saslProperties = new HashMap<String, String>();
481       saslProperties.put(Sasl.QOP, qop);
482       TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
483       saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
484         new SaslGssCallbackHandler() {
485           @Override
486           public void handle(Callback[] callbacks)
487               throws UnsupportedCallbackException {
488             AuthorizeCallback ac = null;
489             for (Callback callback : callbacks) {
490               if (callback instanceof AuthorizeCallback) {
491                 ac = (AuthorizeCallback) callback;
492               } else {
493                 throw new UnsupportedCallbackException(callback,
494                     "Unrecognized SASL GSSAPI Callback");
495               }
496             }
497             if (ac != null) {
498               String authid = ac.getAuthenticationID();
499               String authzid = ac.getAuthorizationID();
500               if (!authid.equals(authzid)) {
501                 ac.setAuthorized(false);
502               } else {
503                 ac.setAuthorized(true);
504                 String userName = SecurityUtil.getUserFromPrincipal(authzid);
505                 LOG.info("Effective user: " + userName);
506                 ac.setAuthorizedID(userName);
507               }
508             }
509           }
510         });
511       transportFactory = saslFactory;
512 
513       // Create a processor wrapper, to get the caller
514       processor = new TProcessor() {
515         @Override
516         public boolean process(TProtocol inProt,
517             TProtocol outProt) throws TException {
518           TSaslServerTransport saslServerTransport =
519             (TSaslServerTransport)inProt.getTransport();
520           SaslServer saslServer = saslServerTransport.getSaslServer();
521           String principal = saslServer.getAuthorizationID();
522           hbaseHandler.setEffectiveUser(principal);
523           return p.process(inProt, outProt);
524         }
525       };
526     }
527 
528     if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
529       LOG.error("Server types " + Joiner.on(", ").join(
530           ImplType.serversThatCannotSpecifyBindIP()) + " don't support IP " +
531           "address binding at the moment. See " +
532           "https://issues.apache.org/jira/browse/HBASE-2155 for details.");
533       throw new RuntimeException(
534           "-" + BIND_CONF_KEY + " not supported with " + implType);
535     }
536 
537     // Thrift's implementation uses '0' as a placeholder for 'use the default.'
538     int backlog = conf.getInt(BACKLOG_CONF_KEY, 0);
539 
540     if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING ||
541         implType == ImplType.THREADED_SELECTOR) {
542       InetAddress listenAddress = getBindAddress(conf);
543       TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(
544           new InetSocketAddress(listenAddress, listenPort));
545 
546       if (implType == ImplType.NONBLOCKING) {
547         TNonblockingServer.Args serverArgs =
548             new TNonblockingServer.Args(serverTransport);
549         serverArgs.processor(processor)
550                   .transportFactory(transportFactory)
551                   .protocolFactory(protocolFactory);
552         tserver = new TNonblockingServer(serverArgs);
553       } else if (implType == ImplType.HS_HA) {
554         THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
555         CallQueue callQueue =
556             new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
557         ExecutorService executorService = createExecutor(
558             callQueue, serverArgs.getMinWorkerThreads(), serverArgs.getMaxWorkerThreads());
559         serverArgs.executorService(executorService)
560                   .processor(processor)
561                   .transportFactory(transportFactory)
562                   .protocolFactory(protocolFactory);
563         tserver = new THsHaServer(serverArgs);
564       } else { // THREADED_SELECTOR
565         TThreadedSelectorServer.Args serverArgs =
566             new HThreadedSelectorServerArgs(serverTransport, conf);
567         CallQueue callQueue =
568             new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
569         ExecutorService executorService = createExecutor(
570             callQueue, serverArgs.getWorkerThreads(), serverArgs.getWorkerThreads());
571         serverArgs.executorService(executorService)
572                   .processor(processor)
573                   .transportFactory(transportFactory)
574                   .protocolFactory(protocolFactory);
575         tserver = new TThreadedSelectorServer(serverArgs);
576       }
577       LOG.info("starting HBase " + implType.simpleClassName() +
578           " server on " + Integer.toString(listenPort));
579     } else if (implType == ImplType.THREAD_POOL) {
580       // Thread pool server. Get the IP address to bind to.
581       InetAddress listenAddress = getBindAddress(conf);
582       int readTimeout = conf.getInt(THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY,
583           THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT);
584       TServerTransport serverTransport = new TServerSocket(
585           new TServerSocket.ServerSocketTransportArgs().
586               bindAddr(new InetSocketAddress(listenAddress, listenPort)).
587               backlog(backlog).
588               clientTimeout(readTimeout));
589 
590       TBoundedThreadPoolServer.Args serverArgs =
591           new TBoundedThreadPoolServer.Args(serverTransport, conf);
592       serverArgs.processor(processor)
593                 .transportFactory(transportFactory)
594                 .protocolFactory(protocolFactory);
595       LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
596           + listenAddress + ":" + Integer.toString(listenPort)
597           + " with readTimeout " + readTimeout + "ms; " + serverArgs);
598       TBoundedThreadPoolServer tserver =
599           new TBoundedThreadPoolServer(serverArgs, metrics);
600       this.tserver = tserver;
601     } else {
602       throw new AssertionError("Unsupported Thrift server implementation: " +
603           implType.simpleClassName());
604     }
605 
606     // A sanity check that we instantiated the right type of server.
607     if (tserver.getClass() != implType.serverClass) {
608       throw new AssertionError("Expected to create Thrift server class " +
609           implType.serverClass.getName() + " but got " +
610           tserver.getClass().getName());
611     }
612 
613 
614 
615     registerFilters(conf);
616   }
617 
618   ExecutorService createExecutor(BlockingQueue<Runnable> callQueue,
619                                  int minWorkers, int maxWorkers) {
620     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
621     tfb.setDaemon(true);
622     tfb.setNameFormat("thrift-worker-%d");
623     return new ThreadPoolExecutor(minWorkers, maxWorkers,
624             Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
625   }
626 
627   private InetAddress getBindAddress(Configuration conf)
628       throws UnknownHostException {
629     String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
630     return InetAddress.getByName(bindAddressStr);
631   }
632 
633   protected static class ResultScannerWrapper {
634 
635     private final ResultScanner scanner;
636     private final boolean sortColumns;
637     public ResultScannerWrapper(ResultScanner resultScanner,
638                                 boolean sortResultColumns) {
639       scanner = resultScanner;
640       sortColumns = sortResultColumns;
641    }
642 
643     public ResultScanner getScanner() {
644       return scanner;
645     }
646 
647     public boolean isColumnSorted() {
648       return sortColumns;
649     }
650   }
651 
652   /**
653    * The HBaseHandler is a glue object that connects Thrift RPC calls to the
654    * HBase client API primarily defined in the Admin and Table objects.
655    */
656   public static class HBaseHandler implements Hbase.Iface {
657     protected Configuration conf;
658     protected static final Log LOG = LogFactory.getLog(HBaseHandler.class);
659 
660     // nextScannerId and scannerMap are used to manage scanner state
661     protected int nextScannerId = 0;
662     protected HashMap<Integer, ResultScannerWrapper> scannerMap = null;
663     private ThriftMetrics metrics = null;
664 
665     private final ConnectionCache connectionCache;
666     IncrementCoalescer coalescer = null;
667 
668     static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
669     static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
670 
671     /**
672      * Returns a list of all the column families for a given Table.
673      *
674      * @param table
675      * @throws IOException
676      */
677     byte[][] getAllColumns(Table table) throws IOException {
678       HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies();
679       byte[][] columns = new byte[cds.length][];
680       for (int i = 0; i < cds.length; i++) {
681         columns[i] = Bytes.add(cds[i].getName(),
682             KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
683       }
684       return columns;
685     }
686 
687     /**
688      * Creates and returns a Table instance from a given table name.
689      *
690      * @param tableName
691      *          name of table
692      * @return Table object
693      * @throws IOException
694      */
695     public Table getTable(final byte[] tableName) throws
696         IOException {
697       String table = Bytes.toString(tableName);
698       return connectionCache.getTable(table);
699     }
700 
701     public Table getTable(final ByteBuffer tableName) throws IOException {
702       return getTable(getBytes(tableName));
703     }
704 
705     /**
706      * Assigns a unique ID to the scanner and adds the mapping to an internal
707      * hash-map.
708      *
709      * @param scanner
710      * @return integer scanner id
711      */
712     protected synchronized int addScanner(ResultScanner scanner,boolean sortColumns) {
713       int id = nextScannerId++;
714       ResultScannerWrapper resultScannerWrapper = new ResultScannerWrapper(scanner, sortColumns);
715       scannerMap.put(id, resultScannerWrapper);
716       return id;
717     }
718 
719     /**
720      * Returns the scanner associated with the specified ID.
721      *
722      * @param id
723      * @return a Scanner, or null if ID was invalid.
724      */
725     protected synchronized ResultScannerWrapper getScanner(int id) {
726       return scannerMap.get(id);
727     }
728 
729     /**
730      * Removes the scanner associated with the specified ID from the internal
731      * id-&gt;scanner hash-map.
732      *
733      * @param id
734      * @return a Scanner, or null if ID was invalid.
735      */
736     protected synchronized ResultScannerWrapper removeScanner(int id) {
737       return scannerMap.remove(id);
738     }
739 
740     protected HBaseHandler(final Configuration c,
741         final UserProvider userProvider) throws IOException {
742       this.conf = c;
743       scannerMap = new HashMap<Integer, ResultScannerWrapper>();
744       this.coalescer = new IncrementCoalescer(this);
745 
746       int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
747       int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
748       connectionCache = new ConnectionCache(
749         conf, userProvider, cleanInterval, maxIdleTime);
750     }
751 
752     /**
753      * Obtain HBaseAdmin. Creates the instance if it is not already created.
754      */
755     private Admin getAdmin() throws IOException {
756       return connectionCache.getAdmin();
757     }
758 
759     void setEffectiveUser(String effectiveUser) {
760       connectionCache.setEffectiveUser(effectiveUser);
761     }
762 
763     @Override
764     public void enableTable(ByteBuffer tableName) throws IOError {
765       try{
766         getAdmin().enableTable(getTableName(tableName));
767       } catch (IOException e) {
768         LOG.warn(e.getMessage(), e);
769         throw new IOError(Throwables.getStackTraceAsString(e));
770       }
771     }
772 
773     @Override
774     public void disableTable(ByteBuffer tableName) throws IOError{
775       try{
776         getAdmin().disableTable(getTableName(tableName));
777       } catch (IOException e) {
778         LOG.warn(e.getMessage(), e);
779         throw new IOError(Throwables.getStackTraceAsString(e));
780       }
781     }
782 
783     @Override
784     public boolean isTableEnabled(ByteBuffer tableName) throws IOError {
785       try {
786         return this.connectionCache.getAdmin().isTableEnabled(getTableName(tableName));
787       } catch (IOException e) {
788         LOG.warn(e.getMessage(), e);
789         throw new IOError(Throwables.getStackTraceAsString(e));
790       }
791     }
792 
793     // ThriftServerRunner.compact should be deprecated and replaced with methods specific to
794     // table and region.
795     @Override
796     public void compact(ByteBuffer tableNameOrRegionName) throws IOError {
797       try {
798         try {
799           getAdmin().compactRegion(getBytes(tableNameOrRegionName));
800         } catch (IllegalArgumentException e) {
801           // Invalid region, try table
802           getAdmin().compact(TableName.valueOf(getBytes(tableNameOrRegionName)));
803         }
804       } catch (IOException e) {
805         LOG.warn(e.getMessage(), e);
806         throw new IOError(Throwables.getStackTraceAsString(e));
807       }
808     }
809 
810     // ThriftServerRunner.majorCompact should be deprecated and replaced with methods specific
811     // to table and region.
812     @Override
813     public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError {
814       try {
815         try {
816           getAdmin().compactRegion(getBytes(tableNameOrRegionName));
817         } catch (IllegalArgumentException e) {
818           // Invalid region, try table
819           getAdmin().compact(TableName.valueOf(getBytes(tableNameOrRegionName)));
820         }
821       } catch (IOException e) {
822         LOG.warn(e.getMessage(), e);
823         throw new IOError(Throwables.getStackTraceAsString(e));
824       }
825     }
826 
827     @Override
828     public List<ByteBuffer> getTableNames() throws IOError {
829       try {
830         TableName[] tableNames = this.getAdmin().listTableNames();
831         ArrayList<ByteBuffer> list = new ArrayList<ByteBuffer>(tableNames.length);
832         for (int i = 0; i < tableNames.length; i++) {
833           list.add(ByteBuffer.wrap(tableNames[i].getName()));
834         }
835         return list;
836       } catch (IOException e) {
837         LOG.warn(e.getMessage(), e);
838         throw new IOError(Throwables.getStackTraceAsString(e));
839       }
840     }
841 
842     /**
843      * @return the list of regions in the given table, or an empty list if the table does not exist
844      */
845     @Override
846     public List<TRegionInfo> getTableRegions(ByteBuffer tableName)
847     throws IOError {
848       try (RegionLocator locator = connectionCache.getRegionLocator(getBytes(tableName))) {
849         List<HRegionLocation> regionLocations = locator.getAllRegionLocations();
850         List<TRegionInfo> results = new ArrayList<TRegionInfo>();
851         for (HRegionLocation regionLocation : regionLocations) {
852           HRegionInfo info = regionLocation.getRegionInfo();
853           ServerName serverName = regionLocation.getServerName();
854           TRegionInfo region = new TRegionInfo();
855           region.serverName = ByteBuffer.wrap(
856               Bytes.toBytes(serverName.getHostname()));
857           region.port = serverName.getPort();
858           region.startKey = ByteBuffer.wrap(info.getStartKey());
859           region.endKey = ByteBuffer.wrap(info.getEndKey());
860           region.id = info.getRegionId();
861           region.name = ByteBuffer.wrap(info.getRegionName());
862           region.version = HREGION_VERSION; // HRegion now not versioned, PB encoding used
863           results.add(region);
864         }
865         return results;
866       } catch (TableNotFoundException e) {
867         // Return empty list for non-existing table
868         return Collections.emptyList();
869       } catch (IOException e){
870         LOG.warn(e.getMessage(), e);
871         throw new IOError(Throwables.getStackTraceAsString(e));
872       }
873     }
874 
875     @Override
876     public List<TCell> get(
877         ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
878         Map<ByteBuffer, ByteBuffer> attributes)
879         throws IOError {
880       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
881       if (famAndQf.length == 1) {
882         return get(tableName, row, famAndQf[0], null, attributes);
883       }
884       if (famAndQf.length == 2) {
885         return get(tableName, row, famAndQf[0], famAndQf[1], attributes);
886       }
887       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
888     }
889 
890     /**
891      * Note: this internal interface is slightly different from public APIs in regard to handling
892      * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
893      * we respect qual == null as a request for the entire column family. The caller (
894      * {@link #get(ByteBuffer, ByteBuffer, ByteBuffer, Map)}) interface IS consistent in that the
895      * column is parse like normal.
896      */
897     protected List<TCell> get(ByteBuffer tableName,
898                               ByteBuffer row,
899                               byte[] family,
900                               byte[] qualifier,
901                               Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
902       Table table = null;
903       try {
904         table = getTable(tableName);
905         Get get = new Get(getBytes(row));
906         addAttributes(get, attributes);
907         if (qualifier == null) {
908           get.addFamily(family);
909         } else {
910           get.addColumn(family, qualifier);
911         }
912         Result result = table.get(get);
913         return ThriftUtilities.cellFromHBase(result.rawCells());
914       } catch (IOException e) {
915         LOG.warn(e.getMessage(), e);
916         throw new IOError(Throwables.getStackTraceAsString(e));
917       } finally {
918         closeTable(table);
919       }
920     }
921 
922     @Override
923     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
924         int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
925       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
926       if(famAndQf.length == 1) {
927         return getVer(tableName, row, famAndQf[0], null, numVersions, attributes);
928       }
929       if (famAndQf.length == 2) {
930         return getVer(tableName, row, famAndQf[0], famAndQf[1], numVersions, attributes);
931       }
932       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
933 
934     }
935 
936     /**
937      * Note: this public interface is slightly different from public Java APIs in regard to
938      * handling of the qualifier. Here we differ from the public Java API in that null != byte[0].
939      * Rather, we respect qual == null as a request for the entire column family. If you want to
940      * access the entire column family, use
941      * {@link #getVer(ByteBuffer, ByteBuffer, ByteBuffer, int, Map)} with a {@code column} value
942      * that lacks a {@code ':'}.
943      */
944     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family,
945         byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
946       
947       Table table = null;
948       try {
949         table = getTable(tableName);
950         Get get = new Get(getBytes(row));
951         addAttributes(get, attributes);
952         if (null == qualifier) {
953           get.addFamily(family);
954         } else {
955           get.addColumn(family, qualifier);
956         }
957         get.setMaxVersions(numVersions);
958         Result result = table.get(get);
959         return ThriftUtilities.cellFromHBase(result.rawCells());
960       } catch (IOException e) {
961         LOG.warn(e.getMessage(), e);
962         throw new IOError(Throwables.getStackTraceAsString(e));
963       } finally{
964         closeTable(table);
965       }
966     }
967 
968     @Override
969     public List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
970         long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
971       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
972       if (famAndQf.length == 1) {
973         return getVerTs(tableName, row, famAndQf[0], null, timestamp, numVersions, attributes);
974       }
975       if (famAndQf.length == 2) {
976         return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, numVersions,
977           attributes);
978       }
979       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
980     }
981 
982     /**
983      * Note: this internal interface is slightly different from public APIs in regard to handling
984      * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
985      * we respect qual == null as a request for the entire column family. The caller (
986      * {@link #getVerTs(ByteBuffer, ByteBuffer, ByteBuffer, long, int, Map)}) interface IS
987      * consistent in that the column is parse like normal.
988      */
989     protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
990         byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
991         throws IOError {
992       
993       Table table = null;
994       try {
995         table = getTable(tableName);
996         Get get = new Get(getBytes(row));
997         addAttributes(get, attributes);
998         if (null == qualifier) {
999           get.addFamily(family);
1000         } else {
1001           get.addColumn(family, qualifier);
1002         }
1003         get.setTimeRange(0, timestamp);
1004         get.setMaxVersions(numVersions);
1005         Result result = table.get(get);
1006         return ThriftUtilities.cellFromHBase(result.rawCells());
1007       } catch (IOException e) {
1008         LOG.warn(e.getMessage(), e);
1009         throw new IOError(Throwables.getStackTraceAsString(e));
1010       } finally{
1011         closeTable(table);
1012       }
1013     }
1014 
1015     @Override
1016     public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row,
1017         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1018       return getRowWithColumnsTs(tableName, row, null,
1019                                  HConstants.LATEST_TIMESTAMP,
1020                                  attributes);
1021     }
1022 
1023     @Override
1024     public List<TRowResult> getRowWithColumns(ByteBuffer tableName,
1025                                               ByteBuffer row,
1026         List<ByteBuffer> columns,
1027         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1028       return getRowWithColumnsTs(tableName, row, columns,
1029                                  HConstants.LATEST_TIMESTAMP,
1030                                  attributes);
1031     }
1032 
1033     @Override
1034     public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row,
1035         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1036       return getRowWithColumnsTs(tableName, row, null,
1037                                  timestamp, attributes);
1038     }
1039 
1040     @Override
1041     public List<TRowResult> getRowWithColumnsTs(
1042         ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
1043         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1044       
1045       Table table = null;
1046       try {
1047         table = getTable(tableName);
1048         if (columns == null) {
1049           Get get = new Get(getBytes(row));
1050           addAttributes(get, attributes);
1051           get.setTimeRange(0, timestamp);
1052           Result result = table.get(get);
1053           return ThriftUtilities.rowResultFromHBase(result);
1054         }
1055         Get get = new Get(getBytes(row));
1056         addAttributes(get, attributes);
1057         for(ByteBuffer column : columns) {
1058           byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1059           if (famAndQf.length == 1) {
1060               get.addFamily(famAndQf[0]);
1061           } else {
1062               get.addColumn(famAndQf[0], famAndQf[1]);
1063           }
1064         }
1065         get.setTimeRange(0, timestamp);
1066         Result result = table.get(get);
1067         return ThriftUtilities.rowResultFromHBase(result);
1068       } catch (IOException e) {
1069         LOG.warn(e.getMessage(), e);
1070         throw new IOError(Throwables.getStackTraceAsString(e));
1071       } finally{
1072         closeTable(table);
1073       }
1074     }
1075 
1076     @Override
1077     public List<TRowResult> getRows(ByteBuffer tableName,
1078                                     List<ByteBuffer> rows,
1079         Map<ByteBuffer, ByteBuffer> attributes)
1080         throws IOError {
1081       return getRowsWithColumnsTs(tableName, rows, null,
1082                                   HConstants.LATEST_TIMESTAMP,
1083                                   attributes);
1084     }
1085 
1086     @Override
1087     public List<TRowResult> getRowsWithColumns(ByteBuffer tableName,
1088                                                List<ByteBuffer> rows,
1089         List<ByteBuffer> columns,
1090         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1091       return getRowsWithColumnsTs(tableName, rows, columns,
1092                                   HConstants.LATEST_TIMESTAMP,
1093                                   attributes);
1094     }
1095 
1096     @Override
1097     public List<TRowResult> getRowsTs(ByteBuffer tableName,
1098                                       List<ByteBuffer> rows,
1099         long timestamp,
1100         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1101       return getRowsWithColumnsTs(tableName, rows, null,
1102                                   timestamp, attributes);
1103     }
1104 
1105     @Override
1106     public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName,
1107                                                  List<ByteBuffer> rows,
1108         List<ByteBuffer> columns, long timestamp,
1109         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1110       
1111       Table table= null;
1112       try {
1113         List<Get> gets = new ArrayList<Get>(rows.size());
1114         table = getTable(tableName);
1115         if (metrics != null) {
1116           metrics.incNumRowKeysInBatchGet(rows.size());
1117         }
1118         for (ByteBuffer row : rows) {
1119           Get get = new Get(getBytes(row));
1120           addAttributes(get, attributes);
1121           if (columns != null) {
1122 
1123             for(ByteBuffer column : columns) {
1124               byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1125               if (famAndQf.length == 1) {
1126                 get.addFamily(famAndQf[0]);
1127               } else {
1128                 get.addColumn(famAndQf[0], famAndQf[1]);
1129               }
1130             }
1131           }
1132           get.setTimeRange(0, timestamp);
1133           gets.add(get);
1134         }
1135         Result[] result = table.get(gets);
1136         return ThriftUtilities.rowResultFromHBase(result);
1137       } catch (IOException e) {
1138         LOG.warn(e.getMessage(), e);
1139         throw new IOError(Throwables.getStackTraceAsString(e));
1140       } finally{
1141         closeTable(table);
1142       }
1143     }
1144 
1145     @Override
1146     public void deleteAll(
1147         ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1148         Map<ByteBuffer, ByteBuffer> attributes)
1149         throws IOError {
1150       deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP,
1151                   attributes);
1152     }
1153 
1154     @Override
1155     public void deleteAllTs(ByteBuffer tableName,
1156                             ByteBuffer row,
1157                             ByteBuffer column,
1158         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1159       Table table = null;
1160       try {
1161         table = getTable(tableName);
1162         Delete delete  = new Delete(getBytes(row));
1163         addAttributes(delete, attributes);
1164         byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1165         if (famAndQf.length == 1) {
1166           delete.addFamily(famAndQf[0], timestamp);
1167         } else {
1168           delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
1169         }
1170         table.delete(delete);
1171 
1172       } catch (IOException e) {
1173         LOG.warn(e.getMessage(), e);
1174         throw new IOError(Throwables.getStackTraceAsString(e));
1175       } finally {
1176         closeTable(table);
1177       }
1178     }
1179 
1180     @Override
1181     public void deleteAllRow(
1182         ByteBuffer tableName, ByteBuffer row,
1183         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1184       deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, attributes);
1185     }
1186 
1187     @Override
1188     public void deleteAllRowTs(
1189         ByteBuffer tableName, ByteBuffer row, long timestamp,
1190         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1191       Table table = null;
1192       try {
1193         table = getTable(tableName);
1194         Delete delete  = new Delete(getBytes(row), timestamp);
1195         addAttributes(delete, attributes);
1196         table.delete(delete);
1197       } catch (IOException e) {
1198         LOG.warn(e.getMessage(), e);
1199         throw new IOError(Throwables.getStackTraceAsString(e));
1200       } finally {
1201         closeTable(table);
1202       }
1203     }
1204 
1205     @Override
1206     public void createTable(ByteBuffer in_tableName,
1207         List<ColumnDescriptor> columnFamilies) throws IOError,
1208         IllegalArgument, AlreadyExists {
1209       TableName tableName = getTableName(in_tableName);
1210       try {
1211         if (getAdmin().tableExists(tableName)) {
1212           throw new AlreadyExists("table name already in use");
1213         }
1214         HTableDescriptor desc = new HTableDescriptor(tableName);
1215         for (ColumnDescriptor col : columnFamilies) {
1216           HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col);
1217           desc.addFamily(colDesc);
1218         }
1219         getAdmin().createTable(desc);
1220       } catch (IOException e) {
1221         LOG.warn(e.getMessage(), e);
1222         throw new IOError(Throwables.getStackTraceAsString(e));
1223       } catch (IllegalArgumentException e) {
1224         LOG.warn(e.getMessage(), e);
1225         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1226       }
1227     }
1228 
1229     private static TableName getTableName(ByteBuffer buffer) {
1230       return TableName.valueOf(getBytes(buffer));
1231     }
1232 
1233     @Override
1234     public void deleteTable(ByteBuffer in_tableName) throws IOError {
1235       TableName tableName = getTableName(in_tableName);
1236       if (LOG.isDebugEnabled()) {
1237         LOG.debug("deleteTable: table=" + tableName);
1238       }
1239       try {
1240         if (!getAdmin().tableExists(tableName)) {
1241           throw new IOException("table does not exist");
1242         }
1243         getAdmin().deleteTable(tableName);
1244       } catch (IOException e) {
1245         LOG.warn(e.getMessage(), e);
1246         throw new IOError(Throwables.getStackTraceAsString(e));
1247       }
1248     }
1249 
1250     @Override
1251     public void mutateRow(ByteBuffer tableName, ByteBuffer row,
1252         List<Mutation> mutations, Map<ByteBuffer, ByteBuffer> attributes)
1253         throws IOError, IllegalArgument {
1254       mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP,
1255                   attributes);
1256     }
1257 
1258     @Override
1259     public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
1260         List<Mutation> mutations, long timestamp,
1261         Map<ByteBuffer, ByteBuffer> attributes)
1262         throws IOError, IllegalArgument {
1263       Table table = null;
1264       try {
1265         table = getTable(tableName);
1266         Put put = new Put(getBytes(row), timestamp);
1267         addAttributes(put, attributes);
1268 
1269         Delete delete = new Delete(getBytes(row));
1270         addAttributes(delete, attributes);
1271         if (metrics != null) {
1272           metrics.incNumRowKeysInBatchMutate(mutations.size());
1273         }
1274 
1275         // I apologize for all this mess :)
1276         for (Mutation m : mutations) {
1277           byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1278           if (m.isDelete) {
1279             if (famAndQf.length == 1) {
1280               delete.addFamily(famAndQf[0], timestamp);
1281             } else {
1282               delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
1283             }
1284             delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1285                 : Durability.SKIP_WAL);
1286           } else {
1287             if(famAndQf.length == 1) {
1288               LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1289                   + "over the whole column family.");
1290             } else {
1291               put.addImmutable(famAndQf[0], famAndQf[1],
1292                   m.value != null ? getBytes(m.value)
1293                       : HConstants.EMPTY_BYTE_ARRAY);
1294             }
1295             put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1296           }
1297         }
1298         if (!delete.isEmpty())
1299           table.delete(delete);
1300         if (!put.isEmpty())
1301           table.put(put);
1302       } catch (IOException e) {
1303         LOG.warn(e.getMessage(), e);
1304         throw new IOError(Throwables.getStackTraceAsString(e));
1305       } catch (IllegalArgumentException e) {
1306         LOG.warn(e.getMessage(), e);
1307         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1308       } finally{
1309         closeTable(table);
1310       }
1311     }
1312 
1313     @Override
1314     public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches,
1315         Map<ByteBuffer, ByteBuffer> attributes)
1316         throws IOError, IllegalArgument, TException {
1317       mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes);
1318     }
1319 
1320     @Override
1321     public void mutateRowsTs(
1322         ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp,
1323         Map<ByteBuffer, ByteBuffer> attributes)
1324         throws IOError, IllegalArgument, TException {
1325       List<Put> puts = new ArrayList<Put>();
1326       List<Delete> deletes = new ArrayList<Delete>();
1327 
1328       for (BatchMutation batch : rowBatches) {
1329         byte[] row = getBytes(batch.row);
1330         List<Mutation> mutations = batch.mutations;
1331         Delete delete = new Delete(row);
1332         addAttributes(delete, attributes);
1333         Put put = new Put(row, timestamp);
1334         addAttributes(put, attributes);
1335         for (Mutation m : mutations) {
1336           byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1337           if (m.isDelete) {
1338             // no qualifier, family only.
1339             if (famAndQf.length == 1) {
1340               delete.addFamily(famAndQf[0], timestamp);
1341             } else {
1342               delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
1343             }
1344             delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1345                 : Durability.SKIP_WAL);
1346           } else {
1347             if (famAndQf.length == 1) {
1348               LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1349                   + "over the whole column family.");
1350             }
1351             if (famAndQf.length == 2) {
1352               put.addImmutable(famAndQf[0], famAndQf[1],
1353                   m.value != null ? getBytes(m.value)
1354                       : HConstants.EMPTY_BYTE_ARRAY);
1355             } else {
1356               throw new IllegalArgumentException("Invalid famAndQf provided.");
1357             }
1358             put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1359           }
1360         }
1361         if (!delete.isEmpty())
1362           deletes.add(delete);
1363         if (!put.isEmpty())
1364           puts.add(put);
1365       }
1366 
1367       Table table = null;
1368       try {
1369         table = getTable(tableName);
1370         if (!puts.isEmpty())
1371           table.put(puts);
1372         if (!deletes.isEmpty())
1373           table.delete(deletes);
1374 
1375       } catch (IOException e) {
1376         LOG.warn(e.getMessage(), e);
1377         throw new IOError(Throwables.getStackTraceAsString(e));
1378       } catch (IllegalArgumentException e) {
1379         LOG.warn(e.getMessage(), e);
1380         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1381       } finally{
1382         closeTable(table);
1383       }
1384     }
1385 
1386     @Override
1387     public long atomicIncrement(
1388         ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount)
1389             throws IOError, IllegalArgument, TException {
1390       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1391       if(famAndQf.length == 1) {
1392         return atomicIncrement(tableName, row, famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, amount);
1393       }
1394       return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
1395     }
1396 
1397     protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
1398         byte [] family, byte [] qualifier, long amount)
1399         throws IOError, IllegalArgument, TException {
1400       Table table = null;
1401       try {
1402         table = getTable(tableName);
1403         return table.incrementColumnValue(
1404             getBytes(row), family, qualifier, amount);
1405       } catch (IOException e) {
1406         LOG.warn(e.getMessage(), e);
1407         throw new IOError(Throwables.getStackTraceAsString(e));
1408       } finally {
1409         closeTable(table);
1410       }
1411     }
1412 
1413     @Override
1414     public void scannerClose(int id) throws IOError, IllegalArgument {
1415       LOG.debug("scannerClose: id=" + id);
1416       ResultScannerWrapper resultScannerWrapper = getScanner(id);
1417       if (resultScannerWrapper == null) {
1418         String message = "scanner ID is invalid";
1419         LOG.warn(message);
1420         throw new IllegalArgument("scanner ID is invalid");
1421       }
1422       resultScannerWrapper.getScanner().close();
1423       removeScanner(id);
1424     }
1425 
1426     @Override
1427     public List<TRowResult> scannerGetList(int id,int nbRows)
1428         throws IllegalArgument, IOError {
1429       LOG.debug("scannerGetList: id=" + id);
1430       ResultScannerWrapper resultScannerWrapper = getScanner(id);
1431       if (null == resultScannerWrapper) {
1432         String message = "scanner ID is invalid";
1433         LOG.warn(message);
1434         throw new IllegalArgument("scanner ID is invalid");
1435       }
1436 
1437       Result [] results = null;
1438       try {
1439         results = resultScannerWrapper.getScanner().next(nbRows);
1440         if (null == results) {
1441           return new ArrayList<TRowResult>();
1442         }
1443       } catch (IOException e) {
1444         LOG.warn(e.getMessage(), e);
1445         throw new IOError(Throwables.getStackTraceAsString(e));
1446       }
1447       return ThriftUtilities.rowResultFromHBase(results, resultScannerWrapper.isColumnSorted());
1448     }
1449 
1450     @Override
1451     public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
1452       return scannerGetList(id,1);
1453     }
1454 
1455     @Override
1456     public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
1457         Map<ByteBuffer, ByteBuffer> attributes)
1458         throws IOError {
1459       
1460       Table table = null;
1461       try {
1462         table = getTable(tableName);
1463         Scan scan = new Scan();
1464         addAttributes(scan, attributes);
1465         if (tScan.isSetStartRow()) {
1466           scan.setStartRow(tScan.getStartRow());
1467         }
1468         if (tScan.isSetStopRow()) {
1469           scan.setStopRow(tScan.getStopRow());
1470         }
1471         if (tScan.isSetTimestamp()) {
1472           scan.setTimeRange(0, tScan.getTimestamp());
1473         }
1474         if (tScan.isSetCaching()) {
1475           scan.setCaching(tScan.getCaching());
1476         }
1477         if (tScan.isSetBatchSize()) {
1478           scan.setBatch(tScan.getBatchSize());
1479         }
1480         if (tScan.isSetColumns() && tScan.getColumns().size() != 0) {
1481           for(ByteBuffer column : tScan.getColumns()) {
1482             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1483             if(famQf.length == 1) {
1484               scan.addFamily(famQf[0]);
1485             } else {
1486               scan.addColumn(famQf[0], famQf[1]);
1487             }
1488           }
1489         }
1490         if (tScan.isSetFilterString()) {
1491           ParseFilter parseFilter = new ParseFilter();
1492           scan.setFilter(
1493               parseFilter.parseFilterString(tScan.getFilterString()));
1494         }
1495         if (tScan.isSetReversed()) {
1496           scan.setReversed(tScan.isReversed());
1497         }
1498         return addScanner(table.getScanner(scan), tScan.sortColumns);
1499       } catch (IOException e) {
1500         LOG.warn(e.getMessage(), e);
1501         throw new IOError(Throwables.getStackTraceAsString(e));
1502       } finally{
1503         closeTable(table);
1504       }
1505     }
1506 
1507     @Override
1508     public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
1509         List<ByteBuffer> columns,
1510         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1511       
1512       Table table = null;
1513       try {
1514         table = getTable(tableName);
1515         Scan scan = new Scan(getBytes(startRow));
1516         addAttributes(scan, attributes);
1517         if(columns != null && columns.size() != 0) {
1518           for(ByteBuffer column : columns) {
1519             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1520             if(famQf.length == 1) {
1521               scan.addFamily(famQf[0]);
1522             } else {
1523               scan.addColumn(famQf[0], famQf[1]);
1524             }
1525           }
1526         }
1527         return addScanner(table.getScanner(scan), false);
1528       } catch (IOException e) {
1529         LOG.warn(e.getMessage(), e);
1530         throw new IOError(Throwables.getStackTraceAsString(e));
1531       } finally{
1532         closeTable(table);
1533       }
1534     }
1535 
1536     @Override
1537     public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow,
1538         ByteBuffer stopRow, List<ByteBuffer> columns,
1539         Map<ByteBuffer, ByteBuffer> attributes)
1540         throws IOError, TException {
1541       
1542       Table table = null;
1543       try {
1544         table = getTable(tableName);
1545         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1546         addAttributes(scan, attributes);
1547         if(columns != null && columns.size() != 0) {
1548           for(ByteBuffer column : columns) {
1549             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1550             if(famQf.length == 1) {
1551               scan.addFamily(famQf[0]);
1552             } else {
1553               scan.addColumn(famQf[0], famQf[1]);
1554             }
1555           }
1556         }
1557         return addScanner(table.getScanner(scan), false);
1558       } catch (IOException e) {
1559         LOG.warn(e.getMessage(), e);
1560         throw new IOError(Throwables.getStackTraceAsString(e));
1561       } finally{
1562         closeTable(table);
1563       }
1564     }
1565 
1566     @Override
1567     public int scannerOpenWithPrefix(ByteBuffer tableName,
1568                                      ByteBuffer startAndPrefix,
1569                                      List<ByteBuffer> columns,
1570         Map<ByteBuffer, ByteBuffer> attributes)
1571         throws IOError, TException {
1572       
1573       Table table = null;
1574       try {
1575         table = getTable(tableName);
1576         Scan scan = new Scan(getBytes(startAndPrefix));
1577         addAttributes(scan, attributes);
1578         Filter f = new WhileMatchFilter(
1579             new PrefixFilter(getBytes(startAndPrefix)));
1580         scan.setFilter(f);
1581         if (columns != null && columns.size() != 0) {
1582           for(ByteBuffer column : columns) {
1583             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1584             if(famQf.length == 1) {
1585               scan.addFamily(famQf[0]);
1586             } else {
1587               scan.addColumn(famQf[0], famQf[1]);
1588             }
1589           }
1590         }
1591         return addScanner(table.getScanner(scan), false);
1592       } catch (IOException e) {
1593         LOG.warn(e.getMessage(), e);
1594         throw new IOError(Throwables.getStackTraceAsString(e));
1595       } finally{
1596         closeTable(table);
1597       }
1598     }
1599 
1600     @Override
1601     public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
1602         List<ByteBuffer> columns, long timestamp,
1603         Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
1604       
1605       Table table = null;
1606       try {
1607         table = getTable(tableName);
1608         Scan scan = new Scan(getBytes(startRow));
1609         addAttributes(scan, attributes);
1610         scan.setTimeRange(0, timestamp);
1611         if (columns != null && columns.size() != 0) {
1612           for (ByteBuffer column : columns) {
1613             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1614             if(famQf.length == 1) {
1615               scan.addFamily(famQf[0]);
1616             } else {
1617               scan.addColumn(famQf[0], famQf[1]);
1618             }
1619           }
1620         }
1621         return addScanner(table.getScanner(scan), false);
1622       } catch (IOException e) {
1623         LOG.warn(e.getMessage(), e);
1624         throw new IOError(Throwables.getStackTraceAsString(e));
1625       } finally{
1626         closeTable(table);
1627       }
1628     }
1629 
1630     @Override
1631     public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow,
1632         ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
1633         Map<ByteBuffer, ByteBuffer> attributes)
1634         throws IOError, TException {
1635       
1636       Table table = null;
1637       try {
1638         table = getTable(tableName);
1639         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1640         addAttributes(scan, attributes);
1641         scan.setTimeRange(0, timestamp);
1642         if (columns != null && columns.size() != 0) {
1643           for (ByteBuffer column : columns) {
1644             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1645             if(famQf.length == 1) {
1646               scan.addFamily(famQf[0]);
1647             } else {
1648               scan.addColumn(famQf[0], famQf[1]);
1649             }
1650           }
1651         }
1652         scan.setTimeRange(0, timestamp);
1653         return addScanner(table.getScanner(scan), false);
1654       } catch (IOException e) {
1655         LOG.warn(e.getMessage(), e);
1656         throw new IOError(Throwables.getStackTraceAsString(e));
1657       } finally{
1658         closeTable(table);
1659       }
1660     }
1661 
1662     @Override
1663     public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
1664         ByteBuffer tableName) throws IOError, TException {
1665       
1666       Table table = null;
1667       try {
1668         TreeMap<ByteBuffer, ColumnDescriptor> columns =
1669           new TreeMap<ByteBuffer, ColumnDescriptor>();
1670 
1671         table = getTable(tableName);
1672         HTableDescriptor desc = table.getTableDescriptor();
1673 
1674         for (HColumnDescriptor e : desc.getFamilies()) {
1675           ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
1676           columns.put(col.name, col);
1677         }
1678         return columns;
1679       } catch (IOException e) {
1680         LOG.warn(e.getMessage(), e);
1681         throw new IOError(Throwables.getStackTraceAsString(e));
1682       } finally {
1683         closeTable(table);
1684       }
1685     }
1686     
1687     private void closeTable(Table table) throws IOError
1688     {
1689       try{
1690         if(table != null){
1691           table.close();
1692         }
1693       } catch (IOException e){
1694         LOG.error(e.getMessage(), e);
1695         throw new IOError(Throwables.getStackTraceAsString(e));
1696       }
1697     }
1698     
1699     @Override
1700     public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
1701       try {
1702         byte[] row = getBytes(searchRow);
1703         Result startRowResult = getReverseScanResult(TableName.META_TABLE_NAME.getName(), row,
1704           HConstants.CATALOG_FAMILY);
1705 
1706         if (startRowResult == null) {
1707           throw new IOException("Cannot find row in "+ TableName.META_TABLE_NAME+", row="
1708                                 + Bytes.toStringBinary(row));
1709         }
1710 
1711         // find region start and end keys
1712         HRegionInfo regionInfo = MetaTableAccessor.getHRegionInfo(startRowResult);
1713         if (regionInfo == null) {
1714           throw new IOException("HRegionInfo REGIONINFO was null or " +
1715                                 " empty in Meta for row="
1716                                 + Bytes.toStringBinary(row));
1717         }
1718         TRegionInfo region = new TRegionInfo();
1719         region.setStartKey(regionInfo.getStartKey());
1720         region.setEndKey(regionInfo.getEndKey());
1721         region.id = regionInfo.getRegionId();
1722         region.setName(regionInfo.getRegionName());
1723         region.version = HREGION_VERSION; // version not used anymore, PB encoding used.
1724 
1725         // find region assignment to server
1726         ServerName serverName = MetaTableAccessor.getServerName(startRowResult, 0);
1727         if (serverName != null) {
1728           region.setServerName(Bytes.toBytes(serverName.getHostname()));
1729           region.port = serverName.getPort();
1730         }
1731         return region;
1732       } catch (IOException e) {
1733         LOG.warn(e.getMessage(), e);
1734         throw new IOError(Throwables.getStackTraceAsString(e));
1735       }
1736     }
1737 
1738     private Result getReverseScanResult(byte[] tableName, byte[] row, byte[] family)
1739         throws IOException {
1740       Scan scan = new Scan(row);
1741       scan.setReversed(true);
1742       scan.addFamily(family);
1743       scan.setStartRow(row);
1744       Table table = getTable(tableName);      
1745       try (ResultScanner scanner = table.getScanner(scan)) {
1746         return scanner.next();
1747       } finally{
1748         if(table != null){
1749           table.close();
1750         }
1751       }
1752     }
1753 
1754     private void initMetrics(ThriftMetrics metrics) {
1755       this.metrics = metrics;
1756     }
1757 
1758     @Override
1759     public void increment(TIncrement tincrement) throws IOError, TException {
1760 
1761       if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) {
1762         throw new TException("Must supply a table and a row key; can't increment");
1763       }
1764 
1765       if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1766         this.coalescer.queueIncrement(tincrement);
1767         return;
1768       }
1769 
1770       Table table = null;
1771       try {
1772         table = getTable(tincrement.getTable());
1773         Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
1774         table.increment(inc);
1775       } catch (IOException e) {
1776         LOG.warn(e.getMessage(), e);
1777         throw new IOError(Throwables.getStackTraceAsString(e));
1778       } finally{
1779         closeTable(table);
1780       }
1781     }
1782 
1783     @Override
1784     public void incrementRows(List<TIncrement> tincrements) throws IOError, TException {
1785       if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1786         this.coalescer.queueIncrements(tincrements);
1787         return;
1788       }
1789       for (TIncrement tinc : tincrements) {
1790         increment(tinc);
1791       }
1792     }
1793 
1794     @Override
1795     public List<TCell> append(TAppend tappend) throws IOError, TException {
1796       if (tappend.getRow().length == 0 || tappend.getTable().length == 0) {
1797         throw new TException("Must supply a table and a row key; can't append");
1798       }
1799 
1800       Table table = null;
1801       try {
1802         table = getTable(tappend.getTable());
1803         Append append = ThriftUtilities.appendFromThrift(tappend);
1804         Result result = table.append(append);
1805         return ThriftUtilities.cellFromHBase(result.rawCells());
1806       } catch (IOException e) {
1807         LOG.warn(e.getMessage(), e);
1808         throw new IOError(Throwables.getStackTraceAsString(e));
1809       } finally{
1810           closeTable(table);
1811       }
1812     }
1813 
1814     @Override
1815     public boolean checkAndPut(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1816         ByteBuffer value, Mutation mput, Map<ByteBuffer, ByteBuffer> attributes) throws IOError,
1817         IllegalArgument, TException {
1818       Put put;
1819       try {
1820         put = new Put(getBytes(row), HConstants.LATEST_TIMESTAMP);
1821         addAttributes(put, attributes);
1822 
1823         byte[][] famAndQf = KeyValue.parseColumn(getBytes(mput.column));
1824 
1825         put.addImmutable(famAndQf[0], famAndQf[1], mput.value != null ? getBytes(mput.value)
1826             : HConstants.EMPTY_BYTE_ARRAY);
1827 
1828         put.setDurability(mput.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1829       } catch (IllegalArgumentException e) {
1830         LOG.warn(e.getMessage(), e);
1831         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1832       }
1833 
1834       Table table = null;
1835       try {
1836         table = getTable(tableName);
1837         byte[][] famAndQf = KeyValue.parseColumn(getBytes(column));
1838         return table.checkAndPut(getBytes(row), famAndQf[0], famAndQf[1],
1839           value != null ? getBytes(value) : HConstants.EMPTY_BYTE_ARRAY, put);
1840       } catch (IOException e) {
1841         LOG.warn(e.getMessage(), e);
1842         throw new IOError(Throwables.getStackTraceAsString(e));
1843       } catch (IllegalArgumentException e) {
1844         LOG.warn(e.getMessage(), e);
1845         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1846       } finally {
1847           closeTable(table);
1848       }
1849     }
1850   }
1851 
1852 
1853 
1854   /**
1855    * Adds all the attributes into the Operation object
1856    */
1857   private static void addAttributes(OperationWithAttributes op,
1858     Map<ByteBuffer, ByteBuffer> attributes) {
1859     if (attributes == null || attributes.size() == 0) {
1860       return;
1861     }
1862     for (Map.Entry<ByteBuffer, ByteBuffer> entry : attributes.entrySet()) {
1863       String name = Bytes.toStringBinary(getBytes(entry.getKey()));
1864       byte[] value =  getBytes(entry.getValue());
1865       op.setAttribute(name, value);
1866     }
1867   }
1868 
1869   public static void registerFilters(Configuration conf) {
1870     String[] filters = conf.getStrings("hbase.thrift.filters");
1871     if(filters != null) {
1872       for(String filterClass: filters) {
1873         String[] filterPart = filterClass.split(":");
1874         if(filterPart.length != 2) {
1875           LOG.warn("Invalid filter specification " + filterClass + " - skipping");
1876         } else {
1877           ParseFilter.registerFilter(filterPart[0], filterPart[1]);
1878         }
1879       }
1880     }
1881   }
1882 }