001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.monitoring;
019
020import java.io.PrintWriter;
021import java.lang.ref.WeakReference;
022import java.lang.reflect.InvocationHandler;
023import java.lang.reflect.Method;
024import java.lang.reflect.Proxy;
025import java.util.ArrayList;
026import java.util.Iterator;
027import java.util.List;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
031import org.apache.hadoop.hbase.util.Threads;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
037import org.apache.hbase.thirdparty.org.apache.commons.collections4.queue.CircularFifoQueue;
038
039/**
040 * Singleton which keeps track of tasks going on in this VM. A Task here is anything which takes
041 * more than a few seconds and the user might want to inquire about the status
042 */
043@InterfaceAudience.Private
044public class TaskMonitor {
045  private static final Logger LOG = LoggerFactory.getLogger(TaskMonitor.class);
046
047  public static final String MAX_TASKS_KEY = "hbase.taskmonitor.max.tasks";
048  public static final int DEFAULT_MAX_TASKS = 1000;
049  public static final String RPC_WARN_TIME_KEY = "hbase.taskmonitor.rpc.warn.time";
050  public static final long DEFAULT_RPC_WARN_TIME = 0;
051  public static final String EXPIRATION_TIME_KEY = "hbase.taskmonitor.expiration.time";
052  public static final long DEFAULT_EXPIRATION_TIME = 60 * 1000;
053  public static final String MONITOR_INTERVAL_KEY = "hbase.taskmonitor.monitor.interval";
054  public static final long DEFAULT_MONITOR_INTERVAL = 10 * 1000;
055
056  private static TaskMonitor instance;
057
058  private final int maxTasks;
059  private final long rpcWarnTime;
060  private final long expirationTime;
061  private final CircularFifoQueue<TaskAndWeakRefPair> tasks;
062  private final List<TaskAndWeakRefPair> rpcTasks;
063  private final long monitorInterval;
064  private Thread monitorThread;
065
066  TaskMonitor(Configuration conf) {
067    maxTasks = conf.getInt(MAX_TASKS_KEY, DEFAULT_MAX_TASKS);
068    expirationTime = conf.getLong(EXPIRATION_TIME_KEY, DEFAULT_EXPIRATION_TIME);
069    rpcWarnTime = conf.getLong(RPC_WARN_TIME_KEY, DEFAULT_RPC_WARN_TIME);
070    tasks = new CircularFifoQueue<>(maxTasks);
071    rpcTasks = Lists.newArrayList();
072    monitorInterval = conf.getLong(MONITOR_INTERVAL_KEY, DEFAULT_MONITOR_INTERVAL);
073    monitorThread = new Thread(new MonitorRunnable());
074    Threads.setDaemonThreadRunning(monitorThread, "Monitor thread for TaskMonitor");
075  }
076
077  /**
078   * Get singleton instance. TODO this would be better off scoped to a single daemon
079   */
080  public static synchronized TaskMonitor get() {
081    if (instance == null) {
082      instance = new TaskMonitor(HBaseConfiguration.create());
083    }
084    return instance;
085  }
086
087  public MonitoredTask createStatus(String description) {
088    return createStatus(description, false);
089  }
090
091  public MonitoredTask createStatus(String description, boolean ignore) {
092    return createStatus(description, ignore, false);
093  }
094
095  public synchronized MonitoredTask createStatus(String description, boolean ignore,
096    boolean enableJournal) {
097    MonitoredTask stat = new MonitoredTaskImpl(enableJournal, description);
098    MonitoredTask proxy = (MonitoredTask) Proxy.newProxyInstance(stat.getClass().getClassLoader(),
099      new Class<?>[] { MonitoredTask.class }, new PassthroughInvocationHandler<>(stat));
100    TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy);
101    if (tasks.isFull()) {
102      purgeExpiredTasks();
103    }
104    if (!ignore) {
105      tasks.add(pair);
106    }
107    return proxy;
108  }
109
110  public synchronized MonitoredRPCHandler createRPCStatus(String description) {
111    MonitoredRPCHandler stat = new MonitoredRPCHandlerImpl(description);
112    MonitoredRPCHandler proxy =
113      (MonitoredRPCHandler) Proxy.newProxyInstance(stat.getClass().getClassLoader(),
114        new Class<?>[] { MonitoredRPCHandler.class }, new PassthroughInvocationHandler<>(stat));
115    TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy);
116    rpcTasks.add(pair);
117    return proxy;
118  }
119
120  private synchronized void warnStuckTasks() {
121    if (rpcWarnTime > 0) {
122      final long now = EnvironmentEdgeManager.currentTime();
123      for (Iterator<TaskAndWeakRefPair> it = rpcTasks.iterator(); it.hasNext();) {
124        TaskAndWeakRefPair pair = it.next();
125        MonitoredTask stat = pair.get();
126        if (
127          (stat.getState() == MonitoredTaskImpl.State.RUNNING)
128            && (now >= stat.getWarnTime() + rpcWarnTime)
129        ) {
130          LOG.warn("Task may be stuck: " + stat);
131          stat.setWarnTime(now);
132        }
133      }
134    }
135  }
136
137  private synchronized void purgeExpiredTasks() {
138    for (Iterator<TaskAndWeakRefPair> it = tasks.iterator(); it.hasNext();) {
139      TaskAndWeakRefPair pair = it.next();
140      MonitoredTask stat = pair.get();
141
142      if (pair.isDead()) {
143        // The class who constructed this leaked it. So we can
144        // assume it's done.
145        if (stat.getState() == MonitoredTaskImpl.State.RUNNING) {
146          LOG.warn("Status " + stat + " appears to have been leaked");
147          stat.cleanup();
148        }
149      }
150
151      if (canPurge(stat)) {
152        it.remove();
153      }
154    }
155  }
156
157  /**
158   * Produces a list containing copies of the current state of all non-expired MonitoredTasks
159   * handled by this TaskMonitor.
160   * @return A complete list of MonitoredTasks.
161   */
162  public List<MonitoredTask> getTasks() {
163    return getTasks(null);
164  }
165
166  /**
167   * Produces a list containing copies of the current state of all non-expired MonitoredTasks
168   * handled by this TaskMonitor.
169   * @param filter type of wanted tasks
170   * @return A filtered list of MonitoredTasks.
171   */
172  public synchronized List<MonitoredTask> getTasks(String filter) {
173    purgeExpiredTasks();
174    TaskFilter taskFilter = createTaskFilter(filter);
175    ArrayList<MonitoredTask> results =
176      Lists.newArrayListWithCapacity(tasks.size() + rpcTasks.size());
177    processTasks(tasks, taskFilter, results);
178    processTasks(rpcTasks, taskFilter, results);
179    return results;
180  }
181
182  /**
183   * Create a task filter according to a given filter type.
184   * @param filter type of monitored task
185   * @return a task filter
186   */
187  private static TaskFilter createTaskFilter(String filter) {
188    switch (TaskFilter.TaskType.getTaskType(filter)) {
189      case GENERAL:
190        return task -> task instanceof MonitoredRPCHandler;
191      case HANDLER:
192        return task -> !(task instanceof MonitoredRPCHandler);
193      case RPC:
194        return task -> !(task instanceof MonitoredRPCHandler)
195          || !((MonitoredRPCHandler) task).isRPCRunning();
196      case OPERATION:
197        return task -> !(task instanceof MonitoredRPCHandler)
198          || !((MonitoredRPCHandler) task).isOperationRunning();
199      default:
200        return task -> false;
201    }
202  }
203
204  private static void processTasks(Iterable<TaskAndWeakRefPair> tasks, TaskFilter filter,
205    List<MonitoredTask> results) {
206    for (TaskAndWeakRefPair task : tasks) {
207      MonitoredTask t = task.get();
208      if (!filter.filter(t)) {
209        results.add(t.clone());
210      }
211    }
212  }
213
214  private boolean canPurge(MonitoredTask stat) {
215    long cts = stat.getCompletionTimestamp();
216    return (cts > 0 && EnvironmentEdgeManager.currentTime() - cts > expirationTime);
217  }
218
219  public void dumpAsText(PrintWriter out) {
220    long now = EnvironmentEdgeManager.currentTime();
221
222    List<MonitoredTask> tasks = getTasks();
223    for (MonitoredTask task : tasks) {
224      out.println("Task: " + task.getDescription());
225      out.println("Status: " + task.getState() + ":" + task.getStatus());
226      long running = (now - task.getStartTime()) / 1000;
227      if (task.getCompletionTimestamp() != -1) {
228        long completed = (now - task.getCompletionTimestamp()) / 1000;
229        out.println("Completed " + completed + "s ago");
230        out
231          .println("Ran for " + (task.getCompletionTimestamp() - task.getStartTime()) / 1000 + "s");
232      } else {
233        out.println("Running for " + running + "s");
234      }
235      out.println();
236    }
237  }
238
239  public synchronized void shutdown() {
240    if (this.monitorThread != null) {
241      monitorThread.interrupt();
242    }
243  }
244
245  /**
246   * This class encapsulates an object as well as a weak reference to a proxy that passes through
247   * calls to that object. In art form:
248   *
249   * <pre>
250   *     Proxy  <------------------
251   *       |                       \
252   *       v                        \
253   * PassthroughInvocationHandler   |  weak reference
254   *       |                       /
255   * MonitoredTaskImpl            /
256   *       |                     /
257   * StatAndWeakRefProxy  ------/
258   * </pre>
259   *
260   * Since we only return the Proxy to the creator of the MonitorableStatus, this means that they
261   * can leak that object, and we'll detect it since our weak reference will go null. But, we still
262   * have the actual object, so we can log it and display it as a leaked (incomplete) action.
263   */
264  private static class TaskAndWeakRefPair {
265    private MonitoredTask impl;
266    private WeakReference<MonitoredTask> weakProxy;
267
268    public TaskAndWeakRefPair(MonitoredTask stat, MonitoredTask proxy) {
269      this.impl = stat;
270      this.weakProxy = new WeakReference<>(proxy);
271    }
272
273    public MonitoredTask get() {
274      return impl;
275    }
276
277    public boolean isDead() {
278      return weakProxy.get() == null;
279    }
280  }
281
282  /**
283   * An InvocationHandler that simply passes through calls to the original object.
284   */
285  private static class PassthroughInvocationHandler<T> implements InvocationHandler {
286    private T delegatee;
287
288    public PassthroughInvocationHandler(T delegatee) {
289      this.delegatee = delegatee;
290    }
291
292    @Override
293    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
294      return method.invoke(delegatee, args);
295    }
296  }
297
298  private class MonitorRunnable implements Runnable {
299    private boolean running = true;
300
301    @Override
302    public void run() {
303      while (running) {
304        try {
305          Thread.sleep(monitorInterval);
306          if (tasks.isFull()) {
307            purgeExpiredTasks();
308          }
309          warnStuckTasks();
310        } catch (InterruptedException e) {
311          running = false;
312        }
313      }
314    }
315  }
316
317  private interface TaskFilter {
318    enum TaskType {
319      GENERAL("general"),
320      HANDLER("handler"),
321      RPC("rpc"),
322      OPERATION("operation"),
323      ALL("all");
324
325      private final String type;
326
327      private TaskType(String type) {
328        this.type = type.toLowerCase();
329      }
330
331      static TaskType getTaskType(String type) {
332        if (type == null || type.isEmpty()) {
333          return ALL;
334        }
335        type = type.toLowerCase();
336        for (TaskType taskType : values()) {
337          if (taskType.toString().equals(type)) {
338            return taskType;
339          }
340        }
341        return ALL;
342      }
343
344      @Override
345      public String toString() {
346        return type;
347      }
348    }
349
350    /**
351     * Filter out unwanted task.
352     * @param task monitored task
353     * @return false if a task is accepted, true if it is filtered
354     */
355    boolean filter(MonitoredTask task);
356  }
357}