View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.thrift;
20  
21  import static org.apache.hadoop.hbase.util.Bytes.getBytes;
22  
23  import java.io.IOException;
24  import java.net.InetAddress;
25  import java.net.InetSocketAddress;
26  import java.net.UnknownHostException;
27  import java.nio.ByteBuffer;
28  import java.security.PrivilegedAction;
29  import java.util.ArrayList;
30  import java.util.Arrays;
31  import java.util.Collections;
32  import java.util.HashMap;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.TreeMap;
36  import java.util.concurrent.BlockingQueue;
37  import java.util.concurrent.ExecutorService;
38  import java.util.concurrent.LinkedBlockingQueue;
39  import java.util.concurrent.ThreadPoolExecutor;
40  import java.util.concurrent.TimeUnit;
41  
42  import javax.security.auth.callback.Callback;
43  import javax.security.auth.callback.UnsupportedCallbackException;
44  import javax.security.sasl.AuthorizeCallback;
45  import javax.security.sasl.Sasl;
46  import javax.security.sasl.SaslServer;
47  
48  import org.apache.commons.cli.CommandLine;
49  import org.apache.commons.cli.Option;
50  import org.apache.commons.cli.OptionGroup;
51  import org.apache.commons.logging.Log;
52  import org.apache.commons.logging.LogFactory;
53  import org.apache.hadoop.conf.Configuration;
54  import org.apache.hadoop.hbase.HBaseConfiguration;
55  import org.apache.hadoop.hbase.HColumnDescriptor;
56  import org.apache.hadoop.hbase.HConstants;
57  import org.apache.hadoop.hbase.HRegionInfo;
58  import org.apache.hadoop.hbase.HRegionLocation;
59  import org.apache.hadoop.hbase.HTableDescriptor;
60  import org.apache.hadoop.hbase.KeyValue;
61  import org.apache.hadoop.hbase.ServerName;
62  import org.apache.hadoop.hbase.TableName;
63  import org.apache.hadoop.hbase.TableNotFoundException;
64  import org.apache.hadoop.hbase.classification.InterfaceAudience;
65  import org.apache.hadoop.hbase.client.Admin;
66  import org.apache.hadoop.hbase.client.Append;
67  import org.apache.hadoop.hbase.client.Delete;
68  import org.apache.hadoop.hbase.client.Durability;
69  import org.apache.hadoop.hbase.client.Get;
70  import org.apache.hadoop.hbase.client.HBaseAdmin;
71  import org.apache.hadoop.hbase.client.Increment;
72  import org.apache.hadoop.hbase.client.OperationWithAttributes;
73  import org.apache.hadoop.hbase.client.Put;
74  import org.apache.hadoop.hbase.client.RegionLocator;
75  import org.apache.hadoop.hbase.client.Result;
76  import org.apache.hadoop.hbase.client.ResultScanner;
77  import org.apache.hadoop.hbase.client.Scan;
78  import org.apache.hadoop.hbase.client.Table;
79  import org.apache.hadoop.hbase.filter.Filter;
80  import org.apache.hadoop.hbase.filter.ParseFilter;
81  import org.apache.hadoop.hbase.filter.PrefixFilter;
82  import org.apache.hadoop.hbase.filter.WhileMatchFilter;
83  import org.apache.hadoop.hbase.security.SecurityUtil;
84  import org.apache.hadoop.hbase.security.UserProvider;
85  import org.apache.hadoop.hbase.thrift.CallQueue.Call;
86  import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
87  import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
88  import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
89  import org.apache.hadoop.hbase.thrift.generated.Hbase;
90  import org.apache.hadoop.hbase.thrift.generated.IOError;
91  import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
92  import org.apache.hadoop.hbase.thrift.generated.Mutation;
93  import org.apache.hadoop.hbase.thrift.generated.TAppend;
94  import org.apache.hadoop.hbase.thrift.generated.TCell;
95  import org.apache.hadoop.hbase.thrift.generated.TIncrement;
96  import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
97  import org.apache.hadoop.hbase.thrift.generated.TRowResult;
98  import org.apache.hadoop.hbase.thrift.generated.TScan;
99  import org.apache.hadoop.hbase.util.Bytes;
100 import org.apache.hadoop.hbase.util.ConnectionCache;
101 import org.apache.hadoop.hbase.util.Strings;
102 import org.apache.hadoop.net.DNS;
103 import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
104 import org.apache.hadoop.security.UserGroupInformation;
105 import org.apache.hadoop.security.authorize.ProxyUsers;
106 import org.apache.thrift.TException;
107 import org.apache.thrift.TProcessor;
108 import org.apache.thrift.protocol.TBinaryProtocol;
109 import org.apache.thrift.protocol.TCompactProtocol;
110 import org.apache.thrift.protocol.TProtocol;
111 import org.apache.thrift.protocol.TProtocolFactory;
112 import org.apache.thrift.server.THsHaServer;
113 import org.apache.thrift.server.TNonblockingServer;
114 import org.apache.thrift.server.TServer;
115 import org.apache.thrift.server.TServlet;
116 import org.apache.thrift.server.TThreadedSelectorServer;
117 import org.apache.thrift.transport.TFramedTransport;
118 import org.apache.thrift.transport.TNonblockingServerSocket;
119 import org.apache.thrift.transport.TNonblockingServerTransport;
120 import org.apache.thrift.transport.TSaslServerTransport;
121 import org.apache.thrift.transport.TServerSocket;
122 import org.apache.thrift.transport.TServerTransport;
123 import org.apache.thrift.transport.TTransportFactory;
124 import org.mortbay.jetty.Connector;
125 import org.mortbay.jetty.Server;
126 import org.mortbay.jetty.nio.SelectChannelConnector;
127 import org.mortbay.jetty.security.SslSelectChannelConnector;
128 import org.mortbay.jetty.servlet.Context;
129 import org.mortbay.jetty.servlet.ServletHolder;
130 import org.mortbay.thread.QueuedThreadPool;
131 
132 import com.google.common.base.Joiner;
133 import com.google.common.base.Throwables;
134 import com.google.common.util.concurrent.ThreadFactoryBuilder;
135 
136 /**
137  * ThriftServerRunner - this class starts up a Thrift server which implements
138  * the Hbase API specified in the Hbase.thrift IDL file.
139  */
140 @InterfaceAudience.Private
141 @SuppressWarnings("deprecation")
142 public class ThriftServerRunner implements Runnable {
143 
144   private static final Log LOG = LogFactory.getLog(ThriftServerRunner.class);
145 
146   static final String SERVER_TYPE_CONF_KEY =
147       "hbase.regionserver.thrift.server.type";
148 
149   static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress";
150   static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
151   static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
152   static final String MAX_FRAME_SIZE_CONF_KEY = "hbase.regionserver.thrift.framed.max_frame_size_in_mb";
153   static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
154   static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
155   static final String USE_HTTP_CONF_KEY = "hbase.regionserver.thrift.http";
156   static final String HTTP_MIN_THREADS = "hbase.thrift.http_threads.min";
157   static final String HTTP_MAX_THREADS = "hbase.thrift.http_threads.max";
158 
159   static final String THRIFT_SSL_ENABLED = "hbase.thrift.ssl.enabled";
160   static final String THRIFT_SSL_KEYSTORE_STORE = "hbase.thrift.ssl.keystore.store";
161   static final String THRIFT_SSL_KEYSTORE_PASSWORD = "hbase.thrift.ssl.keystore.password";
162   static final String THRIFT_SSL_KEYSTORE_KEYPASSWORD = "hbase.thrift.ssl.keystore.keypassword";
163 
164 
165   /**
166    * Thrift quality of protection configuration key. Valid values can be:
167    * auth-conf: authentication, integrity and confidentiality checking
168    * auth-int: authentication and integrity checking
169    * auth: authentication only
170    *
171    * This is used to authenticate the callers and support impersonation.
172    * The thrift server and the HBase cluster must run in secure mode.
173    */
174   static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
175 
176   private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
177   public static final int DEFAULT_LISTEN_PORT = 9090;
178   public static final int HREGION_VERSION = 1;
179   static final String THRIFT_SUPPORT_PROXYUSER = "hbase.thrift.support.proxyuser";
180   private final int listenPort;
181 
182   private Configuration conf;
183   volatile TServer tserver;
184   volatile Server httpServer;
185   private final Hbase.Iface handler;
186   private final ThriftMetrics metrics;
187   private final HBaseHandler hbaseHandler;
188   private final UserGroupInformation realUser;
189 
190   private final String qop;
191   private String host;
192 
193   private final boolean securityEnabled;
194   private final boolean doAsEnabled;
195 
196   /** An enum of server implementation selections */
197   enum ImplType {
198     HS_HA("hsha", true, THsHaServer.class, true),
199     NONBLOCKING("nonblocking", true, TNonblockingServer.class, true),
200     THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true),
201     THREADED_SELECTOR(
202         "threadedselector", true, TThreadedSelectorServer.class, true);
203 
204     public static final ImplType DEFAULT = THREAD_POOL;
205 
206     final String option;
207     final boolean isAlwaysFramed;
208     final Class<? extends TServer> serverClass;
209     final boolean canSpecifyBindIP;
210 
211     ImplType(String option, boolean isAlwaysFramed,
212         Class<? extends TServer> serverClass, boolean canSpecifyBindIP) {
213       this.option = option;
214       this.isAlwaysFramed = isAlwaysFramed;
215       this.serverClass = serverClass;
216       this.canSpecifyBindIP = canSpecifyBindIP;
217     }
218 
219     /**
220      * @return <code>-option</code> so we can get the list of options from
221      *         {@link #values()}
222      */
223     @Override
224     public String toString() {
225       return "-" + option;
226     }
227 
228     String getDescription() {
229       StringBuilder sb = new StringBuilder("Use the " +
230           serverClass.getSimpleName());
231       if (isAlwaysFramed) {
232         sb.append(" This implies the framed transport.");
233       }
234       if (this == DEFAULT) {
235         sb.append("This is the default.");
236       }
237       return sb.toString();
238     }
239 
240     static OptionGroup createOptionGroup() {
241       OptionGroup group = new OptionGroup();
242       for (ImplType t : values()) {
243         group.addOption(new Option(t.option, t.getDescription()));
244       }
245       return group;
246     }
247 
248     static ImplType getServerImpl(Configuration conf) {
249       String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option);
250       for (ImplType t : values()) {
251         if (confType.equals(t.option)) {
252           return t;
253         }
254       }
255       throw new AssertionError("Unknown server ImplType.option:" + confType);
256     }
257 
258     static void setServerImpl(CommandLine cmd, Configuration conf) {
259       ImplType chosenType = null;
260       int numChosen = 0;
261       for (ImplType t : values()) {
262         if (cmd.hasOption(t.option)) {
263           chosenType = t;
264           ++numChosen;
265         }
266       }
267       if (numChosen < 1) {
268         LOG.info("Using default thrift server type");
269         chosenType = DEFAULT;
270       } else if (numChosen > 1) {
271         throw new AssertionError("Exactly one option out of " +
272           Arrays.toString(values()) + " has to be specified");
273       }
274       LOG.info("Using thrift server type " + chosenType.option);
275       conf.set(SERVER_TYPE_CONF_KEY, chosenType.option);
276     }
277 
278     public String simpleClassName() {
279       return serverClass.getSimpleName();
280     }
281 
282     public static List<String> serversThatCannotSpecifyBindIP() {
283       List<String> l = new ArrayList<String>();
284       for (ImplType t : values()) {
285         if (!t.canSpecifyBindIP) {
286           l.add(t.simpleClassName());
287         }
288       }
289       return l;
290     }
291 
292   }
293 
294   public ThriftServerRunner(Configuration conf) throws IOException {
295     UserProvider userProvider = UserProvider.instantiate(conf);
296     // login the server principal (if using secure Hadoop)
297     securityEnabled = userProvider.isHadoopSecurityEnabled()
298       && userProvider.isHBaseSecurityEnabled();
299     if (securityEnabled) {
300       host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
301         conf.get("hbase.thrift.dns.interface", "default"),
302         conf.get("hbase.thrift.dns.nameserver", "default")));
303       userProvider.login("hbase.thrift.keytab.file",
304         "hbase.thrift.kerberos.principal", host);
305     }
306     this.conf = HBaseConfiguration.create(conf);
307     this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
308     this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
309     this.hbaseHandler = new HBaseHandler(conf, userProvider);
310     this.hbaseHandler.initMetrics(metrics);
311     this.handler = HbaseHandlerMetricsProxy.newInstance(
312       hbaseHandler, metrics, conf);
313     this.realUser = userProvider.getCurrent().getUGI();
314     qop = conf.get(THRIFT_QOP_KEY);
315     doAsEnabled = conf.getBoolean(THRIFT_SUPPORT_PROXYUSER, false);
316     if (qop != null) {
317       if (!qop.equals("auth") && !qop.equals("auth-int")
318           && !qop.equals("auth-conf")) {
319         throw new IOException("Invalid " + THRIFT_QOP_KEY + ": " + qop
320           + ", it must be 'auth', 'auth-int', or 'auth-conf'");
321       }
322       if (!securityEnabled) {
323         throw new IOException("Thrift server must"
324           + " run in secure mode to support authentication");
325       }
326     }
327   }
328 
329   /*
330    * Runs the Thrift server
331    */
332   @Override
333   public void run() {
334     realUser.doAs(new PrivilegedAction<Object>() {
335       @Override
336       public Object run() {
337         try {
338           if (conf.getBoolean(USE_HTTP_CONF_KEY, false)) {
339             setupHTTPServer();
340             httpServer.start();
341             httpServer.join();
342           } else {
343             setupServer();
344             tserver.serve();
345           }
346         } catch (Exception e) {
347           LOG.fatal("Cannot run ThriftServer", e);
348           // Crash the process if the ThriftServer is not running
349           System.exit(-1);
350         }
351         return null;
352       }
353     });
354 
355   }
356 
357   public void shutdown() {
358     if (tserver != null) {
359       tserver.stop();
360       tserver = null;
361     }
362     if (httpServer != null) {
363       try {
364         httpServer.stop();
365         httpServer = null;
366       } catch (Exception e) {
367         LOG.error("Problem encountered in shutting down HTTP server " + e.getCause());
368       }
369       httpServer = null;
370     }
371   }
372 
373   private void setupHTTPServer() throws IOException {
374     TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
375     TProcessor processor = new Hbase.Processor<Hbase.Iface>(handler);
376     TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, realUser,
377         conf, hbaseHandler, securityEnabled, doAsEnabled);
378 
379     httpServer = new Server();
380     // Context handler
381     Context context = new Context(httpServer, "/", Context.SESSIONS);
382     context.setContextPath("/");
383     String httpPath = "/*";
384     httpServer.setHandler(context);
385     context.addServlet(new ServletHolder(thriftHttpServlet), httpPath);
386 
387     // set up Jetty and run the embedded server
388     Connector connector = new SelectChannelConnector();
389     if(conf.getBoolean(THRIFT_SSL_ENABLED, false)) {
390       SslSelectChannelConnector sslConnector = new SslSelectChannelConnector();
391       String keystore = conf.get(THRIFT_SSL_KEYSTORE_STORE);
392       String password = HBaseConfiguration.getPassword(conf,
393           THRIFT_SSL_KEYSTORE_PASSWORD, null);
394       String keyPassword = HBaseConfiguration.getPassword(conf,
395           THRIFT_SSL_KEYSTORE_KEYPASSWORD, password);
396       sslConnector.setKeystore(keystore);
397       sslConnector.setPassword(password);
398       sslConnector.setKeyPassword(keyPassword);
399       connector = sslConnector;
400     }
401     String host = getBindAddress(conf).getHostAddress();
402     connector.setPort(listenPort);
403     connector.setHost(host);
404     httpServer.addConnector(connector);
405 
406     if (doAsEnabled) {
407       ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
408     }
409 
410     // Set the default max thread number to 100 to limit
411     // the number of concurrent requests so that Thrfit HTTP server doesn't OOM easily.
412     // Jetty set the default max thread number to 250, if we don't set it.
413     //
414     // Our default min thread number 2 is the same as that used by Jetty.
415     int minThreads = conf.getInt(HTTP_MIN_THREADS, 2);
416     int maxThreads = conf.getInt(HTTP_MAX_THREADS, 100);
417     QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads);
418     threadPool.setMinThreads(minThreads);
419     httpServer.setThreadPool(threadPool);
420 
421     httpServer.setSendServerVersion(false);
422     httpServer.setSendDateHeader(false);
423     httpServer.setStopAtShutdown(true);
424 
425     LOG.info("Starting Thrift HTTP Server on " + Integer.toString(listenPort));
426   }
427 
428   /**
429    * Setting up the thrift TServer
430    */
431   private void setupServer() throws Exception {
432     // Construct correct ProtocolFactory
433     TProtocolFactory protocolFactory;
434     if (conf.getBoolean(COMPACT_CONF_KEY, false)) {
435       LOG.debug("Using compact protocol");
436       protocolFactory = new TCompactProtocol.Factory();
437     } else {
438       LOG.debug("Using binary protocol");
439       protocolFactory = new TBinaryProtocol.Factory();
440     }
441 
442     final TProcessor p = new Hbase.Processor<Hbase.Iface>(handler);
443     ImplType implType = ImplType.getServerImpl(conf);
444     TProcessor processor = p;
445 
446     // Construct correct TransportFactory
447     TTransportFactory transportFactory;
448     if (conf.getBoolean(FRAMED_CONF_KEY, false) || implType.isAlwaysFramed) {
449       if (qop != null) {
450         throw new RuntimeException("Thrift server authentication"
451           + " doesn't work with framed transport yet");
452       }
453       transportFactory = new TFramedTransport.Factory(
454           conf.getInt(MAX_FRAME_SIZE_CONF_KEY, 2)  * 1024 * 1024);
455       LOG.debug("Using framed transport");
456     } else if (qop == null) {
457       transportFactory = new TTransportFactory();
458     } else {
459       // Extract the name from the principal
460       String name = SecurityUtil.getUserFromPrincipal(
461         conf.get("hbase.thrift.kerberos.principal"));
462       Map<String, String> saslProperties = new HashMap<String, String>();
463       saslProperties.put(Sasl.QOP, qop);
464       TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
465       saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
466         new SaslGssCallbackHandler() {
467           @Override
468           public void handle(Callback[] callbacks)
469               throws UnsupportedCallbackException {
470             AuthorizeCallback ac = null;
471             for (Callback callback : callbacks) {
472               if (callback instanceof AuthorizeCallback) {
473                 ac = (AuthorizeCallback) callback;
474               } else {
475                 throw new UnsupportedCallbackException(callback,
476                     "Unrecognized SASL GSSAPI Callback");
477               }
478             }
479             if (ac != null) {
480               String authid = ac.getAuthenticationID();
481               String authzid = ac.getAuthorizationID();
482               if (!authid.equals(authzid)) {
483                 ac.setAuthorized(false);
484               } else {
485                 ac.setAuthorized(true);
486                 String userName = SecurityUtil.getUserFromPrincipal(authzid);
487                 LOG.info("Effective user: " + userName);
488                 ac.setAuthorizedID(userName);
489               }
490             }
491           }
492         });
493       transportFactory = saslFactory;
494 
495       // Create a processor wrapper, to get the caller
496       processor = new TProcessor() {
497         @Override
498         public boolean process(TProtocol inProt,
499             TProtocol outProt) throws TException {
500           TSaslServerTransport saslServerTransport =
501             (TSaslServerTransport)inProt.getTransport();
502           SaslServer saslServer = saslServerTransport.getSaslServer();
503           String principal = saslServer.getAuthorizationID();
504           hbaseHandler.setEffectiveUser(principal);
505           return p.process(inProt, outProt);
506         }
507       };
508     }
509 
510     if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
511       LOG.error("Server types " + Joiner.on(", ").join(
512           ImplType.serversThatCannotSpecifyBindIP()) + " don't support IP " +
513           "address binding at the moment. See " +
514           "https://issues.apache.org/jira/browse/HBASE-2155 for details.");
515       throw new RuntimeException(
516           "-" + BIND_CONF_KEY + " not supported with " + implType);
517     }
518 
519     if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING ||
520         implType == ImplType.THREADED_SELECTOR) {
521 
522       InetAddress listenAddress = getBindAddress(conf);
523       TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(
524           new InetSocketAddress(listenAddress, listenPort));
525 
526       if (implType == ImplType.NONBLOCKING) {
527         TNonblockingServer.Args serverArgs =
528             new TNonblockingServer.Args(serverTransport);
529         serverArgs.processor(processor)
530                   .transportFactory(transportFactory)
531                   .protocolFactory(protocolFactory);
532         tserver = new TNonblockingServer(serverArgs);
533       } else if (implType == ImplType.HS_HA) {
534         THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
535         CallQueue callQueue =
536             new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
537         ExecutorService executorService = createExecutor(
538             callQueue, serverArgs.getWorkerThreads());
539         serverArgs.executorService(executorService)
540                   .processor(processor)
541                   .transportFactory(transportFactory)
542                   .protocolFactory(protocolFactory);
543         tserver = new THsHaServer(serverArgs);
544       } else { // THREADED_SELECTOR
545         TThreadedSelectorServer.Args serverArgs =
546             new HThreadedSelectorServerArgs(serverTransport, conf);
547         CallQueue callQueue =
548             new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
549         ExecutorService executorService = createExecutor(
550             callQueue, serverArgs.getWorkerThreads());
551         serverArgs.executorService(executorService)
552                   .processor(processor)
553                   .transportFactory(transportFactory)
554                   .protocolFactory(protocolFactory);
555         tserver = new TThreadedSelectorServer(serverArgs);
556       }
557       LOG.info("starting HBase " + implType.simpleClassName() +
558           " server on " + Integer.toString(listenPort));
559     } else if (implType == ImplType.THREAD_POOL) {
560       // Thread pool server. Get the IP address to bind to.
561       InetAddress listenAddress = getBindAddress(conf);
562 
563       TServerTransport serverTransport = new TServerSocket(
564           new InetSocketAddress(listenAddress, listenPort));
565 
566       TBoundedThreadPoolServer.Args serverArgs =
567           new TBoundedThreadPoolServer.Args(serverTransport, conf);
568       serverArgs.processor(processor)
569                 .transportFactory(transportFactory)
570                 .protocolFactory(protocolFactory);
571       LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
572           + listenAddress + ":" + Integer.toString(listenPort)
573           + "; " + serverArgs);
574       TBoundedThreadPoolServer tserver =
575           new TBoundedThreadPoolServer(serverArgs, metrics);
576       this.tserver = tserver;
577     } else {
578       throw new AssertionError("Unsupported Thrift server implementation: " +
579           implType.simpleClassName());
580     }
581 
582     // A sanity check that we instantiated the right type of server.
583     if (tserver.getClass() != implType.serverClass) {
584       throw new AssertionError("Expected to create Thrift server class " +
585           implType.serverClass.getName() + " but got " +
586           tserver.getClass().getName());
587     }
588 
589 
590 
591     registerFilters(conf);
592   }
593 
594   ExecutorService createExecutor(BlockingQueue<Runnable> callQueue,
595                                  int workerThreads) {
596     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
597     tfb.setDaemon(true);
598     tfb.setNameFormat("thrift-worker-%d");
599     return new ThreadPoolExecutor(workerThreads, workerThreads,
600             Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
601   }
602 
603   private InetAddress getBindAddress(Configuration conf)
604       throws UnknownHostException {
605     String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
606     return InetAddress.getByName(bindAddressStr);
607   }
608 
609   protected static class ResultScannerWrapper {
610 
611     private final ResultScanner scanner;
612     private final boolean sortColumns;
613     public ResultScannerWrapper(ResultScanner resultScanner,
614                                 boolean sortResultColumns) {
615       scanner = resultScanner;
616       sortColumns = sortResultColumns;
617    }
618 
619     public ResultScanner getScanner() {
620       return scanner;
621     }
622 
623     public boolean isColumnSorted() {
624       return sortColumns;
625     }
626   }
627 
628   /**
629    * The HBaseHandler is a glue object that connects Thrift RPC calls to the
630    * HBase client API primarily defined in the Admin and Table objects.
631    */
632   public static class HBaseHandler implements Hbase.Iface {
633     protected Configuration conf;
634     protected final Log LOG = LogFactory.getLog(this.getClass().getName());
635 
636     // nextScannerId and scannerMap are used to manage scanner state
637     protected int nextScannerId = 0;
638     protected HashMap<Integer, ResultScannerWrapper> scannerMap = null;
639     private ThriftMetrics metrics = null;
640 
641     private final ConnectionCache connectionCache;
642 
643     private static ThreadLocal<Map<String, Table>> threadLocalTables =
644         new ThreadLocal<Map<String, Table>>() {
645       @Override
646       protected Map<String, Table> initialValue() {
647         return new TreeMap<String, Table>();
648       }
649     };
650 
651     IncrementCoalescer coalescer = null;
652 
653     static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
654     static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
655 
656     /**
657      * Returns a list of all the column families for a given Table.
658      *
659      * @param table
660      * @throws IOException
661      */
662     byte[][] getAllColumns(Table table) throws IOException {
663       HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies();
664       byte[][] columns = new byte[cds.length][];
665       for (int i = 0; i < cds.length; i++) {
666         columns[i] = Bytes.add(cds[i].getName(),
667             KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
668       }
669       return columns;
670     }
671 
672     /**
673      * Creates and returns a Table instance from a given table name.
674      *
675      * @param tableName
676      *          name of table
677      * @return Table object
678      * @throws IOException
679      * @throws IOError
680      */
681     public Table getTable(final byte[] tableName) throws
682         IOException {
683       String table = Bytes.toString(tableName);
684       Map<String, Table> tables = threadLocalTables.get();
685       if (!tables.containsKey(table)) {
686         tables.put(table, (Table)connectionCache.getTable(table));
687       }
688       return tables.get(table);
689     }
690 
691     public Table getTable(final ByteBuffer tableName) throws IOException {
692       return getTable(getBytes(tableName));
693     }
694 
695     /**
696      * Assigns a unique ID to the scanner and adds the mapping to an internal
697      * hash-map.
698      *
699      * @param scanner
700      * @return integer scanner id
701      */
702     protected synchronized int addScanner(ResultScanner scanner,boolean sortColumns) {
703       int id = nextScannerId++;
704       ResultScannerWrapper resultScannerWrapper = new ResultScannerWrapper(scanner, sortColumns);
705       scannerMap.put(id, resultScannerWrapper);
706       return id;
707     }
708 
709     /**
710      * Returns the scanner associated with the specified ID.
711      *
712      * @param id
713      * @return a Scanner, or null if ID was invalid.
714      */
715     protected synchronized ResultScannerWrapper getScanner(int id) {
716       return scannerMap.get(id);
717     }
718 
719     /**
720      * Removes the scanner associated with the specified ID from the internal
721      * id->scanner hash-map.
722      *
723      * @param id
724      * @return a Scanner, or null if ID was invalid.
725      */
726     protected synchronized ResultScannerWrapper removeScanner(int id) {
727       return scannerMap.remove(id);
728     }
729 
730     protected HBaseHandler(final Configuration c,
731         final UserProvider userProvider) throws IOException {
732       this.conf = c;
733       scannerMap = new HashMap<Integer, ResultScannerWrapper>();
734       this.coalescer = new IncrementCoalescer(this);
735 
736       int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
737       int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
738       connectionCache = new ConnectionCache(
739         conf, userProvider, cleanInterval, maxIdleTime);
740     }
741 
742     /**
743      * Obtain HBaseAdmin. Creates the instance if it is not already created.
744      */
745     private Admin getAdmin() throws IOException {
746       return connectionCache.getAdmin();
747     }
748 
749     void setEffectiveUser(String effectiveUser) {
750       connectionCache.setEffectiveUser(effectiveUser);
751     }
752 
753     @Override
754     public void enableTable(ByteBuffer tableName) throws IOError {
755       try{
756         getAdmin().enableTable(getTableName(tableName));
757       } catch (IOException e) {
758         LOG.warn(e.getMessage(), e);
759         throw new IOError(Throwables.getStackTraceAsString(e));
760       }
761     }
762 
763     @Override
764     public void disableTable(ByteBuffer tableName) throws IOError{
765       try{
766         getAdmin().disableTable(getTableName(tableName));
767       } catch (IOException e) {
768         LOG.warn(e.getMessage(), e);
769         throw new IOError(Throwables.getStackTraceAsString(e));
770       }
771     }
772 
773     @Override
774     public boolean isTableEnabled(ByteBuffer tableName) throws IOError {
775       try {
776         return this.connectionCache.getAdmin().isTableEnabled(getTableName(tableName));
777       } catch (IOException e) {
778         LOG.warn(e.getMessage(), e);
779         throw new IOError(Throwables.getStackTraceAsString(e));
780       }
781     }
782 
783     @Override
784     public void compact(ByteBuffer tableNameOrRegionName) throws IOError {
785       try {
786         // TODO: HBaseAdmin.compact(byte[]) deprecated and not trivial to replace here.
787         // ThriftServerRunner.compact should be deprecated and replaced with methods specific to
788         // table and region.
789         ((HBaseAdmin) getAdmin()).compact(getBytes(tableNameOrRegionName));
790       } catch (IOException e) {
791         LOG.warn(e.getMessage(), e);
792         throw new IOError(Throwables.getStackTraceAsString(e));
793       }
794     }
795 
796     @Override
797     public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError {
798       try {
799         // TODO: HBaseAdmin.majorCompact(byte[]) deprecated and not trivial to replace here.
800         // ThriftServerRunner.majorCompact should be deprecated and replaced with methods specific
801         // to table and region.
802         ((HBaseAdmin) getAdmin()).majorCompact(getBytes(tableNameOrRegionName));
803       } catch (IOException e) {
804         LOG.warn(e.getMessage(), e);
805         throw new IOError(Throwables.getStackTraceAsString(e));
806       }
807     }
808 
809     @Override
810     public List<ByteBuffer> getTableNames() throws IOError {
811       try {
812         TableName[] tableNames = this.getAdmin().listTableNames();
813         ArrayList<ByteBuffer> list = new ArrayList<ByteBuffer>(tableNames.length);
814         for (int i = 0; i < tableNames.length; i++) {
815           list.add(ByteBuffer.wrap(tableNames[i].getName()));
816         }
817         return list;
818       } catch (IOException e) {
819         LOG.warn(e.getMessage(), e);
820         throw new IOError(Throwables.getStackTraceAsString(e));
821       }
822     }
823 
824     /**
825      * @return the list of regions in the given table, or an empty list if the table does not exist
826      */
827     @Override
828     public List<TRegionInfo> getTableRegions(ByteBuffer tableName)
829     throws IOError {
830       try (RegionLocator locator = connectionCache.getRegionLocator(getBytes(tableName))) {
831         List<HRegionLocation> regionLocations = locator.getAllRegionLocations();
832         List<TRegionInfo> results = new ArrayList<TRegionInfo>();
833         for (HRegionLocation regionLocation : regionLocations) {
834           HRegionInfo info = regionLocation.getRegionInfo();
835           ServerName serverName = regionLocation.getServerName();
836           TRegionInfo region = new TRegionInfo();
837           region.serverName = ByteBuffer.wrap(
838               Bytes.toBytes(serverName.getHostname()));
839           region.port = serverName.getPort();
840           region.startKey = ByteBuffer.wrap(info.getStartKey());
841           region.endKey = ByteBuffer.wrap(info.getEndKey());
842           region.id = info.getRegionId();
843           region.name = ByteBuffer.wrap(info.getRegionName());
844           region.version = HREGION_VERSION; // HRegion now not versioned, PB encoding used
845           results.add(region);
846         }
847         return results;
848       } catch (TableNotFoundException e) {
849         // Return empty list for non-existing table
850         return Collections.emptyList();
851       } catch (IOException e){
852         LOG.warn(e.getMessage(), e);
853         throw new IOError(Throwables.getStackTraceAsString(e));
854       }
855     }
856 
857     @Deprecated
858     @Override
859     public List<TCell> get(
860         ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
861         Map<ByteBuffer, ByteBuffer> attributes)
862         throws IOError {
863       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
864       if (famAndQf.length == 1) {
865         return get(tableName, row, famAndQf[0], null, attributes);
866       }
867       if (famAndQf.length == 2) {
868         return get(tableName, row, famAndQf[0], famAndQf[1], attributes);
869       }
870       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
871     }
872 
873     /**
874      * Note: this internal interface is slightly different from public APIs in regard to handling
875      * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
876      * we respect qual == null as a request for the entire column family. The caller (
877      * {@link #get(ByteBuffer, ByteBuffer, ByteBuffer, Map)}) interface IS consistent in that the
878      * column is parse like normal.
879      */
880     protected List<TCell> get(ByteBuffer tableName,
881                               ByteBuffer row,
882                               byte[] family,
883                               byte[] qualifier,
884                               Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
885       try {
886         Table table = getTable(tableName);
887         Get get = new Get(getBytes(row));
888         addAttributes(get, attributes);
889         if (qualifier == null) {
890           get.addFamily(family);
891         } else {
892           get.addColumn(family, qualifier);
893         }
894         Result result = table.get(get);
895         return ThriftUtilities.cellFromHBase(result.rawCells());
896       } catch (IOException e) {
897         LOG.warn(e.getMessage(), e);
898         throw new IOError(Throwables.getStackTraceAsString(e));
899       }
900     }
901 
902     @Deprecated
903     @Override
904     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
905         int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
906       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
907       if(famAndQf.length == 1) {
908         return getVer(tableName, row, famAndQf[0], null, numVersions, attributes);
909       }
910       if (famAndQf.length == 2) {
911         return getVer(tableName, row, famAndQf[0], famAndQf[1], numVersions, attributes);
912       }
913       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
914 
915     }
916 
917     /**
918      * Note: this public interface is slightly different from public Java APIs in regard to
919      * handling of the qualifier. Here we differ from the public Java API in that null != byte[0].
920      * Rather, we respect qual == null as a request for the entire column family. If you want to
921      * access the entire column family, use
922      * {@link #getVer(ByteBuffer, ByteBuffer, ByteBuffer, int, Map)} with a {@code column} value
923      * that lacks a {@code ':'}.
924      */
925     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family,
926         byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
927       try {
928         Table table = getTable(tableName);
929         Get get = new Get(getBytes(row));
930         addAttributes(get, attributes);
931         if (null == qualifier) {
932           get.addFamily(family);
933         } else {
934           get.addColumn(family, qualifier);
935         }
936         get.setMaxVersions(numVersions);
937         Result result = table.get(get);
938         return ThriftUtilities.cellFromHBase(result.rawCells());
939       } catch (IOException e) {
940         LOG.warn(e.getMessage(), e);
941         throw new IOError(Throwables.getStackTraceAsString(e));
942       }
943     }
944 
945     @Deprecated
946     @Override
947     public List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
948         long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
949       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
950       if (famAndQf.length == 1) {
951         return getVerTs(tableName, row, famAndQf[0], null, timestamp, numVersions, attributes);
952       }
953       if (famAndQf.length == 2) {
954         return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, numVersions,
955           attributes);
956       }
957       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
958     }
959 
960     /**
961      * Note: this internal interface is slightly different from public APIs in regard to handling
962      * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
963      * we respect qual == null as a request for the entire column family. The caller (
964      * {@link #getVerTs(ByteBuffer, ByteBuffer, ByteBuffer, long, int, Map)}) interface IS
965      * consistent in that the column is parse like normal.
966      */
967     protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
968         byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
969         throws IOError {
970       try {
971         Table table = getTable(tableName);
972         Get get = new Get(getBytes(row));
973         addAttributes(get, attributes);
974         if (null == qualifier) {
975           get.addFamily(family);
976         } else {
977           get.addColumn(family, qualifier);
978         }
979         get.setTimeRange(0, timestamp);
980         get.setMaxVersions(numVersions);
981         Result result = table.get(get);
982         return ThriftUtilities.cellFromHBase(result.rawCells());
983       } catch (IOException e) {
984         LOG.warn(e.getMessage(), e);
985         throw new IOError(Throwables.getStackTraceAsString(e));
986       }
987     }
988 
989     @Override
990     public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row,
991         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
992       return getRowWithColumnsTs(tableName, row, null,
993                                  HConstants.LATEST_TIMESTAMP,
994                                  attributes);
995     }
996 
997     @Override
998     public List<TRowResult> getRowWithColumns(ByteBuffer tableName,
999                                               ByteBuffer row,
1000         List<ByteBuffer> columns,
1001         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1002       return getRowWithColumnsTs(tableName, row, columns,
1003                                  HConstants.LATEST_TIMESTAMP,
1004                                  attributes);
1005     }
1006 
1007     @Override
1008     public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row,
1009         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1010       return getRowWithColumnsTs(tableName, row, null,
1011                                  timestamp, attributes);
1012     }
1013 
1014     @Override
1015     public List<TRowResult> getRowWithColumnsTs(
1016         ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
1017         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1018       try {
1019         Table table = getTable(tableName);
1020         if (columns == null) {
1021           Get get = new Get(getBytes(row));
1022           addAttributes(get, attributes);
1023           get.setTimeRange(0, timestamp);
1024           Result result = table.get(get);
1025           return ThriftUtilities.rowResultFromHBase(result);
1026         }
1027         Get get = new Get(getBytes(row));
1028         addAttributes(get, attributes);
1029         for(ByteBuffer column : columns) {
1030           byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1031           if (famAndQf.length == 1) {
1032               get.addFamily(famAndQf[0]);
1033           } else {
1034               get.addColumn(famAndQf[0], famAndQf[1]);
1035           }
1036         }
1037         get.setTimeRange(0, timestamp);
1038         Result result = table.get(get);
1039         return ThriftUtilities.rowResultFromHBase(result);
1040       } catch (IOException e) {
1041         LOG.warn(e.getMessage(), e);
1042         throw new IOError(Throwables.getStackTraceAsString(e));
1043       }
1044     }
1045 
1046     @Override
1047     public List<TRowResult> getRows(ByteBuffer tableName,
1048                                     List<ByteBuffer> rows,
1049         Map<ByteBuffer, ByteBuffer> attributes)
1050         throws IOError {
1051       return getRowsWithColumnsTs(tableName, rows, null,
1052                                   HConstants.LATEST_TIMESTAMP,
1053                                   attributes);
1054     }
1055 
1056     @Override
1057     public List<TRowResult> getRowsWithColumns(ByteBuffer tableName,
1058                                                List<ByteBuffer> rows,
1059         List<ByteBuffer> columns,
1060         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1061       return getRowsWithColumnsTs(tableName, rows, columns,
1062                                   HConstants.LATEST_TIMESTAMP,
1063                                   attributes);
1064     }
1065 
1066     @Override
1067     public List<TRowResult> getRowsTs(ByteBuffer tableName,
1068                                       List<ByteBuffer> rows,
1069         long timestamp,
1070         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1071       return getRowsWithColumnsTs(tableName, rows, null,
1072                                   timestamp, attributes);
1073     }
1074 
1075     @Override
1076     public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName,
1077                                                  List<ByteBuffer> rows,
1078         List<ByteBuffer> columns, long timestamp,
1079         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1080       try {
1081         List<Get> gets = new ArrayList<Get>(rows.size());
1082         Table table = getTable(tableName);
1083         if (metrics != null) {
1084           metrics.incNumRowKeysInBatchGet(rows.size());
1085         }
1086         for (ByteBuffer row : rows) {
1087           Get get = new Get(getBytes(row));
1088           addAttributes(get, attributes);
1089           if (columns != null) {
1090 
1091             for(ByteBuffer column : columns) {
1092               byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1093               if (famAndQf.length == 1) {
1094                 get.addFamily(famAndQf[0]);
1095               } else {
1096                 get.addColumn(famAndQf[0], famAndQf[1]);
1097               }
1098             }
1099           }
1100           get.setTimeRange(0, timestamp);
1101           gets.add(get);
1102         }
1103         Result[] result = table.get(gets);
1104         return ThriftUtilities.rowResultFromHBase(result);
1105       } catch (IOException e) {
1106         LOG.warn(e.getMessage(), e);
1107         throw new IOError(Throwables.getStackTraceAsString(e));
1108       }
1109     }
1110 
1111     @Override
1112     public void deleteAll(
1113         ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1114         Map<ByteBuffer, ByteBuffer> attributes)
1115         throws IOError {
1116       deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP,
1117                   attributes);
1118     }
1119 
1120     @Override
1121     public void deleteAllTs(ByteBuffer tableName,
1122                             ByteBuffer row,
1123                             ByteBuffer column,
1124         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1125       try {
1126         Table table = getTable(tableName);
1127         Delete delete  = new Delete(getBytes(row));
1128         addAttributes(delete, attributes);
1129         byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1130         if (famAndQf.length == 1) {
1131           delete.deleteFamily(famAndQf[0], timestamp);
1132         } else {
1133           delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1134         }
1135         table.delete(delete);
1136 
1137       } catch (IOException e) {
1138         LOG.warn(e.getMessage(), e);
1139         throw new IOError(Throwables.getStackTraceAsString(e));
1140       }
1141     }
1142 
1143     @Override
1144     public void deleteAllRow(
1145         ByteBuffer tableName, ByteBuffer row,
1146         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1147       deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, attributes);
1148     }
1149 
1150     @Override
1151     public void deleteAllRowTs(
1152         ByteBuffer tableName, ByteBuffer row, long timestamp,
1153         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1154       try {
1155         Table table = getTable(tableName);
1156         Delete delete  = new Delete(getBytes(row), timestamp);
1157         addAttributes(delete, attributes);
1158         table.delete(delete);
1159       } catch (IOException e) {
1160         LOG.warn(e.getMessage(), e);
1161         throw new IOError(Throwables.getStackTraceAsString(e));
1162       }
1163     }
1164 
1165     @Override
1166     public void createTable(ByteBuffer in_tableName,
1167         List<ColumnDescriptor> columnFamilies) throws IOError,
1168         IllegalArgument, AlreadyExists {
1169       TableName tableName = getTableName(in_tableName);
1170       try {
1171         if (getAdmin().tableExists(tableName)) {
1172           throw new AlreadyExists("table name already in use");
1173         }
1174         HTableDescriptor desc = new HTableDescriptor(tableName);
1175         for (ColumnDescriptor col : columnFamilies) {
1176           HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col);
1177           desc.addFamily(colDesc);
1178         }
1179         getAdmin().createTable(desc);
1180       } catch (IOException e) {
1181         LOG.warn(e.getMessage(), e);
1182         throw new IOError(Throwables.getStackTraceAsString(e));
1183       } catch (IllegalArgumentException e) {
1184         LOG.warn(e.getMessage(), e);
1185         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1186       }
1187     }
1188 
1189     private static TableName getTableName(ByteBuffer buffer) {
1190       return TableName.valueOf(getBytes(buffer));
1191     }
1192 
1193     @Override
1194     public void deleteTable(ByteBuffer in_tableName) throws IOError {
1195       TableName tableName = getTableName(in_tableName);
1196       if (LOG.isDebugEnabled()) {
1197         LOG.debug("deleteTable: table=" + tableName);
1198       }
1199       try {
1200         if (!getAdmin().tableExists(tableName)) {
1201           throw new IOException("table does not exist");
1202         }
1203         getAdmin().deleteTable(tableName);
1204       } catch (IOException e) {
1205         LOG.warn(e.getMessage(), e);
1206         throw new IOError(Throwables.getStackTraceAsString(e));
1207       }
1208     }
1209 
1210     @Override
1211     public void mutateRow(ByteBuffer tableName, ByteBuffer row,
1212         List<Mutation> mutations, Map<ByteBuffer, ByteBuffer> attributes)
1213         throws IOError, IllegalArgument {
1214       mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP,
1215                   attributes);
1216     }
1217 
1218     @Override
1219     public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
1220         List<Mutation> mutations, long timestamp,
1221         Map<ByteBuffer, ByteBuffer> attributes)
1222         throws IOError, IllegalArgument {
1223       Table table = null;
1224       try {
1225         table = getTable(tableName);
1226         Put put = new Put(getBytes(row), timestamp);
1227         addAttributes(put, attributes);
1228 
1229         Delete delete = new Delete(getBytes(row));
1230         addAttributes(delete, attributes);
1231         if (metrics != null) {
1232           metrics.incNumRowKeysInBatchMutate(mutations.size());
1233         }
1234 
1235         // I apologize for all this mess :)
1236         for (Mutation m : mutations) {
1237           byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1238           if (m.isDelete) {
1239             if (famAndQf.length == 1) {
1240               delete.deleteFamily(famAndQf[0], timestamp);
1241             } else {
1242               delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1243             }
1244             delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1245                 : Durability.SKIP_WAL);
1246           } else {
1247             if(famAndQf.length == 1) {
1248               LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1249                   + "over the whole column family.");
1250             } else {
1251               put.addImmutable(famAndQf[0], famAndQf[1],
1252                   m.value != null ? getBytes(m.value)
1253                       : HConstants.EMPTY_BYTE_ARRAY);
1254             }
1255             put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1256           }
1257         }
1258         if (!delete.isEmpty())
1259           table.delete(delete);
1260         if (!put.isEmpty())
1261           table.put(put);
1262       } catch (IOException e) {
1263         LOG.warn(e.getMessage(), e);
1264         throw new IOError(Throwables.getStackTraceAsString(e));
1265       } catch (IllegalArgumentException e) {
1266         LOG.warn(e.getMessage(), e);
1267         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1268       }
1269     }
1270 
1271     @Override
1272     public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches,
1273         Map<ByteBuffer, ByteBuffer> attributes)
1274         throws IOError, IllegalArgument, TException {
1275       mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes);
1276     }
1277 
1278     @Override
1279     public void mutateRowsTs(
1280         ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp,
1281         Map<ByteBuffer, ByteBuffer> attributes)
1282         throws IOError, IllegalArgument, TException {
1283       List<Put> puts = new ArrayList<Put>();
1284       List<Delete> deletes = new ArrayList<Delete>();
1285 
1286       for (BatchMutation batch : rowBatches) {
1287         byte[] row = getBytes(batch.row);
1288         List<Mutation> mutations = batch.mutations;
1289         Delete delete = new Delete(row);
1290         addAttributes(delete, attributes);
1291         Put put = new Put(row, timestamp);
1292         addAttributes(put, attributes);
1293         for (Mutation m : mutations) {
1294           byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1295           if (m.isDelete) {
1296             // no qualifier, family only.
1297             if (famAndQf.length == 1) {
1298               delete.deleteFamily(famAndQf[0], timestamp);
1299             } else {
1300               delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1301             }
1302             delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1303                 : Durability.SKIP_WAL);
1304           } else {
1305             if (famAndQf.length == 1) {
1306               LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1307                   + "over the whole column family.");
1308             }
1309             if (famAndQf.length == 2) {
1310               put.addImmutable(famAndQf[0], famAndQf[1],
1311                   m.value != null ? getBytes(m.value)
1312                       : HConstants.EMPTY_BYTE_ARRAY);
1313             } else {
1314               throw new IllegalArgumentException("Invalid famAndQf provided.");
1315             }
1316             put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1317           }
1318         }
1319         if (!delete.isEmpty())
1320           deletes.add(delete);
1321         if (!put.isEmpty())
1322           puts.add(put);
1323       }
1324 
1325       Table table = null;
1326       try {
1327         table = getTable(tableName);
1328         if (!puts.isEmpty())
1329           table.put(puts);
1330         if (!deletes.isEmpty())
1331           table.delete(deletes);
1332 
1333       } catch (IOException e) {
1334         LOG.warn(e.getMessage(), e);
1335         throw new IOError(Throwables.getStackTraceAsString(e));
1336       } catch (IllegalArgumentException e) {
1337         LOG.warn(e.getMessage(), e);
1338         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1339       }
1340     }
1341 
1342     @Deprecated
1343     @Override
1344     public long atomicIncrement(
1345         ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount)
1346             throws IOError, IllegalArgument, TException {
1347       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1348       if(famAndQf.length == 1) {
1349         return atomicIncrement(tableName, row, famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, amount);
1350       }
1351       return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
1352     }
1353 
1354     protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
1355         byte [] family, byte [] qualifier, long amount)
1356         throws IOError, IllegalArgument, TException {
1357       Table table;
1358       try {
1359         table = getTable(tableName);
1360         return table.incrementColumnValue(
1361             getBytes(row), family, qualifier, amount);
1362       } catch (IOException e) {
1363         LOG.warn(e.getMessage(), e);
1364         throw new IOError(Throwables.getStackTraceAsString(e));
1365       }
1366     }
1367 
1368     @Override
1369     public void scannerClose(int id) throws IOError, IllegalArgument {
1370       LOG.debug("scannerClose: id=" + id);
1371       ResultScannerWrapper resultScannerWrapper = getScanner(id);
1372       if (resultScannerWrapper == null) {
1373         String message = "scanner ID is invalid";
1374         LOG.warn(message);
1375         throw new IllegalArgument("scanner ID is invalid");
1376       }
1377       resultScannerWrapper.getScanner().close();
1378       removeScanner(id);
1379     }
1380 
1381     @Override
1382     public List<TRowResult> scannerGetList(int id,int nbRows)
1383         throws IllegalArgument, IOError {
1384       LOG.debug("scannerGetList: id=" + id);
1385       ResultScannerWrapper resultScannerWrapper = getScanner(id);
1386       if (null == resultScannerWrapper) {
1387         String message = "scanner ID is invalid";
1388         LOG.warn(message);
1389         throw new IllegalArgument("scanner ID is invalid");
1390       }
1391 
1392       Result [] results = null;
1393       try {
1394         results = resultScannerWrapper.getScanner().next(nbRows);
1395         if (null == results) {
1396           return new ArrayList<TRowResult>();
1397         }
1398       } catch (IOException e) {
1399         LOG.warn(e.getMessage(), e);
1400         throw new IOError(Throwables.getStackTraceAsString(e));
1401       }
1402       return ThriftUtilities.rowResultFromHBase(results, resultScannerWrapper.isColumnSorted());
1403     }
1404 
1405     @Override
1406     public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
1407       return scannerGetList(id,1);
1408     }
1409 
1410     @Override
1411     public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
1412         Map<ByteBuffer, ByteBuffer> attributes)
1413         throws IOError {
1414       try {
1415         Table table = getTable(tableName);
1416         Scan scan = new Scan();
1417         addAttributes(scan, attributes);
1418         if (tScan.isSetStartRow()) {
1419           scan.setStartRow(tScan.getStartRow());
1420         }
1421         if (tScan.isSetStopRow()) {
1422           scan.setStopRow(tScan.getStopRow());
1423         }
1424         if (tScan.isSetTimestamp()) {
1425           scan.setTimeRange(0, tScan.getTimestamp());
1426         }
1427         if (tScan.isSetCaching()) {
1428           scan.setCaching(tScan.getCaching());
1429         }
1430         if (tScan.isSetBatchSize()) {
1431           scan.setBatch(tScan.getBatchSize());
1432         }
1433         if (tScan.isSetColumns() && tScan.getColumns().size() != 0) {
1434           for(ByteBuffer column : tScan.getColumns()) {
1435             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1436             if(famQf.length == 1) {
1437               scan.addFamily(famQf[0]);
1438             } else {
1439               scan.addColumn(famQf[0], famQf[1]);
1440             }
1441           }
1442         }
1443         if (tScan.isSetFilterString()) {
1444           ParseFilter parseFilter = new ParseFilter();
1445           scan.setFilter(
1446               parseFilter.parseFilterString(tScan.getFilterString()));
1447         }
1448         if (tScan.isSetReversed()) {
1449           scan.setReversed(tScan.isReversed());
1450         }
1451         return addScanner(table.getScanner(scan), tScan.sortColumns);
1452       } catch (IOException e) {
1453         LOG.warn(e.getMessage(), e);
1454         throw new IOError(Throwables.getStackTraceAsString(e));
1455       }
1456     }
1457 
1458     @Override
1459     public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
1460         List<ByteBuffer> columns,
1461         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1462       try {
1463         Table table = getTable(tableName);
1464         Scan scan = new Scan(getBytes(startRow));
1465         addAttributes(scan, attributes);
1466         if(columns != null && columns.size() != 0) {
1467           for(ByteBuffer column : columns) {
1468             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1469             if(famQf.length == 1) {
1470               scan.addFamily(famQf[0]);
1471             } else {
1472               scan.addColumn(famQf[0], famQf[1]);
1473             }
1474           }
1475         }
1476         return addScanner(table.getScanner(scan), false);
1477       } catch (IOException e) {
1478         LOG.warn(e.getMessage(), e);
1479         throw new IOError(Throwables.getStackTraceAsString(e));
1480       }
1481     }
1482 
1483     @Override
1484     public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow,
1485         ByteBuffer stopRow, List<ByteBuffer> columns,
1486         Map<ByteBuffer, ByteBuffer> attributes)
1487         throws IOError, TException {
1488       try {
1489         Table table = getTable(tableName);
1490         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1491         addAttributes(scan, attributes);
1492         if(columns != null && columns.size() != 0) {
1493           for(ByteBuffer column : columns) {
1494             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1495             if(famQf.length == 1) {
1496               scan.addFamily(famQf[0]);
1497             } else {
1498               scan.addColumn(famQf[0], famQf[1]);
1499             }
1500           }
1501         }
1502         return addScanner(table.getScanner(scan), false);
1503       } catch (IOException e) {
1504         LOG.warn(e.getMessage(), e);
1505         throw new IOError(Throwables.getStackTraceAsString(e));
1506       }
1507     }
1508 
1509     @Override
1510     public int scannerOpenWithPrefix(ByteBuffer tableName,
1511                                      ByteBuffer startAndPrefix,
1512                                      List<ByteBuffer> columns,
1513         Map<ByteBuffer, ByteBuffer> attributes)
1514         throws IOError, TException {
1515       try {
1516         Table table = getTable(tableName);
1517         Scan scan = new Scan(getBytes(startAndPrefix));
1518         addAttributes(scan, attributes);
1519         Filter f = new WhileMatchFilter(
1520             new PrefixFilter(getBytes(startAndPrefix)));
1521         scan.setFilter(f);
1522         if (columns != null && columns.size() != 0) {
1523           for(ByteBuffer column : columns) {
1524             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1525             if(famQf.length == 1) {
1526               scan.addFamily(famQf[0]);
1527             } else {
1528               scan.addColumn(famQf[0], famQf[1]);
1529             }
1530           }
1531         }
1532         return addScanner(table.getScanner(scan), false);
1533       } catch (IOException e) {
1534         LOG.warn(e.getMessage(), e);
1535         throw new IOError(Throwables.getStackTraceAsString(e));
1536       }
1537     }
1538 
1539     @Override
1540     public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
1541         List<ByteBuffer> columns, long timestamp,
1542         Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
1543       try {
1544         Table table = getTable(tableName);
1545         Scan scan = new Scan(getBytes(startRow));
1546         addAttributes(scan, attributes);
1547         scan.setTimeRange(0, timestamp);
1548         if (columns != null && columns.size() != 0) {
1549           for (ByteBuffer column : columns) {
1550             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1551             if(famQf.length == 1) {
1552               scan.addFamily(famQf[0]);
1553             } else {
1554               scan.addColumn(famQf[0], famQf[1]);
1555             }
1556           }
1557         }
1558         return addScanner(table.getScanner(scan), false);
1559       } catch (IOException e) {
1560         LOG.warn(e.getMessage(), e);
1561         throw new IOError(Throwables.getStackTraceAsString(e));
1562       }
1563     }
1564 
1565     @Override
1566     public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow,
1567         ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
1568         Map<ByteBuffer, ByteBuffer> attributes)
1569         throws IOError, TException {
1570       try {
1571         Table table = getTable(tableName);
1572         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1573         addAttributes(scan, attributes);
1574         scan.setTimeRange(0, timestamp);
1575         if (columns != null && columns.size() != 0) {
1576           for (ByteBuffer column : columns) {
1577             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1578             if(famQf.length == 1) {
1579               scan.addFamily(famQf[0]);
1580             } else {
1581               scan.addColumn(famQf[0], famQf[1]);
1582             }
1583           }
1584         }
1585         scan.setTimeRange(0, timestamp);
1586         return addScanner(table.getScanner(scan), false);
1587       } catch (IOException e) {
1588         LOG.warn(e.getMessage(), e);
1589         throw new IOError(Throwables.getStackTraceAsString(e));
1590       }
1591     }
1592 
1593     @Override
1594     public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
1595         ByteBuffer tableName) throws IOError, TException {
1596       try {
1597         TreeMap<ByteBuffer, ColumnDescriptor> columns =
1598           new TreeMap<ByteBuffer, ColumnDescriptor>();
1599 
1600         Table table = getTable(tableName);
1601         HTableDescriptor desc = table.getTableDescriptor();
1602 
1603         for (HColumnDescriptor e : desc.getFamilies()) {
1604           ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
1605           columns.put(col.name, col);
1606         }
1607         return columns;
1608       } catch (IOException e) {
1609         LOG.warn(e.getMessage(), e);
1610         throw new IOError(Throwables.getStackTraceAsString(e));
1611       }
1612     }
1613 
1614     @Deprecated
1615     @Override
1616     public List<TCell> getRowOrBefore(ByteBuffer tableName, ByteBuffer row,
1617         ByteBuffer family) throws IOError {
1618       try {
1619         Result result = getRowOrBefore(getBytes(tableName), getBytes(row), getBytes(family));
1620         return ThriftUtilities.cellFromHBase(result.rawCells());
1621       } catch (IOException e) {
1622         LOG.warn(e.getMessage(), e);
1623         throw new IOError(Throwables.getStackTraceAsString(e));
1624       }
1625     }
1626 
1627     @Override
1628     public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
1629       try {
1630         byte[] row = getBytes(searchRow);
1631         Result startRowResult =
1632             getRowOrBefore(TableName.META_TABLE_NAME.getName(), row, HConstants.CATALOG_FAMILY);
1633 
1634         if (startRowResult == null) {
1635           throw new IOException("Cannot find row in "+ TableName.META_TABLE_NAME+", row="
1636                                 + Bytes.toStringBinary(row));
1637         }
1638 
1639         // find region start and end keys
1640         HRegionInfo regionInfo = HRegionInfo.getHRegionInfo(startRowResult);
1641         if (regionInfo == null) {
1642           throw new IOException("HRegionInfo REGIONINFO was null or " +
1643                                 " empty in Meta for row="
1644                                 + Bytes.toStringBinary(row));
1645         }
1646         TRegionInfo region = new TRegionInfo();
1647         region.setStartKey(regionInfo.getStartKey());
1648         region.setEndKey(regionInfo.getEndKey());
1649         region.id = regionInfo.getRegionId();
1650         region.setName(regionInfo.getRegionName());
1651         region.version = HREGION_VERSION; // version not used anymore, PB encoding used.
1652 
1653         // find region assignment to server
1654         ServerName serverName = HRegionInfo.getServerName(startRowResult);
1655         if (serverName != null) {
1656           region.setServerName(Bytes.toBytes(serverName.getHostname()));
1657           region.port = serverName.getPort();
1658         }
1659         return region;
1660       } catch (IOException e) {
1661         LOG.warn(e.getMessage(), e);
1662         throw new IOError(Throwables.getStackTraceAsString(e));
1663       }
1664     }
1665 
1666     private Result getRowOrBefore(byte[] tableName, byte[] row, byte[] family) throws IOException {
1667       Scan scan = new Scan(row);
1668       scan.setReversed(true);
1669       scan.addFamily(family);
1670       scan.setStartRow(row);
1671 
1672       Table table = getTable(tableName);
1673       try (ResultScanner scanner = table.getScanner(scan)) {
1674         return scanner.next();
1675       }
1676     }
1677 
1678     private void initMetrics(ThriftMetrics metrics) {
1679       this.metrics = metrics;
1680     }
1681 
1682     @Override
1683     public void increment(TIncrement tincrement) throws IOError, TException {
1684 
1685       if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) {
1686         throw new TException("Must supply a table and a row key; can't increment");
1687       }
1688 
1689       if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1690         this.coalescer.queueIncrement(tincrement);
1691         return;
1692       }
1693 
1694       try {
1695         Table table = getTable(tincrement.getTable());
1696         Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
1697         table.increment(inc);
1698       } catch (IOException e) {
1699         LOG.warn(e.getMessage(), e);
1700         throw new IOError(Throwables.getStackTraceAsString(e));
1701       }
1702     }
1703 
1704     @Override
1705     public void incrementRows(List<TIncrement> tincrements) throws IOError, TException {
1706       if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1707         this.coalescer.queueIncrements(tincrements);
1708         return;
1709       }
1710       for (TIncrement tinc : tincrements) {
1711         increment(tinc);
1712       }
1713     }
1714 
1715     @Override
1716     public List<TCell> append(TAppend tappend) throws IOError, TException {
1717       if (tappend.getRow().length == 0 || tappend.getTable().length == 0) {
1718         throw new TException("Must supply a table and a row key; can't append");
1719       }
1720 
1721       try {
1722         Table table = getTable(tappend.getTable());
1723         Append append = ThriftUtilities.appendFromThrift(tappend);
1724         Result result = table.append(append);
1725         return ThriftUtilities.cellFromHBase(result.rawCells());
1726       } catch (IOException e) {
1727         LOG.warn(e.getMessage(), e);
1728         throw new IOError(Throwables.getStackTraceAsString(e));
1729       }
1730     }
1731 
1732     @Override
1733     public boolean checkAndPut(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1734         ByteBuffer value, Mutation mput, Map<ByteBuffer, ByteBuffer> attributes) throws IOError,
1735         IllegalArgument, TException {
1736       Put put;
1737       try {
1738         put = new Put(getBytes(row), HConstants.LATEST_TIMESTAMP);
1739         addAttributes(put, attributes);
1740 
1741         byte[][] famAndQf = KeyValue.parseColumn(getBytes(mput.column));
1742 
1743         put.addImmutable(famAndQf[0], famAndQf[1], mput.value != null ? getBytes(mput.value)
1744             : HConstants.EMPTY_BYTE_ARRAY);
1745 
1746         put.setDurability(mput.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1747       } catch (IllegalArgumentException e) {
1748         LOG.warn(e.getMessage(), e);
1749         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1750       }
1751 
1752       Table table = null;
1753       try {
1754         table = getTable(tableName);
1755         byte[][] famAndQf = KeyValue.parseColumn(getBytes(column));
1756         return table.checkAndPut(getBytes(row), famAndQf[0], famAndQf[1],
1757           value != null ? getBytes(value) : HConstants.EMPTY_BYTE_ARRAY, put);
1758       } catch (IOException e) {
1759         LOG.warn(e.getMessage(), e);
1760         throw new IOError(Throwables.getStackTraceAsString(e));
1761       } catch (IllegalArgumentException e) {
1762         LOG.warn(e.getMessage(), e);
1763         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1764       }
1765     }
1766   }
1767 
1768 
1769 
1770   /**
1771    * Adds all the attributes into the Operation object
1772    */
1773   private static void addAttributes(OperationWithAttributes op,
1774     Map<ByteBuffer, ByteBuffer> attributes) {
1775     if (attributes == null || attributes.size() == 0) {
1776       return;
1777     }
1778     for (Map.Entry<ByteBuffer, ByteBuffer> entry : attributes.entrySet()) {
1779       String name = Bytes.toStringBinary(getBytes(entry.getKey()));
1780       byte[] value =  getBytes(entry.getValue());
1781       op.setAttribute(name, value);
1782     }
1783   }
1784 
1785   public static void registerFilters(Configuration conf) {
1786     String[] filters = conf.getStrings("hbase.thrift.filters");
1787     if(filters != null) {
1788       for(String filterClass: filters) {
1789         String[] filterPart = filterClass.split(":");
1790         if(filterPart.length != 2) {
1791           LOG.warn("Invalid filter specification " + filterClass + " - skipping");
1792         } else {
1793           ParseFilter.registerFilter(filterPart[0], filterPart[1]);
1794         }
1795       }
1796     }
1797   }
1798 }