001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.util; 020 021import java.io.IOException; 022import java.util.Map; 023import java.util.concurrent.ConcurrentHashMap; 024import java.util.concurrent.locks.Lock; 025 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.ChoreService; 028import org.apache.hadoop.hbase.ScheduledChore; 029import org.apache.hadoop.hbase.Stoppable; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034import org.apache.hadoop.hbase.client.Admin; 035import org.apache.hadoop.hbase.client.Connection; 036import org.apache.hadoop.hbase.client.ConnectionFactory; 037import org.apache.hadoop.hbase.client.RegionLocator; 038import org.apache.hadoop.hbase.client.Table; 039import org.apache.hadoop.hbase.security.User; 040import org.apache.hadoop.hbase.security.UserProvider; 041import org.apache.hadoop.security.UserGroupInformation; 042 043/** 044 * A utility to store user specific HConnections in memory. 045 * There is a chore to clean up connections idle for too long. 046 * This class is used by REST server and Thrift server to 047 * support authentication and impersonation. 048 */ 049@InterfaceAudience.Private 050public class ConnectionCache { 051 private static final Logger LOG = LoggerFactory.getLogger(ConnectionCache.class); 052 053 private final Map<String, ConnectionInfo> connections = new ConcurrentHashMap<>(); 054 private final KeyLocker<String> locker = new KeyLocker<>(); 055 private final String realUserName; 056 private final UserGroupInformation realUser; 057 private final UserProvider userProvider; 058 private final Configuration conf; 059 private final ChoreService choreService; 060 061 private final ThreadLocal<String> effectiveUserNames = 062 new ThreadLocal<String>() { 063 @Override 064 protected String initialValue() { 065 return realUserName; 066 } 067 }; 068 069 public ConnectionCache(final Configuration conf, 070 final UserProvider userProvider, 071 final int cleanInterval, final int maxIdleTime) throws IOException { 072 Stoppable stoppable = new Stoppable() { 073 private volatile boolean isStopped = false; 074 @Override public void stop(String why) { isStopped = true;} 075 @Override public boolean isStopped() {return isStopped;} 076 }; 077 this.choreService = new ChoreService("ConnectionCache"); 078 ScheduledChore cleaner = new ScheduledChore("ConnectionCleaner", stoppable, cleanInterval) { 079 @Override 080 protected void chore() { 081 for (Map.Entry<String, ConnectionInfo> entry: connections.entrySet()) { 082 ConnectionInfo connInfo = entry.getValue(); 083 if (connInfo.timedOut(maxIdleTime)) { 084 if (connInfo.admin != null) { 085 try { 086 connInfo.admin.close(); 087 } catch (Throwable t) { 088 LOG.info("Got exception in closing idle admin", t); 089 } 090 } 091 try { 092 connInfo.connection.close(); 093 } catch (Throwable t) { 094 LOG.info("Got exception in closing idle connection", t); 095 } 096 } 097 } 098 } 099 }; 100 // Start the daemon cleaner chore 101 choreService.scheduleChore(cleaner); 102 this.realUser = userProvider.getCurrent().getUGI(); 103 this.realUserName = realUser.getShortUserName(); 104 this.userProvider = userProvider; 105 this.conf = conf; 106 } 107 108 /** 109 * Set the current thread local effective user 110 */ 111 public void setEffectiveUser(String user) { 112 effectiveUserNames.set(user); 113 } 114 115 /** 116 * Get the current thread local effective user 117 */ 118 public String getEffectiveUser() { 119 return effectiveUserNames.get(); 120 } 121 122 /** 123 * Called when cache is no longer needed so that it can perform cleanup operations 124 */ 125 public void shutdown() { 126 if (choreService != null) choreService.shutdown(); 127 } 128 129 /** 130 * Caller doesn't close the admin afterwards. 131 * We need to manage it and close it properly. 132 */ 133 public Admin getAdmin() throws IOException { 134 ConnectionInfo connInfo = getCurrentConnection(); 135 if (connInfo.admin == null) { 136 Lock lock = locker.acquireLock(getEffectiveUser()); 137 try { 138 if (connInfo.admin == null) { 139 connInfo.admin = connInfo.connection.getAdmin(); 140 } 141 } finally { 142 lock.unlock(); 143 } 144 } 145 return connInfo.admin; 146 } 147 148 /** 149 * Caller closes the table afterwards. 150 */ 151 public Table getTable(String tableName) throws IOException { 152 ConnectionInfo connInfo = getCurrentConnection(); 153 return connInfo.connection.getTable(TableName.valueOf(tableName)); 154 } 155 156 /** 157 * Retrieve a regionLocator for the table. The user should close the RegionLocator. 158 */ 159 public RegionLocator getRegionLocator(byte[] tableName) throws IOException { 160 return getCurrentConnection().connection.getRegionLocator(TableName.valueOf(tableName)); 161 } 162 163 /** 164 * Get the cached connection for the current user. 165 * If none or timed out, create a new one. 166 */ 167 ConnectionInfo getCurrentConnection() throws IOException { 168 String userName = getEffectiveUser(); 169 ConnectionInfo connInfo = connections.get(userName); 170 if (connInfo == null || !connInfo.updateAccessTime()) { 171 Lock lock = locker.acquireLock(userName); 172 try { 173 connInfo = connections.get(userName); 174 if (connInfo == null) { 175 UserGroupInformation ugi = realUser; 176 if (!userName.equals(realUserName)) { 177 ugi = UserGroupInformation.createProxyUser(userName, realUser); 178 } 179 User user = userProvider.create(ugi); 180 Connection conn = ConnectionFactory.createConnection(conf, user); 181 connInfo = new ConnectionInfo(conn, userName); 182 connections.put(userName, connInfo); 183 } 184 } finally { 185 lock.unlock(); 186 } 187 } 188 return connInfo; 189 } 190 191 /** 192 * Updates the access time for the current connection. Used to keep Connections alive for 193 * long-lived scanners. 194 * @return whether we successfully updated the last access time 195 */ 196 public boolean updateConnectionAccessTime() { 197 String userName = getEffectiveUser(); 198 ConnectionInfo connInfo = connections.get(userName); 199 if (connInfo != null) { 200 return connInfo.updateAccessTime(); 201 } 202 return false; 203 } 204 205 class ConnectionInfo { 206 final Connection connection; 207 final String userName; 208 209 volatile Admin admin; 210 private long lastAccessTime; 211 private boolean closed; 212 213 ConnectionInfo(Connection conn, String user) { 214 lastAccessTime = EnvironmentEdgeManager.currentTime(); 215 connection = conn; 216 closed = false; 217 userName = user; 218 } 219 220 synchronized boolean updateAccessTime() { 221 if (closed) { 222 return false; 223 } 224 if (connection.isAborted() || connection.isClosed()) { 225 LOG.info("Unexpected: cached Connection is aborted/closed, removed from cache"); 226 connections.remove(userName); 227 return false; 228 } 229 lastAccessTime = EnvironmentEdgeManager.currentTime(); 230 return true; 231 } 232 233 synchronized boolean timedOut(int maxIdleTime) { 234 long timeoutTime = lastAccessTime + maxIdleTime; 235 if (EnvironmentEdgeManager.currentTime() > timeoutTime) { 236 connections.remove(userName); 237 closed = true; 238 return true; 239 } 240 return false; 241 } 242 } 243}