001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.IOException; 021import java.util.Map; 022import java.util.concurrent.ConcurrentHashMap; 023import java.util.concurrent.locks.Lock; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.ChoreService; 026import org.apache.hadoop.hbase.ScheduledChore; 027import org.apache.hadoop.hbase.Stoppable; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.client.Admin; 030import org.apache.hadoop.hbase.client.Connection; 031import org.apache.hadoop.hbase.client.ConnectionFactory; 032import org.apache.hadoop.hbase.client.RegionLocator; 033import org.apache.hadoop.hbase.client.Table; 034import org.apache.hadoop.hbase.security.User; 035import org.apache.hadoop.hbase.security.UserProvider; 036import org.apache.hadoop.security.UserGroupInformation; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * A utility to store user specific HConnections in memory. There is a chore to clean up connections 043 * idle for too long. This class is used by REST server and Thrift server to support authentication 044 * and impersonation. 045 */ 046@InterfaceAudience.Private 047public class ConnectionCache { 048 private static final Logger LOG = LoggerFactory.getLogger(ConnectionCache.class); 049 050 private final Map<String, ConnectionInfo> connections = new ConcurrentHashMap<>(); 051 private final KeyLocker<String> locker = new KeyLocker<>(); 052 private final String realUserName; 053 private final UserGroupInformation realUser; 054 private final UserProvider userProvider; 055 private final Configuration conf; 056 private final ChoreService choreService; 057 058 private final ThreadLocal<String> effectiveUserNames = new ThreadLocal<String>() { 059 @Override 060 protected String initialValue() { 061 return realUserName; 062 } 063 }; 064 065 public ConnectionCache(final Configuration conf, final UserProvider userProvider, 066 final int cleanInterval, final int maxIdleTime) throws IOException { 067 Stoppable stoppable = new Stoppable() { 068 private volatile boolean isStopped = false; 069 070 @Override 071 public void stop(String why) { 072 isStopped = true; 073 } 074 075 @Override 076 public boolean isStopped() { 077 return isStopped; 078 } 079 }; 080 this.choreService = new ChoreService("ConnectionCache"); 081 ScheduledChore cleaner = new ScheduledChore("ConnectionCleaner", stoppable, cleanInterval) { 082 @Override 083 protected void chore() { 084 for (Map.Entry<String, ConnectionInfo> entry : connections.entrySet()) { 085 ConnectionInfo connInfo = entry.getValue(); 086 if (connInfo.timedOut(maxIdleTime)) { 087 if (connInfo.admin != null) { 088 try { 089 connInfo.admin.close(); 090 } catch (Throwable t) { 091 LOG.info("Got exception in closing idle admin", t); 092 } 093 } 094 try { 095 connInfo.connection.close(); 096 } catch (Throwable t) { 097 LOG.info("Got exception in closing idle connection", t); 098 } 099 } 100 } 101 } 102 }; 103 // Start the daemon cleaner chore 104 choreService.scheduleChore(cleaner); 105 this.realUser = userProvider.getCurrent().getUGI(); 106 this.realUserName = realUser.getShortUserName(); 107 this.userProvider = userProvider; 108 this.conf = conf; 109 } 110 111 /** 112 * Set the current thread local effective user 113 */ 114 public void setEffectiveUser(String user) { 115 effectiveUserNames.set(user); 116 } 117 118 /** 119 * Get the current thread local effective user 120 */ 121 public String getEffectiveUser() { 122 return effectiveUserNames.get(); 123 } 124 125 /** 126 * Called when cache is no longer needed so that it can perform cleanup operations 127 */ 128 public void shutdown() { 129 if (choreService != null) choreService.shutdown(); 130 } 131 132 /** 133 * Caller doesn't close the admin afterwards. We need to manage it and close it properly. 134 */ 135 public Admin getAdmin() throws IOException { 136 ConnectionInfo connInfo = getCurrentConnection(); 137 if (connInfo.admin == null) { 138 Lock lock = locker.acquireLock(getEffectiveUser()); 139 try { 140 if (connInfo.admin == null) { 141 connInfo.admin = connInfo.connection.getAdmin(); 142 } 143 } finally { 144 lock.unlock(); 145 } 146 } 147 return connInfo.admin; 148 } 149 150 /** 151 * Caller closes the table afterwards. 152 */ 153 public Table getTable(String tableName) throws IOException { 154 ConnectionInfo connInfo = getCurrentConnection(); 155 return connInfo.connection.getTable(TableName.valueOf(tableName)); 156 } 157 158 /** 159 * Retrieve a regionLocator for the table. The user should close the RegionLocator. 160 */ 161 public RegionLocator getRegionLocator(byte[] tableName) throws IOException { 162 return getCurrentConnection().connection.getRegionLocator(TableName.valueOf(tableName)); 163 } 164 165 /** 166 * Get the cached connection for the current user. If none or timed out, create a new one. 167 */ 168 ConnectionInfo getCurrentConnection() throws IOException { 169 String userName = getEffectiveUser(); 170 ConnectionInfo connInfo = connections.get(userName); 171 if (connInfo == null || !connInfo.updateAccessTime()) { 172 Lock lock = locker.acquireLock(userName); 173 try { 174 connInfo = connections.get(userName); 175 if (connInfo == null) { 176 UserGroupInformation ugi = realUser; 177 if (!userName.equals(realUserName)) { 178 ugi = UserGroupInformation.createProxyUser(userName, realUser); 179 } 180 User user = userProvider.create(ugi); 181 Connection conn = ConnectionFactory.createConnection(conf, user); 182 connInfo = new ConnectionInfo(conn, userName); 183 connections.put(userName, connInfo); 184 } 185 } finally { 186 lock.unlock(); 187 } 188 } 189 return connInfo; 190 } 191 192 /** 193 * Updates the access time for the current connection. Used to keep Connections alive for 194 * long-lived scanners. 195 * @return whether we successfully updated the last access time 196 */ 197 public boolean updateConnectionAccessTime() { 198 String userName = getEffectiveUser(); 199 ConnectionInfo connInfo = connections.get(userName); 200 if (connInfo != null) { 201 return connInfo.updateAccessTime(); 202 } 203 return false; 204 } 205 206 /** Returns Cluster ID for the HBase cluster or null if there is an err making the connection. */ 207 public String getClusterId() { 208 try { 209 ConnectionInfo connInfo = getCurrentConnection(); 210 return connInfo.connection.getClusterId(); 211 } catch (IOException e) { 212 LOG.error("Error getting connection: ", e); 213 } 214 return null; 215 } 216 217 class ConnectionInfo { 218 final Connection connection; 219 final String userName; 220 221 volatile Admin admin; 222 private long lastAccessTime; 223 private boolean closed; 224 225 ConnectionInfo(Connection conn, String user) { 226 lastAccessTime = EnvironmentEdgeManager.currentTime(); 227 connection = conn; 228 closed = false; 229 userName = user; 230 } 231 232 synchronized boolean updateAccessTime() { 233 if (closed) { 234 return false; 235 } 236 if (connection.isAborted() || connection.isClosed()) { 237 LOG.info("Unexpected: cached Connection is aborted/closed, removed from cache"); 238 connections.remove(userName); 239 return false; 240 } 241 lastAccessTime = EnvironmentEdgeManager.currentTime(); 242 return true; 243 } 244 245 synchronized boolean timedOut(int maxIdleTime) { 246 long timeoutTime = lastAccessTime + maxIdleTime; 247 if (EnvironmentEdgeManager.currentTime() > timeoutTime) { 248 connections.remove(userName); 249 closed = true; 250 return true; 251 } 252 return false; 253 } 254 } 255}