001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.monitoring; 020 021import java.io.PrintWriter; 022import java.lang.ref.WeakReference; 023import java.lang.reflect.InvocationHandler; 024import java.lang.reflect.Method; 025import java.lang.reflect.Proxy; 026import java.util.ArrayList; 027import java.util.Iterator; 028import java.util.List; 029 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.HBaseConfiguration; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 036import org.apache.hadoop.hbase.util.Threads; 037 038import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 039import org.apache.hbase.thirdparty.org.apache.commons.collections4.queue.CircularFifoQueue; 040 041/** 042 * Singleton which keeps track of tasks going on in this VM. 043 * A Task here is anything which takes more than a few seconds 044 * and the user might want to inquire about the status 045 */ 046@InterfaceAudience.Private 047public class TaskMonitor { 048 private static final Logger LOG = LoggerFactory.getLogger(TaskMonitor.class); 049 050 public static final String MAX_TASKS_KEY = "hbase.taskmonitor.max.tasks"; 051 public static final int DEFAULT_MAX_TASKS = 1000; 052 public static final String RPC_WARN_TIME_KEY = "hbase.taskmonitor.rpc.warn.time"; 053 public static final long DEFAULT_RPC_WARN_TIME = 0; 054 public static final String EXPIRATION_TIME_KEY = "hbase.taskmonitor.expiration.time"; 055 public static final long DEFAULT_EXPIRATION_TIME = 60*1000; 056 public static final String MONITOR_INTERVAL_KEY = "hbase.taskmonitor.monitor.interval"; 057 public static final long DEFAULT_MONITOR_INTERVAL = 10*1000; 058 059 private static TaskMonitor instance; 060 061 private final int maxTasks; 062 private final long rpcWarnTime; 063 private final long expirationTime; 064 private final CircularFifoQueue tasks; 065 private final List<TaskAndWeakRefPair> rpcTasks; 066 private final long monitorInterval; 067 private Thread monitorThread; 068 069 TaskMonitor(Configuration conf) { 070 maxTasks = conf.getInt(MAX_TASKS_KEY, DEFAULT_MAX_TASKS); 071 expirationTime = conf.getLong(EXPIRATION_TIME_KEY, DEFAULT_EXPIRATION_TIME); 072 rpcWarnTime = conf.getLong(RPC_WARN_TIME_KEY, DEFAULT_RPC_WARN_TIME); 073 tasks = new CircularFifoQueue(maxTasks); 074 rpcTasks = Lists.newArrayList(); 075 monitorInterval = conf.getLong(MONITOR_INTERVAL_KEY, DEFAULT_MONITOR_INTERVAL); 076 monitorThread = new Thread(new MonitorRunnable()); 077 Threads.setDaemonThreadRunning(monitorThread, "Monitor thread for TaskMonitor"); 078 } 079 080 /** 081 * Get singleton instance. 082 * TODO this would be better off scoped to a single daemon 083 */ 084 public static synchronized TaskMonitor get() { 085 if (instance == null) { 086 instance = new TaskMonitor(HBaseConfiguration.create()); 087 } 088 return instance; 089 } 090 091 public synchronized MonitoredTask createStatus(String description) { 092 MonitoredTask stat = new MonitoredTaskImpl(); 093 stat.setDescription(description); 094 MonitoredTask proxy = (MonitoredTask) Proxy.newProxyInstance( 095 stat.getClass().getClassLoader(), 096 new Class<?>[] { MonitoredTask.class }, 097 new PassthroughInvocationHandler<>(stat)); 098 TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy); 099 if (tasks.isFull()) { 100 purgeExpiredTasks(); 101 } 102 tasks.add(pair); 103 return proxy; 104 } 105 106 public synchronized MonitoredRPCHandler createRPCStatus(String description) { 107 MonitoredRPCHandler stat = new MonitoredRPCHandlerImpl(); 108 stat.setDescription(description); 109 MonitoredRPCHandler proxy = (MonitoredRPCHandler) Proxy.newProxyInstance( 110 stat.getClass().getClassLoader(), 111 new Class<?>[] { MonitoredRPCHandler.class }, 112 new PassthroughInvocationHandler<>(stat)); 113 TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy); 114 rpcTasks.add(pair); 115 return proxy; 116 } 117 118 private synchronized void warnStuckTasks() { 119 if (rpcWarnTime > 0) { 120 final long now = EnvironmentEdgeManager.currentTime(); 121 for (Iterator<TaskAndWeakRefPair> it = rpcTasks.iterator(); 122 it.hasNext();) { 123 TaskAndWeakRefPair pair = it.next(); 124 MonitoredTask stat = pair.get(); 125 if ((stat.getState() == MonitoredTaskImpl.State.RUNNING) && 126 (now >= stat.getWarnTime() + rpcWarnTime)) { 127 LOG.warn("Task may be stuck: " + stat); 128 stat.setWarnTime(now); 129 } 130 } 131 } 132 } 133 134 private synchronized void purgeExpiredTasks() { 135 for (Iterator<TaskAndWeakRefPair> it = tasks.iterator(); 136 it.hasNext();) { 137 TaskAndWeakRefPair pair = it.next(); 138 MonitoredTask stat = pair.get(); 139 140 if (pair.isDead()) { 141 // The class who constructed this leaked it. So we can 142 // assume it's done. 143 if (stat.getState() == MonitoredTaskImpl.State.RUNNING) { 144 LOG.warn("Status " + stat + " appears to have been leaked"); 145 stat.cleanup(); 146 } 147 } 148 149 if (canPurge(stat)) { 150 it.remove(); 151 } 152 } 153 } 154 155 /** 156 * Produces a list containing copies of the current state of all non-expired 157 * MonitoredTasks handled by this TaskMonitor. 158 * @return A complete list of MonitoredTasks. 159 */ 160 public List<MonitoredTask> getTasks() { 161 return getTasks(null); 162 } 163 164 /** 165 * Produces a list containing copies of the current state of all non-expired 166 * MonitoredTasks handled by this TaskMonitor. 167 * @param filter type of wanted tasks 168 * @return A filtered list of MonitoredTasks. 169 */ 170 public synchronized List<MonitoredTask> getTasks(String filter) { 171 purgeExpiredTasks(); 172 TaskFilter taskFilter = createTaskFilter(filter); 173 ArrayList<MonitoredTask> results = 174 Lists.newArrayListWithCapacity(tasks.size() + rpcTasks.size()); 175 processTasks(tasks, taskFilter, results); 176 processTasks(rpcTasks, taskFilter, results); 177 return results; 178 } 179 180 /** 181 * Create a task filter according to a given filter type. 182 * @param filter type of monitored task 183 * @return a task filter 184 */ 185 private static TaskFilter createTaskFilter(String filter) { 186 switch (TaskFilter.TaskType.getTaskType(filter)) { 187 case GENERAL: return task -> task instanceof MonitoredRPCHandler; 188 case HANDLER: return task -> !(task instanceof MonitoredRPCHandler); 189 case RPC: return task -> !(task instanceof MonitoredRPCHandler) || 190 !((MonitoredRPCHandler) task).isRPCRunning(); 191 case OPERATION: return task -> !(task instanceof MonitoredRPCHandler) || 192 !((MonitoredRPCHandler) task).isOperationRunning(); 193 default: return task -> false; 194 } 195 } 196 197 private static void processTasks(Iterable<TaskAndWeakRefPair> tasks, 198 TaskFilter filter, 199 List<MonitoredTask> results) { 200 for (TaskAndWeakRefPair task : tasks) { 201 MonitoredTask t = task.get(); 202 if (!filter.filter(t)) { 203 results.add(t.clone()); 204 } 205 } 206 } 207 208 private boolean canPurge(MonitoredTask stat) { 209 long cts = stat.getCompletionTimestamp(); 210 return (cts > 0 && EnvironmentEdgeManager.currentTime() - cts > expirationTime); 211 } 212 213 public void dumpAsText(PrintWriter out) { 214 long now = EnvironmentEdgeManager.currentTime(); 215 216 List<MonitoredTask> tasks = getTasks(); 217 for (MonitoredTask task : tasks) { 218 out.println("Task: " + task.getDescription()); 219 out.println("Status: " + task.getState() + ":" + task.getStatus()); 220 long running = (now - task.getStartTime())/1000; 221 if (task.getCompletionTimestamp() != -1) { 222 long completed = (now - task.getCompletionTimestamp()) / 1000; 223 out.println("Completed " + completed + "s ago"); 224 out.println("Ran for " + 225 (task.getCompletionTimestamp() - task.getStartTime())/1000 226 + "s"); 227 } else { 228 out.println("Running for " + running + "s"); 229 } 230 out.println(); 231 } 232 } 233 234 public synchronized void shutdown() { 235 if (this.monitorThread != null) { 236 monitorThread.interrupt(); 237 } 238 } 239 240 /** 241 * This class encapsulates an object as well as a weak reference to a proxy 242 * that passes through calls to that object. In art form: 243 * <pre> 244 * Proxy <------------------ 245 * | \ 246 * v \ 247 * PassthroughInvocationHandler | weak reference 248 * | / 249 * MonitoredTaskImpl / 250 * | / 251 * StatAndWeakRefProxy ------/ 252 * </pre> 253 * Since we only return the Proxy to the creator of the MonitorableStatus, 254 * this means that they can leak that object, and we'll detect it 255 * since our weak reference will go null. But, we still have the actual 256 * object, so we can log it and display it as a leaked (incomplete) action. 257 */ 258 private static class TaskAndWeakRefPair { 259 private MonitoredTask impl; 260 private WeakReference<MonitoredTask> weakProxy; 261 262 public TaskAndWeakRefPair(MonitoredTask stat, 263 MonitoredTask proxy) { 264 this.impl = stat; 265 this.weakProxy = new WeakReference<>(proxy); 266 } 267 268 public MonitoredTask get() { 269 return impl; 270 } 271 272 public boolean isDead() { 273 return weakProxy.get() == null; 274 } 275 } 276 277 /** 278 * An InvocationHandler that simply passes through calls to the original 279 * object. 280 */ 281 private static class PassthroughInvocationHandler<T> implements InvocationHandler { 282 private T delegatee; 283 284 public PassthroughInvocationHandler(T delegatee) { 285 this.delegatee = delegatee; 286 } 287 288 @Override 289 public Object invoke(Object proxy, Method method, Object[] args) 290 throws Throwable { 291 return method.invoke(delegatee, args); 292 } 293 } 294 295 private class MonitorRunnable implements Runnable { 296 private boolean running = true; 297 298 @Override 299 public void run() { 300 while (running) { 301 try { 302 Thread.sleep(monitorInterval); 303 if (tasks.isFull()) { 304 purgeExpiredTasks(); 305 } 306 warnStuckTasks(); 307 } catch (InterruptedException e) { 308 running = false; 309 } 310 } 311 } 312 } 313 314 private interface TaskFilter { 315 enum TaskType { 316 GENERAL("general"), 317 HANDLER("handler"), 318 RPC("rpc"), 319 OPERATION("operation"), 320 ALL("all"); 321 322 private final String type; 323 324 private TaskType(String type) { 325 this.type = type.toLowerCase(); 326 } 327 328 static TaskType getTaskType(String type) { 329 if (type == null || type.isEmpty()) { 330 return ALL; 331 } 332 type = type.toLowerCase(); 333 for (TaskType taskType : values()) { 334 if (taskType.toString().equals(type)) { 335 return taskType; 336 } 337 } 338 return ALL; 339 } 340 341 @Override 342 public String toString() { 343 return type; 344 } 345 } 346 347 /** 348 * Filter out unwanted task. 349 * @param task monitored task 350 * @return false if a task is accepted, true if it is filtered 351 */ 352 boolean filter(MonitoredTask task); 353 } 354}