001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.thrift;
020
021import static org.apache.hadoop.hbase.util.Bytes.getBytes;
022
023import java.io.IOException;
024import java.net.InetAddress;
025import java.net.InetSocketAddress;
026import java.net.UnknownHostException;
027import java.nio.ByteBuffer;
028import java.security.PrivilegedAction;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.Collections;
032import java.util.HashMap;
033import java.util.List;
034import java.util.Map;
035import java.util.TreeMap;
036import java.util.concurrent.BlockingQueue;
037import java.util.concurrent.ExecutorService;
038import java.util.concurrent.LinkedBlockingQueue;
039import java.util.concurrent.ThreadPoolExecutor;
040import java.util.concurrent.TimeUnit;
041
042import javax.security.auth.callback.Callback;
043import javax.security.auth.callback.UnsupportedCallbackException;
044import javax.security.sasl.AuthorizeCallback;
045import javax.security.sasl.SaslServer;
046
047import org.apache.commons.lang3.ArrayUtils;
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.hbase.Cell.Type;
050import org.apache.hadoop.hbase.CellBuilder;
051import org.apache.hadoop.hbase.CellBuilderFactory;
052import org.apache.hadoop.hbase.CellBuilderType;
053import org.apache.hadoop.hbase.CellUtil;
054import org.apache.hadoop.hbase.HBaseConfiguration;
055import org.apache.hadoop.hbase.HColumnDescriptor;
056import org.apache.hadoop.hbase.HConstants;
057import org.apache.hadoop.hbase.HRegionLocation;
058import org.apache.hadoop.hbase.HTableDescriptor;
059import org.apache.hadoop.hbase.KeyValue;
060import org.apache.hadoop.hbase.MetaTableAccessor;
061import org.apache.hadoop.hbase.ServerName;
062import org.apache.hadoop.hbase.TableName;
063import org.apache.hadoop.hbase.TableNotFoundException;
064import org.apache.hadoop.hbase.client.Admin;
065import org.apache.hadoop.hbase.client.Append;
066import org.apache.hadoop.hbase.client.Delete;
067import org.apache.hadoop.hbase.client.Durability;
068import org.apache.hadoop.hbase.client.Get;
069import org.apache.hadoop.hbase.client.Increment;
070import org.apache.hadoop.hbase.client.OperationWithAttributes;
071import org.apache.hadoop.hbase.client.Put;
072import org.apache.hadoop.hbase.client.RegionInfo;
073import org.apache.hadoop.hbase.client.RegionLocator;
074import org.apache.hadoop.hbase.client.Result;
075import org.apache.hadoop.hbase.client.ResultScanner;
076import org.apache.hadoop.hbase.client.Scan;
077import org.apache.hadoop.hbase.client.Table;
078import org.apache.hadoop.hbase.filter.Filter;
079import org.apache.hadoop.hbase.filter.ParseFilter;
080import org.apache.hadoop.hbase.filter.PrefixFilter;
081import org.apache.hadoop.hbase.filter.WhileMatchFilter;
082import org.apache.hadoop.hbase.http.HttpServerUtil;
083import org.apache.hadoop.hbase.log.HBaseMarkers;
084import org.apache.hadoop.hbase.security.SaslUtil;
085import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
086import org.apache.hadoop.hbase.security.SecurityUtil;
087import org.apache.hadoop.hbase.security.UserProvider;
088import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
089import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
090import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
091import org.apache.hadoop.hbase.thrift.generated.Hbase;
092import org.apache.hadoop.hbase.thrift.generated.IOError;
093import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
094import org.apache.hadoop.hbase.thrift.generated.Mutation;
095import org.apache.hadoop.hbase.thrift.generated.TAppend;
096import org.apache.hadoop.hbase.thrift.generated.TCell;
097import org.apache.hadoop.hbase.thrift.generated.TIncrement;
098import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
099import org.apache.hadoop.hbase.thrift.generated.TRowResult;
100import org.apache.hadoop.hbase.thrift.generated.TScan;
101import org.apache.hadoop.hbase.util.Bytes;
102import org.apache.hadoop.hbase.util.ConnectionCache;
103import org.apache.hadoop.hbase.util.DNS;
104import org.apache.hadoop.hbase.util.JvmPauseMonitor;
105import org.apache.hadoop.hbase.util.Strings;
106import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
107import org.apache.hadoop.security.UserGroupInformation;
108import org.apache.hadoop.security.authorize.ProxyUsers;
109import org.apache.thrift.TException;
110import org.apache.thrift.TProcessor;
111import org.apache.thrift.protocol.TBinaryProtocol;
112import org.apache.thrift.protocol.TCompactProtocol;
113import org.apache.thrift.protocol.TProtocolFactory;
114import org.apache.thrift.server.THsHaServer;
115import org.apache.thrift.server.TNonblockingServer;
116import org.apache.thrift.server.TServer;
117import org.apache.thrift.server.TServlet;
118import org.apache.thrift.server.TThreadedSelectorServer;
119import org.apache.thrift.transport.TFramedTransport;
120import org.apache.thrift.transport.TNonblockingServerSocket;
121import org.apache.thrift.transport.TNonblockingServerTransport;
122import org.apache.thrift.transport.TSaslServerTransport;
123import org.apache.thrift.transport.TServerSocket;
124import org.apache.thrift.transport.TServerTransport;
125import org.apache.thrift.transport.TTransportFactory;
126import org.apache.yetus.audience.InterfaceAudience;
127import org.eclipse.jetty.http.HttpVersion;
128import org.eclipse.jetty.server.HttpConfiguration;
129import org.eclipse.jetty.server.HttpConnectionFactory;
130import org.eclipse.jetty.server.SecureRequestCustomizer;
131import org.eclipse.jetty.server.Server;
132import org.eclipse.jetty.server.ServerConnector;
133import org.eclipse.jetty.server.SslConnectionFactory;
134import org.eclipse.jetty.servlet.ServletContextHandler;
135import org.eclipse.jetty.servlet.ServletHolder;
136import org.eclipse.jetty.util.ssl.SslContextFactory;
137import org.eclipse.jetty.util.thread.QueuedThreadPool;
138import org.slf4j.Logger;
139import org.slf4j.LoggerFactory;
140import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
141import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
142import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
143import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
144import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
145import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
146import org.apache.hbase.thirdparty.org.apache.commons.cli.OptionGroup;
147
148/**
149 * ThriftServerRunner - this class starts up a Thrift server which implements
150 * the Hbase API specified in the Hbase.thrift IDL file.
151 */
152@InterfaceAudience.Private
153public class ThriftServerRunner implements Runnable {
154
155  private static final Logger LOG = LoggerFactory.getLogger(ThriftServerRunner.class);
156
157  private static final int DEFAULT_HTTP_MAX_HEADER_SIZE = 64 * 1024; // 64k
158
159  static final String SERVER_TYPE_CONF_KEY =
160      "hbase.regionserver.thrift.server.type";
161
162  static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress";
163  static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
164  static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
165  static final String MAX_FRAME_SIZE_CONF_KEY = "hbase.regionserver.thrift.framed.max_frame_size_in_mb";
166  static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
167  static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
168  static final String USE_HTTP_CONF_KEY = "hbase.regionserver.thrift.http";
169  static final String HTTP_MIN_THREADS_KEY = "hbase.thrift.http_threads.min";
170  static final String HTTP_MAX_THREADS_KEY = "hbase.thrift.http_threads.max";
171
172  static final String THRIFT_SSL_ENABLED_KEY = "hbase.thrift.ssl.enabled";
173  static final String THRIFT_SSL_KEYSTORE_STORE_KEY = "hbase.thrift.ssl.keystore.store";
174  static final String THRIFT_SSL_KEYSTORE_PASSWORD_KEY = "hbase.thrift.ssl.keystore.password";
175  static final String THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY = "hbase.thrift.ssl.keystore.keypassword";
176  static final String THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY =
177      "hbase.thrift.ssl.exclude.cipher.suites";
178  static final String THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY =
179      "hbase.thrift.ssl.include.cipher.suites";
180  static final String THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY = "hbase.thrift.ssl.exclude.protocols";
181  static final String THRIFT_SSL_INCLUDE_PROTOCOLS_KEY = "hbase.thrift.ssl.include.protocols";
182
183  static final String THRIFT_SUPPORT_PROXYUSER_KEY = "hbase.thrift.support.proxyuser";
184
185  static final String THRIFT_DNS_INTERFACE_KEY = "hbase.thrift.dns.interface";
186  static final String THRIFT_DNS_NAMESERVER_KEY = "hbase.thrift.dns.nameserver";
187  static final String THRIFT_KERBEROS_PRINCIPAL_KEY = "hbase.thrift.kerberos.principal";
188  static final String THRIFT_KEYTAB_FILE_KEY = "hbase.thrift.keytab.file";
189  static final String THRIFT_SPNEGO_PRINCIPAL_KEY = "hbase.thrift.spnego.principal";
190  static final String THRIFT_SPNEGO_KEYTAB_FILE_KEY = "hbase.thrift.spnego.keytab.file";
191
192  /**
193   * Amount of time in milliseconds before a server thread will timeout
194   * waiting for client to send data on a connected socket. Currently,
195   * applies only to TBoundedThreadPoolServer
196   */
197  public static final String THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY =
198    "hbase.thrift.server.socket.read.timeout";
199  public static final int THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT = 60000;
200
201
202  /**
203   * Thrift quality of protection configuration key. Valid values can be:
204   * auth-conf: authentication, integrity and confidentiality checking
205   * auth-int: authentication and integrity checking
206   * auth: authentication only
207   *
208   * This is used to authenticate the callers and support impersonation.
209   * The thrift server and the HBase cluster must run in secure mode.
210   */
211  static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
212  static final String BACKLOG_CONF_KEY = "hbase.regionserver.thrift.backlog";
213
214  private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
215  public static final int DEFAULT_LISTEN_PORT = 9090;
216  public static final int HREGION_VERSION = 1;
217
218  private final int listenPort;
219
220  private Configuration conf;
221  volatile TServer tserver;
222  volatile Server httpServer;
223  private final Hbase.Iface handler;
224  private final ThriftMetrics metrics;
225  private final HBaseHandler hbaseHandler;
226  private final UserGroupInformation serviceUGI;
227  private UserGroupInformation httpUGI;
228
229  private SaslUtil.QualityOfProtection qop;
230  private String host;
231
232  private final boolean securityEnabled;
233  private final boolean doAsEnabled;
234
235  private final JvmPauseMonitor pauseMonitor;
236
237  static String THRIFT_HTTP_ALLOW_OPTIONS_METHOD = "hbase.thrift.http.allow.options.method";
238  private static boolean THRIFT_HTTP_ALLOW_OPTIONS_METHOD_DEFAULT = false;
239
240  /** An enum of server implementation selections */
241  public enum ImplType {
242    HS_HA("hsha", true, THsHaServer.class, true),
243    NONBLOCKING("nonblocking", true, TNonblockingServer.class, true),
244    THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true),
245    THREADED_SELECTOR("threadedselector", true, TThreadedSelectorServer.class, true);
246
247    public static final ImplType DEFAULT = THREAD_POOL;
248
249    final String option;
250    final boolean isAlwaysFramed;
251    final Class<? extends TServer> serverClass;
252    final boolean canSpecifyBindIP;
253
254    private ImplType(String option, boolean isAlwaysFramed,
255        Class<? extends TServer> serverClass, boolean canSpecifyBindIP) {
256      this.option = option;
257      this.isAlwaysFramed = isAlwaysFramed;
258      this.serverClass = serverClass;
259      this.canSpecifyBindIP = canSpecifyBindIP;
260    }
261
262    /**
263     * @return <code>-option</code>
264     */
265    @Override
266    public String toString() {
267      return "-" + option;
268    }
269
270    public String getOption() {
271      return option;
272    }
273
274    public boolean isAlwaysFramed() {
275      return isAlwaysFramed;
276    }
277
278    public String getDescription() {
279      StringBuilder sb = new StringBuilder("Use the " +
280          serverClass.getSimpleName());
281      if (isAlwaysFramed) {
282        sb.append(" This implies the framed transport.");
283      }
284      if (this == DEFAULT) {
285        sb.append("This is the default.");
286      }
287      return sb.toString();
288    }
289
290    static OptionGroup createOptionGroup() {
291      OptionGroup group = new OptionGroup();
292      for (ImplType t : values()) {
293        group.addOption(new Option(t.option, t.getDescription()));
294      }
295      return group;
296    }
297
298    public static ImplType getServerImpl(Configuration conf) {
299      String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option);
300      for (ImplType t : values()) {
301        if (confType.equals(t.option)) {
302          return t;
303        }
304      }
305      throw new AssertionError("Unknown server ImplType.option:" + confType);
306    }
307
308    static void setServerImpl(CommandLine cmd, Configuration conf) {
309      ImplType chosenType = null;
310      int numChosen = 0;
311      for (ImplType t : values()) {
312        if (cmd.hasOption(t.option)) {
313          chosenType = t;
314          ++numChosen;
315        }
316      }
317      if (numChosen < 1) {
318        LOG.info("Using default thrift server type");
319        chosenType = DEFAULT;
320      } else if (numChosen > 1) {
321        throw new AssertionError("Exactly one option out of " +
322          Arrays.toString(values()) + " has to be specified");
323      }
324      LOG.info("Using thrift server type " + chosenType.option);
325      conf.set(SERVER_TYPE_CONF_KEY, chosenType.option);
326    }
327
328    public String simpleClassName() {
329      return serverClass.getSimpleName();
330    }
331
332    public static List<String> serversThatCannotSpecifyBindIP() {
333      List<String> l = new ArrayList<>();
334      for (ImplType t : values()) {
335        if (!t.canSpecifyBindIP) {
336          l.add(t.simpleClassName());
337        }
338      }
339      return l;
340    }
341  }
342
343  public ThriftServerRunner(Configuration conf) throws IOException {
344    // login the server principal (if using secure Hadoop)
345    UserProvider userProvider = UserProvider.instantiate(conf);
346    securityEnabled = userProvider.isHadoopSecurityEnabled()
347        && userProvider.isHBaseSecurityEnabled();
348    if (securityEnabled) {
349      host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
350        conf.get(THRIFT_DNS_INTERFACE_KEY, "default"),
351        conf.get(THRIFT_DNS_NAMESERVER_KEY, "default")));
352      userProvider.login(THRIFT_KEYTAB_FILE_KEY, THRIFT_KERBEROS_PRINCIPAL_KEY, host);
353
354      // Setup the SPNEGO user for HTTP if configured
355      String spnegoPrincipal = getSpengoPrincipal(conf, host);
356      String spnegoKeytab = getSpnegoKeytab(conf);
357      UserGroupInformation.setConfiguration(conf);
358      // login the SPNEGO principal using UGI to avoid polluting the login user
359      this.httpUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(spnegoPrincipal,
360        spnegoKeytab);
361    }
362    this.serviceUGI = userProvider.getCurrent().getUGI();
363    if (httpUGI == null) {
364      this.httpUGI = serviceUGI;
365    }
366
367    this.conf = HBaseConfiguration.create(conf);
368    this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
369    this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
370    this.pauseMonitor = new JvmPauseMonitor(conf, this.metrics.getSource());
371    this.hbaseHandler = new HBaseHandler(conf, userProvider);
372    this.hbaseHandler.initMetrics(metrics);
373    this.handler = HbaseHandlerMetricsProxy.newInstance(hbaseHandler, metrics, conf);
374
375    boolean httpEnabled = conf.getBoolean(USE_HTTP_CONF_KEY, false);
376    doAsEnabled = conf.getBoolean(THRIFT_SUPPORT_PROXYUSER_KEY, false);
377    if (doAsEnabled && !httpEnabled) {
378      LOG.warn("Fail to enable the doAs feature. " + USE_HTTP_CONF_KEY + " is not configured");
379    }
380
381    String strQop = conf.get(THRIFT_QOP_KEY);
382    if (strQop != null) {
383      this.qop = SaslUtil.getQop(strQop);
384    }
385    if (qop != null) {
386      if (qop != QualityOfProtection.AUTHENTICATION &&
387          qop != QualityOfProtection.INTEGRITY &&
388          qop != QualityOfProtection.PRIVACY) {
389        throw new IOException(String.format("Invalid %s: It must be one of %s, %s, or %s.",
390                              THRIFT_QOP_KEY,
391                              QualityOfProtection.AUTHENTICATION.name(),
392                              QualityOfProtection.INTEGRITY.name(),
393                              QualityOfProtection.PRIVACY.name()));
394      }
395      checkHttpSecurity(qop, conf);
396      if (!securityEnabled) {
397        throw new IOException("Thrift server must run in secure mode to support authentication");
398      }
399    }
400  }
401
402
403  private String getSpengoPrincipal(Configuration conf, String host) throws IOException {
404    String principal = conf.get(THRIFT_SPNEGO_PRINCIPAL_KEY);
405    if (principal == null) {
406      // We cannot use the Hadoop configuration deprecation handling here since
407      // the THRIFT_KERBEROS_PRINCIPAL_KEY config is still valid for regular Kerberos
408      // communication. The preference should be to use the THRIFT_SPNEGO_PRINCIPAL_KEY
409      // config so that THRIFT_KERBEROS_PRINCIPAL_KEY doesn't control both backend
410      // Kerberos principal and SPNEGO principal.
411      LOG.info("Using deprecated {} config for SPNEGO principal. Use {} instead.",
412        THRIFT_KERBEROS_PRINCIPAL_KEY, THRIFT_SPNEGO_PRINCIPAL_KEY);
413      principal = conf.get(THRIFT_KERBEROS_PRINCIPAL_KEY);
414    }
415    // Handle _HOST in principal value
416    return org.apache.hadoop.security.SecurityUtil.getServerPrincipal(principal, host);
417  }
418
419  private String getSpnegoKeytab(Configuration conf) {
420    String keytab = conf.get(THRIFT_SPNEGO_KEYTAB_FILE_KEY);
421    if (keytab == null) {
422      // We cannot use the Hadoop configuration deprecation handling here since
423      // the THRIFT_KEYTAB_FILE_KEY config is still valid for regular Kerberos
424      // communication. The preference should be to use the THRIFT_SPNEGO_KEYTAB_FILE_KEY
425      // config so that THRIFT_KEYTAB_FILE_KEY doesn't control both backend
426      // Kerberos keytab and SPNEGO keytab.
427      LOG.info("Using deprecated {} config for SPNEGO keytab. Use {} instead.",
428        THRIFT_KEYTAB_FILE_KEY, THRIFT_SPNEGO_KEYTAB_FILE_KEY);
429      keytab = conf.get(THRIFT_KEYTAB_FILE_KEY);
430    }
431    return keytab;
432  }
433
434  private void checkHttpSecurity(QualityOfProtection qop, Configuration conf) {
435    if (qop == QualityOfProtection.PRIVACY &&
436        conf.getBoolean(USE_HTTP_CONF_KEY, false) &&
437        !conf.getBoolean(THRIFT_SSL_ENABLED_KEY, false)) {
438      throw new IllegalArgumentException("Thrift HTTP Server's QoP is privacy, but " +
439          THRIFT_SSL_ENABLED_KEY + " is false");
440    }
441  }
442
443  /*
444   * Runs the Thrift server
445   */
446  @Override
447  public void run() {
448    serviceUGI.doAs(new PrivilegedAction<Object>() {
449      @Override
450      public Object run() {
451        try {
452          pauseMonitor.start();
453          if (conf.getBoolean(USE_HTTP_CONF_KEY, false)) {
454            setupHTTPServer();
455            httpServer.start();
456            httpServer.join();
457          } else {
458            setupServer();
459            tserver.serve();
460          }
461        } catch (Exception e) {
462          LOG.error(HBaseMarkers.FATAL, "Cannot run ThriftServer", e);
463          // Crash the process if the ThriftServer is not running
464          System.exit(-1);
465        }
466        return null;
467      }
468    });
469
470  }
471
472  public void shutdown() {
473    if (pauseMonitor != null) {
474      pauseMonitor.stop();
475    }
476    if (tserver != null) {
477      tserver.stop();
478      tserver = null;
479    }
480    if (httpServer != null) {
481      try {
482        httpServer.stop();
483        httpServer = null;
484      } catch (Exception e) {
485        LOG.error("Problem encountered in shutting down HTTP server", e);
486      }
487      httpServer = null;
488    }
489  }
490
491  private void setupHTTPServer() throws IOException {
492    TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
493    TProcessor processor = new Hbase.Processor<>(handler);
494    TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, serviceUGI,
495        httpUGI, hbaseHandler, securityEnabled, doAsEnabled);
496
497    // Set the default max thread number to 100 to limit
498    // the number of concurrent requests so that Thrfit HTTP server doesn't OOM easily.
499    // Jetty set the default max thread number to 250, if we don't set it.
500    //
501    // Our default min thread number 2 is the same as that used by Jetty.
502    int minThreads = conf.getInt(HTTP_MIN_THREADS_KEY, 2);
503    int maxThreads = conf.getInt(HTTP_MAX_THREADS_KEY, 100);
504    QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads);
505    threadPool.setMinThreads(minThreads);
506    httpServer = new Server(threadPool);
507
508    // Context handler
509    ServletContextHandler ctxHandler = new ServletContextHandler(httpServer, "/", ServletContextHandler.SESSIONS);
510    ctxHandler.addServlet(new ServletHolder(thriftHttpServlet), "/*");
511    HttpServerUtil.constrainHttpMethods(ctxHandler,
512      conf.getBoolean(THRIFT_HTTP_ALLOW_OPTIONS_METHOD, THRIFT_HTTP_ALLOW_OPTIONS_METHOD_DEFAULT));
513
514    // set up Jetty and run the embedded server
515    HttpConfiguration httpConfig = new HttpConfiguration();
516    httpConfig.setSecureScheme("https");
517    httpConfig.setSecurePort(listenPort);
518    httpConfig.setHeaderCacheSize(DEFAULT_HTTP_MAX_HEADER_SIZE);
519    httpConfig.setRequestHeaderSize(DEFAULT_HTTP_MAX_HEADER_SIZE);
520    httpConfig.setResponseHeaderSize(DEFAULT_HTTP_MAX_HEADER_SIZE);
521    httpConfig.setSendServerVersion(false);
522    httpConfig.setSendDateHeader(false);
523
524    ServerConnector serverConnector;
525    if(conf.getBoolean(THRIFT_SSL_ENABLED_KEY, false)) {
526      HttpConfiguration httpsConfig = new HttpConfiguration(httpConfig);
527      httpsConfig.addCustomizer(new SecureRequestCustomizer());
528
529      SslContextFactory sslCtxFactory = new SslContextFactory();
530      String keystore = conf.get(THRIFT_SSL_KEYSTORE_STORE_KEY);
531      String password = HBaseConfiguration.getPassword(conf,
532          THRIFT_SSL_KEYSTORE_PASSWORD_KEY, null);
533      String keyPassword = HBaseConfiguration.getPassword(conf,
534          THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY, password);
535      sslCtxFactory.setKeyStorePath(keystore);
536      sslCtxFactory.setKeyStorePassword(password);
537      sslCtxFactory.setKeyManagerPassword(keyPassword);
538
539      String[] excludeCiphers = conf.getStrings(
540          THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY, ArrayUtils.EMPTY_STRING_ARRAY);
541      if (excludeCiphers.length != 0) {
542        sslCtxFactory.setExcludeCipherSuites(excludeCiphers);
543      }
544      String[] includeCiphers = conf.getStrings(
545          THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY, ArrayUtils.EMPTY_STRING_ARRAY);
546      if (includeCiphers.length != 0) {
547        sslCtxFactory.setIncludeCipherSuites(includeCiphers);
548      }
549
550      // Disable SSLv3 by default due to "Poodle" Vulnerability - CVE-2014-3566
551      String[] excludeProtocols = conf.getStrings(
552          THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY, "SSLv3");
553      if (excludeProtocols.length != 0) {
554        sslCtxFactory.setExcludeProtocols(excludeProtocols);
555      }
556      String[] includeProtocols = conf.getStrings(
557          THRIFT_SSL_INCLUDE_PROTOCOLS_KEY, ArrayUtils.EMPTY_STRING_ARRAY);
558      if (includeProtocols.length != 0) {
559        sslCtxFactory.setIncludeProtocols(includeProtocols);
560      }
561
562      serverConnector = new ServerConnector(httpServer,
563          new SslConnectionFactory(sslCtxFactory, HttpVersion.HTTP_1_1.toString()),
564          new HttpConnectionFactory(httpsConfig));
565    } else {
566      serverConnector = new ServerConnector(httpServer, new HttpConnectionFactory(httpConfig));
567    }
568    serverConnector.setPort(listenPort);
569    serverConnector.setHost(getBindAddress(conf).getHostAddress());
570    httpServer.addConnector(serverConnector);
571    httpServer.setStopAtShutdown(true);
572
573    if (doAsEnabled) {
574      ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
575    }
576
577    LOG.info("Starting Thrift HTTP Server on {}", Integer.toString(listenPort));
578  }
579
580  /**
581   * Setting up the thrift TServer
582   */
583  private void setupServer() throws Exception {
584    // Construct correct ProtocolFactory
585    TProtocolFactory protocolFactory;
586    if (conf.getBoolean(COMPACT_CONF_KEY, false)) {
587      LOG.debug("Using compact protocol");
588      protocolFactory = new TCompactProtocol.Factory();
589    } else {
590      LOG.debug("Using binary protocol");
591      protocolFactory = new TBinaryProtocol.Factory();
592    }
593
594    final TProcessor p = new Hbase.Processor<>(handler);
595    ImplType implType = ImplType.getServerImpl(conf);
596    TProcessor processor = p;
597
598    // Construct correct TransportFactory
599    TTransportFactory transportFactory;
600    if (conf.getBoolean(FRAMED_CONF_KEY, false) || implType.isAlwaysFramed) {
601      if (qop != null) {
602        throw new RuntimeException("Thrift server authentication"
603          + " doesn't work with framed transport yet");
604      }
605      transportFactory = new TFramedTransport.Factory(
606          conf.getInt(MAX_FRAME_SIZE_CONF_KEY, 2)  * 1024 * 1024);
607      LOG.debug("Using framed transport");
608    } else if (qop == null) {
609      transportFactory = new TTransportFactory();
610    } else {
611      // Extract the name from the principal
612      String thriftKerberosPrincipal = conf.get(THRIFT_KERBEROS_PRINCIPAL_KEY);
613      if (thriftKerberosPrincipal == null) {
614        throw new IllegalArgumentException(THRIFT_KERBEROS_PRINCIPAL_KEY + " cannot be null");
615      }
616      String name = SecurityUtil.getUserFromPrincipal(thriftKerberosPrincipal);
617      Map<String, String> saslProperties = SaslUtil.initSaslProperties(qop.name());
618      TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
619      saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
620        new SaslGssCallbackHandler() {
621          @Override
622          public void handle(Callback[] callbacks)
623              throws UnsupportedCallbackException {
624            AuthorizeCallback ac = null;
625            for (Callback callback : callbacks) {
626              if (callback instanceof AuthorizeCallback) {
627                ac = (AuthorizeCallback) callback;
628              } else {
629                throw new UnsupportedCallbackException(callback,
630                    "Unrecognized SASL GSSAPI Callback");
631              }
632            }
633            if (ac != null) {
634              String authid = ac.getAuthenticationID();
635              String authzid = ac.getAuthorizationID();
636              if (!authid.equals(authzid)) {
637                ac.setAuthorized(false);
638              } else {
639                ac.setAuthorized(true);
640                String userName = SecurityUtil.getUserFromPrincipal(authzid);
641                LOG.info("Effective user: {}", userName);
642                ac.setAuthorizedID(userName);
643              }
644            }
645          }
646        });
647      transportFactory = saslFactory;
648
649      // Create a processor wrapper, to get the caller
650      processor = (inProt, outProt) -> {
651        TSaslServerTransport saslServerTransport =
652          (TSaslServerTransport)inProt.getTransport();
653        SaslServer saslServer = saslServerTransport.getSaslServer();
654        String principal = saslServer.getAuthorizationID();
655        hbaseHandler.setEffectiveUser(principal);
656        return p.process(inProt, outProt);
657      };
658    }
659
660    if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
661      LOG.error("Server types {} don't support IP address binding at the moment. See " +
662          "https://issues.apache.org/jira/browse/HBASE-2155 for details.",
663          Joiner.on(", ").join(ImplType.serversThatCannotSpecifyBindIP()));
664      throw new RuntimeException("-" + BIND_CONF_KEY + " not supported with " + implType);
665    }
666
667    // Thrift's implementation uses '0' as a placeholder for 'use the default.'
668    int backlog = conf.getInt(BACKLOG_CONF_KEY, 0);
669
670    if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING ||
671        implType == ImplType.THREADED_SELECTOR) {
672      InetAddress listenAddress = getBindAddress(conf);
673      TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(
674          new InetSocketAddress(listenAddress, listenPort));
675
676      if (implType == ImplType.NONBLOCKING) {
677        TNonblockingServer.Args serverArgs =
678            new TNonblockingServer.Args(serverTransport);
679        serverArgs.processor(processor)
680                  .transportFactory(transportFactory)
681                  .protocolFactory(protocolFactory);
682        tserver = new TNonblockingServer(serverArgs);
683      } else if (implType == ImplType.HS_HA) {
684        THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
685        CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(), metrics);
686        ExecutorService executorService = createExecutor(
687            callQueue, serverArgs.getMaxWorkerThreads(), serverArgs.getMaxWorkerThreads());
688        serverArgs.executorService(executorService)
689                  .processor(processor)
690                  .transportFactory(transportFactory)
691                  .protocolFactory(protocolFactory);
692        tserver = new THsHaServer(serverArgs);
693      } else { // THREADED_SELECTOR
694        TThreadedSelectorServer.Args serverArgs =
695            new HThreadedSelectorServerArgs(serverTransport, conf);
696        CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(), metrics);
697        ExecutorService executorService = createExecutor(
698            callQueue, serverArgs.getWorkerThreads(), serverArgs.getWorkerThreads());
699        serverArgs.executorService(executorService)
700                  .processor(processor)
701                  .transportFactory(transportFactory)
702                  .protocolFactory(protocolFactory);
703        tserver = new TThreadedSelectorServer(serverArgs);
704      }
705      LOG.info("starting HBase {} server on {}", implType.simpleClassName(),
706          Integer.toString(listenPort));
707    } else if (implType == ImplType.THREAD_POOL) {
708      // Thread pool server. Get the IP address to bind to.
709      InetAddress listenAddress = getBindAddress(conf);
710      int readTimeout = conf.getInt(THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY,
711          THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT);
712      TServerTransport serverTransport = new TServerSocket(
713          new TServerSocket.ServerSocketTransportArgs().
714              bindAddr(new InetSocketAddress(listenAddress, listenPort)).
715              backlog(backlog).
716              clientTimeout(readTimeout));
717
718      TBoundedThreadPoolServer.Args serverArgs =
719          new TBoundedThreadPoolServer.Args(serverTransport, conf);
720      serverArgs.processor(processor)
721                .transportFactory(transportFactory)
722                .protocolFactory(protocolFactory);
723      LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
724          + listenAddress + ":" + Integer.toString(listenPort)
725          + " with readTimeout " + readTimeout + "ms; " + serverArgs);
726      TBoundedThreadPoolServer tserver =
727          new TBoundedThreadPoolServer(serverArgs, metrics);
728      this.tserver = tserver;
729    } else {
730      throw new AssertionError("Unsupported Thrift server implementation: " +
731          implType.simpleClassName());
732    }
733
734    // A sanity check that we instantiated the right type of server.
735    if (tserver.getClass() != implType.serverClass) {
736      throw new AssertionError("Expected to create Thrift server class " +
737          implType.serverClass.getName() + " but got " +
738          tserver.getClass().getName());
739    }
740
741
742
743    registerFilters(conf);
744  }
745
746  ExecutorService createExecutor(BlockingQueue<Runnable> callQueue,
747                                 int minWorkers, int maxWorkers) {
748    ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
749    tfb.setDaemon(true);
750    tfb.setNameFormat("thrift-worker-%d");
751    ThreadPoolExecutor threadPool = new THBaseThreadPoolExecutor(minWorkers, maxWorkers,
752            Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build(), metrics);
753    threadPool.allowCoreThreadTimeOut(true);
754    return threadPool;
755  }
756
757  private InetAddress getBindAddress(Configuration conf)
758      throws UnknownHostException {
759    String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
760    return InetAddress.getByName(bindAddressStr);
761  }
762
763  protected static class ResultScannerWrapper {
764
765    private final ResultScanner scanner;
766    private final boolean sortColumns;
767    public ResultScannerWrapper(ResultScanner resultScanner,
768                                boolean sortResultColumns) {
769      scanner = resultScanner;
770      sortColumns = sortResultColumns;
771   }
772
773    public ResultScanner getScanner() {
774      return scanner;
775    }
776
777    public boolean isColumnSorted() {
778      return sortColumns;
779    }
780  }
781
782  /**
783   * The HBaseHandler is a glue object that connects Thrift RPC calls to the
784   * HBase client API primarily defined in the Admin and Table objects.
785   */
786  public static class HBaseHandler implements Hbase.Iface {
787    protected Configuration conf;
788    protected static final Logger LOG = LoggerFactory.getLogger(HBaseHandler.class);
789
790    // nextScannerId and scannerMap are used to manage scanner state
791    protected int nextScannerId = 0;
792    protected HashMap<Integer, ResultScannerWrapper> scannerMap;
793    private ThriftMetrics metrics = null;
794
795    private final ConnectionCache connectionCache;
796    IncrementCoalescer coalescer;
797
798    static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
799    static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
800
801    /**
802     * Returns a list of all the column families for a given Table.
803     *
804     * @param table
805     * @throws IOException
806     */
807    byte[][] getAllColumns(Table table) throws IOException {
808      HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies();
809      byte[][] columns = new byte[cds.length][];
810      for (int i = 0; i < cds.length; i++) {
811        columns[i] = Bytes.add(cds[i].getName(),
812            KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
813      }
814      return columns;
815    }
816
817    /**
818     * Creates and returns a Table instance from a given table name.
819     *
820     * @param tableName
821     *          name of table
822     * @return Table object
823     * @throws IOException
824     */
825    public Table getTable(final byte[] tableName) throws
826        IOException {
827      String table = Bytes.toString(tableName);
828      return connectionCache.getTable(table);
829    }
830
831    public Table getTable(final ByteBuffer tableName) throws IOException {
832      return getTable(getBytes(tableName));
833    }
834
835    /**
836     * Assigns a unique ID to the scanner and adds the mapping to an internal
837     * hash-map.
838     *
839     * @param scanner
840     * @return integer scanner id
841     */
842    protected synchronized int addScanner(ResultScanner scanner,boolean sortColumns) {
843      int id = nextScannerId++;
844      ResultScannerWrapper resultScannerWrapper = new ResultScannerWrapper(scanner, sortColumns);
845      scannerMap.put(id, resultScannerWrapper);
846      return id;
847    }
848
849    /**
850     * Returns the scanner associated with the specified ID.
851     *
852     * @param id
853     * @return a Scanner, or null if ID was invalid.
854     */
855    protected synchronized ResultScannerWrapper getScanner(int id) {
856      return scannerMap.get(id);
857    }
858
859    /**
860     * Removes the scanner associated with the specified ID from the internal
861     * id-&gt;scanner hash-map.
862     *
863     * @param id
864     * @return a Scanner, or null if ID was invalid.
865     */
866    protected synchronized ResultScannerWrapper removeScanner(int id) {
867      return scannerMap.remove(id);
868    }
869
870    protected HBaseHandler(final Configuration c,
871        final UserProvider userProvider) throws IOException {
872      this.conf = c;
873      scannerMap = new HashMap<>();
874      this.coalescer = new IncrementCoalescer(this);
875
876      int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
877      int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
878      connectionCache = new ConnectionCache(
879        conf, userProvider, cleanInterval, maxIdleTime);
880    }
881
882    /**
883     * Obtain HBaseAdmin. Creates the instance if it is not already created.
884     */
885    private Admin getAdmin() throws IOException {
886      return connectionCache.getAdmin();
887    }
888
889    void setEffectiveUser(String effectiveUser) {
890      connectionCache.setEffectiveUser(effectiveUser);
891    }
892
893    @Override
894    public void enableTable(ByteBuffer tableName) throws IOError {
895      try{
896        getAdmin().enableTable(getTableName(tableName));
897      } catch (IOException e) {
898        LOG.warn(e.getMessage(), e);
899        throw getIOError(e);
900      }
901    }
902
903    @Override
904    public void disableTable(ByteBuffer tableName) throws IOError{
905      try{
906        getAdmin().disableTable(getTableName(tableName));
907      } catch (IOException e) {
908        LOG.warn(e.getMessage(), e);
909        throw getIOError(e);
910      }
911    }
912
913    @Override
914    public boolean isTableEnabled(ByteBuffer tableName) throws IOError {
915      try {
916        return this.connectionCache.getAdmin().isTableEnabled(getTableName(tableName));
917      } catch (IOException e) {
918        LOG.warn(e.getMessage(), e);
919        throw getIOError(e);
920      }
921    }
922
923    // ThriftServerRunner.compact should be deprecated and replaced with methods specific to
924    // table and region.
925    @Override
926    public void compact(ByteBuffer tableNameOrRegionName) throws IOError {
927      try {
928        try {
929          getAdmin().compactRegion(getBytes(tableNameOrRegionName));
930        } catch (IllegalArgumentException e) {
931          // Invalid region, try table
932          getAdmin().compact(TableName.valueOf(getBytes(tableNameOrRegionName)));
933        }
934      } catch (IOException e) {
935        LOG.warn(e.getMessage(), e);
936        throw getIOError(e);
937      }
938    }
939
940    // ThriftServerRunner.majorCompact should be deprecated and replaced with methods specific
941    // to table and region.
942    @Override
943    public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError {
944      try {
945        try {
946          getAdmin().compactRegion(getBytes(tableNameOrRegionName));
947        } catch (IllegalArgumentException e) {
948          // Invalid region, try table
949          getAdmin().compact(TableName.valueOf(getBytes(tableNameOrRegionName)));
950        }
951      } catch (IOException e) {
952        LOG.warn(e.getMessage(), e);
953        throw getIOError(e);
954      }
955    }
956
957    @Override
958    public List<ByteBuffer> getTableNames() throws IOError {
959      try {
960        TableName[] tableNames = this.getAdmin().listTableNames();
961        ArrayList<ByteBuffer> list = new ArrayList<>(tableNames.length);
962        for (TableName tableName : tableNames) {
963          list.add(ByteBuffer.wrap(tableName.getName()));
964        }
965        return list;
966      } catch (IOException e) {
967        LOG.warn(e.getMessage(), e);
968        throw getIOError(e);
969      }
970    }
971
972    /**
973     * @return the list of regions in the given table, or an empty list if the table does not exist
974     */
975    @Override
976    public List<TRegionInfo> getTableRegions(ByteBuffer tableName) throws IOError {
977      try (RegionLocator locator = connectionCache.getRegionLocator(getBytes(tableName))) {
978        List<HRegionLocation> regionLocations = locator.getAllRegionLocations();
979        List<TRegionInfo> results = new ArrayList<>(regionLocations.size());
980        for (HRegionLocation regionLocation : regionLocations) {
981          RegionInfo info = regionLocation.getRegionInfo();
982          ServerName serverName = regionLocation.getServerName();
983          TRegionInfo region = new TRegionInfo();
984          region.serverName = ByteBuffer.wrap(
985              Bytes.toBytes(serverName.getHostname()));
986          region.port = serverName.getPort();
987          region.startKey = ByteBuffer.wrap(info.getStartKey());
988          region.endKey = ByteBuffer.wrap(info.getEndKey());
989          region.id = info.getRegionId();
990          region.name = ByteBuffer.wrap(info.getRegionName());
991          region.version = HREGION_VERSION; // HRegion now not versioned, PB encoding used
992          results.add(region);
993        }
994        return results;
995      } catch (TableNotFoundException e) {
996        // Return empty list for non-existing table
997        return Collections.emptyList();
998      } catch (IOException e){
999        LOG.warn(e.getMessage(), e);
1000        throw getIOError(e);
1001      }
1002    }
1003
1004    @Override
1005    public List<TCell> get(
1006        ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1007        Map<ByteBuffer, ByteBuffer> attributes)
1008        throws IOError {
1009      byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
1010      if (famAndQf.length == 1) {
1011        return get(tableName, row, famAndQf[0], null, attributes);
1012      }
1013      if (famAndQf.length == 2) {
1014        return get(tableName, row, famAndQf[0], famAndQf[1], attributes);
1015      }
1016      throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
1017    }
1018
1019    /**
1020     * Note: this internal interface is slightly different from public APIs in regard to handling
1021     * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
1022     * we respect qual == null as a request for the entire column family. The caller (
1023     * {@link #get(ByteBuffer, ByteBuffer, ByteBuffer, Map)}) interface IS consistent in that the
1024     * column is parse like normal.
1025     */
1026    protected List<TCell> get(ByteBuffer tableName,
1027                              ByteBuffer row,
1028                              byte[] family,
1029                              byte[] qualifier,
1030                              Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1031      Table table = null;
1032      try {
1033        table = getTable(tableName);
1034        Get get = new Get(getBytes(row));
1035        addAttributes(get, attributes);
1036        if (qualifier == null) {
1037          get.addFamily(family);
1038        } else {
1039          get.addColumn(family, qualifier);
1040        }
1041        Result result = table.get(get);
1042        return ThriftUtilities.cellFromHBase(result.rawCells());
1043      } catch (IOException e) {
1044        LOG.warn(e.getMessage(), e);
1045        throw getIOError(e);
1046      } finally {
1047        closeTable(table);
1048      }
1049    }
1050
1051    @Override
1052    public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1053        int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1054      byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
1055      if(famAndQf.length == 1) {
1056        return getVer(tableName, row, famAndQf[0], null, numVersions, attributes);
1057      }
1058      if (famAndQf.length == 2) {
1059        return getVer(tableName, row, famAndQf[0], famAndQf[1], numVersions, attributes);
1060      }
1061      throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
1062
1063    }
1064
1065    /**
1066     * Note: this public interface is slightly different from public Java APIs in regard to
1067     * handling of the qualifier. Here we differ from the public Java API in that null != byte[0].
1068     * Rather, we respect qual == null as a request for the entire column family. If you want to
1069     * access the entire column family, use
1070     * {@link #getVer(ByteBuffer, ByteBuffer, ByteBuffer, int, Map)} with a {@code column} value
1071     * that lacks a {@code ':'}.
1072     */
1073    public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family,
1074        byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1075
1076      Table table = null;
1077      try {
1078        table = getTable(tableName);
1079        Get get = new Get(getBytes(row));
1080        addAttributes(get, attributes);
1081        if (null == qualifier) {
1082          get.addFamily(family);
1083        } else {
1084          get.addColumn(family, qualifier);
1085        }
1086        get.setMaxVersions(numVersions);
1087        Result result = table.get(get);
1088        return ThriftUtilities.cellFromHBase(result.rawCells());
1089      } catch (IOException e) {
1090        LOG.warn(e.getMessage(), e);
1091        throw getIOError(e);
1092      } finally{
1093        closeTable(table);
1094      }
1095    }
1096
1097    @Override
1098    public List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1099        long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1100      byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
1101      if (famAndQf.length == 1) {
1102        return getVerTs(tableName, row, famAndQf[0], null, timestamp, numVersions, attributes);
1103      }
1104      if (famAndQf.length == 2) {
1105        return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, numVersions,
1106          attributes);
1107      }
1108      throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
1109    }
1110
1111    /**
1112     * Note: this internal interface is slightly different from public APIs in regard to handling
1113     * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
1114     * we respect qual == null as a request for the entire column family. The caller (
1115     * {@link #getVerTs(ByteBuffer, ByteBuffer, ByteBuffer, long, int, Map)}) interface IS
1116     * consistent in that the column is parse like normal.
1117     */
1118    protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
1119        byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
1120        throws IOError {
1121
1122      Table table = null;
1123      try {
1124        table = getTable(tableName);
1125        Get get = new Get(getBytes(row));
1126        addAttributes(get, attributes);
1127        if (null == qualifier) {
1128          get.addFamily(family);
1129        } else {
1130          get.addColumn(family, qualifier);
1131        }
1132        get.setTimeRange(0, timestamp);
1133        get.setMaxVersions(numVersions);
1134        Result result = table.get(get);
1135        return ThriftUtilities.cellFromHBase(result.rawCells());
1136      } catch (IOException e) {
1137        LOG.warn(e.getMessage(), e);
1138        throw getIOError(e);
1139      } finally{
1140        closeTable(table);
1141      }
1142    }
1143
1144    @Override
1145    public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row,
1146        Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1147      return getRowWithColumnsTs(tableName, row, null,
1148                                 HConstants.LATEST_TIMESTAMP,
1149                                 attributes);
1150    }
1151
1152    @Override
1153    public List<TRowResult> getRowWithColumns(ByteBuffer tableName,
1154                                              ByteBuffer row,
1155        List<ByteBuffer> columns,
1156        Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1157      return getRowWithColumnsTs(tableName, row, columns,
1158                                 HConstants.LATEST_TIMESTAMP,
1159                                 attributes);
1160    }
1161
1162    @Override
1163    public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row,
1164        long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1165      return getRowWithColumnsTs(tableName, row, null,
1166                                 timestamp, attributes);
1167    }
1168
1169    @Override
1170    public List<TRowResult> getRowWithColumnsTs(
1171        ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
1172        long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1173
1174      Table table = null;
1175      try {
1176        table = getTable(tableName);
1177        if (columns == null) {
1178          Get get = new Get(getBytes(row));
1179          addAttributes(get, attributes);
1180          get.setTimeRange(0, timestamp);
1181          Result result = table.get(get);
1182          return ThriftUtilities.rowResultFromHBase(result);
1183        }
1184        Get get = new Get(getBytes(row));
1185        addAttributes(get, attributes);
1186        for(ByteBuffer column : columns) {
1187          byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
1188          if (famAndQf.length == 1) {
1189              get.addFamily(famAndQf[0]);
1190          } else {
1191              get.addColumn(famAndQf[0], famAndQf[1]);
1192          }
1193        }
1194        get.setTimeRange(0, timestamp);
1195        Result result = table.get(get);
1196        return ThriftUtilities.rowResultFromHBase(result);
1197      } catch (IOException e) {
1198        LOG.warn(e.getMessage(), e);
1199        throw getIOError(e);
1200      } finally{
1201        closeTable(table);
1202      }
1203    }
1204
1205    @Override
1206    public List<TRowResult> getRows(ByteBuffer tableName,
1207                                    List<ByteBuffer> rows,
1208        Map<ByteBuffer, ByteBuffer> attributes)
1209        throws IOError {
1210      return getRowsWithColumnsTs(tableName, rows, null,
1211                                  HConstants.LATEST_TIMESTAMP,
1212                                  attributes);
1213    }
1214
1215    @Override
1216    public List<TRowResult> getRowsWithColumns(ByteBuffer tableName,
1217                                               List<ByteBuffer> rows,
1218        List<ByteBuffer> columns,
1219        Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1220      return getRowsWithColumnsTs(tableName, rows, columns,
1221                                  HConstants.LATEST_TIMESTAMP,
1222                                  attributes);
1223    }
1224
1225    @Override
1226    public List<TRowResult> getRowsTs(ByteBuffer tableName,
1227                                      List<ByteBuffer> rows,
1228        long timestamp,
1229        Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1230      return getRowsWithColumnsTs(tableName, rows, null,
1231                                  timestamp, attributes);
1232    }
1233
1234    @Override
1235    public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName,
1236                                                 List<ByteBuffer> rows,
1237        List<ByteBuffer> columns, long timestamp,
1238        Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1239
1240      Table table= null;
1241      try {
1242        List<Get> gets = new ArrayList<>(rows.size());
1243        table = getTable(tableName);
1244        if (metrics != null) {
1245          metrics.incNumRowKeysInBatchGet(rows.size());
1246        }
1247        for (ByteBuffer row : rows) {
1248          Get get = new Get(getBytes(row));
1249          addAttributes(get, attributes);
1250          if (columns != null) {
1251
1252            for(ByteBuffer column : columns) {
1253              byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
1254              if (famAndQf.length == 1) {
1255                get.addFamily(famAndQf[0]);
1256              } else {
1257                get.addColumn(famAndQf[0], famAndQf[1]);
1258              }
1259            }
1260          }
1261          get.setTimeRange(0, timestamp);
1262          gets.add(get);
1263        }
1264        Result[] result = table.get(gets);
1265        return ThriftUtilities.rowResultFromHBase(result);
1266      } catch (IOException e) {
1267        LOG.warn(e.getMessage(), e);
1268        throw getIOError(e);
1269      } finally{
1270        closeTable(table);
1271      }
1272    }
1273
1274    @Override
1275    public void deleteAll(
1276        ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1277        Map<ByteBuffer, ByteBuffer> attributes)
1278        throws IOError {
1279      deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP,
1280                  attributes);
1281    }
1282
1283    @Override
1284    public void deleteAllTs(ByteBuffer tableName,
1285                            ByteBuffer row,
1286                            ByteBuffer column,
1287        long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1288      Table table = null;
1289      try {
1290        table = getTable(tableName);
1291        Delete delete  = new Delete(getBytes(row));
1292        addAttributes(delete, attributes);
1293        byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
1294        if (famAndQf.length == 1) {
1295          delete.addFamily(famAndQf[0], timestamp);
1296        } else {
1297          delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
1298        }
1299        table.delete(delete);
1300
1301      } catch (IOException e) {
1302        LOG.warn(e.getMessage(), e);
1303        throw getIOError(e);
1304      } finally {
1305        closeTable(table);
1306      }
1307    }
1308
1309    @Override
1310    public void deleteAllRow(
1311        ByteBuffer tableName, ByteBuffer row,
1312        Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1313      deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, attributes);
1314    }
1315
1316    @Override
1317    public void deleteAllRowTs(
1318        ByteBuffer tableName, ByteBuffer row, long timestamp,
1319        Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1320      Table table = null;
1321      try {
1322        table = getTable(tableName);
1323        Delete delete  = new Delete(getBytes(row), timestamp);
1324        addAttributes(delete, attributes);
1325        table.delete(delete);
1326      } catch (IOException e) {
1327        LOG.warn(e.getMessage(), e);
1328        throw getIOError(e);
1329      } finally {
1330        closeTable(table);
1331      }
1332    }
1333
1334    @Override
1335    public void createTable(ByteBuffer in_tableName,
1336        List<ColumnDescriptor> columnFamilies) throws IOError,
1337        IllegalArgument, AlreadyExists {
1338      TableName tableName = getTableName(in_tableName);
1339      try {
1340        if (getAdmin().tableExists(tableName)) {
1341          throw new AlreadyExists("table name already in use");
1342        }
1343        HTableDescriptor desc = new HTableDescriptor(tableName);
1344        for (ColumnDescriptor col : columnFamilies) {
1345          HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col);
1346          desc.addFamily(colDesc);
1347        }
1348        getAdmin().createTable(desc);
1349      } catch (IOException e) {
1350        LOG.warn(e.getMessage(), e);
1351        throw getIOError(e);
1352      } catch (IllegalArgumentException e) {
1353        LOG.warn(e.getMessage(), e);
1354        throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1355      }
1356    }
1357
1358    private static TableName getTableName(ByteBuffer buffer) {
1359      return TableName.valueOf(getBytes(buffer));
1360    }
1361
1362    @Override
1363    public void deleteTable(ByteBuffer in_tableName) throws IOError {
1364      TableName tableName = getTableName(in_tableName);
1365      if (LOG.isDebugEnabled()) {
1366        LOG.debug("deleteTable: table={}", tableName);
1367      }
1368      try {
1369        if (!getAdmin().tableExists(tableName)) {
1370          throw new IOException("table does not exist");
1371        }
1372        getAdmin().deleteTable(tableName);
1373      } catch (IOException e) {
1374        LOG.warn(e.getMessage(), e);
1375        throw getIOError(e);
1376      }
1377    }
1378
1379    @Override
1380    public void mutateRow(ByteBuffer tableName, ByteBuffer row,
1381        List<Mutation> mutations, Map<ByteBuffer, ByteBuffer> attributes)
1382        throws IOError, IllegalArgument {
1383      mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP, attributes);
1384    }
1385
1386    @Override
1387    public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
1388        List<Mutation> mutations, long timestamp,
1389        Map<ByteBuffer, ByteBuffer> attributes)
1390        throws IOError, IllegalArgument {
1391      Table table = null;
1392      try {
1393        table = getTable(tableName);
1394        Put put = new Put(getBytes(row), timestamp);
1395        addAttributes(put, attributes);
1396
1397        Delete delete = new Delete(getBytes(row));
1398        addAttributes(delete, attributes);
1399        if (metrics != null) {
1400          metrics.incNumRowKeysInBatchMutate(mutations.size());
1401        }
1402
1403        // I apologize for all this mess :)
1404        CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
1405        for (Mutation m : mutations) {
1406          byte[][] famAndQf = CellUtil.parseColumn(getBytes(m.column));
1407          if (m.isDelete) {
1408            if (famAndQf.length == 1) {
1409              delete.addFamily(famAndQf[0], timestamp);
1410            } else {
1411              delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
1412            }
1413            delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1414          } else {
1415            if(famAndQf.length == 1) {
1416              LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1417                  + "over the whole column family.");
1418            } else {
1419              put.add(builder.clear()
1420                  .setRow(put.getRow())
1421                  .setFamily(famAndQf[0])
1422                  .setQualifier(famAndQf[1])
1423                  .setTimestamp(put.getTimestamp())
1424                  .setType(Type.Put)
1425                  .setValue(m.value != null ? getBytes(m.value)
1426                      : HConstants.EMPTY_BYTE_ARRAY)
1427                  .build());
1428            }
1429            put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1430          }
1431        }
1432        if (!delete.isEmpty())
1433          table.delete(delete);
1434        if (!put.isEmpty())
1435          table.put(put);
1436      } catch (IOException e) {
1437        LOG.warn(e.getMessage(), e);
1438        throw getIOError(e);
1439      } catch (IllegalArgumentException e) {
1440        LOG.warn(e.getMessage(), e);
1441        throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1442      } finally{
1443        closeTable(table);
1444      }
1445    }
1446
1447    @Override
1448    public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches,
1449        Map<ByteBuffer, ByteBuffer> attributes)
1450        throws IOError, IllegalArgument, TException {
1451      mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes);
1452    }
1453
1454    @Override
1455    public void mutateRowsTs(
1456        ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp,
1457        Map<ByteBuffer, ByteBuffer> attributes)
1458        throws IOError, IllegalArgument, TException {
1459      List<Put> puts = new ArrayList<>();
1460      List<Delete> deletes = new ArrayList<>();
1461      CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
1462      for (BatchMutation batch : rowBatches) {
1463        byte[] row = getBytes(batch.row);
1464        List<Mutation> mutations = batch.mutations;
1465        Delete delete = new Delete(row);
1466        addAttributes(delete, attributes);
1467        Put put = new Put(row, timestamp);
1468        addAttributes(put, attributes);
1469        for (Mutation m : mutations) {
1470          byte[][] famAndQf = CellUtil.parseColumn(getBytes(m.column));
1471          if (m.isDelete) {
1472            // no qualifier, family only.
1473            if (famAndQf.length == 1) {
1474              delete.addFamily(famAndQf[0], timestamp);
1475            } else {
1476              delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
1477            }
1478            delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1479                : Durability.SKIP_WAL);
1480          } else {
1481            if (famAndQf.length == 1) {
1482              LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1483                  + "over the whole column family.");
1484            }
1485            if (famAndQf.length == 2) {
1486              try {
1487                put.add(builder.clear()
1488                    .setRow(put.getRow())
1489                    .setFamily(famAndQf[0])
1490                    .setQualifier(famAndQf[1])
1491                    .setTimestamp(put.getTimestamp())
1492                    .setType(Type.Put)
1493                    .setValue(m.value != null ? getBytes(m.value)
1494                        : HConstants.EMPTY_BYTE_ARRAY)
1495                    .build());
1496              } catch (IOException e) {
1497                throw new IllegalArgumentException(e);
1498              }
1499            } else {
1500              throw new IllegalArgumentException("Invalid famAndQf provided.");
1501            }
1502            put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1503          }
1504        }
1505        if (!delete.isEmpty())
1506          deletes.add(delete);
1507        if (!put.isEmpty())
1508          puts.add(put);
1509      }
1510
1511      Table table = null;
1512      try {
1513        table = getTable(tableName);
1514        if (!puts.isEmpty())
1515          table.put(puts);
1516        if (!deletes.isEmpty())
1517          table.delete(deletes);
1518      } catch (IOException e) {
1519        LOG.warn(e.getMessage(), e);
1520        throw getIOError(e);
1521      } catch (IllegalArgumentException e) {
1522        LOG.warn(e.getMessage(), e);
1523        throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1524      } finally{
1525        closeTable(table);
1526      }
1527    }
1528
1529    @Override
1530    public long atomicIncrement(
1531        ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount)
1532            throws IOError, IllegalArgument, TException {
1533      byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
1534      if(famAndQf.length == 1) {
1535        return atomicIncrement(tableName, row, famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, amount);
1536      }
1537      return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
1538    }
1539
1540    protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
1541        byte [] family, byte [] qualifier, long amount)
1542        throws IOError, IllegalArgument, TException {
1543      Table table = null;
1544      try {
1545        table = getTable(tableName);
1546        return table.incrementColumnValue(
1547            getBytes(row), family, qualifier, amount);
1548      } catch (IOException e) {
1549        LOG.warn(e.getMessage(), e);
1550        throw getIOError(e);
1551      } finally {
1552        closeTable(table);
1553      }
1554    }
1555
1556    @Override
1557    public void scannerClose(int id) throws IOError, IllegalArgument {
1558      LOG.debug("scannerClose: id={}", id);
1559      ResultScannerWrapper resultScannerWrapper = getScanner(id);
1560      if (resultScannerWrapper == null) {
1561        LOG.warn("scanner ID is invalid");
1562        throw new IllegalArgument("scanner ID is invalid");
1563      }
1564      resultScannerWrapper.getScanner().close();
1565      removeScanner(id);
1566    }
1567
1568    @Override
1569    public List<TRowResult> scannerGetList(int id,int nbRows)
1570        throws IllegalArgument, IOError {
1571      LOG.debug("scannerGetList: id={}", id);
1572      ResultScannerWrapper resultScannerWrapper = getScanner(id);
1573      if (null == resultScannerWrapper) {
1574        String message = "scanner ID is invalid";
1575        LOG.warn(message);
1576        throw new IllegalArgument("scanner ID is invalid");
1577      }
1578
1579      Result [] results;
1580      try {
1581        results = resultScannerWrapper.getScanner().next(nbRows);
1582        if (null == results) {
1583          return new ArrayList<>();
1584        }
1585      } catch (IOException e) {
1586        LOG.warn(e.getMessage(), e);
1587        throw getIOError(e);
1588      }
1589      return ThriftUtilities.rowResultFromHBase(results, resultScannerWrapper.isColumnSorted());
1590    }
1591
1592    @Override
1593    public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
1594      return scannerGetList(id,1);
1595    }
1596
1597    @Override
1598    public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
1599        Map<ByteBuffer, ByteBuffer> attributes)
1600        throws IOError {
1601
1602      Table table = null;
1603      try {
1604        table = getTable(tableName);
1605        Scan scan = new Scan();
1606        addAttributes(scan, attributes);
1607        if (tScan.isSetStartRow()) {
1608          scan.setStartRow(tScan.getStartRow());
1609        }
1610        if (tScan.isSetStopRow()) {
1611          scan.setStopRow(tScan.getStopRow());
1612        }
1613        if (tScan.isSetTimestamp()) {
1614          scan.setTimeRange(0, tScan.getTimestamp());
1615        }
1616        if (tScan.isSetCaching()) {
1617          scan.setCaching(tScan.getCaching());
1618        }
1619        if (tScan.isSetBatchSize()) {
1620          scan.setBatch(tScan.getBatchSize());
1621        }
1622        if (tScan.isSetColumns() && !tScan.getColumns().isEmpty()) {
1623          for(ByteBuffer column : tScan.getColumns()) {
1624            byte [][] famQf = CellUtil.parseColumn(getBytes(column));
1625            if(famQf.length == 1) {
1626              scan.addFamily(famQf[0]);
1627            } else {
1628              scan.addColumn(famQf[0], famQf[1]);
1629            }
1630          }
1631        }
1632        if (tScan.isSetFilterString()) {
1633          ParseFilter parseFilter = new ParseFilter();
1634          scan.setFilter(
1635              parseFilter.parseFilterString(tScan.getFilterString()));
1636        }
1637        if (tScan.isSetReversed()) {
1638          scan.setReversed(tScan.isReversed());
1639        }
1640        if (tScan.isSetCacheBlocks()) {
1641          scan.setCacheBlocks(tScan.isCacheBlocks());
1642        }
1643        return addScanner(table.getScanner(scan), tScan.sortColumns);
1644      } catch (IOException e) {
1645        LOG.warn(e.getMessage(), e);
1646        throw getIOError(e);
1647      } finally{
1648        closeTable(table);
1649      }
1650    }
1651
1652    @Override
1653    public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
1654        List<ByteBuffer> columns,
1655        Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1656
1657      Table table = null;
1658      try {
1659        table = getTable(tableName);
1660        Scan scan = new Scan(getBytes(startRow));
1661        addAttributes(scan, attributes);
1662        if(columns != null && !columns.isEmpty()) {
1663          for(ByteBuffer column : columns) {
1664            byte [][] famQf = CellUtil.parseColumn(getBytes(column));
1665            if(famQf.length == 1) {
1666              scan.addFamily(famQf[0]);
1667            } else {
1668              scan.addColumn(famQf[0], famQf[1]);
1669            }
1670          }
1671        }
1672        return addScanner(table.getScanner(scan), false);
1673      } catch (IOException e) {
1674        LOG.warn(e.getMessage(), e);
1675        throw getIOError(e);
1676      } finally{
1677        closeTable(table);
1678      }
1679    }
1680
1681    @Override
1682    public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow,
1683        ByteBuffer stopRow, List<ByteBuffer> columns,
1684        Map<ByteBuffer, ByteBuffer> attributes)
1685        throws IOError, TException {
1686
1687      Table table = null;
1688      try {
1689        table = getTable(tableName);
1690        Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1691        addAttributes(scan, attributes);
1692        if(columns != null && !columns.isEmpty()) {
1693          for(ByteBuffer column : columns) {
1694            byte [][] famQf = CellUtil.parseColumn(getBytes(column));
1695            if(famQf.length == 1) {
1696              scan.addFamily(famQf[0]);
1697            } else {
1698              scan.addColumn(famQf[0], famQf[1]);
1699            }
1700          }
1701        }
1702        return addScanner(table.getScanner(scan), false);
1703      } catch (IOException e) {
1704        LOG.warn(e.getMessage(), e);
1705        throw getIOError(e);
1706      } finally{
1707        closeTable(table);
1708      }
1709    }
1710
1711    @Override
1712    public int scannerOpenWithPrefix(ByteBuffer tableName,
1713                                     ByteBuffer startAndPrefix,
1714                                     List<ByteBuffer> columns,
1715        Map<ByteBuffer, ByteBuffer> attributes)
1716        throws IOError, TException {
1717
1718      Table table = null;
1719      try {
1720        table = getTable(tableName);
1721        Scan scan = new Scan(getBytes(startAndPrefix));
1722        addAttributes(scan, attributes);
1723        Filter f = new WhileMatchFilter(
1724            new PrefixFilter(getBytes(startAndPrefix)));
1725        scan.setFilter(f);
1726        if (columns != null && !columns.isEmpty()) {
1727          for(ByteBuffer column : columns) {
1728            byte [][] famQf = CellUtil.parseColumn(getBytes(column));
1729            if(famQf.length == 1) {
1730              scan.addFamily(famQf[0]);
1731            } else {
1732              scan.addColumn(famQf[0], famQf[1]);
1733            }
1734          }
1735        }
1736        return addScanner(table.getScanner(scan), false);
1737      } catch (IOException e) {
1738        LOG.warn(e.getMessage(), e);
1739        throw getIOError(e);
1740      } finally{
1741        closeTable(table);
1742      }
1743    }
1744
1745    @Override
1746    public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
1747        List<ByteBuffer> columns, long timestamp,
1748        Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
1749
1750      Table table = null;
1751      try {
1752        table = getTable(tableName);
1753        Scan scan = new Scan(getBytes(startRow));
1754        addAttributes(scan, attributes);
1755        scan.setTimeRange(0, timestamp);
1756        if (columns != null && !columns.isEmpty()) {
1757          for (ByteBuffer column : columns) {
1758            byte [][] famQf = CellUtil.parseColumn(getBytes(column));
1759            if(famQf.length == 1) {
1760              scan.addFamily(famQf[0]);
1761            } else {
1762              scan.addColumn(famQf[0], famQf[1]);
1763            }
1764          }
1765        }
1766        return addScanner(table.getScanner(scan), false);
1767      } catch (IOException e) {
1768        LOG.warn(e.getMessage(), e);
1769        throw getIOError(e);
1770      } finally{
1771        closeTable(table);
1772      }
1773    }
1774
1775    @Override
1776    public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow,
1777        ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
1778        Map<ByteBuffer, ByteBuffer> attributes)
1779        throws IOError, TException {
1780
1781      Table table = null;
1782      try {
1783        table = getTable(tableName);
1784        Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1785        addAttributes(scan, attributes);
1786        scan.setTimeRange(0, timestamp);
1787        if (columns != null && !columns.isEmpty()) {
1788          for (ByteBuffer column : columns) {
1789            byte [][] famQf = CellUtil.parseColumn(getBytes(column));
1790            if(famQf.length == 1) {
1791              scan.addFamily(famQf[0]);
1792            } else {
1793              scan.addColumn(famQf[0], famQf[1]);
1794            }
1795          }
1796        }
1797        scan.setTimeRange(0, timestamp);
1798        return addScanner(table.getScanner(scan), false);
1799      } catch (IOException e) {
1800        LOG.warn(e.getMessage(), e);
1801        throw getIOError(e);
1802      } finally{
1803        closeTable(table);
1804      }
1805    }
1806
1807    @Override
1808    public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
1809        ByteBuffer tableName) throws IOError, TException {
1810
1811      Table table = null;
1812      try {
1813        TreeMap<ByteBuffer, ColumnDescriptor> columns = new TreeMap<>();
1814
1815        table = getTable(tableName);
1816        HTableDescriptor desc = table.getTableDescriptor();
1817
1818        for (HColumnDescriptor e : desc.getFamilies()) {
1819          ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
1820          columns.put(col.name, col);
1821        }
1822        return columns;
1823      } catch (IOException e) {
1824        LOG.warn(e.getMessage(), e);
1825        throw getIOError(e);
1826      } finally {
1827        closeTable(table);
1828      }
1829    }
1830
1831    private void closeTable(Table table) throws IOError
1832    {
1833      try{
1834        if(table != null){
1835          table.close();
1836        }
1837      } catch (IOException e){
1838        LOG.error(e.getMessage(), e);
1839        throw getIOError(e);
1840      }
1841    }
1842
1843    @Override
1844    public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
1845      try {
1846        byte[] row = getBytes(searchRow);
1847        Result startRowResult = getReverseScanResult(TableName.META_TABLE_NAME.getName(), row,
1848          HConstants.CATALOG_FAMILY);
1849
1850        if (startRowResult == null) {
1851          throw new IOException("Cannot find row in "+ TableName.META_TABLE_NAME+", row="
1852                                + Bytes.toStringBinary(row));
1853        }
1854
1855        // find region start and end keys
1856        RegionInfo regionInfo = MetaTableAccessor.getRegionInfo(startRowResult);
1857        if (regionInfo == null) {
1858          throw new IOException("RegionInfo REGIONINFO was null or " +
1859                                " empty in Meta for row="
1860                                + Bytes.toStringBinary(row));
1861        }
1862        TRegionInfo region = new TRegionInfo();
1863        region.setStartKey(regionInfo.getStartKey());
1864        region.setEndKey(regionInfo.getEndKey());
1865        region.id = regionInfo.getRegionId();
1866        region.setName(regionInfo.getRegionName());
1867        region.version = HREGION_VERSION; // version not used anymore, PB encoding used.
1868
1869        // find region assignment to server
1870        ServerName serverName = MetaTableAccessor.getServerName(startRowResult, 0);
1871        if (serverName != null) {
1872          region.setServerName(Bytes.toBytes(serverName.getHostname()));
1873          region.port = serverName.getPort();
1874        }
1875        return region;
1876      } catch (IOException e) {
1877        LOG.warn(e.getMessage(), e);
1878        throw getIOError(e);
1879      }
1880    }
1881
1882    private Result getReverseScanResult(byte[] tableName, byte[] row, byte[] family)
1883        throws IOException {
1884      Scan scan = new Scan(row);
1885      scan.setReversed(true);
1886      scan.addFamily(family);
1887      scan.setStartRow(row);
1888      try (Table table = getTable(tableName);
1889           ResultScanner scanner = table.getScanner(scan)) {
1890        return scanner.next();
1891      }
1892    }
1893
1894    private void initMetrics(ThriftMetrics metrics) {
1895      this.metrics = metrics;
1896    }
1897
1898    @Override
1899    public void increment(TIncrement tincrement) throws IOError, TException {
1900
1901      if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) {
1902        throw new TException("Must supply a table and a row key; can't increment");
1903      }
1904
1905      if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1906        this.coalescer.queueIncrement(tincrement);
1907        return;
1908      }
1909
1910      Table table = null;
1911      try {
1912        table = getTable(tincrement.getTable());
1913        Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
1914        table.increment(inc);
1915      } catch (IOException e) {
1916        LOG.warn(e.getMessage(), e);
1917        throw getIOError(e);
1918      } finally{
1919        closeTable(table);
1920      }
1921    }
1922
1923    @Override
1924    public void incrementRows(List<TIncrement> tincrements) throws IOError, TException {
1925      if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1926        this.coalescer.queueIncrements(tincrements);
1927        return;
1928      }
1929      for (TIncrement tinc : tincrements) {
1930        increment(tinc);
1931      }
1932    }
1933
1934    @Override
1935    public List<TCell> append(TAppend tappend) throws IOError, TException {
1936      if (tappend.getRow().length == 0 || tappend.getTable().length == 0) {
1937        throw new TException("Must supply a table and a row key; can't append");
1938      }
1939
1940      Table table = null;
1941      try {
1942        table = getTable(tappend.getTable());
1943        Append append = ThriftUtilities.appendFromThrift(tappend);
1944        Result result = table.append(append);
1945        return ThriftUtilities.cellFromHBase(result.rawCells());
1946      } catch (IOException e) {
1947        LOG.warn(e.getMessage(), e);
1948        throw getIOError(e);
1949      } finally{
1950          closeTable(table);
1951      }
1952    }
1953
1954    @Override
1955    public boolean checkAndPut(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1956        ByteBuffer value, Mutation mput, Map<ByteBuffer, ByteBuffer> attributes) throws IOError,
1957        IllegalArgument, TException {
1958      Put put;
1959      try {
1960        put = new Put(getBytes(row), HConstants.LATEST_TIMESTAMP);
1961        addAttributes(put, attributes);
1962
1963        byte[][] famAndQf = CellUtil.parseColumn(getBytes(mput.column));
1964        put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
1965            .setRow(put.getRow())
1966            .setFamily(famAndQf[0])
1967            .setQualifier(famAndQf[1])
1968            .setTimestamp(put.getTimestamp())
1969            .setType(Type.Put)
1970            .setValue(mput.value != null ? getBytes(mput.value)
1971                : HConstants.EMPTY_BYTE_ARRAY)
1972            .build());
1973        put.setDurability(mput.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1974      } catch (IOException | IllegalArgumentException e) {
1975        LOG.warn(e.getMessage(), e);
1976        throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1977      }
1978
1979      Table table = null;
1980      try {
1981        table = getTable(tableName);
1982        byte[][] famAndQf = CellUtil.parseColumn(getBytes(column));
1983        Table.CheckAndMutateBuilder mutateBuilder =
1984            table.checkAndMutate(getBytes(row), famAndQf[0]).qualifier(famAndQf[1]);
1985        if (value != null) {
1986          return mutateBuilder.ifEquals(getBytes(value)).thenPut(put);
1987        } else {
1988          return mutateBuilder.ifNotExists().thenPut(put);
1989        }
1990      } catch (IOException e) {
1991        LOG.warn(e.getMessage(), e);
1992        throw getIOError(e);
1993      } catch (IllegalArgumentException e) {
1994        LOG.warn(e.getMessage(), e);
1995        throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1996      } finally {
1997          closeTable(table);
1998      }
1999    }
2000  }
2001
2002  private static IOError getIOError(Throwable throwable) {
2003    IOError error = new IOErrorWithCause(throwable);
2004    error.setMessage(Throwables.getStackTraceAsString(throwable));
2005    return error;
2006  }
2007
2008  /**
2009   * Adds all the attributes into the Operation object
2010   */
2011  private static void addAttributes(OperationWithAttributes op,
2012    Map<ByteBuffer, ByteBuffer> attributes) {
2013    if (attributes == null || attributes.isEmpty()) {
2014      return;
2015    }
2016    for (Map.Entry<ByteBuffer, ByteBuffer> entry : attributes.entrySet()) {
2017      String name = Bytes.toStringBinary(getBytes(entry.getKey()));
2018      byte[] value =  getBytes(entry.getValue());
2019      op.setAttribute(name, value);
2020    }
2021  }
2022
2023  public static void registerFilters(Configuration conf) {
2024    String[] filters = conf.getStrings("hbase.thrift.filters");
2025    Splitter splitter = Splitter.on(':');
2026    if(filters != null) {
2027      for(String filterClass: filters) {
2028        List<String> filterPart = splitter.splitToList(filterClass);
2029        if(filterPart.size() != 2) {
2030          LOG.warn("Invalid filter specification " + filterClass + " - skipping");
2031        } else {
2032          ParseFilter.registerFilter(filterPart.get(0), filterPart.get(1));
2033        }
2034      }
2035    }
2036  }
2037
2038  public static class IOErrorWithCause extends IOError {
2039    private final Throwable cause;
2040    public IOErrorWithCause(Throwable cause) {
2041      this.cause = cause;
2042    }
2043
2044    @Override
2045    public synchronized Throwable getCause() {
2046      return cause;
2047    }
2048
2049    @Override
2050    public boolean equals(Object other) {
2051      if (super.equals(other) &&
2052          other instanceof IOErrorWithCause) {
2053        Throwable otherCause = ((IOErrorWithCause) other).getCause();
2054        if (this.getCause() != null) {
2055          return otherCause != null && this.getCause().equals(otherCause);
2056        } else {
2057          return otherCause == null;
2058        }
2059      }
2060      return false;
2061    }
2062
2063    @Override
2064    public int hashCode() {
2065      int result = super.hashCode();
2066      result = 31 * result + (cause != null ? cause.hashCode() : 0);
2067      return result;
2068    }
2069  }
2070}