001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.thrift;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
021import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
022
023import java.io.IOException;
024import java.nio.ByteBuffer;
025import java.util.Objects;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.atomic.AtomicInteger;
028import java.util.concurrent.locks.Lock;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HBaseInterfaceAudience;
031import org.apache.hadoop.hbase.client.Admin;
032import org.apache.hadoop.hbase.client.ResultScanner;
033import org.apache.hadoop.hbase.client.Table;
034import org.apache.hadoop.hbase.security.AccessDeniedException;
035import org.apache.hadoop.hbase.security.UserProvider;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.ConnectionCache;
038import org.apache.hadoop.hbase.util.KeyLocker;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
044import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
045import org.apache.hbase.thirdparty.com.google.common.cache.RemovalCause;
046
047/**
048 * abstract class for HBase handler providing a Connection cache and get table/admin method
049 */
050@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
051public abstract class HBaseServiceHandler {
052
053  private static final Logger LOG = LoggerFactory.getLogger(HBaseServiceHandler.class);
054
055  public static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
056  public static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
057
058  protected Configuration conf;
059
060  protected final ConnectionCache connectionCache;
061
062  protected static final class ResultScannerWrapper {
063    public final ResultScanner scanner;
064    public final boolean sortColumns;
065    public final String owner;
066
067    public ResultScannerWrapper(ResultScanner scanner, boolean sortColumns, String owner) {
068      this.scanner = scanner;
069      this.sortColumns = sortColumns;
070      this.owner = owner;
071    }
072  }
073
074  private final AtomicInteger nextScannerId = new AtomicInteger(0);
075  private final Cache<Integer, ResultScannerWrapper> scannerMap;
076  private final KeyLocker<Integer> removeScannerLock = new KeyLocker<>();
077
078  public HBaseServiceHandler(final Configuration c, final UserProvider userProvider)
079    throws IOException {
080    this.conf = c;
081    int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
082    int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
083    connectionCache = new ConnectionCache(conf, userProvider, cleanInterval, maxIdleTime);
084    long cacheTimeout = conf.getLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
085      DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
086    scannerMap = CacheBuilder.newBuilder().expireAfterAccess(cacheTimeout, TimeUnit.MILLISECONDS)
087      .removalListener(notification -> {
088        // do not close the scanner if it is removed manually, we will either add it back or close
089        // it manually.
090        if (notification.getCause() != RemovalCause.EXPLICIT) {
091          ((ResultScannerWrapper) notification.getValue()).scanner.close();
092        }
093      }).build();
094  }
095
096  protected ThriftMetrics metrics = null;
097
098  public void initMetrics(ThriftMetrics metrics) {
099    this.metrics = metrics;
100  }
101
102  public void setEffectiveUser(String effectiveUser) {
103    connectionCache.setEffectiveUser(effectiveUser);
104  }
105
106  /**
107   * Assigns a unique ID to the scanner and adds the mapping to an internal HashMap.
108   * @param scanner to add
109   * @return Id for this Scanner
110   */
111  protected int addScanner(ResultScanner scanner, boolean sortColumns) {
112    int id = nextScannerId.getAndIncrement();
113    ResultScannerWrapper wrapper =
114      new ResultScannerWrapper(scanner, sortColumns, connectionCache.getEffectiveUser());
115    scannerMap.put(id, wrapper);
116    return id;
117  }
118
119  /**
120   * Add the given scanner back to scanner map.
121   * <p>
122   * When scanning, we need to remove the scanner from scanner map to prevent expiration during
123   * scanning.
124   */
125  protected void addScannerBack(int id, ResultScannerWrapper wrapper) {
126    scannerMap.put(id, wrapper);
127  }
128
129  /**
130   * Removes the scanner associated with the specified ID from the internal HashMap.
131   * @param id of the Scanner to remove
132   * @throws AccessDeniedException if the scanner is not belong to the current user
133   */
134  protected ResultScannerWrapper removeScanner(int id) throws IOException {
135    Lock lock = removeScannerLock.acquireLock(id);
136    try {
137      ResultScannerWrapper wrapper = scannerMap.getIfPresent(id);
138      if (wrapper != null && !Objects.equals(connectionCache.getEffectiveUser(), wrapper.owner)) {
139        LOG.warn("User {} is trying to access scanner id = {} where owner = {}",
140          connectionCache.getEffectiveUser(), id, wrapper.owner);
141        throw new AccessDeniedException(
142          "User " + connectionCache.getEffectiveUser() + " is not allowed to access scanner " + id);
143      }
144      scannerMap.invalidate(id);
145      return wrapper;
146    } finally {
147      lock.unlock();
148    }
149  }
150
151  /**
152   * Obtain HBaseAdmin. Creates the instance if it is not already created.
153   */
154  protected Admin getAdmin() throws IOException {
155    return connectionCache.getAdmin();
156  }
157
158  /**
159   * Creates and returns a Table instance from a given table name. name of table
160   * @return Table object
161   * @throws IOException if getting the table fails
162   */
163  protected Table getTable(final byte[] tableName) throws IOException {
164    String table = Bytes.toString(tableName);
165    return connectionCache.getTable(table);
166  }
167
168  protected Table getTable(final ByteBuffer tableName) throws IOException {
169    return getTable(Bytes.getBytes(tableName));
170  }
171}