1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.util;
20
21 import java.io.IOException;
22 import java.util.Map;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.locks.Lock;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.ChoreService;
29 import org.apache.hadoop.hbase.ScheduledChore;
30 import org.apache.hadoop.hbase.Stoppable;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.hbase.TableName;
33 import org.apache.hadoop.hbase.client.Admin;
34 import org.apache.hadoop.hbase.client.Connection;
35 import org.apache.hadoop.hbase.client.ConnectionFactory;
36 import org.apache.hadoop.hbase.client.RegionLocator;
37 import org.apache.hadoop.hbase.client.Table;
38 import org.apache.hadoop.hbase.security.User;
39 import org.apache.hadoop.hbase.security.UserProvider;
40 import org.apache.hadoop.security.UserGroupInformation;
41 import org.apache.commons.logging.LogFactory;
42
43
44
45
46
47
48
49 @InterfaceAudience.Private
50 public class ConnectionCache {
51 private static final Log LOG = LogFactory.getLog(ConnectionCache.class);
52
53 private final Map<String, ConnectionInfo>
54 connections = new ConcurrentHashMap<String, ConnectionInfo>();
55 private final KeyLocker<String> locker = new KeyLocker<String>();
56 private final String realUserName;
57 private final UserGroupInformation realUser;
58 private final UserProvider userProvider;
59 private final Configuration conf;
60 private final ChoreService choreService;
61
62 private final ThreadLocal<String> effectiveUserNames =
63 new ThreadLocal<String>() {
64 @Override
65 protected String initialValue() {
66 return realUserName;
67 }
68 };
69
70 public ConnectionCache(final Configuration conf,
71 final UserProvider userProvider,
72 final int cleanInterval, final int maxIdleTime) throws IOException {
73 Stoppable stoppable = new Stoppable() {
74 private volatile boolean isStopped = false;
75 @Override public void stop(String why) { isStopped = true;}
76 @Override public boolean isStopped() {return isStopped;}
77 };
78 this.choreService = new ChoreService("ConnectionCache");
79 ScheduledChore cleaner = new ScheduledChore("ConnectionCleaner", stoppable, cleanInterval) {
80 @Override
81 protected void chore() {
82 for (Map.Entry<String, ConnectionInfo> entry: connections.entrySet()) {
83 ConnectionInfo connInfo = entry.getValue();
84 if (connInfo.timedOut(maxIdleTime)) {
85 if (connInfo.admin != null) {
86 try {
87 connInfo.admin.close();
88 } catch (Throwable t) {
89 LOG.info("Got exception in closing idle admin", t);
90 }
91 }
92 try {
93 connInfo.connection.close();
94 } catch (Throwable t) {
95 LOG.info("Got exception in closing idle connection", t);
96 }
97 }
98 }
99 }
100 };
101
102 choreService.scheduleChore(cleaner);
103 this.realUser = userProvider.getCurrent().getUGI();
104 this.realUserName = realUser.getShortUserName();
105 this.userProvider = userProvider;
106 this.conf = conf;
107 }
108
109
110
111
112 public void setEffectiveUser(String user) {
113 effectiveUserNames.set(user);
114 }
115
116
117
118
119 public String getEffectiveUser() {
120 return effectiveUserNames.get();
121 }
122
123
124
125
126 public void shutdown() {
127 if (choreService != null) choreService.shutdown();
128 }
129
130
131
132
133
134 public Admin getAdmin() throws IOException {
135 ConnectionInfo connInfo = getCurrentConnection();
136 if (connInfo.admin == null) {
137 Lock lock = locker.acquireLock(getEffectiveUser());
138 try {
139 if (connInfo.admin == null) {
140 connInfo.admin = connInfo.connection.getAdmin();
141 }
142 } finally {
143 lock.unlock();
144 }
145 }
146 return connInfo.admin;
147 }
148
149
150
151
152 public Table getTable(String tableName) throws IOException {
153 ConnectionInfo connInfo = getCurrentConnection();
154 return connInfo.connection.getTable(TableName.valueOf(tableName));
155 }
156
157
158
159
160 public RegionLocator getRegionLocator(byte[] tableName) throws IOException {
161 return getCurrentConnection().connection.getRegionLocator(TableName.valueOf(tableName));
162 }
163
164
165
166
167
168 ConnectionInfo getCurrentConnection() throws IOException {
169 String userName = getEffectiveUser();
170 ConnectionInfo connInfo = connections.get(userName);
171 if (connInfo == null || !connInfo.updateAccessTime()) {
172 Lock lock = locker.acquireLock(userName);
173 try {
174 connInfo = connections.get(userName);
175 if (connInfo == null) {
176 UserGroupInformation ugi = realUser;
177 if (!userName.equals(realUserName)) {
178 ugi = UserGroupInformation.createProxyUser(userName, realUser);
179 }
180 User user = userProvider.create(ugi);
181 Connection conn = ConnectionFactory.createConnection(conf, user);
182 connInfo = new ConnectionInfo(conn, userName);
183 connections.put(userName, connInfo);
184 }
185 } finally {
186 lock.unlock();
187 }
188 }
189 return connInfo;
190 }
191
192 class ConnectionInfo {
193 final Connection connection;
194 final String userName;
195
196 volatile Admin admin;
197 private long lastAccessTime;
198 private boolean closed;
199
200 ConnectionInfo(Connection conn, String user) {
201 lastAccessTime = EnvironmentEdgeManager.currentTime();
202 connection = conn;
203 closed = false;
204 userName = user;
205 }
206
207 synchronized boolean updateAccessTime() {
208 if (closed) {
209 return false;
210 }
211 if (connection.isAborted() || connection.isClosed()) {
212 LOG.info("Unexpected: cached HConnection is aborted/closed, removed from cache");
213 connections.remove(userName);
214 return false;
215 }
216 lastAccessTime = EnvironmentEdgeManager.currentTime();
217 return true;
218 }
219
220 synchronized boolean timedOut(int maxIdleTime) {
221 long timeoutTime = lastAccessTime + maxIdleTime;
222 if (EnvironmentEdgeManager.currentTime() > timeoutTime) {
223 connections.remove(userName);
224 closed = true;
225 return true;
226 }
227 return false;
228 }
229 }
230 }