001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.thrift;
020
021import static org.apache.hadoop.hbase.util.Bytes.getBytes;
022
023import java.io.IOException;
024import java.net.InetAddress;
025import java.net.InetSocketAddress;
026import java.net.UnknownHostException;
027import java.nio.ByteBuffer;
028import java.security.PrivilegedAction;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.Collections;
032import java.util.HashMap;
033import java.util.List;
034import java.util.Map;
035import java.util.TreeMap;
036import java.util.concurrent.BlockingQueue;
037import java.util.concurrent.ExecutorService;
038import java.util.concurrent.LinkedBlockingQueue;
039import java.util.concurrent.ThreadPoolExecutor;
040import java.util.concurrent.TimeUnit;
041
042import javax.security.auth.callback.Callback;
043import javax.security.auth.callback.UnsupportedCallbackException;
044import javax.security.sasl.AuthorizeCallback;
045import javax.security.sasl.SaslServer;
046
047import org.apache.commons.lang3.ArrayUtils;
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.hbase.Cell.Type;
050import org.apache.hadoop.hbase.CellBuilder;
051import org.apache.hadoop.hbase.CellBuilderFactory;
052import org.apache.hadoop.hbase.CellBuilderType;
053import org.apache.hadoop.hbase.CellUtil;
054import org.apache.hadoop.hbase.HBaseConfiguration;
055import org.apache.hadoop.hbase.HColumnDescriptor;
056import org.apache.hadoop.hbase.HConstants;
057import org.apache.hadoop.hbase.HRegionLocation;
058import org.apache.hadoop.hbase.HTableDescriptor;
059import org.apache.hadoop.hbase.KeyValue;
060import org.apache.hadoop.hbase.MetaTableAccessor;
061import org.apache.hadoop.hbase.ServerName;
062import org.apache.hadoop.hbase.TableName;
063import org.apache.hadoop.hbase.TableNotFoundException;
064import org.apache.hadoop.hbase.client.Admin;
065import org.apache.hadoop.hbase.client.Append;
066import org.apache.hadoop.hbase.client.Delete;
067import org.apache.hadoop.hbase.client.Durability;
068import org.apache.hadoop.hbase.client.Get;
069import org.apache.hadoop.hbase.client.Increment;
070import org.apache.hadoop.hbase.client.OperationWithAttributes;
071import org.apache.hadoop.hbase.client.Put;
072import org.apache.hadoop.hbase.client.RegionInfo;
073import org.apache.hadoop.hbase.client.RegionLocator;
074import org.apache.hadoop.hbase.client.Result;
075import org.apache.hadoop.hbase.client.ResultScanner;
076import org.apache.hadoop.hbase.client.Scan;
077import org.apache.hadoop.hbase.client.Table;
078import org.apache.hadoop.hbase.filter.Filter;
079import org.apache.hadoop.hbase.filter.ParseFilter;
080import org.apache.hadoop.hbase.filter.PrefixFilter;
081import org.apache.hadoop.hbase.filter.WhileMatchFilter;
082import org.apache.hadoop.hbase.log.HBaseMarkers;
083import org.apache.hadoop.hbase.security.SaslUtil;
084import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
085import org.apache.hadoop.hbase.security.SecurityUtil;
086import org.apache.hadoop.hbase.security.UserProvider;
087import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
088import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
089import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
090import org.apache.hadoop.hbase.thrift.generated.Hbase;
091import org.apache.hadoop.hbase.thrift.generated.IOError;
092import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
093import org.apache.hadoop.hbase.thrift.generated.Mutation;
094import org.apache.hadoop.hbase.thrift.generated.TAppend;
095import org.apache.hadoop.hbase.thrift.generated.TCell;
096import org.apache.hadoop.hbase.thrift.generated.TIncrement;
097import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
098import org.apache.hadoop.hbase.thrift.generated.TRowResult;
099import org.apache.hadoop.hbase.thrift.generated.TScan;
100import org.apache.hadoop.hbase.util.Bytes;
101import org.apache.hadoop.hbase.util.ConnectionCache;
102import org.apache.hadoop.hbase.util.DNS;
103import org.apache.hadoop.hbase.util.JvmPauseMonitor;
104import org.apache.hadoop.hbase.util.Strings;
105import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
106import org.apache.hadoop.security.UserGroupInformation;
107import org.apache.hadoop.security.authorize.ProxyUsers;
108import org.apache.thrift.TException;
109import org.apache.thrift.TProcessor;
110import org.apache.thrift.protocol.TBinaryProtocol;
111import org.apache.thrift.protocol.TCompactProtocol;
112import org.apache.thrift.protocol.TProtocol;
113import org.apache.thrift.protocol.TProtocolFactory;
114import org.apache.thrift.server.THsHaServer;
115import org.apache.thrift.server.TNonblockingServer;
116import org.apache.thrift.server.TServer;
117import org.apache.thrift.server.TServlet;
118import org.apache.thrift.server.TThreadedSelectorServer;
119import org.apache.thrift.transport.TFramedTransport;
120import org.apache.thrift.transport.TNonblockingServerSocket;
121import org.apache.thrift.transport.TNonblockingServerTransport;
122import org.apache.thrift.transport.TSaslServerTransport;
123import org.apache.thrift.transport.TServerSocket;
124import org.apache.thrift.transport.TServerTransport;
125import org.apache.thrift.transport.TTransportFactory;
126import org.apache.yetus.audience.InterfaceAudience;
127import org.eclipse.jetty.http.HttpVersion;
128import org.eclipse.jetty.server.HttpConfiguration;
129import org.eclipse.jetty.server.HttpConnectionFactory;
130import org.eclipse.jetty.server.SecureRequestCustomizer;
131import org.eclipse.jetty.server.Server;
132import org.eclipse.jetty.server.ServerConnector;
133import org.eclipse.jetty.server.SslConnectionFactory;
134import org.eclipse.jetty.servlet.ServletContextHandler;
135import org.eclipse.jetty.servlet.ServletHolder;
136import org.eclipse.jetty.util.ssl.SslContextFactory;
137import org.eclipse.jetty.util.thread.QueuedThreadPool;
138import org.slf4j.Logger;
139import org.slf4j.LoggerFactory;
140import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
141import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
142import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
143import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
144import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
145import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
146import org.apache.hbase.thirdparty.org.apache.commons.cli.OptionGroup;
147
148/**
149 * ThriftServerRunner - this class starts up a Thrift server which implements
150 * the Hbase API specified in the Hbase.thrift IDL file.
151 */
152@InterfaceAudience.Private
153public class ThriftServerRunner implements Runnable {
154
155  private static final Logger LOG = LoggerFactory.getLogger(ThriftServerRunner.class);
156
157  private static final int DEFAULT_HTTP_MAX_HEADER_SIZE = 64 * 1024; // 64k
158
159  static final String SERVER_TYPE_CONF_KEY =
160      "hbase.regionserver.thrift.server.type";
161
162  static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress";
163  static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
164  static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
165  static final String MAX_FRAME_SIZE_CONF_KEY = "hbase.regionserver.thrift.framed.max_frame_size_in_mb";
166  static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
167  static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
168  static final String USE_HTTP_CONF_KEY = "hbase.regionserver.thrift.http";
169  static final String HTTP_MIN_THREADS = "hbase.thrift.http_threads.min";
170  static final String HTTP_MAX_THREADS = "hbase.thrift.http_threads.max";
171
172  static final String THRIFT_SSL_ENABLED = "hbase.thrift.ssl.enabled";
173  static final String THRIFT_SSL_KEYSTORE_STORE = "hbase.thrift.ssl.keystore.store";
174  static final String THRIFT_SSL_KEYSTORE_PASSWORD = "hbase.thrift.ssl.keystore.password";
175  static final String THRIFT_SSL_KEYSTORE_KEYPASSWORD = "hbase.thrift.ssl.keystore.keypassword";
176  static final String THRIFT_SSL_EXCLUDE_CIPHER_SUITES = "hbase.thrift.ssl.exclude.cipher.suites";
177  static final String THRIFT_SSL_INCLUDE_CIPHER_SUITES = "hbase.thrift.ssl.include.cipher.suites";
178  static final String THRIFT_SSL_EXCLUDE_PROTOCOLS = "hbase.thrift.ssl.exclude.protocols";
179  static final String THRIFT_SSL_INCLUDE_PROTOCOLS = "hbase.thrift.ssl.include.protocols";
180
181  /**
182   * Amount of time in milliseconds before a server thread will timeout
183   * waiting for client to send data on a connected socket. Currently,
184   * applies only to TBoundedThreadPoolServer
185   */
186  public static final String THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY =
187    "hbase.thrift.server.socket.read.timeout";
188  public static final int THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT = 60000;
189
190
191  /**
192   * Thrift quality of protection configuration key. Valid values can be:
193   * auth-conf: authentication, integrity and confidentiality checking
194   * auth-int: authentication and integrity checking
195   * auth: authentication only
196   *
197   * This is used to authenticate the callers and support impersonation.
198   * The thrift server and the HBase cluster must run in secure mode.
199   */
200  static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
201  static final String BACKLOG_CONF_KEY = "hbase.regionserver.thrift.backlog";
202
203  private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
204  public static final int DEFAULT_LISTEN_PORT = 9090;
205  public static final int HREGION_VERSION = 1;
206  static final String THRIFT_SUPPORT_PROXYUSER = "hbase.thrift.support.proxyuser";
207  private final int listenPort;
208
209  private Configuration conf;
210  volatile TServer tserver;
211  volatile Server httpServer;
212  private final Hbase.Iface handler;
213  private final ThriftMetrics metrics;
214  private final HBaseHandler hbaseHandler;
215  private final UserGroupInformation realUser;
216
217  private SaslUtil.QualityOfProtection qop;
218  private String host;
219
220  private final boolean securityEnabled;
221  private final boolean doAsEnabled;
222
223  private final JvmPauseMonitor pauseMonitor;
224
225  /** An enum of server implementation selections */
226  public enum ImplType {
227    HS_HA("hsha", true, THsHaServer.class, true),
228    NONBLOCKING("nonblocking", true, TNonblockingServer.class, true),
229    THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true),
230    THREADED_SELECTOR(
231        "threadedselector", true, TThreadedSelectorServer.class, true);
232
233    public static final ImplType DEFAULT = THREAD_POOL;
234
235    final String option;
236    final boolean isAlwaysFramed;
237    final Class<? extends TServer> serverClass;
238    final boolean canSpecifyBindIP;
239
240    private ImplType(String option, boolean isAlwaysFramed,
241        Class<? extends TServer> serverClass, boolean canSpecifyBindIP) {
242      this.option = option;
243      this.isAlwaysFramed = isAlwaysFramed;
244      this.serverClass = serverClass;
245      this.canSpecifyBindIP = canSpecifyBindIP;
246    }
247
248    /**
249     * @return <code>-option</code> so we can get the list of options from
250     *         {@link #values()}
251     */
252    @Override
253    public String toString() {
254      return "-" + option;
255    }
256
257    public String getOption() {
258      return option;
259    }
260
261    public boolean isAlwaysFramed() {
262      return isAlwaysFramed;
263    }
264
265    public String getDescription() {
266      StringBuilder sb = new StringBuilder("Use the " +
267          serverClass.getSimpleName());
268      if (isAlwaysFramed) {
269        sb.append(" This implies the framed transport.");
270      }
271      if (this == DEFAULT) {
272        sb.append("This is the default.");
273      }
274      return sb.toString();
275    }
276
277    static OptionGroup createOptionGroup() {
278      OptionGroup group = new OptionGroup();
279      for (ImplType t : values()) {
280        group.addOption(new Option(t.option, t.getDescription()));
281      }
282      return group;
283    }
284
285    public static ImplType getServerImpl(Configuration conf) {
286      String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option);
287      for (ImplType t : values()) {
288        if (confType.equals(t.option)) {
289          return t;
290        }
291      }
292      throw new AssertionError("Unknown server ImplType.option:" + confType);
293    }
294
295    static void setServerImpl(CommandLine cmd, Configuration conf) {
296      ImplType chosenType = null;
297      int numChosen = 0;
298      for (ImplType t : values()) {
299        if (cmd.hasOption(t.option)) {
300          chosenType = t;
301          ++numChosen;
302        }
303      }
304      if (numChosen < 1) {
305        LOG.info("Using default thrift server type");
306        chosenType = DEFAULT;
307      } else if (numChosen > 1) {
308        throw new AssertionError("Exactly one option out of " +
309          Arrays.toString(values()) + " has to be specified");
310      }
311      LOG.info("Using thrift server type " + chosenType.option);
312      conf.set(SERVER_TYPE_CONF_KEY, chosenType.option);
313    }
314
315    public String simpleClassName() {
316      return serverClass.getSimpleName();
317    }
318
319    public static List<String> serversThatCannotSpecifyBindIP() {
320      List<String> l = new ArrayList<>();
321      for (ImplType t : values()) {
322        if (!t.canSpecifyBindIP) {
323          l.add(t.simpleClassName());
324        }
325      }
326      return l;
327    }
328
329  }
330
331  public ThriftServerRunner(Configuration conf) throws IOException {
332    UserProvider userProvider = UserProvider.instantiate(conf);
333    // login the server principal (if using secure Hadoop)
334    securityEnabled = userProvider.isHadoopSecurityEnabled()
335      && userProvider.isHBaseSecurityEnabled();
336    if (securityEnabled) {
337      host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
338        conf.get("hbase.thrift.dns.interface", "default"),
339        conf.get("hbase.thrift.dns.nameserver", "default")));
340      userProvider.login("hbase.thrift.keytab.file",
341        "hbase.thrift.kerberos.principal", host);
342    }
343    this.conf = HBaseConfiguration.create(conf);
344    this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
345    this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
346    this.pauseMonitor = new JvmPauseMonitor(conf, this.metrics.getSource());
347    this.hbaseHandler = new HBaseHandler(conf, userProvider);
348    this.hbaseHandler.initMetrics(metrics);
349    this.handler = HbaseHandlerMetricsProxy.newInstance(
350      hbaseHandler, metrics, conf);
351    this.realUser = userProvider.getCurrent().getUGI();
352    String strQop = conf.get(THRIFT_QOP_KEY);
353    if (strQop != null) {
354      this.qop = SaslUtil.getQop(strQop);
355    }
356    doAsEnabled = conf.getBoolean(THRIFT_SUPPORT_PROXYUSER, false);
357    if (doAsEnabled) {
358      if (!conf.getBoolean(USE_HTTP_CONF_KEY, false)) {
359        LOG.warn("Fail to enable the doAs feature. hbase.regionserver.thrift.http is not configured ");
360      }
361    }
362    if (qop != null) {
363      if (qop != QualityOfProtection.AUTHENTICATION &&
364          qop != QualityOfProtection.INTEGRITY &&
365          qop != QualityOfProtection.PRIVACY) {
366        throw new IOException(String.format("Invalide %s: It must be one of %s, %s, or %s.",
367                              THRIFT_QOP_KEY,
368                              QualityOfProtection.AUTHENTICATION.name(),
369                              QualityOfProtection.INTEGRITY.name(),
370                              QualityOfProtection.PRIVACY.name()));
371      }
372      checkHttpSecurity(qop, conf);
373      if (!securityEnabled) {
374        throw new IOException("Thrift server must"
375          + " run in secure mode to support authentication");
376      }
377    }
378  }
379
380  private void checkHttpSecurity(QualityOfProtection qop, Configuration conf) {
381    if (qop == QualityOfProtection.PRIVACY &&
382        conf.getBoolean(USE_HTTP_CONF_KEY, false) &&
383        !conf.getBoolean(THRIFT_SSL_ENABLED, false)) {
384      throw new IllegalArgumentException("Thrift HTTP Server's QoP is privacy, but " +
385          THRIFT_SSL_ENABLED + " is false");
386    }
387  }
388
389  /*
390   * Runs the Thrift server
391   */
392  @Override
393  public void run() {
394    realUser.doAs(new PrivilegedAction<Object>() {
395      @Override
396      public Object run() {
397        try {
398          pauseMonitor.start();
399          if (conf.getBoolean(USE_HTTP_CONF_KEY, false)) {
400            setupHTTPServer();
401            httpServer.start();
402            httpServer.join();
403          } else {
404            setupServer();
405            tserver.serve();
406          }
407        } catch (Exception e) {
408          LOG.error(HBaseMarkers.FATAL, "Cannot run ThriftServer", e);
409          // Crash the process if the ThriftServer is not running
410          System.exit(-1);
411        }
412        return null;
413      }
414    });
415
416  }
417
418  public void shutdown() {
419    if (pauseMonitor != null) {
420      pauseMonitor.stop();
421    }
422    if (tserver != null) {
423      tserver.stop();
424      tserver = null;
425    }
426    if (httpServer != null) {
427      try {
428        httpServer.stop();
429        httpServer = null;
430      } catch (Exception e) {
431        LOG.error("Problem encountered in shutting down HTTP server " + e.getCause());
432      }
433      httpServer = null;
434    }
435  }
436
437  private void setupHTTPServer() throws IOException {
438    TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
439    TProcessor processor = new Hbase.Processor<>(handler);
440    TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, realUser,
441        conf, hbaseHandler, securityEnabled, doAsEnabled);
442
443    // Set the default max thread number to 100 to limit
444    // the number of concurrent requests so that Thrfit HTTP server doesn't OOM easily.
445    // Jetty set the default max thread number to 250, if we don't set it.
446    //
447    // Our default min thread number 2 is the same as that used by Jetty.
448    int minThreads = conf.getInt(HTTP_MIN_THREADS, 2);
449    int maxThreads = conf.getInt(HTTP_MAX_THREADS, 100);
450    QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads);
451    threadPool.setMinThreads(minThreads);
452    httpServer = new Server(threadPool);
453
454    // Context handler
455    ServletContextHandler ctxHandler = new ServletContextHandler(httpServer, "/", ServletContextHandler.SESSIONS);
456    ctxHandler.addServlet(new ServletHolder(thriftHttpServlet), "/*");
457
458    // set up Jetty and run the embedded server
459    HttpConfiguration httpConfig = new HttpConfiguration();
460    httpConfig.setSecureScheme("https");
461    httpConfig.setSecurePort(listenPort);
462    httpConfig.setHeaderCacheSize(DEFAULT_HTTP_MAX_HEADER_SIZE);
463    httpConfig.setRequestHeaderSize(DEFAULT_HTTP_MAX_HEADER_SIZE);
464    httpConfig.setResponseHeaderSize(DEFAULT_HTTP_MAX_HEADER_SIZE);
465    httpConfig.setSendServerVersion(false);
466    httpConfig.setSendDateHeader(false);
467
468    ServerConnector serverConnector;
469    if(conf.getBoolean(THRIFT_SSL_ENABLED, false)) {
470      HttpConfiguration httpsConfig = new HttpConfiguration(httpConfig);
471      httpsConfig.addCustomizer(new SecureRequestCustomizer());
472
473      SslContextFactory sslCtxFactory = new SslContextFactory();
474      String keystore = conf.get(THRIFT_SSL_KEYSTORE_STORE);
475      String password = HBaseConfiguration.getPassword(conf,
476          THRIFT_SSL_KEYSTORE_PASSWORD, null);
477      String keyPassword = HBaseConfiguration.getPassword(conf,
478          THRIFT_SSL_KEYSTORE_KEYPASSWORD, password);
479      sslCtxFactory.setKeyStorePath(keystore);
480      sslCtxFactory.setKeyStorePassword(password);
481      sslCtxFactory.setKeyManagerPassword(keyPassword);
482
483      String[] excludeCiphers = conf.getStrings(
484          THRIFT_SSL_EXCLUDE_CIPHER_SUITES, ArrayUtils.EMPTY_STRING_ARRAY);
485      if (excludeCiphers.length != 0) {
486        sslCtxFactory.setExcludeCipherSuites(excludeCiphers);
487      }
488      String[] includeCiphers = conf.getStrings(
489          THRIFT_SSL_INCLUDE_CIPHER_SUITES, ArrayUtils.EMPTY_STRING_ARRAY);
490      if (includeCiphers.length != 0) {
491        sslCtxFactory.setIncludeCipherSuites(includeCiphers);
492      }
493
494      // Disable SSLv3 by default due to "Poodle" Vulnerability - CVE-2014-3566
495      String[] excludeProtocols = conf.getStrings(
496          THRIFT_SSL_EXCLUDE_PROTOCOLS, "SSLv3");
497      if (excludeProtocols.length != 0) {
498        sslCtxFactory.setExcludeProtocols(excludeProtocols);
499      }
500      String[] includeProtocols = conf.getStrings(
501          THRIFT_SSL_INCLUDE_PROTOCOLS, ArrayUtils.EMPTY_STRING_ARRAY);
502      if (includeProtocols.length != 0) {
503        sslCtxFactory.setIncludeProtocols(includeProtocols);
504      }
505
506      serverConnector = new ServerConnector(httpServer,
507          new SslConnectionFactory(sslCtxFactory, HttpVersion.HTTP_1_1.toString()),
508          new HttpConnectionFactory(httpsConfig));
509    } else {
510      serverConnector = new ServerConnector(httpServer, new HttpConnectionFactory(httpConfig));
511    }
512    serverConnector.setPort(listenPort);
513    String host = getBindAddress(conf).getHostAddress();
514    serverConnector.setHost(host);
515    httpServer.addConnector(serverConnector);
516    httpServer.setStopAtShutdown(true);
517
518    if (doAsEnabled) {
519      ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
520    }
521
522    LOG.info("Starting Thrift HTTP Server on " + Integer.toString(listenPort));
523  }
524
525  /**
526   * Setting up the thrift TServer
527   */
528  private void setupServer() throws Exception {
529    // Construct correct ProtocolFactory
530    TProtocolFactory protocolFactory;
531    if (conf.getBoolean(COMPACT_CONF_KEY, false)) {
532      LOG.debug("Using compact protocol");
533      protocolFactory = new TCompactProtocol.Factory();
534    } else {
535      LOG.debug("Using binary protocol");
536      protocolFactory = new TBinaryProtocol.Factory();
537    }
538
539    final TProcessor p = new Hbase.Processor<>(handler);
540    ImplType implType = ImplType.getServerImpl(conf);
541    TProcessor processor = p;
542
543    // Construct correct TransportFactory
544    TTransportFactory transportFactory;
545    if (conf.getBoolean(FRAMED_CONF_KEY, false) || implType.isAlwaysFramed) {
546      if (qop != null) {
547        throw new RuntimeException("Thrift server authentication"
548          + " doesn't work with framed transport yet");
549      }
550      transportFactory = new TFramedTransport.Factory(
551          conf.getInt(MAX_FRAME_SIZE_CONF_KEY, 2)  * 1024 * 1024);
552      LOG.debug("Using framed transport");
553    } else if (qop == null) {
554      transportFactory = new TTransportFactory();
555    } else {
556      // Extract the name from the principal
557      String name = SecurityUtil.getUserFromPrincipal(
558        conf.get("hbase.thrift.kerberos.principal"));
559      Map<String, String> saslProperties = SaslUtil.initSaslProperties(qop.name());
560      TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
561      saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
562        new SaslGssCallbackHandler() {
563          @Override
564          public void handle(Callback[] callbacks)
565              throws UnsupportedCallbackException {
566            AuthorizeCallback ac = null;
567            for (Callback callback : callbacks) {
568              if (callback instanceof AuthorizeCallback) {
569                ac = (AuthorizeCallback) callback;
570              } else {
571                throw new UnsupportedCallbackException(callback,
572                    "Unrecognized SASL GSSAPI Callback");
573              }
574            }
575            if (ac != null) {
576              String authid = ac.getAuthenticationID();
577              String authzid = ac.getAuthorizationID();
578              if (!authid.equals(authzid)) {
579                ac.setAuthorized(false);
580              } else {
581                ac.setAuthorized(true);
582                String userName = SecurityUtil.getUserFromPrincipal(authzid);
583                LOG.info("Effective user: " + userName);
584                ac.setAuthorizedID(userName);
585              }
586            }
587          }
588        });
589      transportFactory = saslFactory;
590
591      // Create a processor wrapper, to get the caller
592      processor = new TProcessor() {
593        @Override
594        public boolean process(TProtocol inProt,
595            TProtocol outProt) throws TException {
596          TSaslServerTransport saslServerTransport =
597            (TSaslServerTransport)inProt.getTransport();
598          SaslServer saslServer = saslServerTransport.getSaslServer();
599          String principal = saslServer.getAuthorizationID();
600          hbaseHandler.setEffectiveUser(principal);
601          return p.process(inProt, outProt);
602        }
603      };
604    }
605
606    if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
607      LOG.error("Server types " + Joiner.on(", ").join(
608          ImplType.serversThatCannotSpecifyBindIP()) + " don't support IP " +
609          "address binding at the moment. See " +
610          "https://issues.apache.org/jira/browse/HBASE-2155 for details.");
611      throw new RuntimeException(
612          "-" + BIND_CONF_KEY + " not supported with " + implType);
613    }
614
615    // Thrift's implementation uses '0' as a placeholder for 'use the default.'
616    int backlog = conf.getInt(BACKLOG_CONF_KEY, 0);
617
618    if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING ||
619        implType == ImplType.THREADED_SELECTOR) {
620      InetAddress listenAddress = getBindAddress(conf);
621      TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(
622          new InetSocketAddress(listenAddress, listenPort));
623
624      if (implType == ImplType.NONBLOCKING) {
625        TNonblockingServer.Args serverArgs =
626            new TNonblockingServer.Args(serverTransport);
627        serverArgs.processor(processor)
628                  .transportFactory(transportFactory)
629                  .protocolFactory(protocolFactory);
630        tserver = new TNonblockingServer(serverArgs);
631      } else if (implType == ImplType.HS_HA) {
632        THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
633        CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(), metrics);
634        ExecutorService executorService = createExecutor(
635            callQueue, serverArgs.getMaxWorkerThreads(), serverArgs.getMaxWorkerThreads());
636        serverArgs.executorService(executorService)
637                  .processor(processor)
638                  .transportFactory(transportFactory)
639                  .protocolFactory(protocolFactory);
640        tserver = new THsHaServer(serverArgs);
641      } else { // THREADED_SELECTOR
642        TThreadedSelectorServer.Args serverArgs =
643            new HThreadedSelectorServerArgs(serverTransport, conf);
644        CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(), metrics);
645        ExecutorService executorService = createExecutor(
646            callQueue, serverArgs.getWorkerThreads(), serverArgs.getWorkerThreads());
647        serverArgs.executorService(executorService)
648                  .processor(processor)
649                  .transportFactory(transportFactory)
650                  .protocolFactory(protocolFactory);
651        tserver = new TThreadedSelectorServer(serverArgs);
652      }
653      LOG.info("starting HBase " + implType.simpleClassName() +
654          " server on " + Integer.toString(listenPort));
655    } else if (implType == ImplType.THREAD_POOL) {
656      // Thread pool server. Get the IP address to bind to.
657      InetAddress listenAddress = getBindAddress(conf);
658      int readTimeout = conf.getInt(THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY,
659          THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT);
660      TServerTransport serverTransport = new TServerSocket(
661          new TServerSocket.ServerSocketTransportArgs().
662              bindAddr(new InetSocketAddress(listenAddress, listenPort)).
663              backlog(backlog).
664              clientTimeout(readTimeout));
665
666      TBoundedThreadPoolServer.Args serverArgs =
667          new TBoundedThreadPoolServer.Args(serverTransport, conf);
668      serverArgs.processor(processor)
669                .transportFactory(transportFactory)
670                .protocolFactory(protocolFactory);
671      LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
672          + listenAddress + ":" + Integer.toString(listenPort)
673          + " with readTimeout " + readTimeout + "ms; " + serverArgs);
674      TBoundedThreadPoolServer tserver =
675          new TBoundedThreadPoolServer(serverArgs, metrics);
676      this.tserver = tserver;
677    } else {
678      throw new AssertionError("Unsupported Thrift server implementation: " +
679          implType.simpleClassName());
680    }
681
682    // A sanity check that we instantiated the right type of server.
683    if (tserver.getClass() != implType.serverClass) {
684      throw new AssertionError("Expected to create Thrift server class " +
685          implType.serverClass.getName() + " but got " +
686          tserver.getClass().getName());
687    }
688
689
690
691    registerFilters(conf);
692  }
693
694  ExecutorService createExecutor(BlockingQueue<Runnable> callQueue,
695                                 int minWorkers, int maxWorkers) {
696    ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
697    tfb.setDaemon(true);
698    tfb.setNameFormat("thrift-worker-%d");
699    ThreadPoolExecutor threadPool = new THBaseThreadPoolExecutor(minWorkers, maxWorkers,
700            Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build(), metrics);
701    threadPool.allowCoreThreadTimeOut(true);
702    return threadPool;
703  }
704
705  private InetAddress getBindAddress(Configuration conf)
706      throws UnknownHostException {
707    String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
708    return InetAddress.getByName(bindAddressStr);
709  }
710
711  protected static class ResultScannerWrapper {
712
713    private final ResultScanner scanner;
714    private final boolean sortColumns;
715    public ResultScannerWrapper(ResultScanner resultScanner,
716                                boolean sortResultColumns) {
717      scanner = resultScanner;
718      sortColumns = sortResultColumns;
719   }
720
721    public ResultScanner getScanner() {
722      return scanner;
723    }
724
725    public boolean isColumnSorted() {
726      return sortColumns;
727    }
728  }
729
730  /**
731   * The HBaseHandler is a glue object that connects Thrift RPC calls to the
732   * HBase client API primarily defined in the Admin and Table objects.
733   */
734  public static class HBaseHandler implements Hbase.Iface {
735    protected Configuration conf;
736    protected static final Logger LOG = LoggerFactory.getLogger(HBaseHandler.class);
737
738    // nextScannerId and scannerMap are used to manage scanner state
739    protected int nextScannerId = 0;
740    protected HashMap<Integer, ResultScannerWrapper> scannerMap = null;
741    private ThriftMetrics metrics = null;
742
743    private final ConnectionCache connectionCache;
744    IncrementCoalescer coalescer = null;
745
746    static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
747    static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
748
749    /**
750     * Returns a list of all the column families for a given Table.
751     *
752     * @param table
753     * @throws IOException
754     */
755    byte[][] getAllColumns(Table table) throws IOException {
756      HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies();
757      byte[][] columns = new byte[cds.length][];
758      for (int i = 0; i < cds.length; i++) {
759        columns[i] = Bytes.add(cds[i].getName(),
760            KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
761      }
762      return columns;
763    }
764
765    /**
766     * Creates and returns a Table instance from a given table name.
767     *
768     * @param tableName
769     *          name of table
770     * @return Table object
771     * @throws IOException
772     */
773    public Table getTable(final byte[] tableName) throws
774        IOException {
775      String table = Bytes.toString(tableName);
776      return connectionCache.getTable(table);
777    }
778
779    public Table getTable(final ByteBuffer tableName) throws IOException {
780      return getTable(getBytes(tableName));
781    }
782
783    /**
784     * Assigns a unique ID to the scanner and adds the mapping to an internal
785     * hash-map.
786     *
787     * @param scanner
788     * @return integer scanner id
789     */
790    protected synchronized int addScanner(ResultScanner scanner,boolean sortColumns) {
791      int id = nextScannerId++;
792      ResultScannerWrapper resultScannerWrapper = new ResultScannerWrapper(scanner, sortColumns);
793      scannerMap.put(id, resultScannerWrapper);
794      return id;
795    }
796
797    /**
798     * Returns the scanner associated with the specified ID.
799     *
800     * @param id
801     * @return a Scanner, or null if ID was invalid.
802     */
803    protected synchronized ResultScannerWrapper getScanner(int id) {
804      return scannerMap.get(id);
805    }
806
807    /**
808     * Removes the scanner associated with the specified ID from the internal
809     * id-&gt;scanner hash-map.
810     *
811     * @param id
812     * @return a Scanner, or null if ID was invalid.
813     */
814    protected synchronized ResultScannerWrapper removeScanner(int id) {
815      return scannerMap.remove(id);
816    }
817
818    protected HBaseHandler(final Configuration c,
819        final UserProvider userProvider) throws IOException {
820      this.conf = c;
821      scannerMap = new HashMap<>();
822      this.coalescer = new IncrementCoalescer(this);
823
824      int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
825      int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
826      connectionCache = new ConnectionCache(
827        conf, userProvider, cleanInterval, maxIdleTime);
828    }
829
830    /**
831     * Obtain HBaseAdmin. Creates the instance if it is not already created.
832     */
833    private Admin getAdmin() throws IOException {
834      return connectionCache.getAdmin();
835    }
836
837    void setEffectiveUser(String effectiveUser) {
838      connectionCache.setEffectiveUser(effectiveUser);
839    }
840
841    @Override
842    public void enableTable(ByteBuffer tableName) throws IOError {
843      try{
844        getAdmin().enableTable(getTableName(tableName));
845      } catch (IOException e) {
846        LOG.warn(e.getMessage(), e);
847        throw getIOError(e);
848      }
849    }
850
851    @Override
852    public void disableTable(ByteBuffer tableName) throws IOError{
853      try{
854        getAdmin().disableTable(getTableName(tableName));
855      } catch (IOException e) {
856        LOG.warn(e.getMessage(), e);
857        throw getIOError(e);
858      }
859    }
860
861    @Override
862    public boolean isTableEnabled(ByteBuffer tableName) throws IOError {
863      try {
864        return this.connectionCache.getAdmin().isTableEnabled(getTableName(tableName));
865      } catch (IOException e) {
866        LOG.warn(e.getMessage(), e);
867        throw getIOError(e);
868      }
869    }
870
871    // ThriftServerRunner.compact should be deprecated and replaced with methods specific to
872    // table and region.
873    @Override
874    public void compact(ByteBuffer tableNameOrRegionName) throws IOError {
875      try {
876        try {
877          getAdmin().compactRegion(getBytes(tableNameOrRegionName));
878        } catch (IllegalArgumentException e) {
879          // Invalid region, try table
880          getAdmin().compact(TableName.valueOf(getBytes(tableNameOrRegionName)));
881        }
882      } catch (IOException e) {
883        LOG.warn(e.getMessage(), e);
884        throw getIOError(e);
885      }
886    }
887
888    // ThriftServerRunner.majorCompact should be deprecated and replaced with methods specific
889    // to table and region.
890    @Override
891    public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError {
892      try {
893        try {
894          getAdmin().compactRegion(getBytes(tableNameOrRegionName));
895        } catch (IllegalArgumentException e) {
896          // Invalid region, try table
897          getAdmin().compact(TableName.valueOf(getBytes(tableNameOrRegionName)));
898        }
899      } catch (IOException e) {
900        LOG.warn(e.getMessage(), e);
901        throw getIOError(e);
902      }
903    }
904
905    @Override
906    public List<ByteBuffer> getTableNames() throws IOError {
907      try {
908        TableName[] tableNames = this.getAdmin().listTableNames();
909        ArrayList<ByteBuffer> list = new ArrayList<>(tableNames.length);
910        for (int i = 0; i < tableNames.length; i++) {
911          list.add(ByteBuffer.wrap(tableNames[i].getName()));
912        }
913        return list;
914      } catch (IOException e) {
915        LOG.warn(e.getMessage(), e);
916        throw getIOError(e);
917      }
918    }
919
920    /**
921     * @return the list of regions in the given table, or an empty list if the table does not exist
922     */
923    @Override
924    public List<TRegionInfo> getTableRegions(ByteBuffer tableName)
925    throws IOError {
926      try (RegionLocator locator = connectionCache.getRegionLocator(getBytes(tableName))) {
927        List<HRegionLocation> regionLocations = locator.getAllRegionLocations();
928        List<TRegionInfo> results = new ArrayList<>(regionLocations.size());
929        for (HRegionLocation regionLocation : regionLocations) {
930          RegionInfo info = regionLocation.getRegionInfo();
931          ServerName serverName = regionLocation.getServerName();
932          TRegionInfo region = new TRegionInfo();
933          region.serverName = ByteBuffer.wrap(
934              Bytes.toBytes(serverName.getHostname()));
935          region.port = serverName.getPort();
936          region.startKey = ByteBuffer.wrap(info.getStartKey());
937          region.endKey = ByteBuffer.wrap(info.getEndKey());
938          region.id = info.getRegionId();
939          region.name = ByteBuffer.wrap(info.getRegionName());
940          region.version = HREGION_VERSION; // HRegion now not versioned, PB encoding used
941          results.add(region);
942        }
943        return results;
944      } catch (TableNotFoundException e) {
945        // Return empty list for non-existing table
946        return Collections.emptyList();
947      } catch (IOException e){
948        LOG.warn(e.getMessage(), e);
949        throw getIOError(e);
950      }
951    }
952
953    @Override
954    public List<TCell> get(
955        ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
956        Map<ByteBuffer, ByteBuffer> attributes)
957        throws IOError {
958      byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
959      if (famAndQf.length == 1) {
960        return get(tableName, row, famAndQf[0], null, attributes);
961      }
962      if (famAndQf.length == 2) {
963        return get(tableName, row, famAndQf[0], famAndQf[1], attributes);
964      }
965      throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
966    }
967
968    /**
969     * Note: this internal interface is slightly different from public APIs in regard to handling
970     * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
971     * we respect qual == null as a request for the entire column family. The caller (
972     * {@link #get(ByteBuffer, ByteBuffer, ByteBuffer, Map)}) interface IS consistent in that the
973     * column is parse like normal.
974     */
975    protected List<TCell> get(ByteBuffer tableName,
976                              ByteBuffer row,
977                              byte[] family,
978                              byte[] qualifier,
979                              Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
980      Table table = null;
981      try {
982        table = getTable(tableName);
983        Get get = new Get(getBytes(row));
984        addAttributes(get, attributes);
985        if (qualifier == null) {
986          get.addFamily(family);
987        } else {
988          get.addColumn(family, qualifier);
989        }
990        Result result = table.get(get);
991        return ThriftUtilities.cellFromHBase(result.rawCells());
992      } catch (IOException e) {
993        LOG.warn(e.getMessage(), e);
994        throw getIOError(e);
995      } finally {
996        closeTable(table);
997      }
998    }
999
1000    @Override
1001    public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1002        int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1003      byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
1004      if(famAndQf.length == 1) {
1005        return getVer(tableName, row, famAndQf[0], null, numVersions, attributes);
1006      }
1007      if (famAndQf.length == 2) {
1008        return getVer(tableName, row, famAndQf[0], famAndQf[1], numVersions, attributes);
1009      }
1010      throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
1011
1012    }
1013
1014    /**
1015     * Note: this public interface is slightly different from public Java APIs in regard to
1016     * handling of the qualifier. Here we differ from the public Java API in that null != byte[0].
1017     * Rather, we respect qual == null as a request for the entire column family. If you want to
1018     * access the entire column family, use
1019     * {@link #getVer(ByteBuffer, ByteBuffer, ByteBuffer, int, Map)} with a {@code column} value
1020     * that lacks a {@code ':'}.
1021     */
1022    public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family,
1023        byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1024
1025      Table table = null;
1026      try {
1027        table = getTable(tableName);
1028        Get get = new Get(getBytes(row));
1029        addAttributes(get, attributes);
1030        if (null == qualifier) {
1031          get.addFamily(family);
1032        } else {
1033          get.addColumn(family, qualifier);
1034        }
1035        get.setMaxVersions(numVersions);
1036        Result result = table.get(get);
1037        return ThriftUtilities.cellFromHBase(result.rawCells());
1038      } catch (IOException e) {
1039        LOG.warn(e.getMessage(), e);
1040        throw getIOError(e);
1041      } finally{
1042        closeTable(table);
1043      }
1044    }
1045
1046    @Override
1047    public List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1048        long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1049      byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
1050      if (famAndQf.length == 1) {
1051        return getVerTs(tableName, row, famAndQf[0], null, timestamp, numVersions, attributes);
1052      }
1053      if (famAndQf.length == 2) {
1054        return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, numVersions,
1055          attributes);
1056      }
1057      throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
1058    }
1059
1060    /**
1061     * Note: this internal interface is slightly different from public APIs in regard to handling
1062     * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
1063     * we respect qual == null as a request for the entire column family. The caller (
1064     * {@link #getVerTs(ByteBuffer, ByteBuffer, ByteBuffer, long, int, Map)}) interface IS
1065     * consistent in that the column is parse like normal.
1066     */
1067    protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
1068        byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
1069        throws IOError {
1070
1071      Table table = null;
1072      try {
1073        table = getTable(tableName);
1074        Get get = new Get(getBytes(row));
1075        addAttributes(get, attributes);
1076        if (null == qualifier) {
1077          get.addFamily(family);
1078        } else {
1079          get.addColumn(family, qualifier);
1080        }
1081        get.setTimeRange(0, timestamp);
1082        get.setMaxVersions(numVersions);
1083        Result result = table.get(get);
1084        return ThriftUtilities.cellFromHBase(result.rawCells());
1085      } catch (IOException e) {
1086        LOG.warn(e.getMessage(), e);
1087        throw getIOError(e);
1088      } finally{
1089        closeTable(table);
1090      }
1091    }
1092
1093    @Override
1094    public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row,
1095        Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1096      return getRowWithColumnsTs(tableName, row, null,
1097                                 HConstants.LATEST_TIMESTAMP,
1098                                 attributes);
1099    }
1100
1101    @Override
1102    public List<TRowResult> getRowWithColumns(ByteBuffer tableName,
1103                                              ByteBuffer row,
1104        List<ByteBuffer> columns,
1105        Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1106      return getRowWithColumnsTs(tableName, row, columns,
1107                                 HConstants.LATEST_TIMESTAMP,
1108                                 attributes);
1109    }
1110
1111    @Override
1112    public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row,
1113        long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1114      return getRowWithColumnsTs(tableName, row, null,
1115                                 timestamp, attributes);
1116    }
1117
1118    @Override
1119    public List<TRowResult> getRowWithColumnsTs(
1120        ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
1121        long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1122
1123      Table table = null;
1124      try {
1125        table = getTable(tableName);
1126        if (columns == null) {
1127          Get get = new Get(getBytes(row));
1128          addAttributes(get, attributes);
1129          get.setTimeRange(0, timestamp);
1130          Result result = table.get(get);
1131          return ThriftUtilities.rowResultFromHBase(result);
1132        }
1133        Get get = new Get(getBytes(row));
1134        addAttributes(get, attributes);
1135        for(ByteBuffer column : columns) {
1136          byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
1137          if (famAndQf.length == 1) {
1138              get.addFamily(famAndQf[0]);
1139          } else {
1140              get.addColumn(famAndQf[0], famAndQf[1]);
1141          }
1142        }
1143        get.setTimeRange(0, timestamp);
1144        Result result = table.get(get);
1145        return ThriftUtilities.rowResultFromHBase(result);
1146      } catch (IOException e) {
1147        LOG.warn(e.getMessage(), e);
1148        throw getIOError(e);
1149      } finally{
1150        closeTable(table);
1151      }
1152    }
1153
1154    @Override
1155    public List<TRowResult> getRows(ByteBuffer tableName,
1156                                    List<ByteBuffer> rows,
1157        Map<ByteBuffer, ByteBuffer> attributes)
1158        throws IOError {
1159      return getRowsWithColumnsTs(tableName, rows, null,
1160                                  HConstants.LATEST_TIMESTAMP,
1161                                  attributes);
1162    }
1163
1164    @Override
1165    public List<TRowResult> getRowsWithColumns(ByteBuffer tableName,
1166                                               List<ByteBuffer> rows,
1167        List<ByteBuffer> columns,
1168        Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1169      return getRowsWithColumnsTs(tableName, rows, columns,
1170                                  HConstants.LATEST_TIMESTAMP,
1171                                  attributes);
1172    }
1173
1174    @Override
1175    public List<TRowResult> getRowsTs(ByteBuffer tableName,
1176                                      List<ByteBuffer> rows,
1177        long timestamp,
1178        Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1179      return getRowsWithColumnsTs(tableName, rows, null,
1180                                  timestamp, attributes);
1181    }
1182
1183    @Override
1184    public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName,
1185                                                 List<ByteBuffer> rows,
1186        List<ByteBuffer> columns, long timestamp,
1187        Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1188
1189      Table table= null;
1190      try {
1191        List<Get> gets = new ArrayList<>(rows.size());
1192        table = getTable(tableName);
1193        if (metrics != null) {
1194          metrics.incNumRowKeysInBatchGet(rows.size());
1195        }
1196        for (ByteBuffer row : rows) {
1197          Get get = new Get(getBytes(row));
1198          addAttributes(get, attributes);
1199          if (columns != null) {
1200
1201            for(ByteBuffer column : columns) {
1202              byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
1203              if (famAndQf.length == 1) {
1204                get.addFamily(famAndQf[0]);
1205              } else {
1206                get.addColumn(famAndQf[0], famAndQf[1]);
1207              }
1208            }
1209          }
1210          get.setTimeRange(0, timestamp);
1211          gets.add(get);
1212        }
1213        Result[] result = table.get(gets);
1214        return ThriftUtilities.rowResultFromHBase(result);
1215      } catch (IOException e) {
1216        LOG.warn(e.getMessage(), e);
1217        throw getIOError(e);
1218      } finally{
1219        closeTable(table);
1220      }
1221    }
1222
1223    @Override
1224    public void deleteAll(
1225        ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1226        Map<ByteBuffer, ByteBuffer> attributes)
1227        throws IOError {
1228      deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP,
1229                  attributes);
1230    }
1231
1232    @Override
1233    public void deleteAllTs(ByteBuffer tableName,
1234                            ByteBuffer row,
1235                            ByteBuffer column,
1236        long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1237      Table table = null;
1238      try {
1239        table = getTable(tableName);
1240        Delete delete  = new Delete(getBytes(row));
1241        addAttributes(delete, attributes);
1242        byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
1243        if (famAndQf.length == 1) {
1244          delete.addFamily(famAndQf[0], timestamp);
1245        } else {
1246          delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
1247        }
1248        table.delete(delete);
1249
1250      } catch (IOException e) {
1251        LOG.warn(e.getMessage(), e);
1252        throw getIOError(e);
1253      } finally {
1254        closeTable(table);
1255      }
1256    }
1257
1258    @Override
1259    public void deleteAllRow(
1260        ByteBuffer tableName, ByteBuffer row,
1261        Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1262      deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, attributes);
1263    }
1264
1265    @Override
1266    public void deleteAllRowTs(
1267        ByteBuffer tableName, ByteBuffer row, long timestamp,
1268        Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1269      Table table = null;
1270      try {
1271        table = getTable(tableName);
1272        Delete delete  = new Delete(getBytes(row), timestamp);
1273        addAttributes(delete, attributes);
1274        table.delete(delete);
1275      } catch (IOException e) {
1276        LOG.warn(e.getMessage(), e);
1277        throw getIOError(e);
1278      } finally {
1279        closeTable(table);
1280      }
1281    }
1282
1283    @Override
1284    public void createTable(ByteBuffer in_tableName,
1285        List<ColumnDescriptor> columnFamilies) throws IOError,
1286        IllegalArgument, AlreadyExists {
1287      TableName tableName = getTableName(in_tableName);
1288      try {
1289        if (getAdmin().tableExists(tableName)) {
1290          throw new AlreadyExists("table name already in use");
1291        }
1292        HTableDescriptor desc = new HTableDescriptor(tableName);
1293        for (ColumnDescriptor col : columnFamilies) {
1294          HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col);
1295          desc.addFamily(colDesc);
1296        }
1297        getAdmin().createTable(desc);
1298      } catch (IOException e) {
1299        LOG.warn(e.getMessage(), e);
1300        throw getIOError(e);
1301      } catch (IllegalArgumentException e) {
1302        LOG.warn(e.getMessage(), e);
1303        throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1304      }
1305    }
1306
1307    private static TableName getTableName(ByteBuffer buffer) {
1308      return TableName.valueOf(getBytes(buffer));
1309    }
1310
1311    @Override
1312    public void deleteTable(ByteBuffer in_tableName) throws IOError {
1313      TableName tableName = getTableName(in_tableName);
1314      if (LOG.isDebugEnabled()) {
1315        LOG.debug("deleteTable: table=" + tableName);
1316      }
1317      try {
1318        if (!getAdmin().tableExists(tableName)) {
1319          throw new IOException("table does not exist");
1320        }
1321        getAdmin().deleteTable(tableName);
1322      } catch (IOException e) {
1323        LOG.warn(e.getMessage(), e);
1324        throw getIOError(e);
1325      }
1326    }
1327
1328    @Override
1329    public void mutateRow(ByteBuffer tableName, ByteBuffer row,
1330        List<Mutation> mutations, Map<ByteBuffer, ByteBuffer> attributes)
1331        throws IOError, IllegalArgument {
1332      mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP,
1333                  attributes);
1334    }
1335
1336    @Override
1337    public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
1338        List<Mutation> mutations, long timestamp,
1339        Map<ByteBuffer, ByteBuffer> attributes)
1340        throws IOError, IllegalArgument {
1341      Table table = null;
1342      try {
1343        table = getTable(tableName);
1344        Put put = new Put(getBytes(row), timestamp);
1345        addAttributes(put, attributes);
1346
1347        Delete delete = new Delete(getBytes(row));
1348        addAttributes(delete, attributes);
1349        if (metrics != null) {
1350          metrics.incNumRowKeysInBatchMutate(mutations.size());
1351        }
1352
1353        // I apologize for all this mess :)
1354        CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
1355        for (Mutation m : mutations) {
1356          byte[][] famAndQf = CellUtil.parseColumn(getBytes(m.column));
1357          if (m.isDelete) {
1358            if (famAndQf.length == 1) {
1359              delete.addFamily(famAndQf[0], timestamp);
1360            } else {
1361              delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
1362            }
1363            delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1364                : Durability.SKIP_WAL);
1365          } else {
1366            if(famAndQf.length == 1) {
1367              LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1368                  + "over the whole column family.");
1369            } else {
1370              put.add(builder.clear()
1371                  .setRow(put.getRow())
1372                  .setFamily(famAndQf[0])
1373                  .setQualifier(famAndQf[1])
1374                  .setTimestamp(put.getTimestamp())
1375                  .setType(Type.Put)
1376                  .setValue(m.value != null ? getBytes(m.value)
1377                      : HConstants.EMPTY_BYTE_ARRAY)
1378                  .build());
1379            }
1380            put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1381          }
1382        }
1383        if (!delete.isEmpty())
1384          table.delete(delete);
1385        if (!put.isEmpty())
1386          table.put(put);
1387      } catch (IOException e) {
1388        LOG.warn(e.getMessage(), e);
1389        throw getIOError(e);
1390      } catch (IllegalArgumentException e) {
1391        LOG.warn(e.getMessage(), e);
1392        throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1393      } finally{
1394        closeTable(table);
1395      }
1396    }
1397
1398    @Override
1399    public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches,
1400        Map<ByteBuffer, ByteBuffer> attributes)
1401        throws IOError, IllegalArgument, TException {
1402      mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes);
1403    }
1404
1405    @Override
1406    public void mutateRowsTs(
1407        ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp,
1408        Map<ByteBuffer, ByteBuffer> attributes)
1409        throws IOError, IllegalArgument, TException {
1410      List<Put> puts = new ArrayList<>();
1411      List<Delete> deletes = new ArrayList<>();
1412      CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
1413      for (BatchMutation batch : rowBatches) {
1414        byte[] row = getBytes(batch.row);
1415        List<Mutation> mutations = batch.mutations;
1416        Delete delete = new Delete(row);
1417        addAttributes(delete, attributes);
1418        Put put = new Put(row, timestamp);
1419        addAttributes(put, attributes);
1420        for (Mutation m : mutations) {
1421          byte[][] famAndQf = CellUtil.parseColumn(getBytes(m.column));
1422          if (m.isDelete) {
1423            // no qualifier, family only.
1424            if (famAndQf.length == 1) {
1425              delete.addFamily(famAndQf[0], timestamp);
1426            } else {
1427              delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
1428            }
1429            delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1430                : Durability.SKIP_WAL);
1431          } else {
1432            if (famAndQf.length == 1) {
1433              LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1434                  + "over the whole column family.");
1435            }
1436            if (famAndQf.length == 2) {
1437              try {
1438                put.add(builder.clear()
1439                    .setRow(put.getRow())
1440                    .setFamily(famAndQf[0])
1441                    .setQualifier(famAndQf[1])
1442                    .setTimestamp(put.getTimestamp())
1443                    .setType(Type.Put)
1444                    .setValue(m.value != null ? getBytes(m.value)
1445                        : HConstants.EMPTY_BYTE_ARRAY)
1446                    .build());
1447              } catch (IOException e) {
1448                throw new IllegalArgumentException(e);
1449              }
1450            } else {
1451              throw new IllegalArgumentException("Invalid famAndQf provided.");
1452            }
1453            put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1454          }
1455        }
1456        if (!delete.isEmpty())
1457          deletes.add(delete);
1458        if (!put.isEmpty())
1459          puts.add(put);
1460      }
1461
1462      Table table = null;
1463      try {
1464        table = getTable(tableName);
1465        if (!puts.isEmpty())
1466          table.put(puts);
1467        if (!deletes.isEmpty())
1468          table.delete(deletes);
1469
1470      } catch (IOException e) {
1471        LOG.warn(e.getMessage(), e);
1472        throw getIOError(e);
1473      } catch (IllegalArgumentException e) {
1474        LOG.warn(e.getMessage(), e);
1475        throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1476      } finally{
1477        closeTable(table);
1478      }
1479    }
1480
1481    @Override
1482    public long atomicIncrement(
1483        ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount)
1484            throws IOError, IllegalArgument, TException {
1485      byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
1486      if(famAndQf.length == 1) {
1487        return atomicIncrement(tableName, row, famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, amount);
1488      }
1489      return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
1490    }
1491
1492    protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
1493        byte [] family, byte [] qualifier, long amount)
1494        throws IOError, IllegalArgument, TException {
1495      Table table = null;
1496      try {
1497        table = getTable(tableName);
1498        return table.incrementColumnValue(
1499            getBytes(row), family, qualifier, amount);
1500      } catch (IOException e) {
1501        LOG.warn(e.getMessage(), e);
1502        throw getIOError(e);
1503      } finally {
1504        closeTable(table);
1505      }
1506    }
1507
1508    @Override
1509    public void scannerClose(int id) throws IOError, IllegalArgument {
1510      LOG.debug("scannerClose: id=" + id);
1511      ResultScannerWrapper resultScannerWrapper = getScanner(id);
1512      if (resultScannerWrapper == null) {
1513        String message = "scanner ID is invalid";
1514        LOG.warn(message);
1515        throw new IllegalArgument("scanner ID is invalid");
1516      }
1517      resultScannerWrapper.getScanner().close();
1518      removeScanner(id);
1519    }
1520
1521    @Override
1522    public List<TRowResult> scannerGetList(int id,int nbRows)
1523        throws IllegalArgument, IOError {
1524      LOG.debug("scannerGetList: id=" + id);
1525      ResultScannerWrapper resultScannerWrapper = getScanner(id);
1526      if (null == resultScannerWrapper) {
1527        String message = "scanner ID is invalid";
1528        LOG.warn(message);
1529        throw new IllegalArgument("scanner ID is invalid");
1530      }
1531
1532      Result [] results = null;
1533      try {
1534        results = resultScannerWrapper.getScanner().next(nbRows);
1535        if (null == results) {
1536          return new ArrayList<>();
1537        }
1538      } catch (IOException e) {
1539        LOG.warn(e.getMessage(), e);
1540        throw getIOError(e);
1541      }
1542      return ThriftUtilities.rowResultFromHBase(results, resultScannerWrapper.isColumnSorted());
1543    }
1544
1545    @Override
1546    public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
1547      return scannerGetList(id,1);
1548    }
1549
1550    @Override
1551    public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
1552        Map<ByteBuffer, ByteBuffer> attributes)
1553        throws IOError {
1554
1555      Table table = null;
1556      try {
1557        table = getTable(tableName);
1558        Scan scan = new Scan();
1559        addAttributes(scan, attributes);
1560        if (tScan.isSetStartRow()) {
1561          scan.setStartRow(tScan.getStartRow());
1562        }
1563        if (tScan.isSetStopRow()) {
1564          scan.setStopRow(tScan.getStopRow());
1565        }
1566        if (tScan.isSetTimestamp()) {
1567          scan.setTimeRange(0, tScan.getTimestamp());
1568        }
1569        if (tScan.isSetCaching()) {
1570          scan.setCaching(tScan.getCaching());
1571        }
1572        if (tScan.isSetBatchSize()) {
1573          scan.setBatch(tScan.getBatchSize());
1574        }
1575        if (tScan.isSetColumns() && tScan.getColumns().size() != 0) {
1576          for(ByteBuffer column : tScan.getColumns()) {
1577            byte [][] famQf = CellUtil.parseColumn(getBytes(column));
1578            if(famQf.length == 1) {
1579              scan.addFamily(famQf[0]);
1580            } else {
1581              scan.addColumn(famQf[0], famQf[1]);
1582            }
1583          }
1584        }
1585        if (tScan.isSetFilterString()) {
1586          ParseFilter parseFilter = new ParseFilter();
1587          scan.setFilter(
1588              parseFilter.parseFilterString(tScan.getFilterString()));
1589        }
1590        if (tScan.isSetReversed()) {
1591          scan.setReversed(tScan.isReversed());
1592        }
1593        if (tScan.isSetCacheBlocks()) {
1594          scan.setCacheBlocks(tScan.isCacheBlocks());
1595        }
1596        return addScanner(table.getScanner(scan), tScan.sortColumns);
1597      } catch (IOException e) {
1598        LOG.warn(e.getMessage(), e);
1599        throw getIOError(e);
1600      } finally{
1601        closeTable(table);
1602      }
1603    }
1604
1605    @Override
1606    public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
1607        List<ByteBuffer> columns,
1608        Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1609
1610      Table table = null;
1611      try {
1612        table = getTable(tableName);
1613        Scan scan = new Scan(getBytes(startRow));
1614        addAttributes(scan, attributes);
1615        if(columns != null && columns.size() != 0) {
1616          for(ByteBuffer column : columns) {
1617            byte [][] famQf = CellUtil.parseColumn(getBytes(column));
1618            if(famQf.length == 1) {
1619              scan.addFamily(famQf[0]);
1620            } else {
1621              scan.addColumn(famQf[0], famQf[1]);
1622            }
1623          }
1624        }
1625        return addScanner(table.getScanner(scan), false);
1626      } catch (IOException e) {
1627        LOG.warn(e.getMessage(), e);
1628        throw getIOError(e);
1629      } finally{
1630        closeTable(table);
1631      }
1632    }
1633
1634    @Override
1635    public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow,
1636        ByteBuffer stopRow, List<ByteBuffer> columns,
1637        Map<ByteBuffer, ByteBuffer> attributes)
1638        throws IOError, TException {
1639
1640      Table table = null;
1641      try {
1642        table = getTable(tableName);
1643        Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1644        addAttributes(scan, attributes);
1645        if(columns != null && columns.size() != 0) {
1646          for(ByteBuffer column : columns) {
1647            byte [][] famQf = CellUtil.parseColumn(getBytes(column));
1648            if(famQf.length == 1) {
1649              scan.addFamily(famQf[0]);
1650            } else {
1651              scan.addColumn(famQf[0], famQf[1]);
1652            }
1653          }
1654        }
1655        return addScanner(table.getScanner(scan), false);
1656      } catch (IOException e) {
1657        LOG.warn(e.getMessage(), e);
1658        throw getIOError(e);
1659      } finally{
1660        closeTable(table);
1661      }
1662    }
1663
1664    @Override
1665    public int scannerOpenWithPrefix(ByteBuffer tableName,
1666                                     ByteBuffer startAndPrefix,
1667                                     List<ByteBuffer> columns,
1668        Map<ByteBuffer, ByteBuffer> attributes)
1669        throws IOError, TException {
1670
1671      Table table = null;
1672      try {
1673        table = getTable(tableName);
1674        Scan scan = new Scan(getBytes(startAndPrefix));
1675        addAttributes(scan, attributes);
1676        Filter f = new WhileMatchFilter(
1677            new PrefixFilter(getBytes(startAndPrefix)));
1678        scan.setFilter(f);
1679        if (columns != null && columns.size() != 0) {
1680          for(ByteBuffer column : columns) {
1681            byte [][] famQf = CellUtil.parseColumn(getBytes(column));
1682            if(famQf.length == 1) {
1683              scan.addFamily(famQf[0]);
1684            } else {
1685              scan.addColumn(famQf[0], famQf[1]);
1686            }
1687          }
1688        }
1689        return addScanner(table.getScanner(scan), false);
1690      } catch (IOException e) {
1691        LOG.warn(e.getMessage(), e);
1692        throw getIOError(e);
1693      } finally{
1694        closeTable(table);
1695      }
1696    }
1697
1698    @Override
1699    public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
1700        List<ByteBuffer> columns, long timestamp,
1701        Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
1702
1703      Table table = null;
1704      try {
1705        table = getTable(tableName);
1706        Scan scan = new Scan(getBytes(startRow));
1707        addAttributes(scan, attributes);
1708        scan.setTimeRange(0, timestamp);
1709        if (columns != null && columns.size() != 0) {
1710          for (ByteBuffer column : columns) {
1711            byte [][] famQf = CellUtil.parseColumn(getBytes(column));
1712            if(famQf.length == 1) {
1713              scan.addFamily(famQf[0]);
1714            } else {
1715              scan.addColumn(famQf[0], famQf[1]);
1716            }
1717          }
1718        }
1719        return addScanner(table.getScanner(scan), false);
1720      } catch (IOException e) {
1721        LOG.warn(e.getMessage(), e);
1722        throw getIOError(e);
1723      } finally{
1724        closeTable(table);
1725      }
1726    }
1727
1728    @Override
1729    public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow,
1730        ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
1731        Map<ByteBuffer, ByteBuffer> attributes)
1732        throws IOError, TException {
1733
1734      Table table = null;
1735      try {
1736        table = getTable(tableName);
1737        Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1738        addAttributes(scan, attributes);
1739        scan.setTimeRange(0, timestamp);
1740        if (columns != null && columns.size() != 0) {
1741          for (ByteBuffer column : columns) {
1742            byte [][] famQf = CellUtil.parseColumn(getBytes(column));
1743            if(famQf.length == 1) {
1744              scan.addFamily(famQf[0]);
1745            } else {
1746              scan.addColumn(famQf[0], famQf[1]);
1747            }
1748          }
1749        }
1750        scan.setTimeRange(0, timestamp);
1751        return addScanner(table.getScanner(scan), false);
1752      } catch (IOException e) {
1753        LOG.warn(e.getMessage(), e);
1754        throw getIOError(e);
1755      } finally{
1756        closeTable(table);
1757      }
1758    }
1759
1760    @Override
1761    public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
1762        ByteBuffer tableName) throws IOError, TException {
1763
1764      Table table = null;
1765      try {
1766        TreeMap<ByteBuffer, ColumnDescriptor> columns = new TreeMap<>();
1767
1768        table = getTable(tableName);
1769        HTableDescriptor desc = table.getTableDescriptor();
1770
1771        for (HColumnDescriptor e : desc.getFamilies()) {
1772          ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
1773          columns.put(col.name, col);
1774        }
1775        return columns;
1776      } catch (IOException e) {
1777        LOG.warn(e.getMessage(), e);
1778        throw getIOError(e);
1779      } finally {
1780        closeTable(table);
1781      }
1782    }
1783
1784    private void closeTable(Table table) throws IOError
1785    {
1786      try{
1787        if(table != null){
1788          table.close();
1789        }
1790      } catch (IOException e){
1791        LOG.error(e.getMessage(), e);
1792        throw getIOError(e);
1793      }
1794    }
1795
1796    @Override
1797    public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
1798      try {
1799        byte[] row = getBytes(searchRow);
1800        Result startRowResult = getReverseScanResult(TableName.META_TABLE_NAME.getName(), row,
1801          HConstants.CATALOG_FAMILY);
1802
1803        if (startRowResult == null) {
1804          throw new IOException("Cannot find row in "+ TableName.META_TABLE_NAME+", row="
1805                                + Bytes.toStringBinary(row));
1806        }
1807
1808        // find region start and end keys
1809        RegionInfo regionInfo = MetaTableAccessor.getRegionInfo(startRowResult);
1810        if (regionInfo == null) {
1811          throw new IOException("RegionInfo REGIONINFO was null or " +
1812                                " empty in Meta for row="
1813                                + Bytes.toStringBinary(row));
1814        }
1815        TRegionInfo region = new TRegionInfo();
1816        region.setStartKey(regionInfo.getStartKey());
1817        region.setEndKey(regionInfo.getEndKey());
1818        region.id = regionInfo.getRegionId();
1819        region.setName(regionInfo.getRegionName());
1820        region.version = HREGION_VERSION; // version not used anymore, PB encoding used.
1821
1822        // find region assignment to server
1823        ServerName serverName = MetaTableAccessor.getServerName(startRowResult, 0);
1824        if (serverName != null) {
1825          region.setServerName(Bytes.toBytes(serverName.getHostname()));
1826          region.port = serverName.getPort();
1827        }
1828        return region;
1829      } catch (IOException e) {
1830        LOG.warn(e.getMessage(), e);
1831        throw getIOError(e);
1832      }
1833    }
1834
1835    private Result getReverseScanResult(byte[] tableName, byte[] row, byte[] family)
1836        throws IOException {
1837      Scan scan = new Scan(row);
1838      scan.setReversed(true);
1839      scan.addFamily(family);
1840      scan.setStartRow(row);
1841      Table table = getTable(tableName);
1842      try (ResultScanner scanner = table.getScanner(scan)) {
1843        return scanner.next();
1844      } finally{
1845        if(table != null){
1846          table.close();
1847        }
1848      }
1849    }
1850
1851    private void initMetrics(ThriftMetrics metrics) {
1852      this.metrics = metrics;
1853    }
1854
1855    @Override
1856    public void increment(TIncrement tincrement) throws IOError, TException {
1857
1858      if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) {
1859        throw new TException("Must supply a table and a row key; can't increment");
1860      }
1861
1862      if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1863        this.coalescer.queueIncrement(tincrement);
1864        return;
1865      }
1866
1867      Table table = null;
1868      try {
1869        table = getTable(tincrement.getTable());
1870        Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
1871        table.increment(inc);
1872      } catch (IOException e) {
1873        LOG.warn(e.getMessage(), e);
1874        throw getIOError(e);
1875      } finally{
1876        closeTable(table);
1877      }
1878    }
1879
1880    @Override
1881    public void incrementRows(List<TIncrement> tincrements) throws IOError, TException {
1882      if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1883        this.coalescer.queueIncrements(tincrements);
1884        return;
1885      }
1886      for (TIncrement tinc : tincrements) {
1887        increment(tinc);
1888      }
1889    }
1890
1891    @Override
1892    public List<TCell> append(TAppend tappend) throws IOError, TException {
1893      if (tappend.getRow().length == 0 || tappend.getTable().length == 0) {
1894        throw new TException("Must supply a table and a row key; can't append");
1895      }
1896
1897      Table table = null;
1898      try {
1899        table = getTable(tappend.getTable());
1900        Append append = ThriftUtilities.appendFromThrift(tappend);
1901        Result result = table.append(append);
1902        return ThriftUtilities.cellFromHBase(result.rawCells());
1903      } catch (IOException e) {
1904        LOG.warn(e.getMessage(), e);
1905        throw getIOError(e);
1906      } finally{
1907          closeTable(table);
1908      }
1909    }
1910
1911    @Override
1912    public boolean checkAndPut(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1913        ByteBuffer value, Mutation mput, Map<ByteBuffer, ByteBuffer> attributes) throws IOError,
1914        IllegalArgument, TException {
1915      Put put;
1916      try {
1917        put = new Put(getBytes(row), HConstants.LATEST_TIMESTAMP);
1918        addAttributes(put, attributes);
1919
1920        byte[][] famAndQf = CellUtil.parseColumn(getBytes(mput.column));
1921        put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
1922            .setRow(put.getRow())
1923            .setFamily(famAndQf[0])
1924            .setQualifier(famAndQf[1])
1925            .setTimestamp(put.getTimestamp())
1926            .setType(Type.Put)
1927            .setValue(mput.value != null ? getBytes(mput.value)
1928                : HConstants.EMPTY_BYTE_ARRAY)
1929            .build());
1930        put.setDurability(mput.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1931      } catch (IOException | IllegalArgumentException e) {
1932        LOG.warn(e.getMessage(), e);
1933        throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1934      }
1935
1936      Table table = null;
1937      try {
1938        table = getTable(tableName);
1939        byte[][] famAndQf = CellUtil.parseColumn(getBytes(column));
1940        Table.CheckAndMutateBuilder mutateBuilder =
1941            table.checkAndMutate(getBytes(row), famAndQf[0]).qualifier(famAndQf[1]);
1942        if (value != null) {
1943          return mutateBuilder.ifEquals(getBytes(value)).thenPut(put);
1944        } else {
1945          return mutateBuilder.ifNotExists().thenPut(put);
1946        }
1947      } catch (IOException e) {
1948        LOG.warn(e.getMessage(), e);
1949        throw getIOError(e);
1950      } catch (IllegalArgumentException e) {
1951        LOG.warn(e.getMessage(), e);
1952        throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1953      } finally {
1954          closeTable(table);
1955      }
1956    }
1957  }
1958
1959
1960  private static IOError getIOError(Throwable throwable) {
1961    IOError error = new IOErrorWithCause(throwable);
1962    error.setMessage(Throwables.getStackTraceAsString(throwable));
1963    return error;
1964  }
1965
1966  /**
1967   * Adds all the attributes into the Operation object
1968   */
1969  private static void addAttributes(OperationWithAttributes op,
1970    Map<ByteBuffer, ByteBuffer> attributes) {
1971    if (attributes == null || attributes.isEmpty()) {
1972      return;
1973    }
1974    for (Map.Entry<ByteBuffer, ByteBuffer> entry : attributes.entrySet()) {
1975      String name = Bytes.toStringBinary(getBytes(entry.getKey()));
1976      byte[] value =  getBytes(entry.getValue());
1977      op.setAttribute(name, value);
1978    }
1979  }
1980
1981  public static void registerFilters(Configuration conf) {
1982    String[] filters = conf.getStrings("hbase.thrift.filters");
1983    Splitter splitter = Splitter.on(':');
1984    if(filters != null) {
1985      for(String filterClass: filters) {
1986        List<String> filterPart = splitter.splitToList(filterClass);
1987        if(filterPart.size() != 2) {
1988          LOG.warn("Invalid filter specification " + filterClass + " - skipping");
1989        } else {
1990          ParseFilter.registerFilter(filterPart.get(0), filterPart.get(1));
1991        }
1992      }
1993    }
1994  }
1995
1996  public static class IOErrorWithCause extends IOError {
1997    private Throwable cause;
1998    public IOErrorWithCause(Throwable cause) {
1999      this.cause = cause;
2000    }
2001
2002    @Override
2003    public synchronized Throwable getCause() {
2004      return cause;
2005    }
2006
2007    @Override
2008    public boolean equals(Object other) {
2009      if (super.equals(other) &&
2010          other instanceof IOErrorWithCause) {
2011        Throwable otherCause = ((IOErrorWithCause) other).getCause();
2012        if (this.getCause() != null) {
2013          return otherCause != null && this.getCause().equals(otherCause);
2014        } else {
2015          return otherCause == null;
2016        }
2017      }
2018      return false;
2019    }
2020
2021    @Override
2022    public int hashCode() {
2023      int result = super.hashCode();
2024      result = 31 * result + (cause != null ? cause.hashCode() : 0);
2025      return result;
2026    }
2027  }
2028}