View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.thrift;
20  
21  import static org.apache.hadoop.hbase.util.Bytes.getBytes;
22  
23  import java.io.IOException;
24  import java.net.InetAddress;
25  import java.net.InetSocketAddress;
26  import java.net.UnknownHostException;
27  import java.nio.ByteBuffer;
28  import java.security.PrivilegedAction;
29  import java.util.ArrayList;
30  import java.util.Arrays;
31  import java.util.Collections;
32  import java.util.HashMap;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.TreeMap;
36  import java.util.concurrent.BlockingQueue;
37  import java.util.concurrent.ExecutorService;
38  import java.util.concurrent.LinkedBlockingQueue;
39  import java.util.concurrent.ThreadPoolExecutor;
40  import java.util.concurrent.TimeUnit;
41  
42  import javax.security.auth.callback.Callback;
43  import javax.security.auth.callback.UnsupportedCallbackException;
44  import javax.security.sasl.AuthorizeCallback;
45  import javax.security.sasl.Sasl;
46  import javax.security.sasl.SaslServer;
47  
48  import org.apache.commons.cli.CommandLine;
49  import org.apache.commons.cli.Option;
50  import org.apache.commons.cli.OptionGroup;
51  import org.apache.commons.logging.Log;
52  import org.apache.commons.logging.LogFactory;
53  import org.apache.hadoop.conf.Configuration;
54  import org.apache.hadoop.hbase.HBaseConfiguration;
55  import org.apache.hadoop.hbase.HColumnDescriptor;
56  import org.apache.hadoop.hbase.HConstants;
57  import org.apache.hadoop.hbase.HRegionInfo;
58  import org.apache.hadoop.hbase.HTableDescriptor;
59  import org.apache.hadoop.hbase.KeyValue;
60  import org.apache.hadoop.hbase.ServerName;
61  import org.apache.hadoop.hbase.TableName;
62  import org.apache.hadoop.hbase.TableNotFoundException;
63  import org.apache.hadoop.hbase.classification.InterfaceAudience;
64  import org.apache.hadoop.hbase.client.Append;
65  import org.apache.hadoop.hbase.client.Delete;
66  import org.apache.hadoop.hbase.client.Durability;
67  import org.apache.hadoop.hbase.client.Get;
68  import org.apache.hadoop.hbase.client.HBaseAdmin;
69  import org.apache.hadoop.hbase.client.HTable;
70  import org.apache.hadoop.hbase.client.Increment;
71  import org.apache.hadoop.hbase.client.OperationWithAttributes;
72  import org.apache.hadoop.hbase.client.Put;
73  import org.apache.hadoop.hbase.client.Result;
74  import org.apache.hadoop.hbase.client.ResultScanner;
75  import org.apache.hadoop.hbase.client.Scan;
76  import org.apache.hadoop.hbase.filter.Filter;
77  import org.apache.hadoop.hbase.filter.ParseFilter;
78  import org.apache.hadoop.hbase.filter.PrefixFilter;
79  import org.apache.hadoop.hbase.filter.WhileMatchFilter;
80  import org.apache.hadoop.hbase.security.SecurityUtil;
81  import org.apache.hadoop.hbase.security.UserProvider;
82  import org.apache.hadoop.hbase.thrift.CallQueue.Call;
83  import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
84  import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
85  import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
86  import org.apache.hadoop.hbase.thrift.generated.Hbase;
87  import org.apache.hadoop.hbase.thrift.generated.IOError;
88  import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
89  import org.apache.hadoop.hbase.thrift.generated.Mutation;
90  import org.apache.hadoop.hbase.thrift.generated.TAppend;
91  import org.apache.hadoop.hbase.thrift.generated.TCell;
92  import org.apache.hadoop.hbase.thrift.generated.TIncrement;
93  import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
94  import org.apache.hadoop.hbase.thrift.generated.TRowResult;
95  import org.apache.hadoop.hbase.thrift.generated.TScan;
96  import org.apache.hadoop.hbase.util.Bytes;
97  import org.apache.hadoop.hbase.util.ConnectionCache;
98  import org.apache.hadoop.hbase.util.Strings;
99  import org.apache.hadoop.net.DNS;
100 import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
101 import org.apache.hadoop.security.UserGroupInformation;
102 import org.apache.hadoop.security.authorize.ProxyUsers;
103 import org.apache.thrift.TException;
104 import org.apache.thrift.TProcessor;
105 import org.apache.thrift.protocol.TBinaryProtocol;
106 import org.apache.thrift.protocol.TCompactProtocol;
107 import org.apache.thrift.protocol.TProtocol;
108 import org.apache.thrift.protocol.TProtocolFactory;
109 import org.apache.thrift.server.THsHaServer;
110 import org.apache.thrift.server.TNonblockingServer;
111 import org.apache.thrift.server.TServer;
112 import org.apache.thrift.server.TServlet;
113 import org.apache.thrift.server.TThreadedSelectorServer;
114 import org.apache.thrift.transport.TFramedTransport;
115 import org.apache.thrift.transport.TNonblockingServerSocket;
116 import org.apache.thrift.transport.TNonblockingServerTransport;
117 import org.apache.thrift.transport.TSaslServerTransport;
118 import org.apache.thrift.transport.TServerSocket;
119 import org.apache.thrift.transport.TServerTransport;
120 import org.apache.thrift.transport.TTransportFactory;
121 import org.mortbay.jetty.Connector;
122 import org.mortbay.jetty.Server;
123 import org.mortbay.jetty.nio.SelectChannelConnector;
124 import org.mortbay.jetty.security.SslSelectChannelConnector;
125 import org.mortbay.jetty.servlet.Context;
126 import org.mortbay.jetty.servlet.ServletHolder;
127 import org.mortbay.thread.QueuedThreadPool;
128 
129 import com.google.common.base.Joiner;
130 import com.google.common.util.concurrent.ThreadFactoryBuilder;
131 
132 /**
133  * ThriftServerRunner - this class starts up a Thrift server which implements
134  * the Hbase API specified in the Hbase.thrift IDL file.
135  */
136 @InterfaceAudience.Private
137 @SuppressWarnings("deprecation")
138 public class ThriftServerRunner implements Runnable {
139 
140   private static final Log LOG = LogFactory.getLog(ThriftServerRunner.class);
141 
142   static final String SERVER_TYPE_CONF_KEY =
143       "hbase.regionserver.thrift.server.type";
144 
145   static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress";
146   static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
147   static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
148   static final String MAX_FRAME_SIZE_CONF_KEY = "hbase.regionserver.thrift.framed.max_frame_size_in_mb";
149   static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
150   static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
151   static final String USE_HTTP_CONF_KEY = "hbase.regionserver.thrift.http";
152   static final String HTTP_MIN_THREADS = "hbase.thrift.http_threads.min";
153   static final String HTTP_MAX_THREADS = "hbase.thrift.http_threads.max";
154 
155   static final String THRIFT_SSL_ENABLED = "hbase.thrift.ssl.enabled";
156   static final String THRIFT_SSL_KEYSTORE_STORE = "hbase.thrift.ssl.keystore.store";
157   static final String THRIFT_SSL_KEYSTORE_PASSWORD = "hbase.thrift.ssl.keystore.password";
158   static final String THRIFT_SSL_KEYSTORE_KEYPASSWORD = "hbase.thrift.ssl.keystore.keypassword";
159 
160 
161   /**
162    * Thrift quality of protection configuration key. Valid values can be:
163    * auth-conf: authentication, integrity and confidentiality checking
164    * auth-int: authentication and integrity checking
165    * auth: authentication only
166    *
167    * This is used to authenticate the callers and support impersonation.
168    * The thrift server and the HBase cluster must run in secure mode.
169    */
170   static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
171 
172   private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
173   public static final int DEFAULT_LISTEN_PORT = 9090;
174   public static final int HREGION_VERSION = 1;
175   static final String THRIFT_SUPPORT_PROXYUSER = "hbase.thrift.support.proxyuser";
176   private final int listenPort;
177 
178   private Configuration conf;
179   volatile TServer tserver;
180   volatile Server httpServer;
181   private final Hbase.Iface handler;
182   private final ThriftMetrics metrics;
183   private final HBaseHandler hbaseHandler;
184   private final UserGroupInformation realUser;
185 
186   private final String qop;
187   private String host;
188 
189   private final boolean securityEnabled;
190   private final boolean doAsEnabled;
191 
192   /** An enum of server implementation selections */
193   enum ImplType {
194     HS_HA("hsha", true, THsHaServer.class, true),
195     NONBLOCKING("nonblocking", true, TNonblockingServer.class, true),
196     THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true),
197     THREADED_SELECTOR(
198         "threadedselector", true, TThreadedSelectorServer.class, true);
199 
200     public static final ImplType DEFAULT = THREAD_POOL;
201 
202     final String option;
203     final boolean isAlwaysFramed;
204     final Class<? extends TServer> serverClass;
205     final boolean canSpecifyBindIP;
206 
207     ImplType(String option, boolean isAlwaysFramed,
208         Class<? extends TServer> serverClass, boolean canSpecifyBindIP) {
209       this.option = option;
210       this.isAlwaysFramed = isAlwaysFramed;
211       this.serverClass = serverClass;
212       this.canSpecifyBindIP = canSpecifyBindIP;
213     }
214 
215     /**
216      * @return <code>-option</code> so we can get the list of options from
217      *         {@link #values()}
218      */
219     @Override
220     public String toString() {
221       return "-" + option;
222     }
223 
224     String getDescription() {
225       StringBuilder sb = new StringBuilder("Use the " +
226           serverClass.getSimpleName());
227       if (isAlwaysFramed) {
228         sb.append(" This implies the framed transport.");
229       }
230       if (this == DEFAULT) {
231         sb.append("This is the default.");
232       }
233       return sb.toString();
234     }
235 
236     static OptionGroup createOptionGroup() {
237       OptionGroup group = new OptionGroup();
238       for (ImplType t : values()) {
239         group.addOption(new Option(t.option, t.getDescription()));
240       }
241       return group;
242     }
243 
244     static ImplType getServerImpl(Configuration conf) {
245       String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option);
246       for (ImplType t : values()) {
247         if (confType.equals(t.option)) {
248           return t;
249         }
250       }
251       throw new AssertionError("Unknown server ImplType.option:" + confType);
252     }
253 
254     static void setServerImpl(CommandLine cmd, Configuration conf) {
255       ImplType chosenType = null;
256       int numChosen = 0;
257       for (ImplType t : values()) {
258         if (cmd.hasOption(t.option)) {
259           chosenType = t;
260           ++numChosen;
261         }
262       }
263       if (numChosen < 1) {
264         LOG.info("Using default thrift server type");
265         chosenType = DEFAULT;
266       } else if (numChosen > 1) {
267         throw new AssertionError("Exactly one option out of " +
268           Arrays.toString(values()) + " has to be specified");
269       }
270       LOG.info("Using thrift server type " + chosenType.option);
271       conf.set(SERVER_TYPE_CONF_KEY, chosenType.option);
272     }
273 
274     public String simpleClassName() {
275       return serverClass.getSimpleName();
276     }
277 
278     public static List<String> serversThatCannotSpecifyBindIP() {
279       List<String> l = new ArrayList<String>();
280       for (ImplType t : values()) {
281         if (!t.canSpecifyBindIP) {
282           l.add(t.simpleClassName());
283         }
284       }
285       return l;
286     }
287 
288   }
289 
290   public ThriftServerRunner(Configuration conf) throws IOException {
291     UserProvider userProvider = UserProvider.instantiate(conf);
292     // login the server principal (if using secure Hadoop)
293     securityEnabled = userProvider.isHadoopSecurityEnabled()
294       && userProvider.isHBaseSecurityEnabled();
295     if (securityEnabled) {
296       host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
297         conf.get("hbase.thrift.dns.interface", "default"),
298         conf.get("hbase.thrift.dns.nameserver", "default")));
299       userProvider.login("hbase.thrift.keytab.file",
300         "hbase.thrift.kerberos.principal", host);
301     }
302     this.conf = HBaseConfiguration.create(conf);
303     this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
304     this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
305     this.hbaseHandler = new HBaseHandler(conf, userProvider);
306     this.hbaseHandler.initMetrics(metrics);
307     this.handler = HbaseHandlerMetricsProxy.newInstance(
308       hbaseHandler, metrics, conf);
309     this.realUser = userProvider.getCurrent().getUGI();
310     qop = conf.get(THRIFT_QOP_KEY);
311     doAsEnabled = conf.getBoolean(THRIFT_SUPPORT_PROXYUSER, false);
312     if (qop != null) {
313       if (!qop.equals("auth") && !qop.equals("auth-int")
314           && !qop.equals("auth-conf")) {
315         throw new IOException("Invalid " + THRIFT_QOP_KEY + ": " + qop
316           + ", it must be 'auth', 'auth-int', or 'auth-conf'");
317       }
318       if (!securityEnabled) {
319         throw new IOException("Thrift server must"
320           + " run in secure mode to support authentication");
321       }
322     }
323   }
324 
325   /*
326    * Runs the Thrift server
327    */
328   @Override
329   public void run() {
330     realUser.doAs(new PrivilegedAction<Object>() {
331       @Override
332       public Object run() {
333         try {
334           if (conf.getBoolean(USE_HTTP_CONF_KEY, false)) {
335             setupHTTPServer();
336             httpServer.start();
337             httpServer.join();
338           } else {
339             setupServer();
340             tserver.serve();
341           }
342         } catch (Exception e) {
343           LOG.fatal("Cannot run ThriftServer", e);
344           // Crash the process if the ThriftServer is not running
345           System.exit(-1);
346         }
347         return null;
348       }
349     });
350 
351   }
352 
353   public void shutdown() {
354     if (tserver != null) {
355       tserver.stop();
356       tserver = null;
357     }
358     if (httpServer != null) {
359       try {
360         httpServer.stop();
361         httpServer = null;
362       } catch (Exception e) {
363         LOG.error("Problem encountered in shutting down HTTP server " + e.getCause());
364       }
365       httpServer = null;
366     }
367   }
368 
369   private void setupHTTPServer() throws IOException {
370     TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
371     TProcessor processor = new Hbase.Processor<Hbase.Iface>(handler);
372     TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, realUser,
373         conf, hbaseHandler, securityEnabled, doAsEnabled);
374 
375     httpServer = new Server();
376     // Context handler
377     Context context = new Context(httpServer, "/", Context.SESSIONS);
378     context.setContextPath("/");
379     String httpPath = "/*";
380     httpServer.setHandler(context);
381     context.addServlet(new ServletHolder(thriftHttpServlet), httpPath);
382 
383     // set up Jetty and run the embedded server
384     Connector connector = new SelectChannelConnector();
385     if(conf.getBoolean(THRIFT_SSL_ENABLED, false)) {
386       SslSelectChannelConnector sslConnector = new SslSelectChannelConnector();
387       String keystore = conf.get(THRIFT_SSL_KEYSTORE_STORE);
388       String password = HBaseConfiguration.getPassword(conf,
389           THRIFT_SSL_KEYSTORE_PASSWORD, null);
390       String keyPassword = HBaseConfiguration.getPassword(conf,
391           THRIFT_SSL_KEYSTORE_KEYPASSWORD, password);
392       sslConnector.setKeystore(keystore);
393       sslConnector.setPassword(password);
394       sslConnector.setKeyPassword(keyPassword);
395       connector = sslConnector;
396     }
397     String host = getBindAddress(conf).getHostAddress();
398     connector.setPort(listenPort);
399     connector.setHost(host);
400     httpServer.addConnector(connector);
401 
402     if (doAsEnabled) {
403       ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
404     }
405 
406     // Set the default max thread number to 100 to limit
407     // the number of concurrent requests so that Thrfit HTTP server doesn't OOM easily.
408     // Jetty set the default max thread number to 250, if we don't set it.
409     //
410     // Our default min thread number 2 is the same as that used by Jetty.
411     int minThreads = conf.getInt(HTTP_MIN_THREADS, 2);
412     int maxThreads = conf.getInt(HTTP_MAX_THREADS, 100);
413     QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads);
414     threadPool.setMinThreads(minThreads);
415     httpServer.setThreadPool(threadPool);
416 
417     httpServer.setSendServerVersion(false);
418     httpServer.setSendDateHeader(false);
419     httpServer.setStopAtShutdown(true);
420 
421     LOG.info("Starting Thrift HTTP Server on " + Integer.toString(listenPort));
422   }
423 
424   /**
425    * Setting up the thrift TServer
426    */
427   private void setupServer() throws Exception {
428     // Construct correct ProtocolFactory
429     TProtocolFactory protocolFactory;
430     if (conf.getBoolean(COMPACT_CONF_KEY, false)) {
431       LOG.debug("Using compact protocol");
432       protocolFactory = new TCompactProtocol.Factory();
433     } else {
434       LOG.debug("Using binary protocol");
435       protocolFactory = new TBinaryProtocol.Factory();
436     }
437 
438     final TProcessor p = new Hbase.Processor<Hbase.Iface>(handler);
439     ImplType implType = ImplType.getServerImpl(conf);
440     TProcessor processor = p;
441 
442     // Construct correct TransportFactory
443     TTransportFactory transportFactory;
444     if (conf.getBoolean(FRAMED_CONF_KEY, false) || implType.isAlwaysFramed) {
445       if (qop != null) {
446         throw new RuntimeException("Thrift server authentication"
447           + " doesn't work with framed transport yet");
448       }
449       transportFactory = new TFramedTransport.Factory(
450           conf.getInt(MAX_FRAME_SIZE_CONF_KEY, 2)  * 1024 * 1024);
451       LOG.debug("Using framed transport");
452     } else if (qop == null) {
453       transportFactory = new TTransportFactory();
454     } else {
455       // Extract the name from the principal
456       String name = SecurityUtil.getUserFromPrincipal(
457         conf.get("hbase.thrift.kerberos.principal"));
458       Map<String, String> saslProperties = new HashMap<String, String>();
459       saslProperties.put(Sasl.QOP, qop);
460       TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
461       saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
462         new SaslGssCallbackHandler() {
463           @Override
464           public void handle(Callback[] callbacks)
465               throws UnsupportedCallbackException {
466             AuthorizeCallback ac = null;
467             for (Callback callback : callbacks) {
468               if (callback instanceof AuthorizeCallback) {
469                 ac = (AuthorizeCallback) callback;
470               } else {
471                 throw new UnsupportedCallbackException(callback,
472                     "Unrecognized SASL GSSAPI Callback");
473               }
474             }
475             if (ac != null) {
476               String authid = ac.getAuthenticationID();
477               String authzid = ac.getAuthorizationID();
478               if (!authid.equals(authzid)) {
479                 ac.setAuthorized(false);
480               } else {
481                 ac.setAuthorized(true);
482                 String userName = SecurityUtil.getUserFromPrincipal(authzid);
483                 LOG.info("Effective user: " + userName);
484                 ac.setAuthorizedID(userName);
485               }
486             }
487           }
488         });
489       transportFactory = saslFactory;
490 
491       // Create a processor wrapper, to get the caller
492       processor = new TProcessor() {
493         @Override
494         public boolean process(TProtocol inProt,
495             TProtocol outProt) throws TException {
496           TSaslServerTransport saslServerTransport =
497             (TSaslServerTransport)inProt.getTransport();
498           SaslServer saslServer = saslServerTransport.getSaslServer();
499           String principal = saslServer.getAuthorizationID();
500           hbaseHandler.setEffectiveUser(principal);
501           return p.process(inProt, outProt);
502         }
503       };
504     }
505 
506     if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
507       LOG.error("Server types " + Joiner.on(", ").join(
508           ImplType.serversThatCannotSpecifyBindIP()) + " don't support IP " +
509           "address binding at the moment. See " +
510           "https://issues.apache.org/jira/browse/HBASE-2155 for details.");
511       throw new RuntimeException(
512           "-" + BIND_CONF_KEY + " not supported with " + implType);
513     }
514 
515     if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING ||
516         implType == ImplType.THREADED_SELECTOR) {
517 
518       InetAddress listenAddress = getBindAddress(conf);
519       TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(
520           new InetSocketAddress(listenAddress, listenPort));
521 
522       if (implType == ImplType.NONBLOCKING) {
523         TNonblockingServer.Args serverArgs =
524             new TNonblockingServer.Args(serverTransport);
525         serverArgs.processor(processor)
526                   .transportFactory(transportFactory)
527                   .protocolFactory(protocolFactory);
528         tserver = new TNonblockingServer(serverArgs);
529       } else if (implType == ImplType.HS_HA) {
530         THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
531         CallQueue callQueue =
532             new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
533         ExecutorService executorService = createExecutor(
534             callQueue, serverArgs.getWorkerThreads());
535         serverArgs.executorService(executorService)
536                   .processor(processor)
537                   .transportFactory(transportFactory)
538                   .protocolFactory(protocolFactory);
539         tserver = new THsHaServer(serverArgs);
540       } else { // THREADED_SELECTOR
541         TThreadedSelectorServer.Args serverArgs =
542             new HThreadedSelectorServerArgs(serverTransport, conf);
543         CallQueue callQueue =
544             new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
545         ExecutorService executorService = createExecutor(
546             callQueue, serverArgs.getWorkerThreads());
547         serverArgs.executorService(executorService)
548                   .processor(processor)
549                   .transportFactory(transportFactory)
550                   .protocolFactory(protocolFactory);
551         tserver = new TThreadedSelectorServer(serverArgs);
552       }
553       LOG.info("starting HBase " + implType.simpleClassName() +
554           " server on " + Integer.toString(listenPort));
555     } else if (implType == ImplType.THREAD_POOL) {
556       // Thread pool server. Get the IP address to bind to.
557       InetAddress listenAddress = getBindAddress(conf);
558 
559       TServerTransport serverTransport = new TServerSocket(
560           new InetSocketAddress(listenAddress, listenPort));
561 
562       TBoundedThreadPoolServer.Args serverArgs =
563           new TBoundedThreadPoolServer.Args(serverTransport, conf);
564       serverArgs.processor(processor)
565                 .transportFactory(transportFactory)
566                 .protocolFactory(protocolFactory);
567       LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
568           + listenAddress + ":" + Integer.toString(listenPort)
569           + "; " + serverArgs);
570       TBoundedThreadPoolServer tserver =
571           new TBoundedThreadPoolServer(serverArgs, metrics);
572       this.tserver = tserver;
573     } else {
574       throw new AssertionError("Unsupported Thrift server implementation: " +
575           implType.simpleClassName());
576     }
577 
578     // A sanity check that we instantiated the right type of server.
579     if (tserver.getClass() != implType.serverClass) {
580       throw new AssertionError("Expected to create Thrift server class " +
581           implType.serverClass.getName() + " but got " +
582           tserver.getClass().getName());
583     }
584 
585 
586 
587     registerFilters(conf);
588   }
589 
590   ExecutorService createExecutor(BlockingQueue<Runnable> callQueue,
591                                  int workerThreads) {
592     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
593     tfb.setDaemon(true);
594     tfb.setNameFormat("thrift-worker-%d");
595     return new ThreadPoolExecutor(workerThreads, workerThreads,
596             Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
597   }
598 
599   private InetAddress getBindAddress(Configuration conf)
600       throws UnknownHostException {
601     String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
602     return InetAddress.getByName(bindAddressStr);
603   }
604 
605   protected static class ResultScannerWrapper {
606 
607     private final ResultScanner scanner;
608     private final boolean sortColumns;
609     public ResultScannerWrapper(ResultScanner resultScanner,
610                                 boolean sortResultColumns) {
611       scanner = resultScanner;
612       sortColumns = sortResultColumns;
613    }
614 
615     public ResultScanner getScanner() {
616       return scanner;
617     }
618 
619     public boolean isColumnSorted() {
620       return sortColumns;
621     }
622   }
623 
624   /**
625    * The HBaseHandler is a glue object that connects Thrift RPC calls to the
626    * HBase client API primarily defined in the HBaseAdmin and HTable objects.
627    */
628   public static class HBaseHandler implements Hbase.Iface {
629     protected Configuration conf;
630     protected final Log LOG = LogFactory.getLog(this.getClass().getName());
631 
632     // nextScannerId and scannerMap are used to manage scanner state
633     protected int nextScannerId = 0;
634     protected HashMap<Integer, ResultScannerWrapper> scannerMap = null;
635     private ThriftMetrics metrics = null;
636 
637     private final ConnectionCache connectionCache;
638 
639     private static ThreadLocal<Map<String, HTable>> threadLocalTables =
640         new ThreadLocal<Map<String, HTable>>() {
641       @Override
642       protected Map<String, HTable> initialValue() {
643         return new TreeMap<String, HTable>();
644       }
645     };
646 
647     IncrementCoalescer coalescer = null;
648 
649     static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
650     static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
651 
652     /**
653      * Returns a list of all the column families for a given htable.
654      *
655      * @param table
656      * @throws IOException
657      */
658     byte[][] getAllColumns(HTable table) throws IOException {
659       HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies();
660       byte[][] columns = new byte[cds.length][];
661       for (int i = 0; i < cds.length; i++) {
662         columns[i] = Bytes.add(cds[i].getName(),
663             KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
664       }
665       return columns;
666     }
667 
668     /**
669      * Creates and returns an HTable instance from a given table name.
670      *
671      * @param tableName
672      *          name of table
673      * @return HTable object
674      * @throws IOException
675      * @throws IOError
676      */
677     public HTable getTable(final byte[] tableName) throws
678         IOException {
679       String table = Bytes.toString(tableName);
680       Map<String, HTable> tables = threadLocalTables.get();
681       if (!tables.containsKey(table)) {
682         tables.put(table, (HTable)connectionCache.getTable(table));
683       }
684       return tables.get(table);
685     }
686 
687     public HTable getTable(final ByteBuffer tableName) throws IOException {
688       return getTable(getBytes(tableName));
689     }
690 
691     /**
692      * Assigns a unique ID to the scanner and adds the mapping to an internal
693      * hash-map.
694      *
695      * @param scanner
696      * @return integer scanner id
697      */
698     protected synchronized int addScanner(ResultScanner scanner,boolean sortColumns) {
699       int id = nextScannerId++;
700       ResultScannerWrapper resultScannerWrapper = new ResultScannerWrapper(scanner, sortColumns);
701       scannerMap.put(id, resultScannerWrapper);
702       return id;
703     }
704 
705     /**
706      * Returns the scanner associated with the specified ID.
707      *
708      * @param id
709      * @return a Scanner, or null if ID was invalid.
710      */
711     protected synchronized ResultScannerWrapper getScanner(int id) {
712       return scannerMap.get(id);
713     }
714 
715     /**
716      * Removes the scanner associated with the specified ID from the internal
717      * id->scanner hash-map.
718      *
719      * @param id
720      * @return a Scanner, or null if ID was invalid.
721      */
722     protected synchronized ResultScannerWrapper removeScanner(int id) {
723       return scannerMap.remove(id);
724     }
725 
726     protected HBaseHandler(final Configuration c,
727         final UserProvider userProvider) throws IOException {
728       this.conf = c;
729       scannerMap = new HashMap<Integer, ResultScannerWrapper>();
730       this.coalescer = new IncrementCoalescer(this);
731 
732       int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
733       int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
734       connectionCache = new ConnectionCache(
735         conf, userProvider, cleanInterval, maxIdleTime);
736     }
737 
738     /**
739      * Obtain HBaseAdmin. Creates the instance if it is not already created.
740      */
741     private HBaseAdmin getHBaseAdmin() throws IOException {
742       return connectionCache.getAdmin();
743     }
744 
745     void setEffectiveUser(String effectiveUser) {
746       connectionCache.setEffectiveUser(effectiveUser);
747     }
748 
749     @Override
750     public void enableTable(ByteBuffer tableName) throws IOError {
751       try{
752         getHBaseAdmin().enableTable(getBytes(tableName));
753       } catch (IOException e) {
754         LOG.warn(e.getMessage(), e);
755         throw new IOError(e.getMessage());
756       }
757     }
758 
759     @Override
760     public void disableTable(ByteBuffer tableName) throws IOError{
761       try{
762         getHBaseAdmin().disableTable(getBytes(tableName));
763       } catch (IOException e) {
764         LOG.warn(e.getMessage(), e);
765         throw new IOError(e.getMessage());
766       }
767     }
768 
769     @Override
770     public boolean isTableEnabled(ByteBuffer tableName) throws IOError {
771       try {
772         return HTable.isTableEnabled(this.conf, getBytes(tableName));
773       } catch (IOException e) {
774         LOG.warn(e.getMessage(), e);
775         throw new IOError(e.getMessage());
776       }
777     }
778 
779     @Override
780     public void compact(ByteBuffer tableNameOrRegionName) throws IOError {
781       try{
782         getHBaseAdmin().compact(getBytes(tableNameOrRegionName));
783       } catch (IOException e) {
784         LOG.warn(e.getMessage(), e);
785         throw new IOError(e.getMessage());
786       }
787     }
788 
789     @Override
790     public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError {
791       try{
792         getHBaseAdmin().majorCompact(getBytes(tableNameOrRegionName));
793       } catch (IOException e) {
794         LOG.warn(e.getMessage(), e);
795         throw new IOError(e.getMessage());
796       }
797     }
798 
799     @Override
800     public List<ByteBuffer> getTableNames() throws IOError {
801       try {
802         TableName[] tableNames = this.getHBaseAdmin().listTableNames();
803         ArrayList<ByteBuffer> list = new ArrayList<ByteBuffer>(tableNames.length);
804         for (int i = 0; i < tableNames.length; i++) {
805           list.add(ByteBuffer.wrap(tableNames[i].getName()));
806         }
807         return list;
808       } catch (IOException e) {
809         LOG.warn(e.getMessage(), e);
810         throw new IOError(e.getMessage());
811       }
812     }
813 
814     /**
815      * @return the list of regions in the given table, or an empty list if the table does not exist
816      */
817     @Override
818     public List<TRegionInfo> getTableRegions(ByteBuffer tableName)
819     throws IOError {
820       try {
821         HTable table;
822         try {
823           table = getTable(tableName);
824         } catch (TableNotFoundException ex) {
825           return new ArrayList<TRegionInfo>();
826         }
827         Map<HRegionInfo, ServerName> regionLocations =
828             table.getRegionLocations();
829         List<TRegionInfo> results = new ArrayList<TRegionInfo>();
830         for (Map.Entry<HRegionInfo, ServerName> entry :
831             regionLocations.entrySet()) {
832           HRegionInfo info = entry.getKey();
833           ServerName serverName = entry.getValue();
834           TRegionInfo region = new TRegionInfo();
835           region.serverName = ByteBuffer.wrap(
836               Bytes.toBytes(serverName.getHostname()));
837           region.port = serverName.getPort();
838           region.startKey = ByteBuffer.wrap(info.getStartKey());
839           region.endKey = ByteBuffer.wrap(info.getEndKey());
840           region.id = info.getRegionId();
841           region.name = ByteBuffer.wrap(info.getRegionName());
842           region.version = HREGION_VERSION; // HRegion now not versioned, PB encoding used
843           results.add(region);
844         }
845         return results;
846       } catch (TableNotFoundException e) {
847         // Return empty list for non-existing table
848         return Collections.emptyList();
849       } catch (IOException e){
850         LOG.warn(e.getMessage(), e);
851         throw new IOError(e.getMessage());
852       }
853     }
854 
855     @Deprecated
856     @Override
857     public List<TCell> get(
858         ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
859         Map<ByteBuffer, ByteBuffer> attributes)
860         throws IOError {
861       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
862       if (famAndQf.length == 1) {
863         return get(tableName, row, famAndQf[0], null, attributes);
864       }
865       if (famAndQf.length == 2) {
866         return get(tableName, row, famAndQf[0], famAndQf[1], attributes);
867       }
868       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
869     }
870 
871     /**
872      * Note: this internal interface is slightly different from public APIs in regard to handling
873      * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
874      * we respect qual == null as a request for the entire column family. The caller (
875      * {@link #get(ByteBuffer, ByteBuffer, ByteBuffer, Map)}) interface IS consistent in that the
876      * column is parse like normal.
877      */
878     protected List<TCell> get(ByteBuffer tableName,
879                               ByteBuffer row,
880                               byte[] family,
881                               byte[] qualifier,
882                               Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
883       try {
884         HTable table = getTable(tableName);
885         Get get = new Get(getBytes(row));
886         addAttributes(get, attributes);
887         if (qualifier == null) {
888           get.addFamily(family);
889         } else {
890           get.addColumn(family, qualifier);
891         }
892         Result result = table.get(get);
893         return ThriftUtilities.cellFromHBase(result.rawCells());
894       } catch (IOException e) {
895         LOG.warn(e.getMessage(), e);
896         throw new IOError(e.getMessage());
897       }
898     }
899 
900     @Deprecated
901     @Override
902     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
903         int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
904       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
905       if(famAndQf.length == 1) {
906         return getVer(tableName, row, famAndQf[0], null, numVersions, attributes);
907       }
908       if (famAndQf.length == 2) {
909         return getVer(tableName, row, famAndQf[0], famAndQf[1], numVersions, attributes);
910       }
911       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
912 
913     }
914 
915     /**
916      * Note: this public interface is slightly different from public Java APIs in regard to
917      * handling of the qualifier. Here we differ from the public Java API in that null != byte[0].
918      * Rather, we respect qual == null as a request for the entire column family. If you want to
919      * access the entire column family, use
920      * {@link #getVer(ByteBuffer, ByteBuffer, ByteBuffer, int, Map)} with a {@code column} value
921      * that lacks a {@code ':'}.
922      */
923     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family,
924         byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
925       try {
926         HTable table = getTable(tableName);
927         Get get = new Get(getBytes(row));
928         addAttributes(get, attributes);
929         if (null == qualifier) {
930           get.addFamily(family);
931         } else {
932           get.addColumn(family, qualifier);
933         }
934         get.setMaxVersions(numVersions);
935         Result result = table.get(get);
936         return ThriftUtilities.cellFromHBase(result.rawCells());
937       } catch (IOException e) {
938         LOG.warn(e.getMessage(), e);
939         throw new IOError(e.getMessage());
940       }
941     }
942 
943     @Deprecated
944     @Override
945     public List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
946         long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
947       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
948       if (famAndQf.length == 1) {
949         return getVerTs(tableName, row, famAndQf[0], null, timestamp, numVersions, attributes);
950       }
951       if (famAndQf.length == 2) {
952         return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, numVersions,
953           attributes);
954       }
955       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
956     }
957 
958     /**
959      * Note: this internal interface is slightly different from public APIs in regard to handling
960      * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
961      * we respect qual == null as a request for the entire column family. The caller (
962      * {@link #getVerTs(ByteBuffer, ByteBuffer, ByteBuffer, long, int, Map)}) interface IS
963      * consistent in that the column is parse like normal.
964      */
965     protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
966         byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
967         throws IOError {
968       try {
969         HTable table = getTable(tableName);
970         Get get = new Get(getBytes(row));
971         addAttributes(get, attributes);
972         if (null == qualifier) {
973           get.addFamily(family);
974         } else {
975           get.addColumn(family, qualifier);
976         }
977         get.setTimeRange(0, timestamp);
978         get.setMaxVersions(numVersions);
979         Result result = table.get(get);
980         return ThriftUtilities.cellFromHBase(result.rawCells());
981       } catch (IOException e) {
982         LOG.warn(e.getMessage(), e);
983         throw new IOError(e.getMessage());
984       }
985     }
986 
987     @Override
988     public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row,
989         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
990       return getRowWithColumnsTs(tableName, row, null,
991                                  HConstants.LATEST_TIMESTAMP,
992                                  attributes);
993     }
994 
995     @Override
996     public List<TRowResult> getRowWithColumns(ByteBuffer tableName,
997                                               ByteBuffer row,
998         List<ByteBuffer> columns,
999         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1000       return getRowWithColumnsTs(tableName, row, columns,
1001                                  HConstants.LATEST_TIMESTAMP,
1002                                  attributes);
1003     }
1004 
1005     @Override
1006     public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row,
1007         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1008       return getRowWithColumnsTs(tableName, row, null,
1009                                  timestamp, attributes);
1010     }
1011 
1012     @Override
1013     public List<TRowResult> getRowWithColumnsTs(
1014         ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
1015         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1016       try {
1017         HTable table = getTable(tableName);
1018         if (columns == null) {
1019           Get get = new Get(getBytes(row));
1020           addAttributes(get, attributes);
1021           get.setTimeRange(0, timestamp);
1022           Result result = table.get(get);
1023           return ThriftUtilities.rowResultFromHBase(result);
1024         }
1025         Get get = new Get(getBytes(row));
1026         addAttributes(get, attributes);
1027         for(ByteBuffer column : columns) {
1028           byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1029           if (famAndQf.length == 1) {
1030               get.addFamily(famAndQf[0]);
1031           } else {
1032               get.addColumn(famAndQf[0], famAndQf[1]);
1033           }
1034         }
1035         get.setTimeRange(0, timestamp);
1036         Result result = table.get(get);
1037         return ThriftUtilities.rowResultFromHBase(result);
1038       } catch (IOException e) {
1039         LOG.warn(e.getMessage(), e);
1040         throw new IOError(e.getMessage());
1041       }
1042     }
1043 
1044     @Override
1045     public List<TRowResult> getRows(ByteBuffer tableName,
1046                                     List<ByteBuffer> rows,
1047         Map<ByteBuffer, ByteBuffer> attributes)
1048         throws IOError {
1049       return getRowsWithColumnsTs(tableName, rows, null,
1050                                   HConstants.LATEST_TIMESTAMP,
1051                                   attributes);
1052     }
1053 
1054     @Override
1055     public List<TRowResult> getRowsWithColumns(ByteBuffer tableName,
1056                                                List<ByteBuffer> rows,
1057         List<ByteBuffer> columns,
1058         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1059       return getRowsWithColumnsTs(tableName, rows, columns,
1060                                   HConstants.LATEST_TIMESTAMP,
1061                                   attributes);
1062     }
1063 
1064     @Override
1065     public List<TRowResult> getRowsTs(ByteBuffer tableName,
1066                                       List<ByteBuffer> rows,
1067         long timestamp,
1068         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1069       return getRowsWithColumnsTs(tableName, rows, null,
1070                                   timestamp, attributes);
1071     }
1072 
1073     @Override
1074     public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName,
1075                                                  List<ByteBuffer> rows,
1076         List<ByteBuffer> columns, long timestamp,
1077         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1078       try {
1079         List<Get> gets = new ArrayList<Get>(rows.size());
1080         HTable table = getTable(tableName);
1081         if (metrics != null) {
1082           metrics.incNumRowKeysInBatchGet(rows.size());
1083         }
1084         for (ByteBuffer row : rows) {
1085           Get get = new Get(getBytes(row));
1086           addAttributes(get, attributes);
1087           if (columns != null) {
1088 
1089             for(ByteBuffer column : columns) {
1090               byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1091               if (famAndQf.length == 1) {
1092                 get.addFamily(famAndQf[0]);
1093               } else {
1094                 get.addColumn(famAndQf[0], famAndQf[1]);
1095               }
1096             }
1097           }
1098           get.setTimeRange(0, timestamp);
1099           gets.add(get);
1100         }
1101         Result[] result = table.get(gets);
1102         return ThriftUtilities.rowResultFromHBase(result);
1103       } catch (IOException e) {
1104         LOG.warn(e.getMessage(), e);
1105         throw new IOError(e.getMessage());
1106       }
1107     }
1108 
1109     @Override
1110     public void deleteAll(
1111         ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1112         Map<ByteBuffer, ByteBuffer> attributes)
1113         throws IOError {
1114       deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP,
1115                   attributes);
1116     }
1117 
1118     @Override
1119     public void deleteAllTs(ByteBuffer tableName,
1120                             ByteBuffer row,
1121                             ByteBuffer column,
1122         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1123       try {
1124         HTable table = getTable(tableName);
1125         Delete delete  = new Delete(getBytes(row));
1126         addAttributes(delete, attributes);
1127         byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1128         if (famAndQf.length == 1) {
1129           delete.deleteFamily(famAndQf[0], timestamp);
1130         } else {
1131           delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1132         }
1133         table.delete(delete);
1134 
1135       } catch (IOException e) {
1136         LOG.warn(e.getMessage(), e);
1137         throw new IOError(e.getMessage());
1138       }
1139     }
1140 
1141     @Override
1142     public void deleteAllRow(
1143         ByteBuffer tableName, ByteBuffer row,
1144         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1145       deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, attributes);
1146     }
1147 
1148     @Override
1149     public void deleteAllRowTs(
1150         ByteBuffer tableName, ByteBuffer row, long timestamp,
1151         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1152       try {
1153         HTable table = getTable(tableName);
1154         Delete delete  = new Delete(getBytes(row), timestamp);
1155         addAttributes(delete, attributes);
1156         table.delete(delete);
1157       } catch (IOException e) {
1158         LOG.warn(e.getMessage(), e);
1159         throw new IOError(e.getMessage());
1160       }
1161     }
1162 
1163     @Override
1164     public void createTable(ByteBuffer in_tableName,
1165         List<ColumnDescriptor> columnFamilies) throws IOError,
1166         IllegalArgument, AlreadyExists {
1167       byte [] tableName = getBytes(in_tableName);
1168       try {
1169         if (getHBaseAdmin().tableExists(tableName)) {
1170           throw new AlreadyExists("table name already in use");
1171         }
1172         HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
1173         for (ColumnDescriptor col : columnFamilies) {
1174           HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col);
1175           desc.addFamily(colDesc);
1176         }
1177         getHBaseAdmin().createTable(desc);
1178       } catch (IOException e) {
1179         LOG.warn(e.getMessage(), e);
1180         throw new IOError(e.getMessage());
1181       } catch (IllegalArgumentException e) {
1182         LOG.warn(e.getMessage(), e);
1183         throw new IllegalArgument(e.getMessage());
1184       }
1185     }
1186 
1187     @Override
1188     public void deleteTable(ByteBuffer in_tableName) throws IOError {
1189       byte [] tableName = getBytes(in_tableName);
1190       if (LOG.isDebugEnabled()) {
1191         LOG.debug("deleteTable: table=" + Bytes.toString(tableName));
1192       }
1193       try {
1194         if (!getHBaseAdmin().tableExists(tableName)) {
1195           throw new IOException("table does not exist");
1196         }
1197         getHBaseAdmin().deleteTable(tableName);
1198       } catch (IOException e) {
1199         LOG.warn(e.getMessage(), e);
1200         throw new IOError(e.getMessage());
1201       }
1202     }
1203 
1204     @Override
1205     public void mutateRow(ByteBuffer tableName, ByteBuffer row,
1206         List<Mutation> mutations, Map<ByteBuffer, ByteBuffer> attributes)
1207         throws IOError, IllegalArgument {
1208       mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP,
1209                   attributes);
1210     }
1211 
1212     @Override
1213     public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
1214         List<Mutation> mutations, long timestamp,
1215         Map<ByteBuffer, ByteBuffer> attributes)
1216         throws IOError, IllegalArgument {
1217       HTable table = null;
1218       try {
1219         table = getTable(tableName);
1220         Put put = new Put(getBytes(row), timestamp);
1221         addAttributes(put, attributes);
1222 
1223         Delete delete = new Delete(getBytes(row));
1224         addAttributes(delete, attributes);
1225         if (metrics != null) {
1226           metrics.incNumRowKeysInBatchMutate(mutations.size());
1227         }
1228 
1229         // I apologize for all this mess :)
1230         for (Mutation m : mutations) {
1231           byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1232           if (m.isDelete) {
1233             if (famAndQf.length == 1) {
1234               delete.deleteFamily(famAndQf[0], timestamp);
1235             } else {
1236               delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1237             }
1238             delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1239                 : Durability.SKIP_WAL);
1240           } else {
1241             if(famAndQf.length == 1) {
1242               LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1243                   + "over the whole column family.");
1244             } else {
1245               put.addImmutable(famAndQf[0], famAndQf[1],
1246                   m.value != null ? getBytes(m.value)
1247                       : HConstants.EMPTY_BYTE_ARRAY);
1248             }
1249             put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1250           }
1251         }
1252         if (!delete.isEmpty())
1253           table.delete(delete);
1254         if (!put.isEmpty())
1255           table.put(put);
1256       } catch (IOException e) {
1257         LOG.warn(e.getMessage(), e);
1258         throw new IOError(e.getMessage());
1259       } catch (IllegalArgumentException e) {
1260         LOG.warn(e.getMessage(), e);
1261         throw new IllegalArgument(e.getMessage());
1262       }
1263     }
1264 
1265     @Override
1266     public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches,
1267         Map<ByteBuffer, ByteBuffer> attributes)
1268         throws IOError, IllegalArgument, TException {
1269       mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes);
1270     }
1271 
1272     @Override
1273     public void mutateRowsTs(
1274         ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp,
1275         Map<ByteBuffer, ByteBuffer> attributes)
1276         throws IOError, IllegalArgument, TException {
1277       List<Put> puts = new ArrayList<Put>();
1278       List<Delete> deletes = new ArrayList<Delete>();
1279 
1280       for (BatchMutation batch : rowBatches) {
1281         byte[] row = getBytes(batch.row);
1282         List<Mutation> mutations = batch.mutations;
1283         Delete delete = new Delete(row);
1284         addAttributes(delete, attributes);
1285         Put put = new Put(row, timestamp);
1286         addAttributes(put, attributes);
1287         for (Mutation m : mutations) {
1288           byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1289           if (m.isDelete) {
1290             // no qualifier, family only.
1291             if (famAndQf.length == 1) {
1292               delete.deleteFamily(famAndQf[0], timestamp);
1293             } else {
1294               delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1295             }
1296             delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1297                 : Durability.SKIP_WAL);
1298           } else {
1299             if (famAndQf.length == 1) {
1300               LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1301                   + "over the whole column family.");
1302             }
1303             if (famAndQf.length == 2) {
1304               put.addImmutable(famAndQf[0], famAndQf[1],
1305                   m.value != null ? getBytes(m.value)
1306                       : HConstants.EMPTY_BYTE_ARRAY);
1307             } else {
1308               throw new IllegalArgumentException("Invalid famAndQf provided.");
1309             }
1310             put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1311           }
1312         }
1313         if (!delete.isEmpty())
1314           deletes.add(delete);
1315         if (!put.isEmpty())
1316           puts.add(put);
1317       }
1318 
1319       HTable table = null;
1320       try {
1321         table = getTable(tableName);
1322         if (!puts.isEmpty())
1323           table.put(puts);
1324         if (!deletes.isEmpty())
1325           table.delete(deletes);
1326 
1327       } catch (IOException e) {
1328         LOG.warn(e.getMessage(), e);
1329         throw new IOError(e.getMessage());
1330       } catch (IllegalArgumentException e) {
1331         LOG.warn(e.getMessage(), e);
1332         throw new IllegalArgument(e.getMessage());
1333       }
1334     }
1335 
1336     @Deprecated
1337     @Override
1338     public long atomicIncrement(
1339         ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount)
1340             throws IOError, IllegalArgument, TException {
1341       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1342       if(famAndQf.length == 1) {
1343         return atomicIncrement(tableName, row, famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, amount);
1344       }
1345       return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
1346     }
1347 
1348     protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
1349         byte [] family, byte [] qualifier, long amount)
1350         throws IOError, IllegalArgument, TException {
1351       HTable table;
1352       try {
1353         table = getTable(tableName);
1354         return table.incrementColumnValue(
1355             getBytes(row), family, qualifier, amount);
1356       } catch (IOException e) {
1357         LOG.warn(e.getMessage(), e);
1358         throw new IOError(e.getMessage());
1359       }
1360     }
1361 
1362     @Override
1363     public void scannerClose(int id) throws IOError, IllegalArgument {
1364       LOG.debug("scannerClose: id=" + id);
1365       ResultScannerWrapper resultScannerWrapper = getScanner(id);
1366       if (resultScannerWrapper == null) {
1367         String message = "scanner ID is invalid";
1368         LOG.warn(message);
1369         throw new IllegalArgument("scanner ID is invalid");
1370       }
1371       resultScannerWrapper.getScanner().close();
1372       removeScanner(id);
1373     }
1374 
1375     @Override
1376     public List<TRowResult> scannerGetList(int id,int nbRows)
1377         throws IllegalArgument, IOError {
1378       LOG.debug("scannerGetList: id=" + id);
1379       ResultScannerWrapper resultScannerWrapper = getScanner(id);
1380       if (null == resultScannerWrapper) {
1381         String message = "scanner ID is invalid";
1382         LOG.warn(message);
1383         throw new IllegalArgument("scanner ID is invalid");
1384       }
1385 
1386       Result [] results = null;
1387       try {
1388         results = resultScannerWrapper.getScanner().next(nbRows);
1389         if (null == results) {
1390           return new ArrayList<TRowResult>();
1391         }
1392       } catch (IOException e) {
1393         LOG.warn(e.getMessage(), e);
1394         throw new IOError(e.getMessage());
1395       }
1396       return ThriftUtilities.rowResultFromHBase(results, resultScannerWrapper.isColumnSorted());
1397     }
1398 
1399     @Override
1400     public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
1401       return scannerGetList(id,1);
1402     }
1403 
1404     @Override
1405     public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
1406         Map<ByteBuffer, ByteBuffer> attributes)
1407         throws IOError {
1408       try {
1409         HTable table = getTable(tableName);
1410         Scan scan = new Scan();
1411         addAttributes(scan, attributes);
1412         if (tScan.isSetStartRow()) {
1413           scan.setStartRow(tScan.getStartRow());
1414         }
1415         if (tScan.isSetStopRow()) {
1416           scan.setStopRow(tScan.getStopRow());
1417         }
1418         if (tScan.isSetTimestamp()) {
1419           scan.setTimeRange(0, tScan.getTimestamp());
1420         }
1421         if (tScan.isSetCaching()) {
1422           scan.setCaching(tScan.getCaching());
1423         }
1424         if (tScan.isSetBatchSize()) {
1425           scan.setBatch(tScan.getBatchSize());
1426         }
1427         if (tScan.isSetColumns() && tScan.getColumns().size() != 0) {
1428           for(ByteBuffer column : tScan.getColumns()) {
1429             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1430             if(famQf.length == 1) {
1431               scan.addFamily(famQf[0]);
1432             } else {
1433               scan.addColumn(famQf[0], famQf[1]);
1434             }
1435           }
1436         }
1437         if (tScan.isSetFilterString()) {
1438           ParseFilter parseFilter = new ParseFilter();
1439           scan.setFilter(
1440               parseFilter.parseFilterString(tScan.getFilterString()));
1441         }
1442         if (tScan.isSetReversed()) {
1443           scan.setReversed(tScan.isReversed());
1444         }
1445         return addScanner(table.getScanner(scan), tScan.sortColumns);
1446       } catch (IOException e) {
1447         LOG.warn(e.getMessage(), e);
1448         throw new IOError(e.getMessage());
1449       }
1450     }
1451 
1452     @Override
1453     public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
1454         List<ByteBuffer> columns,
1455         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1456       try {
1457         HTable table = getTable(tableName);
1458         Scan scan = new Scan(getBytes(startRow));
1459         addAttributes(scan, attributes);
1460         if(columns != null && columns.size() != 0) {
1461           for(ByteBuffer column : columns) {
1462             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1463             if(famQf.length == 1) {
1464               scan.addFamily(famQf[0]);
1465             } else {
1466               scan.addColumn(famQf[0], famQf[1]);
1467             }
1468           }
1469         }
1470         return addScanner(table.getScanner(scan), false);
1471       } catch (IOException e) {
1472         LOG.warn(e.getMessage(), e);
1473         throw new IOError(e.getMessage());
1474       }
1475     }
1476 
1477     @Override
1478     public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow,
1479         ByteBuffer stopRow, List<ByteBuffer> columns,
1480         Map<ByteBuffer, ByteBuffer> attributes)
1481         throws IOError, TException {
1482       try {
1483         HTable table = getTable(tableName);
1484         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1485         addAttributes(scan, attributes);
1486         if(columns != null && columns.size() != 0) {
1487           for(ByteBuffer column : columns) {
1488             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1489             if(famQf.length == 1) {
1490               scan.addFamily(famQf[0]);
1491             } else {
1492               scan.addColumn(famQf[0], famQf[1]);
1493             }
1494           }
1495         }
1496         return addScanner(table.getScanner(scan), false);
1497       } catch (IOException e) {
1498         LOG.warn(e.getMessage(), e);
1499         throw new IOError(e.getMessage());
1500       }
1501     }
1502 
1503     @Override
1504     public int scannerOpenWithPrefix(ByteBuffer tableName,
1505                                      ByteBuffer startAndPrefix,
1506                                      List<ByteBuffer> columns,
1507         Map<ByteBuffer, ByteBuffer> attributes)
1508         throws IOError, TException {
1509       try {
1510         HTable table = getTable(tableName);
1511         Scan scan = new Scan(getBytes(startAndPrefix));
1512         addAttributes(scan, attributes);
1513         Filter f = new WhileMatchFilter(
1514             new PrefixFilter(getBytes(startAndPrefix)));
1515         scan.setFilter(f);
1516         if (columns != null && columns.size() != 0) {
1517           for(ByteBuffer column : columns) {
1518             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1519             if(famQf.length == 1) {
1520               scan.addFamily(famQf[0]);
1521             } else {
1522               scan.addColumn(famQf[0], famQf[1]);
1523             }
1524           }
1525         }
1526         return addScanner(table.getScanner(scan), false);
1527       } catch (IOException e) {
1528         LOG.warn(e.getMessage(), e);
1529         throw new IOError(e.getMessage());
1530       }
1531     }
1532 
1533     @Override
1534     public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
1535         List<ByteBuffer> columns, long timestamp,
1536         Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
1537       try {
1538         HTable table = getTable(tableName);
1539         Scan scan = new Scan(getBytes(startRow));
1540         addAttributes(scan, attributes);
1541         scan.setTimeRange(0, timestamp);
1542         if (columns != null && columns.size() != 0) {
1543           for (ByteBuffer column : columns) {
1544             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1545             if(famQf.length == 1) {
1546               scan.addFamily(famQf[0]);
1547             } else {
1548               scan.addColumn(famQf[0], famQf[1]);
1549             }
1550           }
1551         }
1552         return addScanner(table.getScanner(scan), false);
1553       } catch (IOException e) {
1554         LOG.warn(e.getMessage(), e);
1555         throw new IOError(e.getMessage());
1556       }
1557     }
1558 
1559     @Override
1560     public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow,
1561         ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
1562         Map<ByteBuffer, ByteBuffer> attributes)
1563         throws IOError, TException {
1564       try {
1565         HTable table = getTable(tableName);
1566         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1567         addAttributes(scan, attributes);
1568         scan.setTimeRange(0, timestamp);
1569         if (columns != null && columns.size() != 0) {
1570           for (ByteBuffer column : columns) {
1571             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1572             if(famQf.length == 1) {
1573               scan.addFamily(famQf[0]);
1574             } else {
1575               scan.addColumn(famQf[0], famQf[1]);
1576             }
1577           }
1578         }
1579         scan.setTimeRange(0, timestamp);
1580         return addScanner(table.getScanner(scan), false);
1581       } catch (IOException e) {
1582         LOG.warn(e.getMessage(), e);
1583         throw new IOError(e.getMessage());
1584       }
1585     }
1586 
1587     @Override
1588     public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
1589         ByteBuffer tableName) throws IOError, TException {
1590       try {
1591         TreeMap<ByteBuffer, ColumnDescriptor> columns =
1592           new TreeMap<ByteBuffer, ColumnDescriptor>();
1593 
1594         HTable table = getTable(tableName);
1595         HTableDescriptor desc = table.getTableDescriptor();
1596 
1597         for (HColumnDescriptor e : desc.getFamilies()) {
1598           ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
1599           columns.put(col.name, col);
1600         }
1601         return columns;
1602       } catch (IOException e) {
1603         LOG.warn(e.getMessage(), e);
1604         throw new IOError(e.getMessage());
1605       }
1606     }
1607 
1608     @Deprecated
1609     @Override
1610     public List<TCell> getRowOrBefore(ByteBuffer tableName, ByteBuffer row,
1611         ByteBuffer family) throws IOError {
1612       try {
1613         HTable table = getTable(getBytes(tableName));
1614         Result result = table.getRowOrBefore(getBytes(row), getBytes(family));
1615         return ThriftUtilities.cellFromHBase(result.rawCells());
1616       } catch (IOException e) {
1617         LOG.warn(e.getMessage(), e);
1618         throw new IOError(e.getMessage());
1619       }
1620     }
1621 
1622     @Override
1623     public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
1624       try {
1625         HTable table = getTable(TableName.META_TABLE_NAME.getName());
1626         byte[] row = getBytes(searchRow);
1627         Result startRowResult = table.getRowOrBefore(
1628           row, HConstants.CATALOG_FAMILY);
1629 
1630         if (startRowResult == null) {
1631           throw new IOException("Cannot find row in "+ TableName.META_TABLE_NAME+", row="
1632                                 + Bytes.toStringBinary(row));
1633         }
1634 
1635         // find region start and end keys
1636         HRegionInfo regionInfo = HRegionInfo.getHRegionInfo(startRowResult);
1637         if (regionInfo == null) {
1638           throw new IOException("HRegionInfo REGIONINFO was null or " +
1639                                 " empty in Meta for row="
1640                                 + Bytes.toStringBinary(row));
1641         }
1642         TRegionInfo region = new TRegionInfo();
1643         region.setStartKey(regionInfo.getStartKey());
1644         region.setEndKey(regionInfo.getEndKey());
1645         region.id = regionInfo.getRegionId();
1646         region.setName(regionInfo.getRegionName());
1647         region.version = HREGION_VERSION; // version not used anymore, PB encoding used.
1648 
1649         // find region assignment to server
1650         ServerName serverName = HRegionInfo.getServerName(startRowResult);
1651         if (serverName != null) {
1652           region.setServerName(Bytes.toBytes(serverName.getHostname()));
1653           region.port = serverName.getPort();
1654         }
1655         return region;
1656       } catch (IOException e) {
1657         LOG.warn(e.getMessage(), e);
1658         throw new IOError(e.getMessage());
1659       }
1660     }
1661 
1662     private void initMetrics(ThriftMetrics metrics) {
1663       this.metrics = metrics;
1664     }
1665 
1666     @Override
1667     public void increment(TIncrement tincrement) throws IOError, TException {
1668 
1669       if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) {
1670         throw new TException("Must supply a table and a row key; can't increment");
1671       }
1672 
1673       if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1674         this.coalescer.queueIncrement(tincrement);
1675         return;
1676       }
1677 
1678       try {
1679         HTable table = getTable(tincrement.getTable());
1680         Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
1681         table.increment(inc);
1682       } catch (IOException e) {
1683         LOG.warn(e.getMessage(), e);
1684         throw new IOError(e.getMessage());
1685       }
1686     }
1687 
1688     @Override
1689     public void incrementRows(List<TIncrement> tincrements) throws IOError, TException {
1690       if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1691         this.coalescer.queueIncrements(tincrements);
1692         return;
1693       }
1694       for (TIncrement tinc : tincrements) {
1695         increment(tinc);
1696       }
1697     }
1698 
1699     @Override
1700     public List<TCell> append(TAppend tappend) throws IOError, TException {
1701       if (tappend.getRow().length == 0 || tappend.getTable().length == 0) {
1702         throw new TException("Must supply a table and a row key; can't append");
1703       }
1704 
1705       try {
1706         HTable table = getTable(tappend.getTable());
1707         Append append = ThriftUtilities.appendFromThrift(tappend);
1708         Result result = table.append(append);
1709         return ThriftUtilities.cellFromHBase(result.rawCells());
1710       } catch (IOException e) {
1711         LOG.warn(e.getMessage(), e);
1712         throw new IOError(e.getMessage());
1713       }
1714     }
1715 
1716     @Override
1717     public boolean checkAndPut(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1718         ByteBuffer value, Mutation mput, Map<ByteBuffer, ByteBuffer> attributes) throws IOError,
1719         IllegalArgument, TException {
1720       Put put;
1721       try {
1722         put = new Put(getBytes(row), HConstants.LATEST_TIMESTAMP);
1723         addAttributes(put, attributes);
1724 
1725         byte[][] famAndQf = KeyValue.parseColumn(getBytes(mput.column));
1726 
1727         put.addImmutable(famAndQf[0], famAndQf[1], mput.value != null ? getBytes(mput.value)
1728             : HConstants.EMPTY_BYTE_ARRAY);
1729 
1730         put.setDurability(mput.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1731       } catch (IllegalArgumentException e) {
1732         LOG.warn(e.getMessage(), e);
1733         throw new IllegalArgument(e.getMessage());
1734       }
1735 
1736       HTable table = null;
1737       try {
1738         table = getTable(tableName);
1739         byte[][] famAndQf = KeyValue.parseColumn(getBytes(column));
1740         return table.checkAndPut(getBytes(row), famAndQf[0], famAndQf[1],
1741           value != null ? getBytes(value) : HConstants.EMPTY_BYTE_ARRAY, put);
1742       } catch (IOException e) {
1743         LOG.warn(e.getMessage(), e);
1744         throw new IOError(e.getMessage());
1745       } catch (IllegalArgumentException e) {
1746         LOG.warn(e.getMessage(), e);
1747         throw new IllegalArgument(e.getMessage());
1748       }
1749     }
1750   }
1751 
1752 
1753 
1754   /**
1755    * Adds all the attributes into the Operation object
1756    */
1757   private static void addAttributes(OperationWithAttributes op,
1758     Map<ByteBuffer, ByteBuffer> attributes) {
1759     if (attributes == null || attributes.size() == 0) {
1760       return;
1761     }
1762     for (Map.Entry<ByteBuffer, ByteBuffer> entry : attributes.entrySet()) {
1763       String name = Bytes.toStringBinary(getBytes(entry.getKey()));
1764       byte[] value =  getBytes(entry.getValue());
1765       op.setAttribute(name, value);
1766     }
1767   }
1768 
1769   public static void registerFilters(Configuration conf) {
1770     String[] filters = conf.getStrings("hbase.thrift.filters");
1771     if(filters != null) {
1772       for(String filterClass: filters) {
1773         String[] filterPart = filterClass.split(":");
1774         if(filterPart.length != 2) {
1775           LOG.warn("Invalid filter specification " + filterClass + " - skipping");
1776         } else {
1777           ParseFilter.registerFilter(filterPart[0], filterPart[1]);
1778         }
1779       }
1780     }
1781   }
1782 }