001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.monitoring;
020
021import java.io.PrintWriter;
022import java.lang.ref.WeakReference;
023import java.lang.reflect.InvocationHandler;
024import java.lang.reflect.Method;
025import java.lang.reflect.Proxy;
026import java.util.ArrayList;
027import java.util.Iterator;
028import java.util.List;
029
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.HBaseConfiguration;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
036import org.apache.hadoop.hbase.util.Threads;
037
038import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
039import org.apache.hbase.thirdparty.org.apache.commons.collections4.queue.CircularFifoQueue;
040
041/**
042 * Singleton which keeps track of tasks going on in this VM.
043 * A Task here is anything which takes more than a few seconds
044 * and the user might want to inquire about the status
045 */
046@InterfaceAudience.Private
047public class TaskMonitor {
048  private static final Logger LOG = LoggerFactory.getLogger(TaskMonitor.class);
049
050  public static final String MAX_TASKS_KEY = "hbase.taskmonitor.max.tasks";
051  public static final int DEFAULT_MAX_TASKS = 1000;
052  public static final String RPC_WARN_TIME_KEY = "hbase.taskmonitor.rpc.warn.time";
053  public static final long DEFAULT_RPC_WARN_TIME = 0;
054  public static final String EXPIRATION_TIME_KEY = "hbase.taskmonitor.expiration.time";
055  public static final long DEFAULT_EXPIRATION_TIME = 60*1000;
056  public static final String MONITOR_INTERVAL_KEY = "hbase.taskmonitor.monitor.interval";
057  public static final long DEFAULT_MONITOR_INTERVAL = 10*1000;
058
059  private static TaskMonitor instance;
060
061  private final int maxTasks;
062  private final long rpcWarnTime;
063  private final long expirationTime;
064  private final CircularFifoQueue tasks;
065  private final List<TaskAndWeakRefPair> rpcTasks;
066  private final long monitorInterval;
067  private Thread monitorThread;
068
069  TaskMonitor(Configuration conf) {
070    maxTasks = conf.getInt(MAX_TASKS_KEY, DEFAULT_MAX_TASKS);
071    expirationTime = conf.getLong(EXPIRATION_TIME_KEY, DEFAULT_EXPIRATION_TIME);
072    rpcWarnTime = conf.getLong(RPC_WARN_TIME_KEY, DEFAULT_RPC_WARN_TIME);
073    tasks = new CircularFifoQueue(maxTasks);
074    rpcTasks = Lists.newArrayList();
075    monitorInterval = conf.getLong(MONITOR_INTERVAL_KEY, DEFAULT_MONITOR_INTERVAL);
076    monitorThread = new Thread(new MonitorRunnable());
077    Threads.setDaemonThreadRunning(monitorThread, "Monitor thread for TaskMonitor");
078  }
079
080  /**
081   * Get singleton instance.
082   * TODO this would be better off scoped to a single daemon
083   */
084  public static synchronized TaskMonitor get() {
085    if (instance == null) {
086      instance = new TaskMonitor(HBaseConfiguration.create());
087    }
088    return instance;
089  }
090  
091  public synchronized MonitoredTask createStatus(String description) {
092    MonitoredTask stat = new MonitoredTaskImpl();
093    stat.setDescription(description);
094    MonitoredTask proxy = (MonitoredTask) Proxy.newProxyInstance(
095        stat.getClass().getClassLoader(),
096        new Class<?>[] { MonitoredTask.class },
097        new PassthroughInvocationHandler<>(stat));
098    TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy);
099    if (tasks.isFull()) {
100      purgeExpiredTasks();
101    }
102    tasks.add(pair);
103    return proxy;
104  }
105
106  public synchronized MonitoredRPCHandler createRPCStatus(String description) {
107    MonitoredRPCHandler stat = new MonitoredRPCHandlerImpl();
108    stat.setDescription(description);
109    MonitoredRPCHandler proxy = (MonitoredRPCHandler) Proxy.newProxyInstance(
110        stat.getClass().getClassLoader(),
111        new Class<?>[] { MonitoredRPCHandler.class },
112        new PassthroughInvocationHandler<>(stat));
113    TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy);
114    rpcTasks.add(pair);
115    return proxy;
116  }
117
118  private synchronized void warnStuckTasks() {
119    if (rpcWarnTime > 0) {
120      final long now = EnvironmentEdgeManager.currentTime();
121      for (Iterator<TaskAndWeakRefPair> it = rpcTasks.iterator();
122          it.hasNext();) {
123        TaskAndWeakRefPair pair = it.next();
124        MonitoredTask stat = pair.get();
125        if ((stat.getState() == MonitoredTaskImpl.State.RUNNING) &&
126            (now >= stat.getWarnTime() + rpcWarnTime)) {
127          LOG.warn("Task may be stuck: " + stat);
128          stat.setWarnTime(now);
129        }
130      }
131    }
132  }
133
134  private synchronized void purgeExpiredTasks() {
135    for (Iterator<TaskAndWeakRefPair> it = tasks.iterator();
136         it.hasNext();) {
137      TaskAndWeakRefPair pair = it.next();
138      MonitoredTask stat = pair.get();
139      
140      if (pair.isDead()) {
141        // The class who constructed this leaked it. So we can
142        // assume it's done.
143        if (stat.getState() == MonitoredTaskImpl.State.RUNNING) {
144          LOG.warn("Status " + stat + " appears to have been leaked");
145          stat.cleanup();
146        }
147      }
148      
149      if (canPurge(stat)) {
150        it.remove();
151      }
152    }
153  }
154
155  /**
156   * Produces a list containing copies of the current state of all non-expired 
157   * MonitoredTasks handled by this TaskMonitor.
158   * @return A complete list of MonitoredTasks.
159   */
160  public List<MonitoredTask> getTasks() {
161    return getTasks(null);
162  }
163
164  /**
165   * Produces a list containing copies of the current state of all non-expired 
166   * MonitoredTasks handled by this TaskMonitor.
167   * @param filter type of wanted tasks
168   * @return A filtered list of MonitoredTasks.
169   */
170  public synchronized List<MonitoredTask> getTasks(String filter) {
171    purgeExpiredTasks();
172    TaskFilter taskFilter = createTaskFilter(filter);
173    ArrayList<MonitoredTask> results =
174        Lists.newArrayListWithCapacity(tasks.size() + rpcTasks.size());
175    processTasks(tasks, taskFilter, results);
176    processTasks(rpcTasks, taskFilter, results);
177    return results;
178  }
179
180  /**
181   * Create a task filter according to a given filter type.
182   * @param filter type of monitored task
183   * @return a task filter
184   */
185  private static TaskFilter createTaskFilter(String filter) {
186    switch (TaskFilter.TaskType.getTaskType(filter)) {
187      case GENERAL: return task -> task instanceof MonitoredRPCHandler;
188      case HANDLER: return task -> !(task instanceof MonitoredRPCHandler);
189      case RPC: return task -> !(task instanceof MonitoredRPCHandler) ||
190                               !((MonitoredRPCHandler) task).isRPCRunning();
191      case OPERATION: return task -> !(task instanceof MonitoredRPCHandler) ||
192                                     !((MonitoredRPCHandler) task).isOperationRunning();
193      default: return task -> false;
194    }
195  }
196
197  private static void processTasks(Iterable<TaskAndWeakRefPair> tasks,
198                                   TaskFilter filter,
199                                   List<MonitoredTask> results) {
200    for (TaskAndWeakRefPair task : tasks) {
201      MonitoredTask t = task.get();
202      if (!filter.filter(t)) {
203        results.add(t.clone());
204      }
205    }
206  }
207
208  private boolean canPurge(MonitoredTask stat) {
209    long cts = stat.getCompletionTimestamp();
210    return (cts > 0 && EnvironmentEdgeManager.currentTime() - cts > expirationTime);
211  }
212
213  public void dumpAsText(PrintWriter out) {
214    long now = EnvironmentEdgeManager.currentTime();
215    
216    List<MonitoredTask> tasks = getTasks();
217    for (MonitoredTask task : tasks) {
218      out.println("Task: " + task.getDescription());
219      out.println("Status: " + task.getState() + ":" + task.getStatus());
220      long running = (now - task.getStartTime())/1000;
221      if (task.getCompletionTimestamp() != -1) {
222        long completed = (now - task.getCompletionTimestamp()) / 1000;
223        out.println("Completed " + completed + "s ago");
224        out.println("Ran for " +
225            (task.getCompletionTimestamp() - task.getStartTime())/1000
226            + "s");
227      } else {
228        out.println("Running for " + running + "s");
229      }
230      out.println();
231    }
232  }
233
234  public synchronized void shutdown() {
235    if (this.monitorThread != null) {
236      monitorThread.interrupt();
237    }
238  }
239
240  /**
241   * This class encapsulates an object as well as a weak reference to a proxy
242   * that passes through calls to that object. In art form:
243   * <pre>
244   *     Proxy  <------------------
245   *       |                       \
246   *       v                        \
247   * PassthroughInvocationHandler   |  weak reference
248   *       |                       /
249   * MonitoredTaskImpl            / 
250   *       |                     /
251   * StatAndWeakRefProxy  ------/
252   * </pre>
253   * Since we only return the Proxy to the creator of the MonitorableStatus,
254   * this means that they can leak that object, and we'll detect it
255   * since our weak reference will go null. But, we still have the actual
256   * object, so we can log it and display it as a leaked (incomplete) action.
257   */
258  private static class TaskAndWeakRefPair {
259    private MonitoredTask impl;
260    private WeakReference<MonitoredTask> weakProxy;
261    
262    public TaskAndWeakRefPair(MonitoredTask stat,
263        MonitoredTask proxy) {
264      this.impl = stat;
265      this.weakProxy = new WeakReference<>(proxy);
266    }
267    
268    public MonitoredTask get() {
269      return impl;
270    }
271    
272    public boolean isDead() {
273      return weakProxy.get() == null;
274    }
275  }
276  
277  /**
278   * An InvocationHandler that simply passes through calls to the original 
279   * object.
280   */
281  private static class PassthroughInvocationHandler<T> implements InvocationHandler {
282    private T delegatee;
283    
284    public PassthroughInvocationHandler(T delegatee) {
285      this.delegatee = delegatee;
286    }
287
288    @Override
289    public Object invoke(Object proxy, Method method, Object[] args)
290        throws Throwable {
291      return method.invoke(delegatee, args);
292    }    
293  }
294
295  private class MonitorRunnable implements Runnable {
296    private boolean running = true;
297
298    @Override
299    public void run() {
300      while (running) {
301        try {
302          Thread.sleep(monitorInterval);
303          if (tasks.isFull()) {
304            purgeExpiredTasks();
305          }
306          warnStuckTasks();
307        } catch (InterruptedException e) {
308          running = false;
309        }
310      }
311    }
312  }
313
314  private interface TaskFilter {
315    enum TaskType {
316      GENERAL("general"),
317      HANDLER("handler"),
318      RPC("rpc"),
319      OPERATION("operation"),
320      ALL("all");
321
322      private final String type;
323
324      private TaskType(String type) {
325        this.type = type.toLowerCase();
326      }
327
328      static TaskType getTaskType(String type) {
329        if (type == null || type.isEmpty()) {
330          return ALL;
331        }
332        type = type.toLowerCase();
333        for (TaskType taskType : values()) {
334          if (taskType.toString().equals(type)) {
335            return taskType;
336          }
337        }
338        return ALL;
339      }
340
341      @Override
342      public String toString() {
343        return type;
344      }
345    }
346
347    /**
348     * Filter out unwanted task.
349     * @param task monitored task
350     * @return false if a task is accepted, true if it is filtered
351     */
352    boolean filter(MonitoredTask task);
353  }
354}