001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.monitoring;
019
020import java.io.PrintWriter;
021import java.lang.ref.WeakReference;
022import java.lang.reflect.InvocationHandler;
023import java.lang.reflect.Method;
024import java.lang.reflect.Proxy;
025import java.util.ArrayList;
026import java.util.Iterator;
027import java.util.List;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
031import org.apache.hadoop.hbase.util.Threads;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
037import org.apache.hbase.thirdparty.org.apache.commons.collections4.queue.CircularFifoQueue;
038
039/**
040 * Singleton which keeps track of tasks going on in this VM. A Task here is anything which takes
041 * more than a few seconds and the user might want to inquire about the status
042 */
043@InterfaceAudience.Private
044public class TaskMonitor {
045  private static final Logger LOG = LoggerFactory.getLogger(TaskMonitor.class);
046
047  public static final String MAX_TASKS_KEY = "hbase.taskmonitor.max.tasks";
048  public static final int DEFAULT_MAX_TASKS = 1000;
049  public static final String RPC_WARN_TIME_KEY = "hbase.taskmonitor.rpc.warn.time";
050  public static final long DEFAULT_RPC_WARN_TIME = 0;
051  public static final String EXPIRATION_TIME_KEY = "hbase.taskmonitor.expiration.time";
052  public static final long DEFAULT_EXPIRATION_TIME = 60 * 1000;
053  public static final String MONITOR_INTERVAL_KEY = "hbase.taskmonitor.monitor.interval";
054  public static final long DEFAULT_MONITOR_INTERVAL = 10 * 1000;
055
056  private static TaskMonitor instance;
057
058  private final int maxTasks;
059  private final long rpcWarnTime;
060  private final long expirationTime;
061  private final CircularFifoQueue<TaskAndWeakRefPair> tasks;
062  private final List<TaskAndWeakRefPair> rpcTasks;
063  private final long monitorInterval;
064  private Thread monitorThread;
065
066  TaskMonitor(Configuration conf) {
067    maxTasks = conf.getInt(MAX_TASKS_KEY, DEFAULT_MAX_TASKS);
068    expirationTime = conf.getLong(EXPIRATION_TIME_KEY, DEFAULT_EXPIRATION_TIME);
069    rpcWarnTime = conf.getLong(RPC_WARN_TIME_KEY, DEFAULT_RPC_WARN_TIME);
070    tasks = new CircularFifoQueue<>(maxTasks);
071    rpcTasks = Lists.newArrayList();
072    monitorInterval = conf.getLong(MONITOR_INTERVAL_KEY, DEFAULT_MONITOR_INTERVAL);
073    monitorThread = new Thread(new MonitorRunnable());
074    Threads.setDaemonThreadRunning(monitorThread, "Monitor thread for TaskMonitor");
075  }
076
077  /**
078   * Get singleton instance. TODO this would be better off scoped to a single daemon
079   */
080  public static synchronized TaskMonitor get() {
081    if (instance == null) {
082      instance = new TaskMonitor(HBaseConfiguration.create());
083    }
084    return instance;
085  }
086
087  public MonitoredTask createStatus(String description) {
088    return createStatus(description, false);
089  }
090
091  public synchronized MonitoredTask createStatus(String description, boolean enableJournal) {
092    MonitoredTask stat = new MonitoredTaskImpl(enableJournal);
093    stat.setDescription(description);
094    MonitoredTask proxy = (MonitoredTask) Proxy.newProxyInstance(stat.getClass().getClassLoader(),
095      new Class<?>[] { MonitoredTask.class }, new PassthroughInvocationHandler<>(stat));
096    TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy);
097    if (tasks.isFull()) {
098      purgeExpiredTasks();
099    }
100    tasks.add(pair);
101    return proxy;
102  }
103
104  public synchronized MonitoredRPCHandler createRPCStatus(String description) {
105    MonitoredRPCHandler stat = new MonitoredRPCHandlerImpl();
106    stat.setDescription(description);
107    MonitoredRPCHandler proxy =
108      (MonitoredRPCHandler) Proxy.newProxyInstance(stat.getClass().getClassLoader(),
109        new Class<?>[] { MonitoredRPCHandler.class }, new PassthroughInvocationHandler<>(stat));
110    TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy);
111    rpcTasks.add(pair);
112    return proxy;
113  }
114
115  private synchronized void warnStuckTasks() {
116    if (rpcWarnTime > 0) {
117      final long now = EnvironmentEdgeManager.currentTime();
118      for (Iterator<TaskAndWeakRefPair> it = rpcTasks.iterator(); it.hasNext();) {
119        TaskAndWeakRefPair pair = it.next();
120        MonitoredTask stat = pair.get();
121        if (
122          (stat.getState() == MonitoredTaskImpl.State.RUNNING)
123            && (now >= stat.getWarnTime() + rpcWarnTime)
124        ) {
125          LOG.warn("Task may be stuck: " + stat);
126          stat.setWarnTime(now);
127        }
128      }
129    }
130  }
131
132  private synchronized void purgeExpiredTasks() {
133    for (Iterator<TaskAndWeakRefPair> it = tasks.iterator(); it.hasNext();) {
134      TaskAndWeakRefPair pair = it.next();
135      MonitoredTask stat = pair.get();
136
137      if (pair.isDead()) {
138        // The class who constructed this leaked it. So we can
139        // assume it's done.
140        if (stat.getState() == MonitoredTaskImpl.State.RUNNING) {
141          LOG.warn("Status " + stat + " appears to have been leaked");
142          stat.cleanup();
143        }
144      }
145
146      if (canPurge(stat)) {
147        it.remove();
148      }
149    }
150  }
151
152  /**
153   * Produces a list containing copies of the current state of all non-expired MonitoredTasks
154   * handled by this TaskMonitor.
155   * @return A complete list of MonitoredTasks.
156   */
157  public List<MonitoredTask> getTasks() {
158    return getTasks(null);
159  }
160
161  /**
162   * Produces a list containing copies of the current state of all non-expired MonitoredTasks
163   * handled by this TaskMonitor.
164   * @param filter type of wanted tasks
165   * @return A filtered list of MonitoredTasks.
166   */
167  public synchronized List<MonitoredTask> getTasks(String filter) {
168    purgeExpiredTasks();
169    TaskFilter taskFilter = createTaskFilter(filter);
170    ArrayList<MonitoredTask> results =
171      Lists.newArrayListWithCapacity(tasks.size() + rpcTasks.size());
172    processTasks(tasks, taskFilter, results);
173    processTasks(rpcTasks, taskFilter, results);
174    return results;
175  }
176
177  /**
178   * Create a task filter according to a given filter type.
179   * @param filter type of monitored task
180   * @return a task filter
181   */
182  private static TaskFilter createTaskFilter(String filter) {
183    switch (TaskFilter.TaskType.getTaskType(filter)) {
184      case GENERAL:
185        return task -> task instanceof MonitoredRPCHandler;
186      case HANDLER:
187        return task -> !(task instanceof MonitoredRPCHandler);
188      case RPC:
189        return task -> !(task instanceof MonitoredRPCHandler)
190          || !((MonitoredRPCHandler) task).isRPCRunning();
191      case OPERATION:
192        return task -> !(task instanceof MonitoredRPCHandler)
193          || !((MonitoredRPCHandler) task).isOperationRunning();
194      default:
195        return task -> false;
196    }
197  }
198
199  private static void processTasks(Iterable<TaskAndWeakRefPair> tasks, TaskFilter filter,
200    List<MonitoredTask> results) {
201    for (TaskAndWeakRefPair task : tasks) {
202      MonitoredTask t = task.get();
203      if (!filter.filter(t)) {
204        results.add(t.clone());
205      }
206    }
207  }
208
209  private boolean canPurge(MonitoredTask stat) {
210    long cts = stat.getCompletionTimestamp();
211    return (cts > 0 && EnvironmentEdgeManager.currentTime() - cts > expirationTime);
212  }
213
214  public void dumpAsText(PrintWriter out) {
215    long now = EnvironmentEdgeManager.currentTime();
216
217    List<MonitoredTask> tasks = getTasks();
218    for (MonitoredTask task : tasks) {
219      out.println("Task: " + task.getDescription());
220      out.println("Status: " + task.getState() + ":" + task.getStatus());
221      long running = (now - task.getStartTime()) / 1000;
222      if (task.getCompletionTimestamp() != -1) {
223        long completed = (now - task.getCompletionTimestamp()) / 1000;
224        out.println("Completed " + completed + "s ago");
225        out
226          .println("Ran for " + (task.getCompletionTimestamp() - task.getStartTime()) / 1000 + "s");
227      } else {
228        out.println("Running for " + running + "s");
229      }
230      out.println();
231    }
232  }
233
234  public synchronized void shutdown() {
235    if (this.monitorThread != null) {
236      monitorThread.interrupt();
237    }
238  }
239
240  /**
241   * This class encapsulates an object as well as a weak reference to a proxy that passes through
242   * calls to that object. In art form:
243   *
244   * <pre>
245   *     Proxy  <------------------
246   *       |                       \
247   *       v                        \
248   * PassthroughInvocationHandler   |  weak reference
249   *       |                       /
250   * MonitoredTaskImpl            /
251   *       |                     /
252   * StatAndWeakRefProxy  ------/
253   * </pre>
254   *
255   * Since we only return the Proxy to the creator of the MonitorableStatus, this means that they
256   * can leak that object, and we'll detect it since our weak reference will go null. But, we still
257   * have the actual object, so we can log it and display it as a leaked (incomplete) action.
258   */
259  private static class TaskAndWeakRefPair {
260    private MonitoredTask impl;
261    private WeakReference<MonitoredTask> weakProxy;
262
263    public TaskAndWeakRefPair(MonitoredTask stat, MonitoredTask proxy) {
264      this.impl = stat;
265      this.weakProxy = new WeakReference<>(proxy);
266    }
267
268    public MonitoredTask get() {
269      return impl;
270    }
271
272    public boolean isDead() {
273      return weakProxy.get() == null;
274    }
275  }
276
277  /**
278   * An InvocationHandler that simply passes through calls to the original object.
279   */
280  private static class PassthroughInvocationHandler<T> implements InvocationHandler {
281    private T delegatee;
282
283    public PassthroughInvocationHandler(T delegatee) {
284      this.delegatee = delegatee;
285    }
286
287    @Override
288    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
289      return method.invoke(delegatee, args);
290    }
291  }
292
293  private class MonitorRunnable implements Runnable {
294    private boolean running = true;
295
296    @Override
297    public void run() {
298      while (running) {
299        try {
300          Thread.sleep(monitorInterval);
301          if (tasks.isFull()) {
302            purgeExpiredTasks();
303          }
304          warnStuckTasks();
305        } catch (InterruptedException e) {
306          running = false;
307        }
308      }
309    }
310  }
311
312  private interface TaskFilter {
313    enum TaskType {
314      GENERAL("general"),
315      HANDLER("handler"),
316      RPC("rpc"),
317      OPERATION("operation"),
318      ALL("all");
319
320      private final String type;
321
322      private TaskType(String type) {
323        this.type = type.toLowerCase();
324      }
325
326      static TaskType getTaskType(String type) {
327        if (type == null || type.isEmpty()) {
328          return ALL;
329        }
330        type = type.toLowerCase();
331        for (TaskType taskType : values()) {
332          if (taskType.toString().equals(type)) {
333            return taskType;
334          }
335        }
336        return ALL;
337      }
338
339      @Override
340      public String toString() {
341        return type;
342      }
343    }
344
345    /**
346     * Filter out unwanted task.
347     * @param task monitored task
348     * @return false if a task is accepted, true if it is filtered
349     */
350    boolean filter(MonitoredTask task);
351  }
352}