001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.monitoring;
019
020import java.io.PrintWriter;
021import java.lang.ref.WeakReference;
022import java.lang.reflect.InvocationHandler;
023import java.lang.reflect.Method;
024import java.lang.reflect.Proxy;
025import java.util.ArrayList;
026import java.util.Iterator;
027import java.util.List;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
031import org.apache.hadoop.hbase.util.Threads;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
037import org.apache.hbase.thirdparty.org.apache.commons.collections4.queue.CircularFifoQueue;
038
039/**
040 * Singleton which keeps track of tasks going on in this VM. A Task here is anything which takes
041 * more than a few seconds and the user might want to inquire about the status
042 */
043@InterfaceAudience.Private
044public class TaskMonitor {
045  private static final Logger LOG = LoggerFactory.getLogger(TaskMonitor.class);
046
047  public static final String MAX_TASKS_KEY = "hbase.taskmonitor.max.tasks";
048  public static final int DEFAULT_MAX_TASKS = 1000;
049  public static final String RPC_WARN_TIME_KEY = "hbase.taskmonitor.rpc.warn.time";
050  public static final long DEFAULT_RPC_WARN_TIME = 0;
051  public static final String EXPIRATION_TIME_KEY = "hbase.taskmonitor.expiration.time";
052  public static final long DEFAULT_EXPIRATION_TIME = 60 * 1000;
053  public static final String MONITOR_INTERVAL_KEY = "hbase.taskmonitor.monitor.interval";
054  public static final long DEFAULT_MONITOR_INTERVAL = 10 * 1000;
055
056  private static TaskMonitor instance;
057
058  private final int maxTasks;
059  private final long rpcWarnTime;
060  private final long expirationTime;
061  private final CircularFifoQueue tasks;
062  private final List<TaskAndWeakRefPair> rpcTasks;
063  private final long monitorInterval;
064  private Thread monitorThread;
065
066  TaskMonitor(Configuration conf) {
067    maxTasks = conf.getInt(MAX_TASKS_KEY, DEFAULT_MAX_TASKS);
068    expirationTime = conf.getLong(EXPIRATION_TIME_KEY, DEFAULT_EXPIRATION_TIME);
069    rpcWarnTime = conf.getLong(RPC_WARN_TIME_KEY, DEFAULT_RPC_WARN_TIME);
070    tasks = new CircularFifoQueue(maxTasks);
071    rpcTasks = Lists.newArrayList();
072    monitorInterval = conf.getLong(MONITOR_INTERVAL_KEY, DEFAULT_MONITOR_INTERVAL);
073    monitorThread = new Thread(new MonitorRunnable());
074    Threads.setDaemonThreadRunning(monitorThread, "Monitor thread for TaskMonitor");
075  }
076
077  /**
078   * Get singleton instance. TODO this would be better off scoped to a single daemon
079   */
080  public static synchronized TaskMonitor get() {
081    if (instance == null) {
082      instance = new TaskMonitor(HBaseConfiguration.create());
083    }
084    return instance;
085  }
086
087  public synchronized MonitoredTask createStatus(String description) {
088    return createStatus(description, false);
089  }
090
091  public synchronized MonitoredTask createStatus(String description, boolean ignore) {
092    MonitoredTask stat = new MonitoredTaskImpl();
093    stat.setDescription(description);
094    MonitoredTask proxy = (MonitoredTask) Proxy.newProxyInstance(stat.getClass().getClassLoader(),
095      new Class<?>[] { MonitoredTask.class }, new PassthroughInvocationHandler<>(stat));
096    TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy);
097    if (tasks.isFull()) {
098      purgeExpiredTasks();
099    }
100    if (!ignore) {
101      tasks.add(pair);
102    }
103    return proxy;
104  }
105
106  public synchronized MonitoredRPCHandler createRPCStatus(String description) {
107    MonitoredRPCHandler stat = new MonitoredRPCHandlerImpl();
108    stat.setDescription(description);
109    MonitoredRPCHandler proxy =
110      (MonitoredRPCHandler) Proxy.newProxyInstance(stat.getClass().getClassLoader(),
111        new Class<?>[] { MonitoredRPCHandler.class }, new PassthroughInvocationHandler<>(stat));
112    TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy);
113    rpcTasks.add(pair);
114    return proxy;
115  }
116
117  private synchronized void warnStuckTasks() {
118    if (rpcWarnTime > 0) {
119      final long now = EnvironmentEdgeManager.currentTime();
120      for (Iterator<TaskAndWeakRefPair> it = rpcTasks.iterator(); it.hasNext();) {
121        TaskAndWeakRefPair pair = it.next();
122        MonitoredTask stat = pair.get();
123        if (
124          (stat.getState() == MonitoredTaskImpl.State.RUNNING)
125            && (now >= stat.getWarnTime() + rpcWarnTime)
126        ) {
127          LOG.warn("Task may be stuck: " + stat);
128          stat.setWarnTime(now);
129        }
130      }
131    }
132  }
133
134  private synchronized void purgeExpiredTasks() {
135    for (Iterator<TaskAndWeakRefPair> it = tasks.iterator(); it.hasNext();) {
136      TaskAndWeakRefPair pair = it.next();
137      MonitoredTask stat = pair.get();
138
139      if (pair.isDead()) {
140        // The class who constructed this leaked it. So we can
141        // assume it's done.
142        if (stat.getState() == MonitoredTaskImpl.State.RUNNING) {
143          LOG.warn("Status " + stat + " appears to have been leaked");
144          stat.cleanup();
145        }
146      }
147
148      if (canPurge(stat)) {
149        it.remove();
150      }
151    }
152  }
153
154  /**
155   * Produces a list containing copies of the current state of all non-expired MonitoredTasks
156   * handled by this TaskMonitor.
157   * @return A complete list of MonitoredTasks.
158   */
159  public List<MonitoredTask> getTasks() {
160    return getTasks(null);
161  }
162
163  /**
164   * Produces a list containing copies of the current state of all non-expired MonitoredTasks
165   * handled by this TaskMonitor.
166   * @param filter type of wanted tasks
167   * @return A filtered list of MonitoredTasks.
168   */
169  public synchronized List<MonitoredTask> getTasks(String filter) {
170    purgeExpiredTasks();
171    TaskFilter taskFilter = createTaskFilter(filter);
172    ArrayList<MonitoredTask> results =
173      Lists.newArrayListWithCapacity(tasks.size() + rpcTasks.size());
174    processTasks(tasks, taskFilter, results);
175    processTasks(rpcTasks, taskFilter, results);
176    return results;
177  }
178
179  /**
180   * Create a task filter according to a given filter type.
181   * @param filter type of monitored task
182   * @return a task filter
183   */
184  private static TaskFilter createTaskFilter(String filter) {
185    switch (TaskFilter.TaskType.getTaskType(filter)) {
186      case GENERAL:
187        return task -> task instanceof MonitoredRPCHandler;
188      case HANDLER:
189        return task -> !(task instanceof MonitoredRPCHandler);
190      case RPC:
191        return task -> !(task instanceof MonitoredRPCHandler)
192          || !((MonitoredRPCHandler) task).isRPCRunning();
193      case OPERATION:
194        return task -> !(task instanceof MonitoredRPCHandler)
195          || !((MonitoredRPCHandler) task).isOperationRunning();
196      default:
197        return task -> false;
198    }
199  }
200
201  private static void processTasks(Iterable<TaskAndWeakRefPair> tasks, TaskFilter filter,
202    List<MonitoredTask> results) {
203    for (TaskAndWeakRefPair task : tasks) {
204      MonitoredTask t = task.get();
205      if (!filter.filter(t)) {
206        results.add(t.clone());
207      }
208    }
209  }
210
211  private boolean canPurge(MonitoredTask stat) {
212    long cts = stat.getCompletionTimestamp();
213    return (cts > 0 && EnvironmentEdgeManager.currentTime() - cts > expirationTime);
214  }
215
216  public void dumpAsText(PrintWriter out) {
217    long now = EnvironmentEdgeManager.currentTime();
218
219    List<MonitoredTask> tasks = getTasks();
220    for (MonitoredTask task : tasks) {
221      out.println("Task: " + task.getDescription());
222      out.println("Status: " + task.getState() + ":" + task.getStatus());
223      long running = (now - task.getStartTime()) / 1000;
224      if (task.getCompletionTimestamp() != -1) {
225        long completed = (now - task.getCompletionTimestamp()) / 1000;
226        out.println("Completed " + completed + "s ago");
227        out
228          .println("Ran for " + (task.getCompletionTimestamp() - task.getStartTime()) / 1000 + "s");
229      } else {
230        out.println("Running for " + running + "s");
231      }
232      out.println();
233    }
234  }
235
236  public synchronized void shutdown() {
237    if (this.monitorThread != null) {
238      monitorThread.interrupt();
239    }
240  }
241
242  /**
243   * This class encapsulates an object as well as a weak reference to a proxy that passes through
244   * calls to that object. In art form:
245   *
246   * <pre>
247   *     Proxy  <------------------
248   *       |                       \
249   *       v                        \
250   * PassthroughInvocationHandler   |  weak reference
251   *       |                       /
252   * MonitoredTaskImpl            /
253   *       |                     /
254   * StatAndWeakRefProxy  ------/
255   * </pre>
256   *
257   * Since we only return the Proxy to the creator of the MonitorableStatus, this means that they
258   * can leak that object, and we'll detect it since our weak reference will go null. But, we still
259   * have the actual object, so we can log it and display it as a leaked (incomplete) action.
260   */
261  private static class TaskAndWeakRefPair {
262    private MonitoredTask impl;
263    private WeakReference<MonitoredTask> weakProxy;
264
265    public TaskAndWeakRefPair(MonitoredTask stat, MonitoredTask proxy) {
266      this.impl = stat;
267      this.weakProxy = new WeakReference<>(proxy);
268    }
269
270    public MonitoredTask get() {
271      return impl;
272    }
273
274    public boolean isDead() {
275      return weakProxy.get() == null;
276    }
277  }
278
279  /**
280   * An InvocationHandler that simply passes through calls to the original object.
281   */
282  private static class PassthroughInvocationHandler<T> implements InvocationHandler {
283    private T delegatee;
284
285    public PassthroughInvocationHandler(T delegatee) {
286      this.delegatee = delegatee;
287    }
288
289    @Override
290    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
291      return method.invoke(delegatee, args);
292    }
293  }
294
295  private class MonitorRunnable implements Runnable {
296    private boolean running = true;
297
298    @Override
299    public void run() {
300      while (running) {
301        try {
302          Thread.sleep(monitorInterval);
303          if (tasks.isFull()) {
304            purgeExpiredTasks();
305          }
306          warnStuckTasks();
307        } catch (InterruptedException e) {
308          running = false;
309        }
310      }
311    }
312  }
313
314  private interface TaskFilter {
315    enum TaskType {
316      GENERAL("general"),
317      HANDLER("handler"),
318      RPC("rpc"),
319      OPERATION("operation"),
320      ALL("all");
321
322      private final String type;
323
324      private TaskType(String type) {
325        this.type = type.toLowerCase();
326      }
327
328      static TaskType getTaskType(String type) {
329        if (type == null || type.isEmpty()) {
330          return ALL;
331        }
332        type = type.toLowerCase();
333        for (TaskType taskType : values()) {
334          if (taskType.toString().equals(type)) {
335            return taskType;
336          }
337        }
338        return ALL;
339      }
340
341      @Override
342      public String toString() {
343        return type;
344      }
345    }
346
347    /**
348     * Filter out unwanted task.
349     * @param task monitored task
350     * @return false if a task is accepted, true if it is filtered
351     */
352    boolean filter(MonitoredTask task);
353  }
354}