View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.rest;
20  
21  import java.io.IOException;
22  import java.util.Map;
23  import java.util.concurrent.ConcurrentHashMap;
24  import java.util.concurrent.locks.Lock;
25  
26  import org.apache.hadoop.classification.InterfaceAudience;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.Chore;
29  import org.apache.hadoop.hbase.Stoppable;
30  import org.apache.hadoop.hbase.client.HBaseAdmin;
31  import org.apache.hadoop.hbase.client.HConnection;
32  import org.apache.hadoop.hbase.client.HConnectionManager;
33  import org.apache.hadoop.hbase.client.HTableInterface;
34  import org.apache.hadoop.hbase.security.User;
35  import org.apache.hadoop.hbase.security.UserProvider;
36  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
37  import org.apache.hadoop.hbase.util.KeyLocker;
38  import org.apache.hadoop.hbase.util.Threads;
39  import org.apache.hadoop.security.UserGroupInformation;
40  import org.apache.log4j.Logger;
41  
42  /**
43   * Singleton class encapsulating global REST servlet state and functions.
44   */
45  @InterfaceAudience.Private
46  public class RESTServlet implements Constants {
47    private static Logger LOG = Logger.getLogger(RESTServlet.class);
48    private static RESTServlet INSTANCE;
49    private final Configuration conf;
50    private final MetricsREST metrics = new MetricsREST();
51    private final Map<String, ConnectionInfo>
52      connections = new ConcurrentHashMap<String, ConnectionInfo>();
53    private final KeyLocker<String> locker = new KeyLocker<String>();
54    private final UserGroupInformation realUser;
55  
56    static final String CLEANUP_INTERVAL = "hbase.rest.connection.cleanup-interval";
57    static final String MAX_IDLETIME = "hbase.rest.connection.max-idletime";
58  
59    private final ThreadLocal<UserGroupInformation> effectiveUser =
60        new ThreadLocal<UserGroupInformation>() {
61      protected UserGroupInformation initialValue() {
62        return realUser;
63      }
64    };
65  
66    UserGroupInformation getRealUser() {
67      return realUser;
68    }
69  
70    // A chore to clean up idle connections.
71    private final Chore connectionCleaner;
72    private final Stoppable stoppable;
73    private UserProvider userProvider;
74  
75    class ConnectionInfo {
76      final HConnection connection;
77      final String userName;
78  
79      volatile HBaseAdmin admin;
80      private long lastAccessTime;
81      private boolean closed;
82  
83      ConnectionInfo(HConnection conn, String user) {
84        lastAccessTime = EnvironmentEdgeManager.currentTimeMillis();
85        connection = conn;
86        closed = false;
87        userName = user;
88      }
89  
90      synchronized boolean updateAccessTime() {
91        if (closed) {
92          return false;
93        }
94        if (connection.isAborted() || connection.isClosed()) {
95          LOG.info("Unexpected: cached HConnection is aborted/closed, removed from cache");
96          connections.remove(userName);
97          return false;
98        }
99        lastAccessTime = EnvironmentEdgeManager.currentTimeMillis();
100       return true;
101     }
102 
103     synchronized boolean timedOut(int maxIdleTime) {
104       long timeoutTime = lastAccessTime + maxIdleTime;
105       if (EnvironmentEdgeManager.currentTimeMillis() > timeoutTime) {
106         connections.remove(userName);
107         closed = true;
108       }
109       return false;
110     }
111   }
112 
113   class ConnectionCleaner extends Chore {
114     private final int maxIdleTime;
115 
116     public ConnectionCleaner(int cleanInterval, int maxIdleTime) {
117       super("REST-ConnectionCleaner", cleanInterval, stoppable);
118       this.maxIdleTime = maxIdleTime;
119     }
120 
121     @Override
122     protected void chore() {
123       for (Map.Entry<String, ConnectionInfo> entry: connections.entrySet()) {
124         ConnectionInfo connInfo = entry.getValue();
125         if (connInfo.timedOut(maxIdleTime)) {
126           if (connInfo.admin != null) {
127             try {
128               connInfo.admin.close();
129             } catch (Throwable t) {
130               LOG.info("Got exception in closing idle admin", t);
131             }
132           }
133           try {
134             connInfo.connection.close();
135           } catch (Throwable t) {
136             LOG.info("Got exception in closing idle connection", t);
137           }
138         }
139       }
140     }
141   }
142 
143   /**
144    * @return the RESTServlet singleton instance
145    */
146   public synchronized static RESTServlet getInstance() {
147     assert(INSTANCE != null);
148     return INSTANCE;
149   }
150 
151   /**
152    * @param conf Existing configuration to use in rest servlet
153    * @param realUser the login user
154    * @return the RESTServlet singleton instance
155    */
156   public synchronized static RESTServlet getInstance(Configuration conf,
157       UserGroupInformation realUser) {
158     if (INSTANCE == null) {
159       INSTANCE = new RESTServlet(conf, realUser);
160     }
161     return INSTANCE;
162   }
163 
164   public synchronized static void stop() {
165     if (INSTANCE != null)  INSTANCE = null;
166   }
167 
168   /**
169    * Constructor with existing configuration
170    * @param conf existing configuration
171    * @param realUser the login user
172    */
173   RESTServlet(final Configuration conf,
174       final UserGroupInformation realUser) {
175     this.userProvider = UserProvider.instantiate(conf);
176     stoppable = new Stoppable() {
177       private volatile boolean isStopped = false;
178       @Override public void stop(String why) { isStopped = true;}
179       @Override public boolean isStopped() {return isStopped;}
180     };
181 
182     int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
183     int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
184     connectionCleaner = new ConnectionCleaner(cleanInterval, maxIdleTime);
185     Threads.setDaemonThreadRunning(connectionCleaner.getThread());
186 
187     this.realUser = realUser;
188     this.conf = conf;
189   }
190 
191   /**
192    * Caller doesn't close the admin afterwards.
193    * We need to manage it and close it properly.
194    */
195   HBaseAdmin getAdmin() throws IOException {
196     ConnectionInfo connInfo = getCurrentConnection();
197     if (connInfo.admin == null) {
198       Lock lock = locker.acquireLock(effectiveUser.get().getUserName());
199       try {
200         if (connInfo.admin == null) {
201           connInfo.admin = new HBaseAdmin(connInfo.connection);
202         }
203       } finally {
204         lock.unlock();
205       }
206     }
207     return connInfo.admin;
208   }
209 
210   /**
211    * Caller closes the table afterwards.
212    */
213   HTableInterface getTable(String tableName) throws IOException {
214     ConnectionInfo connInfo = getCurrentConnection();
215     return connInfo.connection.getTable(tableName);
216   }
217 
218   Configuration getConfiguration() {
219     return conf;
220   }
221 
222   MetricsREST getMetrics() {
223     return metrics;
224   }
225 
226   /**
227    * Helper method to determine if server should
228    * only respond to GET HTTP method requests.
229    * @return boolean for server read-only state
230    */
231   boolean isReadOnly() {
232     return getConfiguration().getBoolean("hbase.rest.readonly", false);
233   }
234 
235   void setEffectiveUser(UserGroupInformation effectiveUser) {
236     this.effectiveUser.set(effectiveUser);
237   }
238 
239   private ConnectionInfo getCurrentConnection() throws IOException {
240     String userName = effectiveUser.get().getUserName();
241     ConnectionInfo connInfo = connections.get(userName);
242     if (connInfo == null || !connInfo.updateAccessTime()) {
243       Lock lock = locker.acquireLock(userName);
244       try {
245         connInfo = connections.get(userName);
246         if (connInfo == null) {
247           User user = userProvider.create(effectiveUser.get());
248           HConnection conn = HConnectionManager.createConnection(conf, user);
249           connInfo = new ConnectionInfo(conn, userName);
250           connections.put(userName, connInfo);
251         }
252       } finally {
253         lock.unlock();
254       }
255     }
256     return connInfo;
257   }
258 }