View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.thrift;
20  
21  import static org.apache.hadoop.hbase.util.Bytes.getBytes;
22  
23  import java.io.IOException;
24  import java.net.InetAddress;
25  import java.net.InetSocketAddress;
26  import java.net.UnknownHostException;
27  import java.nio.ByteBuffer;
28  import java.security.PrivilegedAction;
29  import java.util.ArrayList;
30  import java.util.Arrays;
31  import java.util.Collections;
32  import java.util.HashMap;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.TreeMap;
36  import java.util.concurrent.BlockingQueue;
37  import java.util.concurrent.ExecutorService;
38  import java.util.concurrent.LinkedBlockingQueue;
39  import java.util.concurrent.ThreadPoolExecutor;
40  import java.util.concurrent.TimeUnit;
41  
42  import javax.security.auth.callback.Callback;
43  import javax.security.auth.callback.UnsupportedCallbackException;
44  import javax.security.sasl.AuthorizeCallback;
45  import javax.security.sasl.Sasl;
46  import javax.security.sasl.SaslServer;
47  
48  import org.apache.commons.cli.CommandLine;
49  import org.apache.commons.cli.Option;
50  import org.apache.commons.cli.OptionGroup;
51  import org.apache.commons.logging.Log;
52  import org.apache.commons.logging.LogFactory;
53  import org.apache.hadoop.conf.Configuration;
54  import org.apache.hadoop.hbase.HBaseConfiguration;
55  import org.apache.hadoop.hbase.HColumnDescriptor;
56  import org.apache.hadoop.hbase.HConstants;
57  import org.apache.hadoop.hbase.HRegionInfo;
58  import org.apache.hadoop.hbase.HRegionLocation;
59  import org.apache.hadoop.hbase.HTableDescriptor;
60  import org.apache.hadoop.hbase.KeyValue;
61  import org.apache.hadoop.hbase.ServerName;
62  import org.apache.hadoop.hbase.TableName;
63  import org.apache.hadoop.hbase.TableNotFoundException;
64  import org.apache.hadoop.hbase.classification.InterfaceAudience;
65  import org.apache.hadoop.hbase.client.Admin;
66  import org.apache.hadoop.hbase.client.Append;
67  import org.apache.hadoop.hbase.client.Delete;
68  import org.apache.hadoop.hbase.client.Durability;
69  import org.apache.hadoop.hbase.client.Get;
70  import org.apache.hadoop.hbase.client.HBaseAdmin;
71  import org.apache.hadoop.hbase.client.Increment;
72  import org.apache.hadoop.hbase.client.OperationWithAttributes;
73  import org.apache.hadoop.hbase.client.Put;
74  import org.apache.hadoop.hbase.client.RegionLocator;
75  import org.apache.hadoop.hbase.client.Result;
76  import org.apache.hadoop.hbase.client.ResultScanner;
77  import org.apache.hadoop.hbase.client.Scan;
78  import org.apache.hadoop.hbase.client.Table;
79  import org.apache.hadoop.hbase.filter.Filter;
80  import org.apache.hadoop.hbase.filter.ParseFilter;
81  import org.apache.hadoop.hbase.filter.PrefixFilter;
82  import org.apache.hadoop.hbase.filter.WhileMatchFilter;
83  import org.apache.hadoop.hbase.jetty.SslSelectChannelConnectorSecure;
84  import org.apache.hadoop.hbase.security.SecurityUtil;
85  import org.apache.hadoop.hbase.security.UserProvider;
86  import org.apache.hadoop.hbase.thrift.CallQueue.Call;
87  import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
88  import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
89  import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
90  import org.apache.hadoop.hbase.thrift.generated.Hbase;
91  import org.apache.hadoop.hbase.thrift.generated.IOError;
92  import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
93  import org.apache.hadoop.hbase.thrift.generated.Mutation;
94  import org.apache.hadoop.hbase.thrift.generated.TAppend;
95  import org.apache.hadoop.hbase.thrift.generated.TCell;
96  import org.apache.hadoop.hbase.thrift.generated.TIncrement;
97  import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
98  import org.apache.hadoop.hbase.thrift.generated.TRowResult;
99  import org.apache.hadoop.hbase.thrift.generated.TScan;
100 import org.apache.hadoop.hbase.util.Bytes;
101 import org.apache.hadoop.hbase.util.ConnectionCache;
102 import org.apache.hadoop.hbase.util.DNS;
103 import org.apache.hadoop.hbase.util.Strings;
104 import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
105 import org.apache.hadoop.security.UserGroupInformation;
106 import org.apache.hadoop.security.authorize.ProxyUsers;
107 import org.apache.thrift.TException;
108 import org.apache.thrift.TProcessor;
109 import org.apache.thrift.protocol.TBinaryProtocol;
110 import org.apache.thrift.protocol.TCompactProtocol;
111 import org.apache.thrift.protocol.TProtocol;
112 import org.apache.thrift.protocol.TProtocolFactory;
113 import org.apache.thrift.server.THsHaServer;
114 import org.apache.thrift.server.TNonblockingServer;
115 import org.apache.thrift.server.TServer;
116 import org.apache.thrift.server.TServlet;
117 import org.apache.thrift.server.TThreadedSelectorServer;
118 import org.apache.thrift.transport.TFramedTransport;
119 import org.apache.thrift.transport.TNonblockingServerSocket;
120 import org.apache.thrift.transport.TNonblockingServerTransport;
121 import org.apache.thrift.transport.TSaslServerTransport;
122 import org.apache.thrift.transport.TServerSocket;
123 import org.apache.thrift.transport.TServerTransport;
124 import org.apache.thrift.transport.TTransportFactory;
125 import org.mortbay.jetty.Connector;
126 import org.mortbay.jetty.Server;
127 import org.mortbay.jetty.nio.SelectChannelConnector;
128 import org.mortbay.jetty.servlet.Context;
129 import org.mortbay.jetty.servlet.ServletHolder;
130 import org.mortbay.thread.QueuedThreadPool;
131 
132 import com.google.common.base.Joiner;
133 import com.google.common.base.Throwables;
134 import com.google.common.util.concurrent.ThreadFactoryBuilder;
135 
136 /**
137  * ThriftServerRunner - this class starts up a Thrift server which implements
138  * the Hbase API specified in the Hbase.thrift IDL file.
139  */
140 @InterfaceAudience.Private
141 @SuppressWarnings("deprecation")
142 public class ThriftServerRunner implements Runnable {
143 
144   private static final Log LOG = LogFactory.getLog(ThriftServerRunner.class);
145 
146   static final String SERVER_TYPE_CONF_KEY =
147       "hbase.regionserver.thrift.server.type";
148 
149   static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress";
150   static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
151   static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
152   static final String MAX_FRAME_SIZE_CONF_KEY = "hbase.regionserver.thrift.framed.max_frame_size_in_mb";
153   static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
154   static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
155   static final String USE_HTTP_CONF_KEY = "hbase.regionserver.thrift.http";
156   static final String HTTP_MIN_THREADS = "hbase.thrift.http_threads.min";
157   static final String HTTP_MAX_THREADS = "hbase.thrift.http_threads.max";
158 
159   static final String THRIFT_SSL_ENABLED = "hbase.thrift.ssl.enabled";
160   static final String THRIFT_SSL_KEYSTORE_STORE = "hbase.thrift.ssl.keystore.store";
161   static final String THRIFT_SSL_KEYSTORE_PASSWORD = "hbase.thrift.ssl.keystore.password";
162   static final String THRIFT_SSL_KEYSTORE_KEYPASSWORD = "hbase.thrift.ssl.keystore.keypassword";
163 
164 
165   /**
166    * Thrift quality of protection configuration key. Valid values can be:
167    * auth-conf: authentication, integrity and confidentiality checking
168    * auth-int: authentication and integrity checking
169    * auth: authentication only
170    *
171    * This is used to authenticate the callers and support impersonation.
172    * The thrift server and the HBase cluster must run in secure mode.
173    */
174   static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
175 
176   private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
177   public static final int DEFAULT_LISTEN_PORT = 9090;
178   public static final int HREGION_VERSION = 1;
179   static final String THRIFT_SUPPORT_PROXYUSER = "hbase.thrift.support.proxyuser";
180   private final int listenPort;
181 
182   private Configuration conf;
183   volatile TServer tserver;
184   volatile Server httpServer;
185   private final Hbase.Iface handler;
186   private final ThriftMetrics metrics;
187   private final HBaseHandler hbaseHandler;
188   private final UserGroupInformation realUser;
189 
190   private final String qop;
191   private String host;
192 
193   private final boolean securityEnabled;
194   private final boolean doAsEnabled;
195 
196   /** An enum of server implementation selections */
197   enum ImplType {
198     HS_HA("hsha", true, THsHaServer.class, true),
199     NONBLOCKING("nonblocking", true, TNonblockingServer.class, true),
200     THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true),
201     THREADED_SELECTOR(
202         "threadedselector", true, TThreadedSelectorServer.class, true);
203 
204     public static final ImplType DEFAULT = THREAD_POOL;
205 
206     final String option;
207     final boolean isAlwaysFramed;
208     final Class<? extends TServer> serverClass;
209     final boolean canSpecifyBindIP;
210 
211     ImplType(String option, boolean isAlwaysFramed,
212         Class<? extends TServer> serverClass, boolean canSpecifyBindIP) {
213       this.option = option;
214       this.isAlwaysFramed = isAlwaysFramed;
215       this.serverClass = serverClass;
216       this.canSpecifyBindIP = canSpecifyBindIP;
217     }
218 
219     /**
220      * @return <code>-option</code> so we can get the list of options from
221      *         {@link #values()}
222      */
223     @Override
224     public String toString() {
225       return "-" + option;
226     }
227 
228     String getDescription() {
229       StringBuilder sb = new StringBuilder("Use the " +
230           serverClass.getSimpleName());
231       if (isAlwaysFramed) {
232         sb.append(" This implies the framed transport.");
233       }
234       if (this == DEFAULT) {
235         sb.append("This is the default.");
236       }
237       return sb.toString();
238     }
239 
240     static OptionGroup createOptionGroup() {
241       OptionGroup group = new OptionGroup();
242       for (ImplType t : values()) {
243         group.addOption(new Option(t.option, t.getDescription()));
244       }
245       return group;
246     }
247 
248     static ImplType getServerImpl(Configuration conf) {
249       String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option);
250       for (ImplType t : values()) {
251         if (confType.equals(t.option)) {
252           return t;
253         }
254       }
255       throw new AssertionError("Unknown server ImplType.option:" + confType);
256     }
257 
258     static void setServerImpl(CommandLine cmd, Configuration conf) {
259       ImplType chosenType = null;
260       int numChosen = 0;
261       for (ImplType t : values()) {
262         if (cmd.hasOption(t.option)) {
263           chosenType = t;
264           ++numChosen;
265         }
266       }
267       if (numChosen < 1) {
268         LOG.info("Using default thrift server type");
269         chosenType = DEFAULT;
270       } else if (numChosen > 1) {
271         throw new AssertionError("Exactly one option out of " +
272           Arrays.toString(values()) + " has to be specified");
273       }
274       LOG.info("Using thrift server type " + chosenType.option);
275       conf.set(SERVER_TYPE_CONF_KEY, chosenType.option);
276     }
277 
278     public String simpleClassName() {
279       return serverClass.getSimpleName();
280     }
281 
282     public static List<String> serversThatCannotSpecifyBindIP() {
283       List<String> l = new ArrayList<String>();
284       for (ImplType t : values()) {
285         if (!t.canSpecifyBindIP) {
286           l.add(t.simpleClassName());
287         }
288       }
289       return l;
290     }
291 
292   }
293 
294   public ThriftServerRunner(Configuration conf) throws IOException {
295     UserProvider userProvider = UserProvider.instantiate(conf);
296     // login the server principal (if using secure Hadoop)
297     securityEnabled = userProvider.isHadoopSecurityEnabled()
298       && userProvider.isHBaseSecurityEnabled();
299     if (securityEnabled) {
300       host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
301         conf.get("hbase.thrift.dns.interface", "default"),
302         conf.get("hbase.thrift.dns.nameserver", "default")));
303       userProvider.login("hbase.thrift.keytab.file",
304         "hbase.thrift.kerberos.principal", host);
305     }
306     this.conf = HBaseConfiguration.create(conf);
307     this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
308     this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
309     this.hbaseHandler = new HBaseHandler(conf, userProvider);
310     this.hbaseHandler.initMetrics(metrics);
311     this.handler = HbaseHandlerMetricsProxy.newInstance(
312       hbaseHandler, metrics, conf);
313     this.realUser = userProvider.getCurrent().getUGI();
314     qop = conf.get(THRIFT_QOP_KEY);
315     doAsEnabled = conf.getBoolean(THRIFT_SUPPORT_PROXYUSER, false);
316     if (qop != null) {
317       if (!qop.equals("auth") && !qop.equals("auth-int")
318           && !qop.equals("auth-conf")) {
319         throw new IOException("Invalid " + THRIFT_QOP_KEY + ": " + qop
320           + ", it must be 'auth', 'auth-int', or 'auth-conf'");
321       }
322       if (!securityEnabled) {
323         throw new IOException("Thrift server must"
324           + " run in secure mode to support authentication");
325       }
326     }
327   }
328 
329   /*
330    * Runs the Thrift server
331    */
332   @Override
333   public void run() {
334     realUser.doAs(new PrivilegedAction<Object>() {
335       @Override
336       public Object run() {
337         try {
338           if (conf.getBoolean(USE_HTTP_CONF_KEY, false)) {
339             setupHTTPServer();
340             httpServer.start();
341             httpServer.join();
342           } else {
343             setupServer();
344             tserver.serve();
345           }
346         } catch (Exception e) {
347           LOG.fatal("Cannot run ThriftServer", e);
348           // Crash the process if the ThriftServer is not running
349           System.exit(-1);
350         }
351         return null;
352       }
353     });
354 
355   }
356 
357   public void shutdown() {
358     if (tserver != null) {
359       tserver.stop();
360       tserver = null;
361     }
362     if (httpServer != null) {
363       try {
364         httpServer.stop();
365         httpServer = null;
366       } catch (Exception e) {
367         LOG.error("Problem encountered in shutting down HTTP server " + e.getCause());
368       }
369       httpServer = null;
370     }
371   }
372 
373   private void setupHTTPServer() throws IOException {
374     TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
375     TProcessor processor = new Hbase.Processor<Hbase.Iface>(handler);
376     TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, realUser,
377         conf, hbaseHandler, securityEnabled, doAsEnabled);
378 
379     httpServer = new Server();
380     // Context handler
381     Context context = new Context(httpServer, "/", Context.SESSIONS);
382     context.setContextPath("/");
383     String httpPath = "/*";
384     httpServer.setHandler(context);
385     context.addServlet(new ServletHolder(thriftHttpServlet), httpPath);
386 
387     // set up Jetty and run the embedded server
388     Connector connector = new SelectChannelConnector();
389     if(conf.getBoolean(THRIFT_SSL_ENABLED, false)) {
390       SslSelectChannelConnectorSecure sslConnector = new SslSelectChannelConnectorSecure();
391       String keystore = conf.get(THRIFT_SSL_KEYSTORE_STORE);
392       String password = HBaseConfiguration.getPassword(conf,
393           THRIFT_SSL_KEYSTORE_PASSWORD, null);
394       String keyPassword = HBaseConfiguration.getPassword(conf,
395           THRIFT_SSL_KEYSTORE_KEYPASSWORD, password);
396       sslConnector.setKeystore(keystore);
397       sslConnector.setPassword(password);
398       sslConnector.setKeyPassword(keyPassword);
399       connector = sslConnector;
400     }
401     String host = getBindAddress(conf).getHostAddress();
402     connector.setPort(listenPort);
403     connector.setHost(host);
404     connector.setHeaderBufferSize(1024 * 64);
405     httpServer.addConnector(connector);
406 
407     if (doAsEnabled) {
408       ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
409     }
410 
411     // Set the default max thread number to 100 to limit
412     // the number of concurrent requests so that Thrfit HTTP server doesn't OOM easily.
413     // Jetty set the default max thread number to 250, if we don't set it.
414     //
415     // Our default min thread number 2 is the same as that used by Jetty.
416     int minThreads = conf.getInt(HTTP_MIN_THREADS, 2);
417     int maxThreads = conf.getInt(HTTP_MAX_THREADS, 100);
418     QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads);
419     threadPool.setMinThreads(minThreads);
420     httpServer.setThreadPool(threadPool);
421 
422     httpServer.setSendServerVersion(false);
423     httpServer.setSendDateHeader(false);
424     httpServer.setStopAtShutdown(true);
425 
426     LOG.info("Starting Thrift HTTP Server on " + Integer.toString(listenPort));
427   }
428 
429   /**
430    * Setting up the thrift TServer
431    */
432   private void setupServer() throws Exception {
433     // Construct correct ProtocolFactory
434     TProtocolFactory protocolFactory;
435     if (conf.getBoolean(COMPACT_CONF_KEY, false)) {
436       LOG.debug("Using compact protocol");
437       protocolFactory = new TCompactProtocol.Factory();
438     } else {
439       LOG.debug("Using binary protocol");
440       protocolFactory = new TBinaryProtocol.Factory();
441     }
442 
443     final TProcessor p = new Hbase.Processor<Hbase.Iface>(handler);
444     ImplType implType = ImplType.getServerImpl(conf);
445     TProcessor processor = p;
446 
447     // Construct correct TransportFactory
448     TTransportFactory transportFactory;
449     if (conf.getBoolean(FRAMED_CONF_KEY, false) || implType.isAlwaysFramed) {
450       if (qop != null) {
451         throw new RuntimeException("Thrift server authentication"
452           + " doesn't work with framed transport yet");
453       }
454       transportFactory = new TFramedTransport.Factory(
455           conf.getInt(MAX_FRAME_SIZE_CONF_KEY, 2)  * 1024 * 1024);
456       LOG.debug("Using framed transport");
457     } else if (qop == null) {
458       transportFactory = new TTransportFactory();
459     } else {
460       // Extract the name from the principal
461       String name = SecurityUtil.getUserFromPrincipal(
462         conf.get("hbase.thrift.kerberos.principal"));
463       Map<String, String> saslProperties = new HashMap<String, String>();
464       saslProperties.put(Sasl.QOP, qop);
465       TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
466       saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
467         new SaslGssCallbackHandler() {
468           @Override
469           public void handle(Callback[] callbacks)
470               throws UnsupportedCallbackException {
471             AuthorizeCallback ac = null;
472             for (Callback callback : callbacks) {
473               if (callback instanceof AuthorizeCallback) {
474                 ac = (AuthorizeCallback) callback;
475               } else {
476                 throw new UnsupportedCallbackException(callback,
477                     "Unrecognized SASL GSSAPI Callback");
478               }
479             }
480             if (ac != null) {
481               String authid = ac.getAuthenticationID();
482               String authzid = ac.getAuthorizationID();
483               if (!authid.equals(authzid)) {
484                 ac.setAuthorized(false);
485               } else {
486                 ac.setAuthorized(true);
487                 String userName = SecurityUtil.getUserFromPrincipal(authzid);
488                 LOG.info("Effective user: " + userName);
489                 ac.setAuthorizedID(userName);
490               }
491             }
492           }
493         });
494       transportFactory = saslFactory;
495 
496       // Create a processor wrapper, to get the caller
497       processor = new TProcessor() {
498         @Override
499         public boolean process(TProtocol inProt,
500             TProtocol outProt) throws TException {
501           TSaslServerTransport saslServerTransport =
502             (TSaslServerTransport)inProt.getTransport();
503           SaslServer saslServer = saslServerTransport.getSaslServer();
504           String principal = saslServer.getAuthorizationID();
505           hbaseHandler.setEffectiveUser(principal);
506           return p.process(inProt, outProt);
507         }
508       };
509     }
510 
511     if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
512       LOG.error("Server types " + Joiner.on(", ").join(
513           ImplType.serversThatCannotSpecifyBindIP()) + " don't support IP " +
514           "address binding at the moment. See " +
515           "https://issues.apache.org/jira/browse/HBASE-2155 for details.");
516       throw new RuntimeException(
517           "-" + BIND_CONF_KEY + " not supported with " + implType);
518     }
519 
520     if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING ||
521         implType == ImplType.THREADED_SELECTOR) {
522 
523       InetAddress listenAddress = getBindAddress(conf);
524       TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(
525           new InetSocketAddress(listenAddress, listenPort));
526 
527       if (implType == ImplType.NONBLOCKING) {
528         TNonblockingServer.Args serverArgs =
529             new TNonblockingServer.Args(serverTransport);
530         serverArgs.processor(processor)
531                   .transportFactory(transportFactory)
532                   .protocolFactory(protocolFactory);
533         tserver = new TNonblockingServer(serverArgs);
534       } else if (implType == ImplType.HS_HA) {
535         THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
536         CallQueue callQueue =
537             new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
538         ExecutorService executorService = createExecutor(
539             callQueue, serverArgs.getWorkerThreads());
540         serverArgs.executorService(executorService)
541                   .processor(processor)
542                   .transportFactory(transportFactory)
543                   .protocolFactory(protocolFactory);
544         tserver = new THsHaServer(serverArgs);
545       } else { // THREADED_SELECTOR
546         TThreadedSelectorServer.Args serverArgs =
547             new HThreadedSelectorServerArgs(serverTransport, conf);
548         CallQueue callQueue =
549             new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
550         ExecutorService executorService = createExecutor(
551             callQueue, serverArgs.getWorkerThreads());
552         serverArgs.executorService(executorService)
553                   .processor(processor)
554                   .transportFactory(transportFactory)
555                   .protocolFactory(protocolFactory);
556         tserver = new TThreadedSelectorServer(serverArgs);
557       }
558       LOG.info("starting HBase " + implType.simpleClassName() +
559           " server on " + Integer.toString(listenPort));
560     } else if (implType == ImplType.THREAD_POOL) {
561       // Thread pool server. Get the IP address to bind to.
562       InetAddress listenAddress = getBindAddress(conf);
563 
564       TServerTransport serverTransport = new TServerSocket(
565           new InetSocketAddress(listenAddress, listenPort));
566 
567       TBoundedThreadPoolServer.Args serverArgs =
568           new TBoundedThreadPoolServer.Args(serverTransport, conf);
569       serverArgs.processor(processor)
570                 .transportFactory(transportFactory)
571                 .protocolFactory(protocolFactory);
572       LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
573           + listenAddress + ":" + Integer.toString(listenPort)
574           + "; " + serverArgs);
575       TBoundedThreadPoolServer tserver =
576           new TBoundedThreadPoolServer(serverArgs, metrics);
577       this.tserver = tserver;
578     } else {
579       throw new AssertionError("Unsupported Thrift server implementation: " +
580           implType.simpleClassName());
581     }
582 
583     // A sanity check that we instantiated the right type of server.
584     if (tserver.getClass() != implType.serverClass) {
585       throw new AssertionError("Expected to create Thrift server class " +
586           implType.serverClass.getName() + " but got " +
587           tserver.getClass().getName());
588     }
589 
590 
591 
592     registerFilters(conf);
593   }
594 
595   ExecutorService createExecutor(BlockingQueue<Runnable> callQueue,
596                                  int workerThreads) {
597     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
598     tfb.setDaemon(true);
599     tfb.setNameFormat("thrift-worker-%d");
600     return new ThreadPoolExecutor(workerThreads, workerThreads,
601             Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
602   }
603 
604   private InetAddress getBindAddress(Configuration conf)
605       throws UnknownHostException {
606     String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
607     return InetAddress.getByName(bindAddressStr);
608   }
609 
610   protected static class ResultScannerWrapper {
611 
612     private final ResultScanner scanner;
613     private final boolean sortColumns;
614     public ResultScannerWrapper(ResultScanner resultScanner,
615                                 boolean sortResultColumns) {
616       scanner = resultScanner;
617       sortColumns = sortResultColumns;
618    }
619 
620     public ResultScanner getScanner() {
621       return scanner;
622     }
623 
624     public boolean isColumnSorted() {
625       return sortColumns;
626     }
627   }
628 
629   /**
630    * The HBaseHandler is a glue object that connects Thrift RPC calls to the
631    * HBase client API primarily defined in the Admin and Table objects.
632    */
633   public static class HBaseHandler implements Hbase.Iface {
634     protected Configuration conf;
635     protected final Log LOG = LogFactory.getLog(this.getClass().getName());
636 
637     // nextScannerId and scannerMap are used to manage scanner state
638     protected int nextScannerId = 0;
639     protected HashMap<Integer, ResultScannerWrapper> scannerMap = null;
640     private ThriftMetrics metrics = null;
641 
642     private final ConnectionCache connectionCache;
643     IncrementCoalescer coalescer = null;
644 
645     static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
646     static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
647 
648     /**
649      * Returns a list of all the column families for a given Table.
650      *
651      * @param table
652      * @throws IOException
653      */
654     byte[][] getAllColumns(Table table) throws IOException {
655       HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies();
656       byte[][] columns = new byte[cds.length][];
657       for (int i = 0; i < cds.length; i++) {
658         columns[i] = Bytes.add(cds[i].getName(),
659             KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
660       }
661       return columns;
662     }
663 
664     /**
665      * Creates and returns a Table instance from a given table name.
666      *
667      * @param tableName
668      *          name of table
669      * @return Table object
670      * @throws IOException
671      * @throws IOError
672      */
673     public Table getTable(final byte[] tableName) throws
674         IOException {
675       String table = Bytes.toString(tableName);
676       return connectionCache.getTable(table);
677     }
678 
679     public Table getTable(final ByteBuffer tableName) throws IOException {
680       return getTable(getBytes(tableName));
681     }
682 
683     /**
684      * Assigns a unique ID to the scanner and adds the mapping to an internal
685      * hash-map.
686      *
687      * @param scanner
688      * @return integer scanner id
689      */
690     protected synchronized int addScanner(ResultScanner scanner,boolean sortColumns) {
691       int id = nextScannerId++;
692       ResultScannerWrapper resultScannerWrapper = new ResultScannerWrapper(scanner, sortColumns);
693       scannerMap.put(id, resultScannerWrapper);
694       return id;
695     }
696 
697     /**
698      * Returns the scanner associated with the specified ID.
699      *
700      * @param id
701      * @return a Scanner, or null if ID was invalid.
702      */
703     protected synchronized ResultScannerWrapper getScanner(int id) {
704       return scannerMap.get(id);
705     }
706 
707     /**
708      * Removes the scanner associated with the specified ID from the internal
709      * id->scanner hash-map.
710      *
711      * @param id
712      * @return a Scanner, or null if ID was invalid.
713      */
714     protected synchronized ResultScannerWrapper removeScanner(int id) {
715       return scannerMap.remove(id);
716     }
717 
718     protected HBaseHandler(final Configuration c,
719         final UserProvider userProvider) throws IOException {
720       this.conf = c;
721       scannerMap = new HashMap<Integer, ResultScannerWrapper>();
722       this.coalescer = new IncrementCoalescer(this);
723 
724       int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
725       int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
726       connectionCache = new ConnectionCache(
727         conf, userProvider, cleanInterval, maxIdleTime);
728     }
729 
730     /**
731      * Obtain HBaseAdmin. Creates the instance if it is not already created.
732      */
733     private Admin getAdmin() throws IOException {
734       return connectionCache.getAdmin();
735     }
736 
737     void setEffectiveUser(String effectiveUser) {
738       connectionCache.setEffectiveUser(effectiveUser);
739     }
740 
741     @Override
742     public void enableTable(ByteBuffer tableName) throws IOError {
743       try{
744         getAdmin().enableTable(getTableName(tableName));
745       } catch (IOException e) {
746         LOG.warn(e.getMessage(), e);
747         throw new IOError(Throwables.getStackTraceAsString(e));
748       }
749     }
750 
751     @Override
752     public void disableTable(ByteBuffer tableName) throws IOError{
753       try{
754         getAdmin().disableTable(getTableName(tableName));
755       } catch (IOException e) {
756         LOG.warn(e.getMessage(), e);
757         throw new IOError(Throwables.getStackTraceAsString(e));
758       }
759     }
760 
761     @Override
762     public boolean isTableEnabled(ByteBuffer tableName) throws IOError {
763       try {
764         return this.connectionCache.getAdmin().isTableEnabled(getTableName(tableName));
765       } catch (IOException e) {
766         LOG.warn(e.getMessage(), e);
767         throw new IOError(Throwables.getStackTraceAsString(e));
768       }
769     }
770 
771     @Override
772     public void compact(ByteBuffer tableNameOrRegionName) throws IOError {
773       try {
774         // TODO: HBaseAdmin.compact(byte[]) deprecated and not trivial to replace here.
775         // ThriftServerRunner.compact should be deprecated and replaced with methods specific to
776         // table and region.
777         ((HBaseAdmin) getAdmin()).compact(getBytes(tableNameOrRegionName));
778       } catch (IOException e) {
779         LOG.warn(e.getMessage(), e);
780         throw new IOError(Throwables.getStackTraceAsString(e));
781       }
782     }
783 
784     @Override
785     public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError {
786       try {
787         // TODO: HBaseAdmin.majorCompact(byte[]) deprecated and not trivial to replace here.
788         // ThriftServerRunner.majorCompact should be deprecated and replaced with methods specific
789         // to table and region.
790         ((HBaseAdmin) getAdmin()).majorCompact(getBytes(tableNameOrRegionName));
791       } catch (IOException e) {
792         LOG.warn(e.getMessage(), e);
793         throw new IOError(Throwables.getStackTraceAsString(e));
794       }
795     }
796 
797     @Override
798     public List<ByteBuffer> getTableNames() throws IOError {
799       try {
800         TableName[] tableNames = this.getAdmin().listTableNames();
801         ArrayList<ByteBuffer> list = new ArrayList<ByteBuffer>(tableNames.length);
802         for (int i = 0; i < tableNames.length; i++) {
803           list.add(ByteBuffer.wrap(tableNames[i].getName()));
804         }
805         return list;
806       } catch (IOException e) {
807         LOG.warn(e.getMessage(), e);
808         throw new IOError(Throwables.getStackTraceAsString(e));
809       }
810     }
811 
812     /**
813      * @return the list of regions in the given table, or an empty list if the table does not exist
814      */
815     @Override
816     public List<TRegionInfo> getTableRegions(ByteBuffer tableName)
817     throws IOError {
818       try (RegionLocator locator = connectionCache.getRegionLocator(getBytes(tableName))) {
819         List<HRegionLocation> regionLocations = locator.getAllRegionLocations();
820         List<TRegionInfo> results = new ArrayList<TRegionInfo>();
821         for (HRegionLocation regionLocation : regionLocations) {
822           HRegionInfo info = regionLocation.getRegionInfo();
823           ServerName serverName = regionLocation.getServerName();
824           TRegionInfo region = new TRegionInfo();
825           region.serverName = ByteBuffer.wrap(
826               Bytes.toBytes(serverName.getHostname()));
827           region.port = serverName.getPort();
828           region.startKey = ByteBuffer.wrap(info.getStartKey());
829           region.endKey = ByteBuffer.wrap(info.getEndKey());
830           region.id = info.getRegionId();
831           region.name = ByteBuffer.wrap(info.getRegionName());
832           region.version = info.getVersion();
833           results.add(region);
834         }
835         return results;
836       } catch (TableNotFoundException e) {
837         // Return empty list for non-existing table
838         return Collections.emptyList();
839       } catch (IOException e){
840         LOG.warn(e.getMessage(), e);
841         throw new IOError(Throwables.getStackTraceAsString(e));
842       }
843     }
844 
845     @Deprecated
846     @Override
847     public List<TCell> get(
848         ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
849         Map<ByteBuffer, ByteBuffer> attributes)
850         throws IOError {
851       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
852       if (famAndQf.length == 1) {
853         return get(tableName, row, famAndQf[0], null, attributes);
854       }
855       if (famAndQf.length == 2) {
856         return get(tableName, row, famAndQf[0], famAndQf[1], attributes);
857       }
858       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
859     }
860 
861     /**
862      * Note: this internal interface is slightly different from public APIs in regard to handling
863      * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
864      * we respect qual == null as a request for the entire column family. The caller (
865      * {@link #get(ByteBuffer, ByteBuffer, ByteBuffer, Map)}) interface IS consistent in that the
866      * column is parse like normal.
867      */
868     protected List<TCell> get(ByteBuffer tableName,
869                               ByteBuffer row,
870                               byte[] family,
871                               byte[] qualifier,
872                               Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
873       Table table = null;
874       try {
875         table = getTable(tableName);
876         Get get = new Get(getBytes(row));
877         addAttributes(get, attributes);
878         if (qualifier == null) {
879           get.addFamily(family);
880         } else {
881           get.addColumn(family, qualifier);
882         }
883         Result result = table.get(get);
884         return ThriftUtilities.cellFromHBase(result.rawCells());
885       } catch (IOException e) {
886         LOG.warn(e.getMessage(), e);
887         throw new IOError(Throwables.getStackTraceAsString(e));
888       } finally {
889         closeTable(table);
890       }
891     }
892 
893     @Deprecated
894     @Override
895     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
896         int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
897       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
898       if(famAndQf.length == 1) {
899         return getVer(tableName, row, famAndQf[0], null, numVersions, attributes);
900       }
901       if (famAndQf.length == 2) {
902         return getVer(tableName, row, famAndQf[0], famAndQf[1], numVersions, attributes);
903       }
904       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
905 
906     }
907 
908     /**
909      * Note: this public interface is slightly different from public Java APIs in regard to
910      * handling of the qualifier. Here we differ from the public Java API in that null != byte[0].
911      * Rather, we respect qual == null as a request for the entire column family. If you want to
912      * access the entire column family, use
913      * {@link #getVer(ByteBuffer, ByteBuffer, ByteBuffer, int, Map)} with a {@code column} value
914      * that lacks a {@code ':'}.
915      */
916     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family,
917         byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
918       
919       Table table = null;
920       try {
921         table = getTable(tableName);
922         Get get = new Get(getBytes(row));
923         addAttributes(get, attributes);
924         if (null == qualifier) {
925           get.addFamily(family);
926         } else {
927           get.addColumn(family, qualifier);
928         }
929         get.setMaxVersions(numVersions);
930         Result result = table.get(get);
931         return ThriftUtilities.cellFromHBase(result.rawCells());
932       } catch (IOException e) {
933         LOG.warn(e.getMessage(), e);
934         throw new IOError(Throwables.getStackTraceAsString(e));
935       } finally{
936         closeTable(table);
937       }
938     }
939 
940     @Deprecated
941     @Override
942     public List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
943         long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
944       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
945       if (famAndQf.length == 1) {
946         return getVerTs(tableName, row, famAndQf[0], null, timestamp, numVersions, attributes);
947       }
948       if (famAndQf.length == 2) {
949         return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, numVersions,
950           attributes);
951       }
952       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
953     }
954 
955     /**
956      * Note: this internal interface is slightly different from public APIs in regard to handling
957      * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
958      * we respect qual == null as a request for the entire column family. The caller (
959      * {@link #getVerTs(ByteBuffer, ByteBuffer, ByteBuffer, long, int, Map)}) interface IS
960      * consistent in that the column is parse like normal.
961      */
962     protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
963         byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
964         throws IOError {
965       
966       Table table = null;
967       try {
968         table = getTable(tableName);
969         Get get = new Get(getBytes(row));
970         addAttributes(get, attributes);
971         if (null == qualifier) {
972           get.addFamily(family);
973         } else {
974           get.addColumn(family, qualifier);
975         }
976         get.setTimeRange(0, timestamp);
977         get.setMaxVersions(numVersions);
978         Result result = table.get(get);
979         return ThriftUtilities.cellFromHBase(result.rawCells());
980       } catch (IOException e) {
981         LOG.warn(e.getMessage(), e);
982         throw new IOError(Throwables.getStackTraceAsString(e));
983       } finally{
984         closeTable(table);
985       }
986     }
987 
988     @Override
989     public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row,
990         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
991       return getRowWithColumnsTs(tableName, row, null,
992                                  HConstants.LATEST_TIMESTAMP,
993                                  attributes);
994     }
995 
996     @Override
997     public List<TRowResult> getRowWithColumns(ByteBuffer tableName,
998                                               ByteBuffer row,
999         List<ByteBuffer> columns,
1000         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1001       return getRowWithColumnsTs(tableName, row, columns,
1002                                  HConstants.LATEST_TIMESTAMP,
1003                                  attributes);
1004     }
1005 
1006     @Override
1007     public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row,
1008         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1009       return getRowWithColumnsTs(tableName, row, null,
1010                                  timestamp, attributes);
1011     }
1012 
1013     @Override
1014     public List<TRowResult> getRowWithColumnsTs(
1015         ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
1016         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1017       
1018       Table table = null;
1019       try {
1020         table = getTable(tableName);
1021         if (columns == null) {
1022           Get get = new Get(getBytes(row));
1023           addAttributes(get, attributes);
1024           get.setTimeRange(0, timestamp);
1025           Result result = table.get(get);
1026           return ThriftUtilities.rowResultFromHBase(result);
1027         }
1028         Get get = new Get(getBytes(row));
1029         addAttributes(get, attributes);
1030         for(ByteBuffer column : columns) {
1031           byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1032           if (famAndQf.length == 1) {
1033               get.addFamily(famAndQf[0]);
1034           } else {
1035               get.addColumn(famAndQf[0], famAndQf[1]);
1036           }
1037         }
1038         get.setTimeRange(0, timestamp);
1039         Result result = table.get(get);
1040         return ThriftUtilities.rowResultFromHBase(result);
1041       } catch (IOException e) {
1042         LOG.warn(e.getMessage(), e);
1043         throw new IOError(Throwables.getStackTraceAsString(e));
1044       } finally{
1045         closeTable(table);
1046       }
1047     }
1048 
1049     @Override
1050     public List<TRowResult> getRows(ByteBuffer tableName,
1051                                     List<ByteBuffer> rows,
1052         Map<ByteBuffer, ByteBuffer> attributes)
1053         throws IOError {
1054       return getRowsWithColumnsTs(tableName, rows, null,
1055                                   HConstants.LATEST_TIMESTAMP,
1056                                   attributes);
1057     }
1058 
1059     @Override
1060     public List<TRowResult> getRowsWithColumns(ByteBuffer tableName,
1061                                                List<ByteBuffer> rows,
1062         List<ByteBuffer> columns,
1063         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1064       return getRowsWithColumnsTs(tableName, rows, columns,
1065                                   HConstants.LATEST_TIMESTAMP,
1066                                   attributes);
1067     }
1068 
1069     @Override
1070     public List<TRowResult> getRowsTs(ByteBuffer tableName,
1071                                       List<ByteBuffer> rows,
1072         long timestamp,
1073         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1074       return getRowsWithColumnsTs(tableName, rows, null,
1075                                   timestamp, attributes);
1076     }
1077 
1078     @Override
1079     public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName,
1080                                                  List<ByteBuffer> rows,
1081         List<ByteBuffer> columns, long timestamp,
1082         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1083       
1084       Table table= null;
1085       try {
1086         List<Get> gets = new ArrayList<Get>(rows.size());
1087         table = getTable(tableName);
1088         if (metrics != null) {
1089           metrics.incNumRowKeysInBatchGet(rows.size());
1090         }
1091         for (ByteBuffer row : rows) {
1092           Get get = new Get(getBytes(row));
1093           addAttributes(get, attributes);
1094           if (columns != null) {
1095 
1096             for(ByteBuffer column : columns) {
1097               byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1098               if (famAndQf.length == 1) {
1099                 get.addFamily(famAndQf[0]);
1100               } else {
1101                 get.addColumn(famAndQf[0], famAndQf[1]);
1102               }
1103             }
1104           }
1105           get.setTimeRange(0, timestamp);
1106           gets.add(get);
1107         }
1108         Result[] result = table.get(gets);
1109         return ThriftUtilities.rowResultFromHBase(result);
1110       } catch (IOException e) {
1111         LOG.warn(e.getMessage(), e);
1112         throw new IOError(Throwables.getStackTraceAsString(e));
1113       } finally{
1114         closeTable(table);
1115       }
1116     }
1117 
1118     @Override
1119     public void deleteAll(
1120         ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1121         Map<ByteBuffer, ByteBuffer> attributes)
1122         throws IOError {
1123       deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP,
1124                   attributes);
1125     }
1126 
1127     @Override
1128     public void deleteAllTs(ByteBuffer tableName,
1129                             ByteBuffer row,
1130                             ByteBuffer column,
1131         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1132       Table table = null;
1133       try {
1134         table = getTable(tableName);
1135         Delete delete  = new Delete(getBytes(row));
1136         addAttributes(delete, attributes);
1137         byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1138         if (famAndQf.length == 1) {
1139           delete.deleteFamily(famAndQf[0], timestamp);
1140         } else {
1141           delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1142         }
1143         table.delete(delete);
1144 
1145       } catch (IOException e) {
1146         LOG.warn(e.getMessage(), e);
1147         throw new IOError(Throwables.getStackTraceAsString(e));
1148       } finally {
1149         closeTable(table);
1150       }
1151     }
1152 
1153     @Override
1154     public void deleteAllRow(
1155         ByteBuffer tableName, ByteBuffer row,
1156         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1157       deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, attributes);
1158     }
1159 
1160     @Override
1161     public void deleteAllRowTs(
1162         ByteBuffer tableName, ByteBuffer row, long timestamp,
1163         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1164       Table table = null;
1165       try {
1166         table = getTable(tableName);
1167         Delete delete  = new Delete(getBytes(row), timestamp);
1168         addAttributes(delete, attributes);
1169         table.delete(delete);
1170       } catch (IOException e) {
1171         LOG.warn(e.getMessage(), e);
1172         throw new IOError(Throwables.getStackTraceAsString(e));
1173       } finally {
1174         closeTable(table);
1175       }
1176     }
1177 
1178     @Override
1179     public void createTable(ByteBuffer in_tableName,
1180         List<ColumnDescriptor> columnFamilies) throws IOError,
1181         IllegalArgument, AlreadyExists {
1182       TableName tableName = getTableName(in_tableName);
1183       try {
1184         if (getAdmin().tableExists(tableName)) {
1185           throw new AlreadyExists("table name already in use");
1186         }
1187         HTableDescriptor desc = new HTableDescriptor(tableName);
1188         for (ColumnDescriptor col : columnFamilies) {
1189           HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col);
1190           desc.addFamily(colDesc);
1191         }
1192         getAdmin().createTable(desc);
1193       } catch (IOException e) {
1194         LOG.warn(e.getMessage(), e);
1195         throw new IOError(Throwables.getStackTraceAsString(e));
1196       } catch (IllegalArgumentException e) {
1197         LOG.warn(e.getMessage(), e);
1198         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1199       }
1200     }
1201 
1202     private static TableName getTableName(ByteBuffer buffer) {
1203       return TableName.valueOf(getBytes(buffer));
1204     }
1205 
1206     @Override
1207     public void deleteTable(ByteBuffer in_tableName) throws IOError {
1208       TableName tableName = getTableName(in_tableName);
1209       if (LOG.isDebugEnabled()) {
1210         LOG.debug("deleteTable: table=" + tableName);
1211       }
1212       try {
1213         if (!getAdmin().tableExists(tableName)) {
1214           throw new IOException("table does not exist");
1215         }
1216         getAdmin().deleteTable(tableName);
1217       } catch (IOException e) {
1218         LOG.warn(e.getMessage(), e);
1219         throw new IOError(Throwables.getStackTraceAsString(e));
1220       }
1221     }
1222 
1223     @Override
1224     public void mutateRow(ByteBuffer tableName, ByteBuffer row,
1225         List<Mutation> mutations, Map<ByteBuffer, ByteBuffer> attributes)
1226         throws IOError, IllegalArgument {
1227       mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP,
1228                   attributes);
1229     }
1230 
1231     @Override
1232     public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
1233         List<Mutation> mutations, long timestamp,
1234         Map<ByteBuffer, ByteBuffer> attributes)
1235         throws IOError, IllegalArgument {
1236       Table table = null;
1237       try {
1238         table = getTable(tableName);
1239         Put put = new Put(getBytes(row), timestamp);
1240         addAttributes(put, attributes);
1241 
1242         Delete delete = new Delete(getBytes(row));
1243         addAttributes(delete, attributes);
1244         if (metrics != null) {
1245           metrics.incNumRowKeysInBatchMutate(mutations.size());
1246         }
1247 
1248         // I apologize for all this mess :)
1249         for (Mutation m : mutations) {
1250           byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1251           if (m.isDelete) {
1252             if (famAndQf.length == 1) {
1253               delete.deleteFamily(famAndQf[0], timestamp);
1254             } else {
1255               delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1256             }
1257             delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1258                 : Durability.SKIP_WAL);
1259           } else {
1260             if(famAndQf.length == 1) {
1261               LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1262                   + "over the whole column family.");
1263             } else {
1264               put.addImmutable(famAndQf[0], famAndQf[1],
1265                   m.value != null ? getBytes(m.value)
1266                       : HConstants.EMPTY_BYTE_ARRAY);
1267             }
1268             put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1269           }
1270         }
1271         if (!delete.isEmpty())
1272           table.delete(delete);
1273         if (!put.isEmpty())
1274           table.put(put);
1275       } catch (IOException e) {
1276         LOG.warn(e.getMessage(), e);
1277         throw new IOError(Throwables.getStackTraceAsString(e));
1278       } catch (IllegalArgumentException e) {
1279         LOG.warn(e.getMessage(), e);
1280         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1281       } finally{
1282         closeTable(table);
1283       }
1284     }
1285 
1286     @Override
1287     public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches,
1288         Map<ByteBuffer, ByteBuffer> attributes)
1289         throws IOError, IllegalArgument, TException {
1290       mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes);
1291     }
1292 
1293     @Override
1294     public void mutateRowsTs(
1295         ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp,
1296         Map<ByteBuffer, ByteBuffer> attributes)
1297         throws IOError, IllegalArgument, TException {
1298       List<Put> puts = new ArrayList<Put>();
1299       List<Delete> deletes = new ArrayList<Delete>();
1300 
1301       for (BatchMutation batch : rowBatches) {
1302         byte[] row = getBytes(batch.row);
1303         List<Mutation> mutations = batch.mutations;
1304         Delete delete = new Delete(row);
1305         addAttributes(delete, attributes);
1306         Put put = new Put(row, timestamp);
1307         addAttributes(put, attributes);
1308         for (Mutation m : mutations) {
1309           byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1310           if (m.isDelete) {
1311             // no qualifier, family only.
1312             if (famAndQf.length == 1) {
1313               delete.deleteFamily(famAndQf[0], timestamp);
1314             } else {
1315               delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1316             }
1317             delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1318                 : Durability.SKIP_WAL);
1319           } else {
1320             if (famAndQf.length == 1) {
1321               LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1322                   + "over the whole column family.");
1323             }
1324             if (famAndQf.length == 2) {
1325               put.addImmutable(famAndQf[0], famAndQf[1],
1326                   m.value != null ? getBytes(m.value)
1327                       : HConstants.EMPTY_BYTE_ARRAY);
1328             } else {
1329               throw new IllegalArgumentException("Invalid famAndQf provided.");
1330             }
1331             put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1332           }
1333         }
1334         if (!delete.isEmpty())
1335           deletes.add(delete);
1336         if (!put.isEmpty())
1337           puts.add(put);
1338       }
1339 
1340       Table table = null;
1341       try {
1342         table = getTable(tableName);
1343         if (!puts.isEmpty())
1344           table.put(puts);
1345         if (!deletes.isEmpty())
1346           table.delete(deletes);
1347 
1348       } catch (IOException e) {
1349         LOG.warn(e.getMessage(), e);
1350         throw new IOError(Throwables.getStackTraceAsString(e));
1351       } catch (IllegalArgumentException e) {
1352         LOG.warn(e.getMessage(), e);
1353         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1354       } finally{
1355         closeTable(table);
1356       }
1357     }
1358 
1359     @Deprecated
1360     @Override
1361     public long atomicIncrement(
1362         ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount)
1363             throws IOError, IllegalArgument, TException {
1364       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1365       if(famAndQf.length == 1) {
1366         return atomicIncrement(tableName, row, famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, amount);
1367       }
1368       return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
1369     }
1370 
1371     protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
1372         byte [] family, byte [] qualifier, long amount)
1373         throws IOError, IllegalArgument, TException {
1374       Table table = null;
1375       try {
1376         table = getTable(tableName);
1377         return table.incrementColumnValue(
1378             getBytes(row), family, qualifier, amount);
1379       } catch (IOException e) {
1380         LOG.warn(e.getMessage(), e);
1381         throw new IOError(Throwables.getStackTraceAsString(e));
1382       } finally {
1383         closeTable(table);
1384       }
1385     }
1386 
1387     @Override
1388     public void scannerClose(int id) throws IOError, IllegalArgument {
1389       LOG.debug("scannerClose: id=" + id);
1390       ResultScannerWrapper resultScannerWrapper = getScanner(id);
1391       if (resultScannerWrapper == null) {
1392         String message = "scanner ID is invalid";
1393         LOG.warn(message);
1394         throw new IllegalArgument("scanner ID is invalid");
1395       }
1396       resultScannerWrapper.getScanner().close();
1397       removeScanner(id);
1398     }
1399 
1400     @Override
1401     public List<TRowResult> scannerGetList(int id,int nbRows)
1402         throws IllegalArgument, IOError {
1403       LOG.debug("scannerGetList: id=" + id);
1404       ResultScannerWrapper resultScannerWrapper = getScanner(id);
1405       if (null == resultScannerWrapper) {
1406         String message = "scanner ID is invalid";
1407         LOG.warn(message);
1408         throw new IllegalArgument("scanner ID is invalid");
1409       }
1410 
1411       Result [] results = null;
1412       try {
1413         results = resultScannerWrapper.getScanner().next(nbRows);
1414         if (null == results) {
1415           return new ArrayList<TRowResult>();
1416         }
1417       } catch (IOException e) {
1418         LOG.warn(e.getMessage(), e);
1419         throw new IOError(Throwables.getStackTraceAsString(e));
1420       }
1421       return ThriftUtilities.rowResultFromHBase(results, resultScannerWrapper.isColumnSorted());
1422     }
1423 
1424     @Override
1425     public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
1426       return scannerGetList(id,1);
1427     }
1428 
1429     @Override
1430     public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
1431         Map<ByteBuffer, ByteBuffer> attributes)
1432         throws IOError {
1433       
1434       Table table = null;
1435       try {
1436         table = getTable(tableName);
1437         Scan scan = new Scan();
1438         addAttributes(scan, attributes);
1439         if (tScan.isSetStartRow()) {
1440           scan.setStartRow(tScan.getStartRow());
1441         }
1442         if (tScan.isSetStopRow()) {
1443           scan.setStopRow(tScan.getStopRow());
1444         }
1445         if (tScan.isSetTimestamp()) {
1446           scan.setTimeRange(0, tScan.getTimestamp());
1447         }
1448         if (tScan.isSetCaching()) {
1449           scan.setCaching(tScan.getCaching());
1450         }
1451         if (tScan.isSetBatchSize()) {
1452           scan.setBatch(tScan.getBatchSize());
1453         }
1454         if (tScan.isSetColumns() && tScan.getColumns().size() != 0) {
1455           for(ByteBuffer column : tScan.getColumns()) {
1456             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1457             if(famQf.length == 1) {
1458               scan.addFamily(famQf[0]);
1459             } else {
1460               scan.addColumn(famQf[0], famQf[1]);
1461             }
1462           }
1463         }
1464         if (tScan.isSetFilterString()) {
1465           ParseFilter parseFilter = new ParseFilter();
1466           scan.setFilter(
1467               parseFilter.parseFilterString(tScan.getFilterString()));
1468         }
1469         if (tScan.isSetReversed()) {
1470           scan.setReversed(tScan.isReversed());
1471         }
1472         return addScanner(table.getScanner(scan), tScan.sortColumns);
1473       } catch (IOException e) {
1474         LOG.warn(e.getMessage(), e);
1475         throw new IOError(Throwables.getStackTraceAsString(e));
1476       } finally{
1477         closeTable(table);
1478       }
1479     }
1480 
1481     @Override
1482     public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
1483         List<ByteBuffer> columns,
1484         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1485       
1486       Table table = null;
1487       try {
1488         table = getTable(tableName);
1489         Scan scan = new Scan(getBytes(startRow));
1490         addAttributes(scan, attributes);
1491         if(columns != null && columns.size() != 0) {
1492           for(ByteBuffer column : columns) {
1493             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1494             if(famQf.length == 1) {
1495               scan.addFamily(famQf[0]);
1496             } else {
1497               scan.addColumn(famQf[0], famQf[1]);
1498             }
1499           }
1500         }
1501         return addScanner(table.getScanner(scan), false);
1502       } catch (IOException e) {
1503         LOG.warn(e.getMessage(), e);
1504         throw new IOError(Throwables.getStackTraceAsString(e));
1505       } finally{
1506         closeTable(table);
1507       }
1508     }
1509 
1510     @Override
1511     public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow,
1512         ByteBuffer stopRow, List<ByteBuffer> columns,
1513         Map<ByteBuffer, ByteBuffer> attributes)
1514         throws IOError, TException {
1515       
1516       Table table = null;
1517       try {
1518         table = getTable(tableName);
1519         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1520         addAttributes(scan, attributes);
1521         if(columns != null && columns.size() != 0) {
1522           for(ByteBuffer column : columns) {
1523             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1524             if(famQf.length == 1) {
1525               scan.addFamily(famQf[0]);
1526             } else {
1527               scan.addColumn(famQf[0], famQf[1]);
1528             }
1529           }
1530         }
1531         return addScanner(table.getScanner(scan), false);
1532       } catch (IOException e) {
1533         LOG.warn(e.getMessage(), e);
1534         throw new IOError(Throwables.getStackTraceAsString(e));
1535       } finally{
1536         closeTable(table);
1537       }
1538     }
1539 
1540     @Override
1541     public int scannerOpenWithPrefix(ByteBuffer tableName,
1542                                      ByteBuffer startAndPrefix,
1543                                      List<ByteBuffer> columns,
1544         Map<ByteBuffer, ByteBuffer> attributes)
1545         throws IOError, TException {
1546       
1547       Table table = null;
1548       try {
1549         table = getTable(tableName);
1550         Scan scan = new Scan(getBytes(startAndPrefix));
1551         addAttributes(scan, attributes);
1552         Filter f = new WhileMatchFilter(
1553             new PrefixFilter(getBytes(startAndPrefix)));
1554         scan.setFilter(f);
1555         if (columns != null && columns.size() != 0) {
1556           for(ByteBuffer column : columns) {
1557             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1558             if(famQf.length == 1) {
1559               scan.addFamily(famQf[0]);
1560             } else {
1561               scan.addColumn(famQf[0], famQf[1]);
1562             }
1563           }
1564         }
1565         return addScanner(table.getScanner(scan), false);
1566       } catch (IOException e) {
1567         LOG.warn(e.getMessage(), e);
1568         throw new IOError(Throwables.getStackTraceAsString(e));
1569       } finally{
1570         closeTable(table);
1571       }
1572     }
1573 
1574     @Override
1575     public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
1576         List<ByteBuffer> columns, long timestamp,
1577         Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
1578       
1579       Table table = null;
1580       try {
1581         table = getTable(tableName);
1582         Scan scan = new Scan(getBytes(startRow));
1583         addAttributes(scan, attributes);
1584         scan.setTimeRange(0, timestamp);
1585         if (columns != null && columns.size() != 0) {
1586           for (ByteBuffer column : columns) {
1587             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1588             if(famQf.length == 1) {
1589               scan.addFamily(famQf[0]);
1590             } else {
1591               scan.addColumn(famQf[0], famQf[1]);
1592             }
1593           }
1594         }
1595         return addScanner(table.getScanner(scan), false);
1596       } catch (IOException e) {
1597         LOG.warn(e.getMessage(), e);
1598         throw new IOError(Throwables.getStackTraceAsString(e));
1599       } finally{
1600         closeTable(table);
1601       }
1602     }
1603 
1604     @Override
1605     public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow,
1606         ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
1607         Map<ByteBuffer, ByteBuffer> attributes)
1608         throws IOError, TException {
1609       
1610       Table table = null;
1611       try {
1612         table = getTable(tableName);
1613         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1614         addAttributes(scan, attributes);
1615         scan.setTimeRange(0, timestamp);
1616         if (columns != null && columns.size() != 0) {
1617           for (ByteBuffer column : columns) {
1618             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1619             if(famQf.length == 1) {
1620               scan.addFamily(famQf[0]);
1621             } else {
1622               scan.addColumn(famQf[0], famQf[1]);
1623             }
1624           }
1625         }
1626         scan.setTimeRange(0, timestamp);
1627         return addScanner(table.getScanner(scan), false);
1628       } catch (IOException e) {
1629         LOG.warn(e.getMessage(), e);
1630         throw new IOError(Throwables.getStackTraceAsString(e));
1631       } finally{
1632         closeTable(table);
1633       }
1634     }
1635 
1636     @Override
1637     public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
1638         ByteBuffer tableName) throws IOError, TException {
1639       
1640       Table table = null;
1641       try {
1642         TreeMap<ByteBuffer, ColumnDescriptor> columns =
1643           new TreeMap<ByteBuffer, ColumnDescriptor>();
1644 
1645         table = getTable(tableName);
1646         HTableDescriptor desc = table.getTableDescriptor();
1647 
1648         for (HColumnDescriptor e : desc.getFamilies()) {
1649           ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
1650           columns.put(col.name, col);
1651         }
1652         return columns;
1653       } catch (IOException e) {
1654         LOG.warn(e.getMessage(), e);
1655         throw new IOError(Throwables.getStackTraceAsString(e));
1656       } finally {
1657         closeTable(table);
1658       }
1659     }
1660 
1661     @Deprecated
1662     @Override
1663     public List<TCell> getRowOrBefore(ByteBuffer tableName, ByteBuffer row,
1664         ByteBuffer family) throws IOError {
1665       try {
1666         Result result = getRowOrBefore(getBytes(tableName), getBytes(row), getBytes(family));
1667         return ThriftUtilities.cellFromHBase(result.rawCells());
1668       } catch (IOException e) {
1669         LOG.warn(e.getMessage(), e);
1670         throw new IOError(Throwables.getStackTraceAsString(e));
1671       }
1672     }
1673 
1674     @Override
1675     public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
1676       try {
1677         byte[] row = getBytes(searchRow);
1678         Result startRowResult =
1679             getRowOrBefore(TableName.META_TABLE_NAME.getName(), row, HConstants.CATALOG_FAMILY);
1680 
1681         if (startRowResult == null) {
1682           throw new IOException("Cannot find row in "+ TableName.META_TABLE_NAME+", row="
1683                                 + Bytes.toStringBinary(row));
1684         }
1685 
1686         // find region start and end keys
1687         HRegionInfo regionInfo = HRegionInfo.getHRegionInfo(startRowResult);
1688         if (regionInfo == null) {
1689           throw new IOException("HRegionInfo REGIONINFO was null or " +
1690                                 " empty in Meta for row="
1691                                 + Bytes.toStringBinary(row));
1692         }
1693         TRegionInfo region = new TRegionInfo();
1694         region.setStartKey(regionInfo.getStartKey());
1695         region.setEndKey(regionInfo.getEndKey());
1696         region.id = regionInfo.getRegionId();
1697         region.setName(regionInfo.getRegionName());
1698         region.version = regionInfo.getVersion();
1699 
1700         // find region assignment to server
1701         ServerName serverName = HRegionInfo.getServerName(startRowResult);
1702         if (serverName != null) {
1703           region.setServerName(Bytes.toBytes(serverName.getHostname()));
1704           region.port = serverName.getPort();
1705         }
1706         return region;
1707       } catch (IOException e) {
1708         LOG.warn(e.getMessage(), e);
1709         throw new IOError(Throwables.getStackTraceAsString(e));
1710       }
1711     }
1712 
1713     private void closeTable(Table table) throws IOError
1714     {
1715       try{
1716         if(table != null){
1717           table.close();
1718         }
1719       } catch (IOException e){
1720         LOG.error(e.getMessage(), e);
1721         throw new IOError(Throwables.getStackTraceAsString(e));
1722       }
1723     }
1724     
1725     private Result getRowOrBefore(byte[] tableName, byte[] row, byte[] family) throws IOException {
1726       Scan scan = new Scan(row);
1727       scan.setReversed(true);
1728       scan.addFamily(family);
1729       scan.setStartRow(row);
1730       Table table = getTable(tableName);      
1731       try (ResultScanner scanner = table.getScanner(scan)) {
1732         return scanner.next();
1733       } finally{
1734         if(table != null){
1735           table.close();
1736         }
1737       }
1738     }
1739 
1740     private void initMetrics(ThriftMetrics metrics) {
1741       this.metrics = metrics;
1742     }
1743 
1744     @Override
1745     public void increment(TIncrement tincrement) throws IOError, TException {
1746 
1747       if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) {
1748         throw new TException("Must supply a table and a row key; can't increment");
1749       }
1750 
1751       if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1752         this.coalescer.queueIncrement(tincrement);
1753         return;
1754       }
1755 
1756       Table table = null;
1757       try {
1758         table = getTable(tincrement.getTable());
1759         Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
1760         table.increment(inc);
1761       } catch (IOException e) {
1762         LOG.warn(e.getMessage(), e);
1763         throw new IOError(Throwables.getStackTraceAsString(e));
1764       } finally{
1765         closeTable(table);
1766       }
1767     }
1768 
1769     @Override
1770     public void incrementRows(List<TIncrement> tincrements) throws IOError, TException {
1771       if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1772         this.coalescer.queueIncrements(tincrements);
1773         return;
1774       }
1775       for (TIncrement tinc : tincrements) {
1776         increment(tinc);
1777       }
1778     }
1779 
1780     @Override
1781     public List<TCell> append(TAppend tappend) throws IOError, TException {
1782       if (tappend.getRow().length == 0 || tappend.getTable().length == 0) {
1783         throw new TException("Must supply a table and a row key; can't append");
1784       }
1785 
1786       Table table = null;
1787       try {
1788         table = getTable(tappend.getTable());
1789         Append append = ThriftUtilities.appendFromThrift(tappend);
1790         Result result = table.append(append);
1791         return ThriftUtilities.cellFromHBase(result.rawCells());
1792       } catch (IOException e) {
1793         LOG.warn(e.getMessage(), e);
1794         throw new IOError(Throwables.getStackTraceAsString(e));
1795       } finally{
1796           closeTable(table);
1797       }
1798     }
1799 
1800     @Override
1801     public boolean checkAndPut(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1802         ByteBuffer value, Mutation mput, Map<ByteBuffer, ByteBuffer> attributes) throws IOError,
1803         IllegalArgument, TException {
1804       Put put;
1805       try {
1806         put = new Put(getBytes(row), HConstants.LATEST_TIMESTAMP);
1807         addAttributes(put, attributes);
1808 
1809         byte[][] famAndQf = KeyValue.parseColumn(getBytes(mput.column));
1810 
1811         put.addImmutable(famAndQf[0], famAndQf[1], mput.value != null ? getBytes(mput.value)
1812             : HConstants.EMPTY_BYTE_ARRAY);
1813 
1814         put.setDurability(mput.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1815       } catch (IllegalArgumentException e) {
1816         LOG.warn(e.getMessage(), e);
1817         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1818       }
1819 
1820       Table table = null;
1821       try {
1822         table = getTable(tableName);
1823         byte[][] famAndQf = KeyValue.parseColumn(getBytes(column));
1824         return table.checkAndPut(getBytes(row), famAndQf[0], famAndQf[1],
1825           value != null ? getBytes(value) : HConstants.EMPTY_BYTE_ARRAY, put);
1826       } catch (IOException e) {
1827         LOG.warn(e.getMessage(), e);
1828         throw new IOError(Throwables.getStackTraceAsString(e));
1829       } catch (IllegalArgumentException e) {
1830         LOG.warn(e.getMessage(), e);
1831         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1832       } finally {
1833           closeTable(table);
1834       }
1835     }
1836   }
1837 
1838 
1839 
1840   /**
1841    * Adds all the attributes into the Operation object
1842    */
1843   private static void addAttributes(OperationWithAttributes op,
1844     Map<ByteBuffer, ByteBuffer> attributes) {
1845     if (attributes == null || attributes.size() == 0) {
1846       return;
1847     }
1848     for (Map.Entry<ByteBuffer, ByteBuffer> entry : attributes.entrySet()) {
1849       String name = Bytes.toStringBinary(getBytes(entry.getKey()));
1850       byte[] value =  getBytes(entry.getValue());
1851       op.setAttribute(name, value);
1852     }
1853   }
1854 
1855   public static void registerFilters(Configuration conf) {
1856     String[] filters = conf.getStrings("hbase.thrift.filters");
1857     if(filters != null) {
1858       for(String filterClass: filters) {
1859         String[] filterPart = filterClass.split(":");
1860         if(filterPart.length != 2) {
1861           LOG.warn("Invalid filter specification " + filterClass + " - skipping");
1862         } else {
1863           ParseFilter.registerFilter(filterPart[0], filterPart[1]);
1864         }
1865       }
1866     }
1867   }
1868 }