001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.monitoring; 019 020import java.io.PrintWriter; 021import java.lang.ref.WeakReference; 022import java.lang.reflect.InvocationHandler; 023import java.lang.reflect.Method; 024import java.lang.reflect.Proxy; 025import java.util.ArrayList; 026import java.util.Iterator; 027import java.util.List; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 031import org.apache.hadoop.hbase.util.Threads; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 037import org.apache.hbase.thirdparty.org.apache.commons.collections4.queue.CircularFifoQueue; 038 039/** 040 * Singleton which keeps track of tasks going on in this VM. A Task here is anything which takes 041 * more than a few seconds and the user might want to inquire about the status 042 */ 043@InterfaceAudience.Private 044public class TaskMonitor { 045 private static final Logger LOG = LoggerFactory.getLogger(TaskMonitor.class); 046 047 public static final String MAX_TASKS_KEY = "hbase.taskmonitor.max.tasks"; 048 public static final int DEFAULT_MAX_TASKS = 1000; 049 public static final String RPC_WARN_TIME_KEY = "hbase.taskmonitor.rpc.warn.time"; 050 public static final long DEFAULT_RPC_WARN_TIME = 0; 051 public static final String EXPIRATION_TIME_KEY = "hbase.taskmonitor.expiration.time"; 052 public static final long DEFAULT_EXPIRATION_TIME = 60 * 1000; 053 public static final String MONITOR_INTERVAL_KEY = "hbase.taskmonitor.monitor.interval"; 054 public static final long DEFAULT_MONITOR_INTERVAL = 10 * 1000; 055 056 private static TaskMonitor instance; 057 058 private final int maxTasks; 059 private final long rpcWarnTime; 060 private final long expirationTime; 061 private final CircularFifoQueue<TaskAndWeakRefPair> tasks; 062 private final List<TaskAndWeakRefPair> rpcTasks; 063 private final long monitorInterval; 064 private Thread monitorThread; 065 066 TaskMonitor(Configuration conf) { 067 maxTasks = conf.getInt(MAX_TASKS_KEY, DEFAULT_MAX_TASKS); 068 expirationTime = conf.getLong(EXPIRATION_TIME_KEY, DEFAULT_EXPIRATION_TIME); 069 rpcWarnTime = conf.getLong(RPC_WARN_TIME_KEY, DEFAULT_RPC_WARN_TIME); 070 tasks = new CircularFifoQueue<>(maxTasks); 071 rpcTasks = Lists.newArrayList(); 072 monitorInterval = conf.getLong(MONITOR_INTERVAL_KEY, DEFAULT_MONITOR_INTERVAL); 073 monitorThread = new Thread(new MonitorRunnable()); 074 Threads.setDaemonThreadRunning(monitorThread, "Monitor thread for TaskMonitor"); 075 } 076 077 /** 078 * Get singleton instance. TODO this would be better off scoped to a single daemon 079 */ 080 public static synchronized TaskMonitor get() { 081 if (instance == null) { 082 instance = new TaskMonitor(HBaseConfiguration.create()); 083 } 084 return instance; 085 } 086 087 public MonitoredTask createStatus(String description) { 088 return createStatus(description, false); 089 } 090 091 public synchronized MonitoredTask createStatus(String description, boolean enableJournal) { 092 MonitoredTask stat = new MonitoredTaskImpl(enableJournal); 093 stat.setDescription(description); 094 MonitoredTask proxy = (MonitoredTask) Proxy.newProxyInstance(stat.getClass().getClassLoader(), 095 new Class<?>[] { MonitoredTask.class }, new PassthroughInvocationHandler<>(stat)); 096 TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy); 097 if (tasks.isFull()) { 098 purgeExpiredTasks(); 099 } 100 tasks.add(pair); 101 return proxy; 102 } 103 104 public synchronized MonitoredRPCHandler createRPCStatus(String description) { 105 MonitoredRPCHandler stat = new MonitoredRPCHandlerImpl(); 106 stat.setDescription(description); 107 MonitoredRPCHandler proxy = 108 (MonitoredRPCHandler) Proxy.newProxyInstance(stat.getClass().getClassLoader(), 109 new Class<?>[] { MonitoredRPCHandler.class }, new PassthroughInvocationHandler<>(stat)); 110 TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy); 111 rpcTasks.add(pair); 112 return proxy; 113 } 114 115 private synchronized void warnStuckTasks() { 116 if (rpcWarnTime > 0) { 117 final long now = EnvironmentEdgeManager.currentTime(); 118 for (Iterator<TaskAndWeakRefPair> it = rpcTasks.iterator(); it.hasNext();) { 119 TaskAndWeakRefPair pair = it.next(); 120 MonitoredTask stat = pair.get(); 121 if ( 122 (stat.getState() == MonitoredTaskImpl.State.RUNNING) 123 && (now >= stat.getWarnTime() + rpcWarnTime) 124 ) { 125 LOG.warn("Task may be stuck: " + stat); 126 stat.setWarnTime(now); 127 } 128 } 129 } 130 } 131 132 private synchronized void purgeExpiredTasks() { 133 for (Iterator<TaskAndWeakRefPair> it = tasks.iterator(); it.hasNext();) { 134 TaskAndWeakRefPair pair = it.next(); 135 MonitoredTask stat = pair.get(); 136 137 if (pair.isDead()) { 138 // The class who constructed this leaked it. So we can 139 // assume it's done. 140 if (stat.getState() == MonitoredTaskImpl.State.RUNNING) { 141 LOG.warn("Status " + stat + " appears to have been leaked"); 142 stat.cleanup(); 143 } 144 } 145 146 if (canPurge(stat)) { 147 it.remove(); 148 } 149 } 150 } 151 152 /** 153 * Produces a list containing copies of the current state of all non-expired MonitoredTasks 154 * handled by this TaskMonitor. 155 * @return A complete list of MonitoredTasks. 156 */ 157 public List<MonitoredTask> getTasks() { 158 return getTasks(null); 159 } 160 161 /** 162 * Produces a list containing copies of the current state of all non-expired MonitoredTasks 163 * handled by this TaskMonitor. 164 * @param filter type of wanted tasks 165 * @return A filtered list of MonitoredTasks. 166 */ 167 public synchronized List<MonitoredTask> getTasks(String filter) { 168 purgeExpiredTasks(); 169 TaskFilter taskFilter = createTaskFilter(filter); 170 ArrayList<MonitoredTask> results = 171 Lists.newArrayListWithCapacity(tasks.size() + rpcTasks.size()); 172 processTasks(tasks, taskFilter, results); 173 processTasks(rpcTasks, taskFilter, results); 174 return results; 175 } 176 177 /** 178 * Create a task filter according to a given filter type. 179 * @param filter type of monitored task 180 * @return a task filter 181 */ 182 private static TaskFilter createTaskFilter(String filter) { 183 switch (TaskFilter.TaskType.getTaskType(filter)) { 184 case GENERAL: 185 return task -> task instanceof MonitoredRPCHandler; 186 case HANDLER: 187 return task -> !(task instanceof MonitoredRPCHandler); 188 case RPC: 189 return task -> !(task instanceof MonitoredRPCHandler) 190 || !((MonitoredRPCHandler) task).isRPCRunning(); 191 case OPERATION: 192 return task -> !(task instanceof MonitoredRPCHandler) 193 || !((MonitoredRPCHandler) task).isOperationRunning(); 194 default: 195 return task -> false; 196 } 197 } 198 199 private static void processTasks(Iterable<TaskAndWeakRefPair> tasks, TaskFilter filter, 200 List<MonitoredTask> results) { 201 for (TaskAndWeakRefPair task : tasks) { 202 MonitoredTask t = task.get(); 203 if (!filter.filter(t)) { 204 results.add(t.clone()); 205 } 206 } 207 } 208 209 private boolean canPurge(MonitoredTask stat) { 210 long cts = stat.getCompletionTimestamp(); 211 return (cts > 0 && EnvironmentEdgeManager.currentTime() - cts > expirationTime); 212 } 213 214 public void dumpAsText(PrintWriter out) { 215 long now = EnvironmentEdgeManager.currentTime(); 216 217 List<MonitoredTask> tasks = getTasks(); 218 for (MonitoredTask task : tasks) { 219 out.println("Task: " + task.getDescription()); 220 out.println("Status: " + task.getState() + ":" + task.getStatus()); 221 long running = (now - task.getStartTime()) / 1000; 222 if (task.getCompletionTimestamp() != -1) { 223 long completed = (now - task.getCompletionTimestamp()) / 1000; 224 out.println("Completed " + completed + "s ago"); 225 out 226 .println("Ran for " + (task.getCompletionTimestamp() - task.getStartTime()) / 1000 + "s"); 227 } else { 228 out.println("Running for " + running + "s"); 229 } 230 out.println(); 231 } 232 } 233 234 public synchronized void shutdown() { 235 if (this.monitorThread != null) { 236 monitorThread.interrupt(); 237 } 238 } 239 240 /** 241 * This class encapsulates an object as well as a weak reference to a proxy that passes through 242 * calls to that object. In art form: 243 * 244 * <pre> 245 * Proxy <------------------ 246 * | \ 247 * v \ 248 * PassthroughInvocationHandler | weak reference 249 * | / 250 * MonitoredTaskImpl / 251 * | / 252 * StatAndWeakRefProxy ------/ 253 * </pre> 254 * 255 * Since we only return the Proxy to the creator of the MonitorableStatus, this means that they 256 * can leak that object, and we'll detect it since our weak reference will go null. But, we still 257 * have the actual object, so we can log it and display it as a leaked (incomplete) action. 258 */ 259 private static class TaskAndWeakRefPair { 260 private MonitoredTask impl; 261 private WeakReference<MonitoredTask> weakProxy; 262 263 public TaskAndWeakRefPair(MonitoredTask stat, MonitoredTask proxy) { 264 this.impl = stat; 265 this.weakProxy = new WeakReference<>(proxy); 266 } 267 268 public MonitoredTask get() { 269 return impl; 270 } 271 272 public boolean isDead() { 273 return weakProxy.get() == null; 274 } 275 } 276 277 /** 278 * An InvocationHandler that simply passes through calls to the original object. 279 */ 280 private static class PassthroughInvocationHandler<T> implements InvocationHandler { 281 private T delegatee; 282 283 public PassthroughInvocationHandler(T delegatee) { 284 this.delegatee = delegatee; 285 } 286 287 @Override 288 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 289 return method.invoke(delegatee, args); 290 } 291 } 292 293 private class MonitorRunnable implements Runnable { 294 private boolean running = true; 295 296 @Override 297 public void run() { 298 while (running) { 299 try { 300 Thread.sleep(monitorInterval); 301 if (tasks.isFull()) { 302 purgeExpiredTasks(); 303 } 304 warnStuckTasks(); 305 } catch (InterruptedException e) { 306 running = false; 307 } 308 } 309 } 310 } 311 312 private interface TaskFilter { 313 enum TaskType { 314 GENERAL("general"), 315 HANDLER("handler"), 316 RPC("rpc"), 317 OPERATION("operation"), 318 ALL("all"); 319 320 private final String type; 321 322 private TaskType(String type) { 323 this.type = type.toLowerCase(); 324 } 325 326 static TaskType getTaskType(String type) { 327 if (type == null || type.isEmpty()) { 328 return ALL; 329 } 330 type = type.toLowerCase(); 331 for (TaskType taskType : values()) { 332 if (taskType.toString().equals(type)) { 333 return taskType; 334 } 335 } 336 return ALL; 337 } 338 339 @Override 340 public String toString() { 341 return type; 342 } 343 } 344 345 /** 346 * Filter out unwanted task. 347 * @param task monitored task 348 * @return false if a task is accepted, true if it is filtered 349 */ 350 boolean filter(MonitoredTask task); 351 } 352}