001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.util.Map;
022import java.util.concurrent.ConcurrentHashMap;
023import java.util.concurrent.locks.Lock;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.ChoreService;
026import org.apache.hadoop.hbase.ScheduledChore;
027import org.apache.hadoop.hbase.Stoppable;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.client.Admin;
030import org.apache.hadoop.hbase.client.Connection;
031import org.apache.hadoop.hbase.client.ConnectionFactory;
032import org.apache.hadoop.hbase.client.RegionLocator;
033import org.apache.hadoop.hbase.client.Table;
034import org.apache.hadoop.hbase.security.User;
035import org.apache.hadoop.hbase.security.UserProvider;
036import org.apache.hadoop.security.UserGroupInformation;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 * A utility to store user specific HConnections in memory. There is a chore to clean up connections
043 * idle for too long. This class is used by REST server and Thrift server to support authentication
044 * and impersonation.
045 */
046@InterfaceAudience.Private
047public class ConnectionCache {
048  private static final Logger LOG = LoggerFactory.getLogger(ConnectionCache.class);
049
050  private final Map<String, ConnectionInfo> connections = new ConcurrentHashMap<>();
051  private final KeyLocker<String> locker = new KeyLocker<>();
052  private final String realUserName;
053  private final UserGroupInformation realUser;
054  private final UserProvider userProvider;
055  private final Configuration conf;
056  private final ChoreService choreService;
057
058  private final ThreadLocal<String> effectiveUserNames = new ThreadLocal<String>() {
059    @Override
060    protected String initialValue() {
061      return realUserName;
062    }
063  };
064
065  public ConnectionCache(final Configuration conf, final UserProvider userProvider,
066    final int cleanInterval, final int maxIdleTime) throws IOException {
067    Stoppable stoppable = new Stoppable() {
068      private volatile boolean isStopped = false;
069
070      @Override
071      public void stop(String why) {
072        isStopped = true;
073      }
074
075      @Override
076      public boolean isStopped() {
077        return isStopped;
078      }
079    };
080    this.choreService = new ChoreService("ConnectionCache");
081    ScheduledChore cleaner = new ScheduledChore("ConnectionCleaner", stoppable, cleanInterval) {
082      @Override
083      protected void chore() {
084        for (Map.Entry<String, ConnectionInfo> entry : connections.entrySet()) {
085          ConnectionInfo connInfo = entry.getValue();
086          if (connInfo.timedOut(maxIdleTime)) {
087            if (connInfo.admin != null) {
088              try {
089                connInfo.admin.close();
090              } catch (Throwable t) {
091                LOG.info("Got exception in closing idle admin", t);
092              }
093            }
094            try {
095              connInfo.connection.close();
096            } catch (Throwable t) {
097              LOG.info("Got exception in closing idle connection", t);
098            }
099          }
100        }
101      }
102    };
103    // Start the daemon cleaner chore
104    choreService.scheduleChore(cleaner);
105    this.realUser = userProvider.getCurrent().getUGI();
106    this.realUserName = realUser.getShortUserName();
107    this.userProvider = userProvider;
108    this.conf = conf;
109  }
110
111  /**
112   * Set the current thread local effective user
113   */
114  public void setEffectiveUser(String user) {
115    effectiveUserNames.set(user);
116  }
117
118  /**
119   * Get the current thread local effective user
120   */
121  public String getEffectiveUser() {
122    return effectiveUserNames.get();
123  }
124
125  /**
126   * Called when cache is no longer needed so that it can perform cleanup operations
127   */
128  public void shutdown() {
129    if (choreService != null) choreService.shutdown();
130  }
131
132  /**
133   * Caller doesn't close the admin afterwards. We need to manage it and close it properly.
134   */
135  public Admin getAdmin() throws IOException {
136    ConnectionInfo connInfo = getCurrentConnection();
137    if (connInfo.admin == null) {
138      Lock lock = locker.acquireLock(getEffectiveUser());
139      try {
140        if (connInfo.admin == null) {
141          connInfo.admin = connInfo.connection.getAdmin();
142        }
143      } finally {
144        lock.unlock();
145      }
146    }
147    return connInfo.admin;
148  }
149
150  /**
151   * Caller closes the table afterwards.
152   */
153  public Table getTable(String tableName) throws IOException {
154    ConnectionInfo connInfo = getCurrentConnection();
155    return connInfo.connection.getTable(TableName.valueOf(tableName));
156  }
157
158  /**
159   * Retrieve a regionLocator for the table. The user should close the RegionLocator.
160   */
161  public RegionLocator getRegionLocator(byte[] tableName) throws IOException {
162    return getCurrentConnection().connection.getRegionLocator(TableName.valueOf(tableName));
163  }
164
165  /**
166   * Get the cached connection for the current user. If none or timed out, create a new one.
167   */
168  ConnectionInfo getCurrentConnection() throws IOException {
169    String userName = getEffectiveUser();
170    ConnectionInfo connInfo = connections.get(userName);
171    if (connInfo == null || !connInfo.updateAccessTime()) {
172      Lock lock = locker.acquireLock(userName);
173      try {
174        connInfo = connections.get(userName);
175        if (connInfo == null) {
176          UserGroupInformation ugi = realUser;
177          if (!userName.equals(realUserName)) {
178            ugi = UserGroupInformation.createProxyUser(userName, realUser);
179          }
180          User user = userProvider.create(ugi);
181          Connection conn = ConnectionFactory.createConnection(conf, user);
182          connInfo = new ConnectionInfo(conn, userName);
183          connections.put(userName, connInfo);
184        }
185      } finally {
186        lock.unlock();
187      }
188    }
189    return connInfo;
190  }
191
192  /**
193   * Updates the access time for the current connection. Used to keep Connections alive for
194   * long-lived scanners.
195   * @return whether we successfully updated the last access time
196   */
197  public boolean updateConnectionAccessTime() {
198    String userName = getEffectiveUser();
199    ConnectionInfo connInfo = connections.get(userName);
200    if (connInfo != null) {
201      return connInfo.updateAccessTime();
202    }
203    return false;
204  }
205
206  /** Returns Cluster ID for the HBase cluster or null if there is an err making the connection. */
207  public String getClusterId() {
208    try {
209      ConnectionInfo connInfo = getCurrentConnection();
210      return connInfo.connection.getClusterId();
211    } catch (IOException e) {
212      LOG.error("Error getting connection: ", e);
213    }
214    return null;
215  }
216
217  class ConnectionInfo {
218    final Connection connection;
219    final String userName;
220
221    volatile Admin admin;
222    private long lastAccessTime;
223    private boolean closed;
224
225    ConnectionInfo(Connection conn, String user) {
226      lastAccessTime = EnvironmentEdgeManager.currentTime();
227      connection = conn;
228      closed = false;
229      userName = user;
230    }
231
232    synchronized boolean updateAccessTime() {
233      if (closed) {
234        return false;
235      }
236      if (connection.isAborted() || connection.isClosed()) {
237        LOG.info("Unexpected: cached Connection is aborted/closed, removed from cache");
238        connections.remove(userName);
239        return false;
240      }
241      lastAccessTime = EnvironmentEdgeManager.currentTime();
242      return true;
243    }
244
245    synchronized boolean timedOut(int maxIdleTime) {
246      long timeoutTime = lastAccessTime + maxIdleTime;
247      if (EnvironmentEdgeManager.currentTime() > timeoutTime) {
248        connections.remove(userName);
249        closed = true;
250        return true;
251      }
252      return false;
253    }
254  }
255}