View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.util;
20  
21  import java.io.IOException;
22  import java.util.Map;
23  import java.util.concurrent.ConcurrentHashMap;
24  import java.util.concurrent.locks.Lock;
25  
26  import org.apache.hadoop.classification.InterfaceAudience;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.Chore;
29  import org.apache.hadoop.hbase.Stoppable;
30  import org.apache.hadoop.hbase.client.HBaseAdmin;
31  import org.apache.hadoop.hbase.client.HConnection;
32  import org.apache.hadoop.hbase.client.HConnectionManager;
33  import org.apache.hadoop.hbase.client.HTableInterface;
34  import org.apache.hadoop.hbase.security.User;
35  import org.apache.hadoop.hbase.security.UserProvider;
36  import org.apache.hadoop.security.UserGroupInformation;
37  import org.apache.log4j.Logger;
38  
39  /**
40   * A utility to store user specific HConnections in memory.
41   * There is a chore to clean up connections idle for too long.
42   * This class is used by REST server and Thrift server to
43   * support authentication and impersonation.
44   */
45  @InterfaceAudience.Private
46  public class ConnectionCache {
47    private static Logger LOG = Logger.getLogger(ConnectionCache.class);
48  
49    private final Map<String, ConnectionInfo>
50     connections = new ConcurrentHashMap<String, ConnectionInfo>();
51    private final KeyLocker<String> locker = new KeyLocker<String>();
52    private final String realUserName;
53    private final UserGroupInformation realUser;
54    private final UserProvider userProvider;
55    private final Configuration conf;
56  
57    private final ThreadLocal<String> effectiveUserNames =
58        new ThreadLocal<String>() {
59      protected String initialValue() {
60        return realUserName;
61      }
62    };
63  
64    public ConnectionCache(final Configuration conf,
65        final UserProvider userProvider,
66        final int cleanInterval, final int maxIdleTime) throws IOException {
67      Stoppable stoppable = new Stoppable() {
68        private volatile boolean isStopped = false;
69        @Override public void stop(String why) { isStopped = true;}
70        @Override public boolean isStopped() {return isStopped;}
71      };
72  
73      Chore cleaner = new Chore("ConnectionCleaner", cleanInterval, stoppable) {
74        @Override
75        protected void chore() {
76          for (Map.Entry<String, ConnectionInfo> entry: connections.entrySet()) {
77            ConnectionInfo connInfo = entry.getValue();
78            if (connInfo.timedOut(maxIdleTime)) {
79              if (connInfo.admin != null) {
80                try {
81                  connInfo.admin.close();
82                } catch (Throwable t) {
83                  LOG.info("Got exception in closing idle admin", t);
84                }
85              }
86              try {
87                connInfo.connection.close();
88              } catch (Throwable t) {
89                LOG.info("Got exception in closing idle connection", t);
90              }
91            }
92          }
93        }
94      };
95      // Start the daemon cleaner chore
96      Threads.setDaemonThreadRunning(cleaner.getThread());
97      this.realUser = userProvider.getCurrent().getUGI();
98      this.realUserName = realUser.getShortUserName();
99      this.userProvider = userProvider;
100     this.conf = conf;
101   }
102 
103   /**
104    * Set the current thread local effective user
105    */
106   public void setEffectiveUser(String user) {
107     effectiveUserNames.set(user);
108   }
109 
110   /**
111    * Get the current thread local effective user
112    */
113   public String getEffectiveUser() {
114     return effectiveUserNames.get();
115   }
116 
117   /**
118    * Caller doesn't close the admin afterwards.
119    * We need to manage it and close it properly.
120    */
121   @SuppressWarnings("deprecation")
122   public HBaseAdmin getAdmin() throws IOException {
123     ConnectionInfo connInfo = getCurrentConnection();
124     if (connInfo.admin == null) {
125       Lock lock = locker.acquireLock(getEffectiveUser());
126       try {
127         if (connInfo.admin == null) {
128           connInfo.admin = new HBaseAdmin(connInfo.connection);
129         }
130       } finally {
131         lock.unlock();
132       }
133     }
134     return connInfo.admin;
135   }
136 
137   /**
138    * Caller closes the table afterwards.
139    */
140   public HTableInterface getTable(String tableName) throws IOException {
141     ConnectionInfo connInfo = getCurrentConnection();
142     return connInfo.connection.getTable(tableName);
143   }
144 
145   /**
146    * Get the cached connection for the current user.
147    * If none or timed out, create a new one.
148    */
149   ConnectionInfo getCurrentConnection() throws IOException {
150     String userName = getEffectiveUser();
151     ConnectionInfo connInfo = connections.get(userName);
152     if (connInfo == null || !connInfo.updateAccessTime()) {
153       Lock lock = locker.acquireLock(userName);
154       try {
155         connInfo = connections.get(userName);
156         if (connInfo == null) {
157           UserGroupInformation ugi = realUser;
158           if (!userName.equals(realUserName)) {
159             ugi = UserGroupInformation.createProxyUser(userName, realUser);
160           }
161           User user = userProvider.create(ugi);
162           HConnection conn = HConnectionManager.createConnection(conf, user);
163           connInfo = new ConnectionInfo(conn, userName);
164           connections.put(userName, connInfo);
165         }
166       } finally {
167         lock.unlock();
168       }
169     }
170     return connInfo;
171   }
172 
173   class ConnectionInfo {
174     final HConnection connection;
175     final String userName;
176 
177     volatile HBaseAdmin admin;
178     private long lastAccessTime;
179     private boolean closed;
180 
181     ConnectionInfo(HConnection conn, String user) {
182       lastAccessTime = EnvironmentEdgeManager.currentTime();
183       connection = conn;
184       closed = false;
185       userName = user;
186     }
187 
188     synchronized boolean updateAccessTime() {
189       if (closed) {
190         return false;
191       }
192       if (connection.isAborted() || connection.isClosed()) {
193         LOG.info("Unexpected: cached HConnection is aborted/closed, removed from cache");
194         connections.remove(userName);
195         return false;
196       }
197       lastAccessTime = EnvironmentEdgeManager.currentTime();
198       return true;
199     }
200 
201     synchronized boolean timedOut(int maxIdleTime) {
202       long timeoutTime = lastAccessTime + maxIdleTime;
203       if (EnvironmentEdgeManager.currentTime() > timeoutTime) {
204         connections.remove(userName);
205         closed = true;
206       }
207       return false;
208     }
209   }
210 }