001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.thrift; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD; 021import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD; 022 023import java.io.IOException; 024import java.nio.ByteBuffer; 025import java.util.Objects; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.atomic.AtomicInteger; 028import java.util.concurrent.locks.Lock; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.HBaseInterfaceAudience; 031import org.apache.hadoop.hbase.client.Admin; 032import org.apache.hadoop.hbase.client.ResultScanner; 033import org.apache.hadoop.hbase.client.Table; 034import org.apache.hadoop.hbase.security.AccessDeniedException; 035import org.apache.hadoop.hbase.security.UserProvider; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.hbase.util.ConnectionCache; 038import org.apache.hadoop.hbase.util.KeyLocker; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 044import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 045import org.apache.hbase.thirdparty.com.google.common.cache.RemovalCause; 046 047/** 048 * abstract class for HBase handler providing a Connection cache and get table/admin method 049 */ 050@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 051public abstract class HBaseServiceHandler { 052 053 private static final Logger LOG = LoggerFactory.getLogger(HBaseServiceHandler.class); 054 055 public static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval"; 056 public static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime"; 057 058 protected Configuration conf; 059 060 protected final ConnectionCache connectionCache; 061 062 protected static final class ResultScannerWrapper { 063 public final ResultScanner scanner; 064 public final boolean sortColumns; 065 public final String owner; 066 067 public ResultScannerWrapper(ResultScanner scanner, boolean sortColumns, String owner) { 068 this.scanner = scanner; 069 this.sortColumns = sortColumns; 070 this.owner = owner; 071 } 072 } 073 074 private final AtomicInteger nextScannerId = new AtomicInteger(0); 075 private final Cache<Integer, ResultScannerWrapper> scannerMap; 076 private final KeyLocker<Integer> removeScannerLock = new KeyLocker<>(); 077 078 public HBaseServiceHandler(final Configuration c, final UserProvider userProvider) 079 throws IOException { 080 this.conf = c; 081 int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000); 082 int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000); 083 connectionCache = new ConnectionCache(conf, userProvider, cleanInterval, maxIdleTime); 084 long cacheTimeout = conf.getLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 085 DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); 086 scannerMap = CacheBuilder.newBuilder().expireAfterAccess(cacheTimeout, TimeUnit.MILLISECONDS) 087 .removalListener(notification -> { 088 // do not close the scanner if it is removed manually, we will either add it back or close 089 // it manually. 090 if (notification.getCause() != RemovalCause.EXPLICIT) { 091 ((ResultScannerWrapper) notification.getValue()).scanner.close(); 092 } 093 }).build(); 094 } 095 096 protected ThriftMetrics metrics = null; 097 098 public void initMetrics(ThriftMetrics metrics) { 099 this.metrics = metrics; 100 } 101 102 public void setEffectiveUser(String effectiveUser) { 103 connectionCache.setEffectiveUser(effectiveUser); 104 } 105 106 /** 107 * Assigns a unique ID to the scanner and adds the mapping to an internal HashMap. 108 * @param scanner to add 109 * @return Id for this Scanner 110 */ 111 protected int addScanner(ResultScanner scanner, boolean sortColumns) { 112 int id = nextScannerId.getAndIncrement(); 113 ResultScannerWrapper wrapper = 114 new ResultScannerWrapper(scanner, sortColumns, connectionCache.getEffectiveUser()); 115 scannerMap.put(id, wrapper); 116 return id; 117 } 118 119 /** 120 * Add the given scanner back to scanner map. 121 * <p> 122 * When scanning, we need to remove the scanner from scanner map to prevent expiration during 123 * scanning. 124 */ 125 protected void addScannerBack(int id, ResultScannerWrapper wrapper) { 126 scannerMap.put(id, wrapper); 127 } 128 129 /** 130 * Removes the scanner associated with the specified ID from the internal HashMap. 131 * @param id of the Scanner to remove 132 * @throws AccessDeniedException if the scanner is not belong to the current user 133 */ 134 protected ResultScannerWrapper removeScanner(int id) throws IOException { 135 Lock lock = removeScannerLock.acquireLock(id); 136 try { 137 ResultScannerWrapper wrapper = scannerMap.getIfPresent(id); 138 if (wrapper != null && !Objects.equals(connectionCache.getEffectiveUser(), wrapper.owner)) { 139 LOG.warn("User {} is trying to access scanner id = {} where owner = {}", 140 connectionCache.getEffectiveUser(), id, wrapper.owner); 141 throw new AccessDeniedException( 142 "User " + connectionCache.getEffectiveUser() + " is not allowed to access scanner " + id); 143 } 144 scannerMap.invalidate(id); 145 return wrapper; 146 } finally { 147 lock.unlock(); 148 } 149 } 150 151 /** 152 * Obtain HBaseAdmin. Creates the instance if it is not already created. 153 */ 154 protected Admin getAdmin() throws IOException { 155 return connectionCache.getAdmin(); 156 } 157 158 /** 159 * Creates and returns a Table instance from a given table name. name of table 160 * @return Table object 161 * @throws IOException if getting the table fails 162 */ 163 protected Table getTable(final byte[] tableName) throws IOException { 164 String table = Bytes.toString(tableName); 165 return connectionCache.getTable(table); 166 } 167 168 protected Table getTable(final ByteBuffer tableName) throws IOException { 169 return getTable(Bytes.getBytes(tableName)); 170 } 171}