View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.monitoring;
20  
21  import java.io.PrintWriter;
22  import java.lang.ref.WeakReference;
23  import java.lang.reflect.InvocationHandler;
24  import java.lang.reflect.Method;
25  import java.lang.reflect.Proxy;
26  import java.util.ArrayList;
27  import java.util.Iterator;
28  import java.util.List;
29  
30  import org.apache.commons.collections.buffer.CircularFifoBuffer;
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.hbase.classification.InterfaceAudience;
34  
35  import com.google.common.annotations.VisibleForTesting;
36  import com.google.common.collect.Lists;
37  
38  /**
39   * Singleton which keeps track of tasks going on in this VM.
40   * A Task here is anything which takes more than a few seconds
41   * and the user might want to inquire about the status
42   */
43  @InterfaceAudience.Private
44  public class TaskMonitor {
45    private static final Log LOG = LogFactory.getLog(TaskMonitor.class);
46  
47    // Don't keep around any tasks that have completed more than
48    // 60 seconds ago
49    private static final long EXPIRATION_TIME = 60*1000;
50  
51    @VisibleForTesting
52    static final int MAX_TASKS = 1000;
53    
54    private static TaskMonitor instance;
55    private CircularFifoBuffer tasks = new CircularFifoBuffer(MAX_TASKS);
56  
57    /**
58     * Get singleton instance.
59     * TODO this would be better off scoped to a single daemon
60     */
61    public static synchronized TaskMonitor get() {
62      if (instance == null) {
63        instance = new TaskMonitor();
64      }
65      return instance;
66    }
67    
68    public synchronized MonitoredTask createStatus(String description) {
69      MonitoredTask stat = new MonitoredTaskImpl();
70      stat.setDescription(description);
71      MonitoredTask proxy = (MonitoredTask) Proxy.newProxyInstance(
72          stat.getClass().getClassLoader(),
73          new Class<?>[] { MonitoredTask.class },
74          new PassthroughInvocationHandler<MonitoredTask>(stat));
75      TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy);
76      tasks.add(pair);
77      return proxy;
78    }
79  
80    public synchronized MonitoredRPCHandler createRPCStatus(String description) {
81      MonitoredRPCHandler stat = new MonitoredRPCHandlerImpl();
82      stat.setDescription(description);
83      MonitoredRPCHandler proxy = (MonitoredRPCHandler) Proxy.newProxyInstance(
84          stat.getClass().getClassLoader(),
85          new Class<?>[] { MonitoredRPCHandler.class },
86          new PassthroughInvocationHandler<MonitoredRPCHandler>(stat));
87      TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy);
88      tasks.add(pair);
89      return proxy;
90    }
91  
92    private synchronized void purgeExpiredTasks() {
93      for (Iterator<TaskAndWeakRefPair> it = tasks.iterator();
94           it.hasNext();) {
95        TaskAndWeakRefPair pair = it.next();
96        MonitoredTask stat = pair.get();
97        
98        if (pair.isDead()) {
99          // The class who constructed this leaked it. So we can
100         // assume it's done.
101         if (stat.getState() == MonitoredTaskImpl.State.RUNNING) {
102           LOG.warn("Status " + stat + " appears to have been leaked");
103           stat.cleanup();
104         }
105       }
106       
107       if (canPurge(stat)) {
108         it.remove();
109       }
110     }
111   }
112 
113   /**
114    * Produces a list containing copies of the current state of all non-expired 
115    * MonitoredTasks handled by this TaskMonitor.
116    * @return A complete list of MonitoredTasks.
117    */
118   public synchronized List<MonitoredTask> getTasks() {
119     purgeExpiredTasks();
120     ArrayList<MonitoredTask> ret = Lists.newArrayListWithCapacity(tasks.size());
121     for (Iterator<TaskAndWeakRefPair> it = tasks.iterator();
122          it.hasNext();) {
123       TaskAndWeakRefPair pair = it.next();
124       MonitoredTask t = pair.get();
125       ret.add(t.clone());
126     }
127     return ret;
128   }
129 
130   private boolean canPurge(MonitoredTask stat) {
131     long cts = stat.getCompletionTimestamp();
132     return (cts > 0 && System.currentTimeMillis() - cts > EXPIRATION_TIME);
133   }
134   
135 
136   public void dumpAsText(PrintWriter out) {
137     long now = System.currentTimeMillis();
138     
139     List<MonitoredTask> tasks = getTasks();
140     for (MonitoredTask task : tasks) {
141       out.println("Task: " + task.getDescription());
142       out.println("Status: " + task.getState() + ":" + task.getStatus());
143       long running = (now - task.getStartTime())/1000;
144       if (task.getCompletionTimestamp() != -1) {
145         long completed = (now - task.getCompletionTimestamp()) / 1000;
146         out.println("Completed " + completed + "s ago");
147         out.println("Ran for " +
148             (task.getCompletionTimestamp() - task.getStartTime())/1000
149             + "s");
150       } else {
151         out.println("Running for " + running + "s");
152       }
153       out.println();
154     }
155   }
156 
157   /**
158    * This class encapsulates an object as well as a weak reference to a proxy
159    * that passes through calls to that object. In art form:
160    * <code>
161    *     Proxy  <------------------
162    *       |                       \
163    *       v                        \
164    * PassthroughInvocationHandler   |  weak reference
165    *       |                       /
166    * MonitoredTaskImpl            / 
167    *       |                     /
168    * StatAndWeakRefProxy  ------/
169    *
170    * Since we only return the Proxy to the creator of the MonitorableStatus,
171    * this means that they can leak that object, and we'll detect it
172    * since our weak reference will go null. But, we still have the actual
173    * object, so we can log it and display it as a leaked (incomplete) action.
174    */
175   private static class TaskAndWeakRefPair {
176     private MonitoredTask impl;
177     private WeakReference<MonitoredTask> weakProxy;
178     
179     public TaskAndWeakRefPair(MonitoredTask stat,
180         MonitoredTask proxy) {
181       this.impl = stat;
182       this.weakProxy = new WeakReference<MonitoredTask>(proxy);
183     }
184     
185     public MonitoredTask get() {
186       return impl;
187     }
188     
189     public boolean isDead() {
190       return weakProxy.get() == null;
191     }
192   }
193   
194   /**
195    * An InvocationHandler that simply passes through calls to the original 
196    * object.
197    */
198   private static class PassthroughInvocationHandler<T> implements InvocationHandler {
199     private T delegatee;
200     
201     public PassthroughInvocationHandler(T delegatee) {
202       this.delegatee = delegatee;
203     }
204 
205     @Override
206     public Object invoke(Object proxy, Method method, Object[] args)
207         throws Throwable {
208       return method.invoke(delegatee, args);
209     }    
210   }
211 }