View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.rest;
20  
21  import java.io.IOException;
22  import java.util.Map;
23  import java.util.concurrent.ConcurrentHashMap;
24  import java.util.concurrent.locks.Lock;
25  
26  import org.apache.hadoop.classification.InterfaceAudience;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.Chore;
29  import org.apache.hadoop.hbase.Stoppable;
30  import org.apache.hadoop.hbase.client.HBaseAdmin;
31  import org.apache.hadoop.hbase.client.HConnection;
32  import org.apache.hadoop.hbase.client.HConnectionManager;
33  import org.apache.hadoop.hbase.client.HTableInterface;
34  import org.apache.hadoop.hbase.filter.ParseFilter;
35  import org.apache.hadoop.hbase.security.User;
36  import org.apache.hadoop.hbase.security.UserProvider;
37  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
38  import org.apache.hadoop.hbase.util.KeyLocker;
39  import org.apache.hadoop.hbase.util.Threads;
40  import org.apache.hadoop.security.UserGroupInformation;
41  import org.apache.log4j.Logger;
42  
43  /**
44   * Singleton class encapsulating global REST servlet state and functions.
45   */
46  @InterfaceAudience.Private
47  public class RESTServlet implements Constants {
48    private static Logger LOG = Logger.getLogger(RESTServlet.class);
49    private static RESTServlet INSTANCE;
50    private final Configuration conf;
51    private final MetricsREST metrics = new MetricsREST();
52    private final Map<String, ConnectionInfo>
53      connections = new ConcurrentHashMap<String, ConnectionInfo>();
54    private final KeyLocker<String> locker = new KeyLocker<String>();
55    private final UserGroupInformation realUser;
56  
57    static final String CLEANUP_INTERVAL = "hbase.rest.connection.cleanup-interval";
58    static final String MAX_IDLETIME = "hbase.rest.connection.max-idletime";
59  
60    private final ThreadLocal<UserGroupInformation> effectiveUser =
61        new ThreadLocal<UserGroupInformation>() {
62      protected UserGroupInformation initialValue() {
63        return realUser;
64      }
65    };
66  
67    UserGroupInformation getRealUser() {
68      return realUser;
69    }
70  
71    // A chore to clean up idle connections.
72    private final Chore connectionCleaner;
73    private final Stoppable stoppable;
74    private UserProvider userProvider;
75  
76    class ConnectionInfo {
77      final HConnection connection;
78      final String userName;
79  
80      volatile HBaseAdmin admin;
81      private long lastAccessTime;
82      private boolean closed;
83  
84      ConnectionInfo(HConnection conn, String user) {
85        lastAccessTime = EnvironmentEdgeManager.currentTimeMillis();
86        connection = conn;
87        closed = false;
88        userName = user;
89      }
90  
91      synchronized boolean updateAccessTime() {
92        if (closed) {
93          return false;
94        }
95        if (connection.isAborted() || connection.isClosed()) {
96          LOG.info("Unexpected: cached HConnection is aborted/closed, removed from cache");
97          connections.remove(userName);
98          return false;
99        }
100       lastAccessTime = EnvironmentEdgeManager.currentTimeMillis();
101       return true;
102     }
103 
104     synchronized boolean timedOut(int maxIdleTime) {
105       long timeoutTime = lastAccessTime + maxIdleTime;
106       if (EnvironmentEdgeManager.currentTimeMillis() > timeoutTime) {
107         connections.remove(userName);
108         closed = true;
109       }
110       return false;
111     }
112   }
113 
114   class ConnectionCleaner extends Chore {
115     private final int maxIdleTime;
116 
117     public ConnectionCleaner(int cleanInterval, int maxIdleTime) {
118       super("REST-ConnectionCleaner", cleanInterval, stoppable);
119       this.maxIdleTime = maxIdleTime;
120     }
121 
122     @Override
123     protected void chore() {
124       for (Map.Entry<String, ConnectionInfo> entry: connections.entrySet()) {
125         ConnectionInfo connInfo = entry.getValue();
126         if (connInfo.timedOut(maxIdleTime)) {
127           if (connInfo.admin != null) {
128             try {
129               connInfo.admin.close();
130             } catch (Throwable t) {
131               LOG.info("Got exception in closing idle admin", t);
132             }
133           }
134           try {
135             connInfo.connection.close();
136           } catch (Throwable t) {
137             LOG.info("Got exception in closing idle connection", t);
138           }
139         }
140       }
141     }
142   }
143 
144   /**
145    * @return the RESTServlet singleton instance
146    */
147   public synchronized static RESTServlet getInstance() {
148     assert(INSTANCE != null);
149     return INSTANCE;
150   }
151 
152   /**
153    * @param conf Existing configuration to use in rest servlet
154    * @param realUser the login user
155    * @return the RESTServlet singleton instance
156    */
157   public synchronized static RESTServlet getInstance(Configuration conf,
158       UserGroupInformation realUser) {
159     if (INSTANCE == null) {
160       INSTANCE = new RESTServlet(conf, realUser);
161     }
162     return INSTANCE;
163   }
164 
165   public synchronized static void stop() {
166     if (INSTANCE != null)  INSTANCE = null;
167   }
168 
169   /**
170    * Constructor with existing configuration
171    * @param conf existing configuration
172    * @param realUser the login user
173    */
174   RESTServlet(final Configuration conf,
175       final UserGroupInformation realUser) {
176     this.userProvider = UserProvider.instantiate(conf);
177     stoppable = new Stoppable() {
178       private volatile boolean isStopped = false;
179       @Override public void stop(String why) { isStopped = true;}
180       @Override public boolean isStopped() {return isStopped;}
181     };
182 
183     int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
184     int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
185     connectionCleaner = new ConnectionCleaner(cleanInterval, maxIdleTime);
186     Threads.setDaemonThreadRunning(connectionCleaner.getThread());
187 
188     this.realUser = realUser;
189     this.conf = conf;
190     registerCustomFilter(conf);
191   }
192 
193   /**
194    * Caller doesn't close the admin afterwards.
195    * We need to manage it and close it properly.
196    */
197   HBaseAdmin getAdmin() throws IOException {
198     ConnectionInfo connInfo = getCurrentConnection();
199     if (connInfo.admin == null) {
200       Lock lock = locker.acquireLock(effectiveUser.get().getUserName());
201       try {
202         if (connInfo.admin == null) {
203           connInfo.admin = new HBaseAdmin(connInfo.connection);
204         }
205       } finally {
206         lock.unlock();
207       }
208     }
209     return connInfo.admin;
210   }
211 
212   /**
213    * Caller closes the table afterwards.
214    */
215   HTableInterface getTable(String tableName) throws IOException {
216     ConnectionInfo connInfo = getCurrentConnection();
217     return connInfo.connection.getTable(tableName);
218   }
219 
220   Configuration getConfiguration() {
221     return conf;
222   }
223 
224   MetricsREST getMetrics() {
225     return metrics;
226   }
227 
228   /**
229    * Helper method to determine if server should
230    * only respond to GET HTTP method requests.
231    * @return boolean for server read-only state
232    */
233   boolean isReadOnly() {
234     return getConfiguration().getBoolean("hbase.rest.readonly", false);
235   }
236 
237   void setEffectiveUser(UserGroupInformation effectiveUser) {
238     this.effectiveUser.set(effectiveUser);
239   }
240 
241   private ConnectionInfo getCurrentConnection() throws IOException {
242     String userName = effectiveUser.get().getUserName();
243     ConnectionInfo connInfo = connections.get(userName);
244     if (connInfo == null || !connInfo.updateAccessTime()) {
245       Lock lock = locker.acquireLock(userName);
246       try {
247         connInfo = connections.get(userName);
248         if (connInfo == null) {
249           User user = userProvider.create(effectiveUser.get());
250           HConnection conn = HConnectionManager.createConnection(conf, user);
251           connInfo = new ConnectionInfo(conn, userName);
252           connections.put(userName, connInfo);
253         }
254       } finally {
255         lock.unlock();
256       }
257     }
258     return connInfo;
259   }
260   
261   private void registerCustomFilter(Configuration conf) {
262     String[] filterList = conf.getStrings(Constants.CUSTOM_FILTERS);
263     if (filterList != null) {
264       for (String filterClass : filterList) {
265         String[] filterPart = filterClass.split(":");
266         if (filterPart.length != 2) {
267           LOG.warn(
268             "Invalid filter specification " + filterClass + " - skipping");
269         } else {
270           ParseFilter.registerFilter(filterPart[0], filterPart[1]);
271         }
272       }
273     }
274   }
275 }