001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.monitoring;
019
020import java.io.PrintWriter;
021import java.lang.ref.WeakReference;
022import java.lang.reflect.InvocationHandler;
023import java.lang.reflect.Method;
024import java.lang.reflect.Proxy;
025import java.util.ArrayList;
026import java.util.Iterator;
027import java.util.List;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
031import org.apache.hadoop.hbase.util.Threads;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
037import org.apache.hbase.thirdparty.org.apache.commons.collections4.queue.CircularFifoQueue;
038
039/**
040 * Singleton which keeps track of tasks going on in this VM. A Task here is anything which takes
041 * more than a few seconds and the user might want to inquire about the status
042 */
043@InterfaceAudience.Private
044public class TaskMonitor {
045  private static final Logger LOG = LoggerFactory.getLogger(TaskMonitor.class);
046
047  public static final String MAX_TASKS_KEY = "hbase.taskmonitor.max.tasks";
048  public static final int DEFAULT_MAX_TASKS = 1000;
049  public static final String RPC_WARN_TIME_KEY = "hbase.taskmonitor.rpc.warn.time";
050  public static final long DEFAULT_RPC_WARN_TIME = 0;
051  public static final String EXPIRATION_TIME_KEY = "hbase.taskmonitor.expiration.time";
052  public static final long DEFAULT_EXPIRATION_TIME = 60 * 1000;
053  public static final String MONITOR_INTERVAL_KEY = "hbase.taskmonitor.monitor.interval";
054  public static final long DEFAULT_MONITOR_INTERVAL = 10 * 1000;
055
056  private static TaskMonitor instance;
057
058  private final int maxTasks;
059  private final long rpcWarnTime;
060  private final long expirationTime;
061  private final CircularFifoQueue<TaskAndWeakRefPair> tasks;
062  private final List<TaskAndWeakRefPair> rpcTasks;
063  private final long monitorInterval;
064  private Thread monitorThread;
065
066  TaskMonitor(Configuration conf) {
067    maxTasks = conf.getInt(MAX_TASKS_KEY, DEFAULT_MAX_TASKS);
068    expirationTime = conf.getLong(EXPIRATION_TIME_KEY, DEFAULT_EXPIRATION_TIME);
069    rpcWarnTime = conf.getLong(RPC_WARN_TIME_KEY, DEFAULT_RPC_WARN_TIME);
070    tasks = new CircularFifoQueue<>(maxTasks);
071    rpcTasks = Lists.newArrayList();
072    monitorInterval = conf.getLong(MONITOR_INTERVAL_KEY, DEFAULT_MONITOR_INTERVAL);
073    monitorThread = new Thread(new MonitorRunnable());
074    Threads.setDaemonThreadRunning(monitorThread, "Monitor thread for TaskMonitor");
075  }
076
077  /**
078   * Get singleton instance. TODO this would be better off scoped to a single daemon
079   */
080  public static synchronized TaskMonitor get() {
081    if (instance == null) {
082      instance = new TaskMonitor(HBaseConfiguration.create());
083    }
084    return instance;
085  }
086
087  public MonitoredTask createStatus(String description) {
088    return createStatus(description, false);
089  }
090
091  public MonitoredTask createStatus(String description, boolean ignore) {
092    return createStatus(description, ignore, false);
093  }
094
095  /**
096   * Create a monitored task for users to inquire about the status
097   * @param description   description of the status
098   * @param ignore        whether to ignore to track(e.g. show/clear/expire) the task in the
099   *                      {@link TaskMonitor}
100   * @param enableJournal enable when the task contains some stage journals
101   * @return a monitored task
102   */
103  public synchronized MonitoredTask createStatus(String description, boolean ignore,
104    boolean enableJournal) {
105    MonitoredTask stat = new MonitoredTaskImpl(enableJournal, description);
106    MonitoredTask proxy = (MonitoredTask) Proxy.newProxyInstance(stat.getClass().getClassLoader(),
107      new Class<?>[] { MonitoredTask.class }, new PassthroughInvocationHandler<>(stat));
108    TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy);
109    if (tasks.isFull()) {
110      purgeExpiredTasks();
111    }
112    if (!ignore) {
113      tasks.add(pair);
114    }
115    return proxy;
116  }
117
118  /**
119   * Create a task group which contains a series of monitored tasks for users to inquire about the
120   * status
121   * @param ignoreSubTasksInTaskMonitor whether to ignore to track(e.g. show/clear/expire) the task
122   *                                    in the {@link TaskMonitor}
123   * @param description                 description of the status
124   * @return a group of monitored tasks
125   */
126  public static TaskGroup createTaskGroup(boolean ignoreSubTasksInTaskMonitor, String description) {
127    return new TaskGroup(ignoreSubTasksInTaskMonitor, description);
128  }
129
130  public synchronized MonitoredRPCHandler createRPCStatus(String description) {
131    MonitoredRPCHandler stat = new MonitoredRPCHandlerImpl(description);
132    MonitoredRPCHandler proxy =
133      (MonitoredRPCHandler) Proxy.newProxyInstance(stat.getClass().getClassLoader(),
134        new Class<?>[] { MonitoredRPCHandler.class }, new PassthroughInvocationHandler<>(stat));
135    TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy);
136    rpcTasks.add(pair);
137    return proxy;
138  }
139
140  private synchronized void warnStuckTasks() {
141    if (rpcWarnTime > 0) {
142      final long now = EnvironmentEdgeManager.currentTime();
143      for (Iterator<TaskAndWeakRefPair> it = rpcTasks.iterator(); it.hasNext();) {
144        TaskAndWeakRefPair pair = it.next();
145        MonitoredTask stat = pair.get();
146        if (
147          (stat.getState() == MonitoredTaskImpl.State.RUNNING)
148            && (now >= stat.getWarnTime() + rpcWarnTime)
149        ) {
150          LOG.warn("Task may be stuck: " + stat);
151          stat.setWarnTime(now);
152        }
153      }
154    }
155  }
156
157  private synchronized void purgeExpiredTasks() {
158    for (Iterator<TaskAndWeakRefPair> it = tasks.iterator(); it.hasNext();) {
159      TaskAndWeakRefPair pair = it.next();
160      MonitoredTask stat = pair.get();
161
162      if (pair.isDead()) {
163        // The class who constructed this leaked it. So we can
164        // assume it's done.
165        if (stat.getState() == MonitoredTaskImpl.State.RUNNING) {
166          LOG.warn("Status " + stat + " appears to have been leaked");
167          stat.cleanup();
168        }
169      }
170
171      if (canPurge(stat)) {
172        it.remove();
173      }
174    }
175  }
176
177  /**
178   * Produces a list containing copies of the current state of all non-expired MonitoredTasks
179   * handled by this TaskMonitor.
180   * @return A complete list of MonitoredTasks.
181   */
182  public List<MonitoredTask> getTasks() {
183    return getTasks(null);
184  }
185
186  /**
187   * Produces a list containing copies of the current state of all non-expired MonitoredTasks
188   * handled by this TaskMonitor.
189   * @param filter type of wanted tasks
190   * @return A filtered list of MonitoredTasks.
191   */
192  public synchronized List<MonitoredTask> getTasks(String filter) {
193    purgeExpiredTasks();
194    TaskFilter taskFilter = createTaskFilter(filter);
195    ArrayList<MonitoredTask> results =
196      Lists.newArrayListWithCapacity(tasks.size() + rpcTasks.size());
197    processTasks(tasks, taskFilter, results);
198    processTasks(rpcTasks, taskFilter, results);
199    return results;
200  }
201
202  /**
203   * Create a task filter according to a given filter type.
204   * @param filter type of monitored task
205   * @return a task filter
206   */
207  private static TaskFilter createTaskFilter(String filter) {
208    switch (TaskFilter.TaskType.getTaskType(filter)) {
209      case GENERAL:
210        return task -> task instanceof MonitoredRPCHandler;
211      case HANDLER:
212        return task -> !(task instanceof MonitoredRPCHandler);
213      case RPC:
214        return task -> !(task instanceof MonitoredRPCHandler)
215          || !((MonitoredRPCHandler) task).isRPCRunning();
216      case OPERATION:
217        return task -> !(task instanceof MonitoredRPCHandler)
218          || !((MonitoredRPCHandler) task).isOperationRunning();
219      default:
220        return task -> false;
221    }
222  }
223
224  private static void processTasks(Iterable<TaskAndWeakRefPair> tasks, TaskFilter filter,
225    List<MonitoredTask> results) {
226    for (TaskAndWeakRefPair task : tasks) {
227      MonitoredTask t = task.get();
228      if (!filter.filter(t)) {
229        results.add(t.clone());
230      }
231    }
232  }
233
234  private boolean canPurge(MonitoredTask stat) {
235    long cts = stat.getCompletionTimestamp();
236    return (cts > 0 && EnvironmentEdgeManager.currentTime() - cts > expirationTime);
237  }
238
239  public void dumpAsText(PrintWriter out) {
240    long now = EnvironmentEdgeManager.currentTime();
241
242    List<MonitoredTask> tasks = getTasks();
243    for (MonitoredTask task : tasks) {
244      out.println("Task: " + task.getDescription());
245      out.println("Status: " + task.getState() + ":" + task.getStatus());
246      long running = (now - task.getStartTime()) / 1000;
247      if (task.getCompletionTimestamp() != -1) {
248        long completed = (now - task.getCompletionTimestamp()) / 1000;
249        out.println("Completed " + completed + "s ago");
250        out
251          .println("Ran for " + (task.getCompletionTimestamp() - task.getStartTime()) / 1000 + "s");
252      } else {
253        out.println("Running for " + running + "s");
254      }
255      out.println();
256    }
257  }
258
259  public synchronized void shutdown() {
260    if (this.monitorThread != null) {
261      monitorThread.interrupt();
262    }
263  }
264
265  /**
266   * This class encapsulates an object as well as a weak reference to a proxy that passes through
267   * calls to that object. In art form:
268   *
269   * <pre>
270   *     Proxy  <------------------
271   *       |                       \
272   *       v                        \
273   * PassthroughInvocationHandler   |  weak reference
274   *       |                       /
275   * MonitoredTaskImpl            /
276   *       |                     /
277   * StatAndWeakRefProxy  ------/
278   * </pre>
279   *
280   * Since we only return the Proxy to the creator of the MonitorableStatus, this means that they
281   * can leak that object, and we'll detect it since our weak reference will go null. But, we still
282   * have the actual object, so we can log it and display it as a leaked (incomplete) action.
283   */
284  private static class TaskAndWeakRefPair {
285    private MonitoredTask impl;
286    private WeakReference<MonitoredTask> weakProxy;
287
288    public TaskAndWeakRefPair(MonitoredTask stat, MonitoredTask proxy) {
289      this.impl = stat;
290      this.weakProxy = new WeakReference<>(proxy);
291    }
292
293    public MonitoredTask get() {
294      return impl;
295    }
296
297    public boolean isDead() {
298      return weakProxy.get() == null;
299    }
300  }
301
302  /**
303   * An InvocationHandler that simply passes through calls to the original object.
304   */
305  private static class PassthroughInvocationHandler<T> implements InvocationHandler {
306    private T delegatee;
307
308    public PassthroughInvocationHandler(T delegatee) {
309      this.delegatee = delegatee;
310    }
311
312    @Override
313    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
314      return method.invoke(delegatee, args);
315    }
316  }
317
318  private class MonitorRunnable implements Runnable {
319    private boolean running = true;
320
321    @Override
322    public void run() {
323      while (running) {
324        try {
325          Thread.sleep(monitorInterval);
326          if (tasks.isFull()) {
327            purgeExpiredTasks();
328          }
329          warnStuckTasks();
330        } catch (InterruptedException e) {
331          running = false;
332        }
333      }
334    }
335  }
336
337  private interface TaskFilter {
338    enum TaskType {
339      GENERAL("general"),
340      HANDLER("handler"),
341      RPC("rpc"),
342      OPERATION("operation"),
343      ALL("all");
344
345      private final String type;
346
347      private TaskType(String type) {
348        this.type = type.toLowerCase();
349      }
350
351      static TaskType getTaskType(String type) {
352        if (type == null || type.isEmpty()) {
353          return ALL;
354        }
355        type = type.toLowerCase();
356        for (TaskType taskType : values()) {
357          if (taskType.toString().equals(type)) {
358            return taskType;
359          }
360        }
361        return ALL;
362      }
363
364      @Override
365      public String toString() {
366        return type;
367      }
368    }
369
370    /**
371     * Filter out unwanted task.
372     * @param task monitored task
373     * @return false if a task is accepted, true if it is filtered
374     */
375    boolean filter(MonitoredTask task);
376  }
377}