View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.util;
20  
21  import java.io.IOException;
22  import java.util.Map;
23  import java.util.concurrent.ConcurrentHashMap;
24  import java.util.concurrent.locks.Lock;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.ChoreService;
29  import org.apache.hadoop.hbase.ScheduledChore;
30  import org.apache.hadoop.hbase.Stoppable;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.classification.InterfaceAudience;
33  import org.apache.hadoop.hbase.client.Admin;
34  import org.apache.hadoop.hbase.client.Connection;
35  import org.apache.hadoop.hbase.client.ConnectionFactory;
36  import org.apache.hadoop.hbase.client.RegionLocator;
37  import org.apache.hadoop.hbase.client.Table;
38  import org.apache.hadoop.hbase.security.User;
39  import org.apache.hadoop.hbase.security.UserProvider;
40  import org.apache.hadoop.security.UserGroupInformation;
41  import org.apache.commons.logging.LogFactory;
42
43  /**
44   * A utility to store user specific HConnections in memory.
45   * There is a chore to clean up connections idle for too long.
46   * This class is used by REST server and Thrift server to
47   * support authentication and impersonation.
48   */
49  @InterfaceAudience.Private
50  public class ConnectionCache {
51    private static final Log LOG = LogFactory.getLog(ConnectionCache.class);
52
53    private final Map<String, ConnectionInfo>
54     connections = new ConcurrentHashMap<String, ConnectionInfo>();
55    private final KeyLocker<String> locker = new KeyLocker<String>();
56    private final String realUserName;
57    private final UserGroupInformation realUser;
58    private final UserProvider userProvider;
59    private final Configuration conf;
60    private final ChoreService choreService;
61
62    private final ThreadLocal<String> effectiveUserNames =
63        new ThreadLocal<String>() {
64      @Override
65      protected String initialValue() {
66        return realUserName;
67      }
68    };
69
70    public ConnectionCache(final Configuration conf,
71        final UserProvider userProvider,
72        final int cleanInterval, final int maxIdleTime) throws IOException {
73      Stoppable stoppable = new Stoppable() {
74        private volatile boolean isStopped = false;
75        @Override public void stop(String why) { isStopped = true;}
76        @Override public boolean isStopped() {return isStopped;}
77      };
78      this.choreService = new ChoreService("ConnectionCache");
79      ScheduledChore cleaner = new ScheduledChore("ConnectionCleaner", stoppable, cleanInterval) {
80        @Override
81        protected void chore() {
82          for (Map.Entry<String, ConnectionInfo> entry: connections.entrySet()) {
83            ConnectionInfo connInfo = entry.getValue();
84            if (connInfo.timedOut(maxIdleTime)) {
85              if (connInfo.admin != null) {
86                try {
87                  connInfo.admin.close();
88                } catch (Throwable t) {
89                  LOG.info("Got exception in closing idle admin", t);
90                }
91              }
92              try {
93                connInfo.connection.close();
94              } catch (Throwable t) {
95                LOG.info("Got exception in closing idle connection", t);
96              }
97            }
98          }
99        }
100     };
101     // Start the daemon cleaner chore
102     choreService.scheduleChore(cleaner);
103     this.realUser = userProvider.getCurrent().getUGI();
104     this.realUserName = realUser.getShortUserName();
105     this.userProvider = userProvider;
106     this.conf = conf;
107   }
108
109   /**
110    * Set the current thread local effective user
111    */
112   public void setEffectiveUser(String user) {
113     effectiveUserNames.set(user);
114   }
115
116   /**
117    * Get the current thread local effective user
118    */
119   public String getEffectiveUser() {
120     return effectiveUserNames.get();
121   }
122
123   /**
124    * Called when cache is no longer needed so that it can perform cleanup operations
125    */
126   public void shutdown() {
127     if (choreService != null) choreService.shutdown();
128   }
129
130   /**
131    * Caller doesn't close the admin afterwards.
132    * We need to manage it and close it properly.
133    */
134   public Admin getAdmin() throws IOException {
135     ConnectionInfo connInfo = getCurrentConnection();
136     if (connInfo.admin == null) {
137       Lock lock = locker.acquireLock(getEffectiveUser());
138       try {
139         if (connInfo.admin == null) {
140           connInfo.admin = connInfo.connection.getAdmin();
141         }
142       } finally {
143         lock.unlock();
144       }
145     }
146     return connInfo.admin;
147   }
148
149   /**
150    * Caller closes the table afterwards.
151    */
152   public Table getTable(String tableName) throws IOException {
153     ConnectionInfo connInfo = getCurrentConnection();
154     return connInfo.connection.getTable(TableName.valueOf(tableName));
155   }
156
157   /**
158    * Retrieve a regionLocator for the table. The user should close the RegionLocator.
159    */
160   public RegionLocator getRegionLocator(byte[] tableName) throws IOException {
161     return getCurrentConnection().connection.getRegionLocator(TableName.valueOf(tableName));
162   }
163
164   /**
165    * Get the cached connection for the current user.
166    * If none or timed out, create a new one.
167    */
168   ConnectionInfo getCurrentConnection() throws IOException {
169     String userName = getEffectiveUser();
170     ConnectionInfo connInfo = connections.get(userName);
171     if (connInfo == null || !connInfo.updateAccessTime()) {
172       Lock lock = locker.acquireLock(userName);
173       try {
174         connInfo = connections.get(userName);
175         if (connInfo == null) {
176           UserGroupInformation ugi = realUser;
177           if (!userName.equals(realUserName)) {
178             ugi = UserGroupInformation.createProxyUser(userName, realUser);
179           }
180           User user = userProvider.create(ugi);
181           Connection conn = ConnectionFactory.createConnection(conf, user);
182           connInfo = new ConnectionInfo(conn, userName);
183           connections.put(userName, connInfo);
184         }
185       } finally {
186         lock.unlock();
187       }
188     }
189     return connInfo;
190   }
191
192   /**
193    * Updates the access time for the current connection. Used to keep Connections alive for
194    * long-lived scanners.
195    * @return whether we successfully updated the last access time
196    */
197   public boolean updateConnectionAccessTime() {
198     String userName = getEffectiveUser();
199     ConnectionInfo connInfo = connections.get(userName);
200     if (connInfo != null) {
201       return connInfo.updateAccessTime();
202     }
203     return false;
204   }
205
206   class ConnectionInfo {
207     final Connection connection;
208     final String userName;
209
210     volatile Admin admin;
211     private long lastAccessTime;
212     private boolean closed;
213
214     ConnectionInfo(Connection conn, String user) {
215       lastAccessTime = EnvironmentEdgeManager.currentTime();
216       connection = conn;
217       closed = false;
218       userName = user;
219     }
220
221     synchronized boolean updateAccessTime() {
222       if (closed) {
223         return false;
224       }
225       if (connection.isAborted() || connection.isClosed()) {
226         LOG.info("Unexpected: cached Connection is aborted/closed, removed from cache");
227         connections.remove(userName);
228         return false;
229       }
230       lastAccessTime = EnvironmentEdgeManager.currentTime();
231       return true;
232     }
233
234     synchronized boolean timedOut(int maxIdleTime) {
235       long timeoutTime = lastAccessTime + maxIdleTime;
236       if (EnvironmentEdgeManager.currentTime() > timeoutTime) {
237         connections.remove(userName);
238         closed = true;
239         return true;
240       }
241       return false;
242     }
243   }
244 }