View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.monitoring;
20  
21  import java.io.PrintWriter;
22  import java.lang.ref.WeakReference;
23  import java.lang.reflect.InvocationHandler;
24  import java.lang.reflect.Method;
25  import java.lang.reflect.Proxy;
26  import java.util.ArrayList;
27  import java.util.Iterator;
28  import java.util.List;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.classification.InterfaceAudience;
33  
34  import com.google.common.annotations.VisibleForTesting;
35  import com.google.common.collect.Lists;
36  
37  /**
38   * Singleton which keeps track of tasks going on in this VM.
39   * A Task here is anything which takes more than a few seconds
40   * and the user might want to inquire about the status
41   */
42  @InterfaceAudience.Private
43  public class TaskMonitor {
44    private static final Log LOG = LogFactory.getLog(TaskMonitor.class);
45  
46    // Don't keep around any tasks that have completed more than
47    // 60 seconds ago
48    private static final long EXPIRATION_TIME = 60*1000;
49  
50    @VisibleForTesting
51    static final int MAX_TASKS = 1000;
52    
53    private static TaskMonitor instance;
54    private List<TaskAndWeakRefPair> tasks =
55      Lists.newArrayList();
56  
57    /**
58     * Get singleton instance.
59     * TODO this would be better off scoped to a single daemon
60     */
61    public static synchronized TaskMonitor get() {
62      if (instance == null) {
63        instance = new TaskMonitor();
64      }
65      return instance;
66    }
67    
68    public synchronized MonitoredTask createStatus(String description) {
69      MonitoredTask stat = new MonitoredTaskImpl();
70      stat.setDescription(description);
71      MonitoredTask proxy = (MonitoredTask) Proxy.newProxyInstance(
72          stat.getClass().getClassLoader(),
73          new Class<?>[] { MonitoredTask.class },
74          new PassthroughInvocationHandler<MonitoredTask>(stat));
75      TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy);
76      tasks.add(pair);
77      if (tasks.size() > MAX_TASKS) {
78        purgeExpiredTasks();
79      }
80      return proxy;
81    }
82  
83    public synchronized MonitoredRPCHandler createRPCStatus(String description) {
84      MonitoredRPCHandler stat = new MonitoredRPCHandlerImpl();
85      stat.setDescription(description);
86      MonitoredRPCHandler proxy = (MonitoredRPCHandler) Proxy.newProxyInstance(
87          stat.getClass().getClassLoader(),
88          new Class<?>[] { MonitoredRPCHandler.class },
89          new PassthroughInvocationHandler<MonitoredRPCHandler>(stat));
90      TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy);
91      tasks.add(pair);
92      if (tasks.size() > MAX_TASKS) {
93        purgeExpiredTasks();
94      }
95      return proxy;
96    }
97  
98    private synchronized void purgeExpiredTasks() {
99      int size = 0;
100     
101     for (Iterator<TaskAndWeakRefPair> it = tasks.iterator();
102          it.hasNext();) {
103       TaskAndWeakRefPair pair = it.next();
104       MonitoredTask stat = pair.get();
105       
106       if (pair.isDead()) {
107         // The class who constructed this leaked it. So we can
108         // assume it's done.
109         if (stat.getState() == MonitoredTaskImpl.State.RUNNING) {
110           LOG.warn("Status " + stat + " appears to have been leaked");
111           stat.cleanup();
112         }
113       }
114       
115       if (canPurge(stat)) {
116         it.remove();
117       } else {
118         size++;
119       }
120     }
121     
122     if (size > MAX_TASKS) {
123       LOG.warn("Too many actions in action monitor! Purging some.");
124       tasks = tasks.subList(size - MAX_TASKS, size);
125     }
126   }
127 
128   /**
129    * Produces a list containing copies of the current state of all non-expired 
130    * MonitoredTasks handled by this TaskMonitor.
131    * @return A complete list of MonitoredTasks.
132    */
133   public synchronized List<MonitoredTask> getTasks() {
134     purgeExpiredTasks();
135     ArrayList<MonitoredTask> ret = Lists.newArrayListWithCapacity(tasks.size());
136     for (TaskAndWeakRefPair pair : tasks) {
137       MonitoredTask t = pair.get();
138       ret.add(t.clone());
139     }
140     return ret;
141   }
142 
143   private boolean canPurge(MonitoredTask stat) {
144     long cts = stat.getCompletionTimestamp();
145     return (cts > 0 && System.currentTimeMillis() - cts > EXPIRATION_TIME);
146   }
147   
148 
149   public void dumpAsText(PrintWriter out) {
150     long now = System.currentTimeMillis();
151     
152     List<MonitoredTask> tasks = getTasks();
153     for (MonitoredTask task : tasks) {
154       out.println("Task: " + task.getDescription());
155       out.println("Status: " + task.getState() + ":" + task.getStatus());
156       long running = (now - task.getStartTime())/1000;
157       if (task.getCompletionTimestamp() != -1) {
158         long completed = (now - task.getCompletionTimestamp()) / 1000;
159         out.println("Completed " + completed + "s ago");
160         out.println("Ran for " +
161             (task.getCompletionTimestamp() - task.getStartTime())/1000
162             + "s");
163       } else {
164         out.println("Running for " + running + "s");
165       }
166       out.println();
167     }
168   }
169 
170   /**
171    * This class encapsulates an object as well as a weak reference to a proxy
172    * that passes through calls to that object. In art form:
173    * <code>
174    *     Proxy  <------------------
175    *       |                       \
176    *       v                        \
177    * PassthroughInvocationHandler   |  weak reference
178    *       |                       /
179    * MonitoredTaskImpl            / 
180    *       |                     /
181    * StatAndWeakRefProxy  ------/
182    *
183    * Since we only return the Proxy to the creator of the MonitorableStatus,
184    * this means that they can leak that object, and we'll detect it
185    * since our weak reference will go null. But, we still have the actual
186    * object, so we can log it and display it as a leaked (incomplete) action.
187    */
188   private static class TaskAndWeakRefPair {
189     private MonitoredTask impl;
190     private WeakReference<MonitoredTask> weakProxy;
191     
192     public TaskAndWeakRefPair(MonitoredTask stat,
193         MonitoredTask proxy) {
194       this.impl = stat;
195       this.weakProxy = new WeakReference<MonitoredTask>(proxy);
196     }
197     
198     public MonitoredTask get() {
199       return impl;
200     }
201     
202     public boolean isDead() {
203       return weakProxy.get() == null;
204     }
205   }
206   
207   /**
208    * An InvocationHandler that simply passes through calls to the original 
209    * object.
210    */
211   private static class PassthroughInvocationHandler<T> implements InvocationHandler {
212     private T delegatee;
213     
214     public PassthroughInvocationHandler(T delegatee) {
215       this.delegatee = delegatee;
216     }
217 
218     @Override
219     public Object invoke(Object proxy, Method method, Object[] args)
220         throws Throwable {
221       return method.invoke(delegatee, args);
222     }    
223   }
224 }