001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.monitoring; 019 020import java.io.PrintWriter; 021import java.lang.ref.WeakReference; 022import java.lang.reflect.InvocationHandler; 023import java.lang.reflect.Method; 024import java.lang.reflect.Proxy; 025import java.util.ArrayList; 026import java.util.Iterator; 027import java.util.List; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 031import org.apache.hadoop.hbase.util.Threads; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 037import org.apache.hbase.thirdparty.org.apache.commons.collections4.queue.CircularFifoQueue; 038 039/** 040 * Singleton which keeps track of tasks going on in this VM. A Task here is anything which takes 041 * more than a few seconds and the user might want to inquire about the status 042 */ 043@InterfaceAudience.Private 044public class TaskMonitor { 045 private static final Logger LOG = LoggerFactory.getLogger(TaskMonitor.class); 046 047 public static final String MAX_TASKS_KEY = "hbase.taskmonitor.max.tasks"; 048 public static final int DEFAULT_MAX_TASKS = 1000; 049 public static final String RPC_WARN_TIME_KEY = "hbase.taskmonitor.rpc.warn.time"; 050 public static final long DEFAULT_RPC_WARN_TIME = 0; 051 public static final String EXPIRATION_TIME_KEY = "hbase.taskmonitor.expiration.time"; 052 public static final long DEFAULT_EXPIRATION_TIME = 60 * 1000; 053 public static final String MONITOR_INTERVAL_KEY = "hbase.taskmonitor.monitor.interval"; 054 public static final long DEFAULT_MONITOR_INTERVAL = 10 * 1000; 055 056 private static TaskMonitor instance; 057 058 private final int maxTasks; 059 private final long rpcWarnTime; 060 private final long expirationTime; 061 private final CircularFifoQueue tasks; 062 private final List<TaskAndWeakRefPair> rpcTasks; 063 private final long monitorInterval; 064 private Thread monitorThread; 065 066 TaskMonitor(Configuration conf) { 067 maxTasks = conf.getInt(MAX_TASKS_KEY, DEFAULT_MAX_TASKS); 068 expirationTime = conf.getLong(EXPIRATION_TIME_KEY, DEFAULT_EXPIRATION_TIME); 069 rpcWarnTime = conf.getLong(RPC_WARN_TIME_KEY, DEFAULT_RPC_WARN_TIME); 070 tasks = new CircularFifoQueue(maxTasks); 071 rpcTasks = Lists.newArrayList(); 072 monitorInterval = conf.getLong(MONITOR_INTERVAL_KEY, DEFAULT_MONITOR_INTERVAL); 073 monitorThread = new Thread(new MonitorRunnable()); 074 Threads.setDaemonThreadRunning(monitorThread, "Monitor thread for TaskMonitor"); 075 } 076 077 /** 078 * Get singleton instance. TODO this would be better off scoped to a single daemon 079 */ 080 public static synchronized TaskMonitor get() { 081 if (instance == null) { 082 instance = new TaskMonitor(HBaseConfiguration.create()); 083 } 084 return instance; 085 } 086 087 public synchronized MonitoredTask createStatus(String description) { 088 return createStatus(description, false); 089 } 090 091 public synchronized MonitoredTask createStatus(String description, boolean ignore) { 092 MonitoredTask stat = new MonitoredTaskImpl(); 093 stat.setDescription(description); 094 MonitoredTask proxy = (MonitoredTask) Proxy.newProxyInstance(stat.getClass().getClassLoader(), 095 new Class<?>[] { MonitoredTask.class }, new PassthroughInvocationHandler<>(stat)); 096 TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy); 097 if (tasks.isFull()) { 098 purgeExpiredTasks(); 099 } 100 if (!ignore) { 101 tasks.add(pair); 102 } 103 return proxy; 104 } 105 106 public synchronized MonitoredRPCHandler createRPCStatus(String description) { 107 MonitoredRPCHandler stat = new MonitoredRPCHandlerImpl(); 108 stat.setDescription(description); 109 MonitoredRPCHandler proxy = 110 (MonitoredRPCHandler) Proxy.newProxyInstance(stat.getClass().getClassLoader(), 111 new Class<?>[] { MonitoredRPCHandler.class }, new PassthroughInvocationHandler<>(stat)); 112 TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy); 113 rpcTasks.add(pair); 114 return proxy; 115 } 116 117 private synchronized void warnStuckTasks() { 118 if (rpcWarnTime > 0) { 119 final long now = EnvironmentEdgeManager.currentTime(); 120 for (Iterator<TaskAndWeakRefPair> it = rpcTasks.iterator(); it.hasNext();) { 121 TaskAndWeakRefPair pair = it.next(); 122 MonitoredTask stat = pair.get(); 123 if ( 124 (stat.getState() == MonitoredTaskImpl.State.RUNNING) 125 && (now >= stat.getWarnTime() + rpcWarnTime) 126 ) { 127 LOG.warn("Task may be stuck: " + stat); 128 stat.setWarnTime(now); 129 } 130 } 131 } 132 } 133 134 private synchronized void purgeExpiredTasks() { 135 for (Iterator<TaskAndWeakRefPair> it = tasks.iterator(); it.hasNext();) { 136 TaskAndWeakRefPair pair = it.next(); 137 MonitoredTask stat = pair.get(); 138 139 if (pair.isDead()) { 140 // The class who constructed this leaked it. So we can 141 // assume it's done. 142 if (stat.getState() == MonitoredTaskImpl.State.RUNNING) { 143 LOG.warn("Status " + stat + " appears to have been leaked"); 144 stat.cleanup(); 145 } 146 } 147 148 if (canPurge(stat)) { 149 it.remove(); 150 } 151 } 152 } 153 154 /** 155 * Produces a list containing copies of the current state of all non-expired MonitoredTasks 156 * handled by this TaskMonitor. 157 * @return A complete list of MonitoredTasks. 158 */ 159 public List<MonitoredTask> getTasks() { 160 return getTasks(null); 161 } 162 163 /** 164 * Produces a list containing copies of the current state of all non-expired MonitoredTasks 165 * handled by this TaskMonitor. 166 * @param filter type of wanted tasks 167 * @return A filtered list of MonitoredTasks. 168 */ 169 public synchronized List<MonitoredTask> getTasks(String filter) { 170 purgeExpiredTasks(); 171 TaskFilter taskFilter = createTaskFilter(filter); 172 ArrayList<MonitoredTask> results = 173 Lists.newArrayListWithCapacity(tasks.size() + rpcTasks.size()); 174 processTasks(tasks, taskFilter, results); 175 processTasks(rpcTasks, taskFilter, results); 176 return results; 177 } 178 179 /** 180 * Create a task filter according to a given filter type. 181 * @param filter type of monitored task 182 * @return a task filter 183 */ 184 private static TaskFilter createTaskFilter(String filter) { 185 switch (TaskFilter.TaskType.getTaskType(filter)) { 186 case GENERAL: 187 return task -> task instanceof MonitoredRPCHandler; 188 case HANDLER: 189 return task -> !(task instanceof MonitoredRPCHandler); 190 case RPC: 191 return task -> !(task instanceof MonitoredRPCHandler) 192 || !((MonitoredRPCHandler) task).isRPCRunning(); 193 case OPERATION: 194 return task -> !(task instanceof MonitoredRPCHandler) 195 || !((MonitoredRPCHandler) task).isOperationRunning(); 196 default: 197 return task -> false; 198 } 199 } 200 201 private static void processTasks(Iterable<TaskAndWeakRefPair> tasks, TaskFilter filter, 202 List<MonitoredTask> results) { 203 for (TaskAndWeakRefPair task : tasks) { 204 MonitoredTask t = task.get(); 205 if (!filter.filter(t)) { 206 results.add(t.clone()); 207 } 208 } 209 } 210 211 private boolean canPurge(MonitoredTask stat) { 212 long cts = stat.getCompletionTimestamp(); 213 return (cts > 0 && EnvironmentEdgeManager.currentTime() - cts > expirationTime); 214 } 215 216 public void dumpAsText(PrintWriter out) { 217 long now = EnvironmentEdgeManager.currentTime(); 218 219 List<MonitoredTask> tasks = getTasks(); 220 for (MonitoredTask task : tasks) { 221 out.println("Task: " + task.getDescription()); 222 out.println("Status: " + task.getState() + ":" + task.getStatus()); 223 long running = (now - task.getStartTime()) / 1000; 224 if (task.getCompletionTimestamp() != -1) { 225 long completed = (now - task.getCompletionTimestamp()) / 1000; 226 out.println("Completed " + completed + "s ago"); 227 out 228 .println("Ran for " + (task.getCompletionTimestamp() - task.getStartTime()) / 1000 + "s"); 229 } else { 230 out.println("Running for " + running + "s"); 231 } 232 out.println(); 233 } 234 } 235 236 public synchronized void shutdown() { 237 if (this.monitorThread != null) { 238 monitorThread.interrupt(); 239 } 240 } 241 242 /** 243 * This class encapsulates an object as well as a weak reference to a proxy that passes through 244 * calls to that object. In art form: 245 * 246 * <pre> 247 * Proxy <------------------ 248 * | \ 249 * v \ 250 * PassthroughInvocationHandler | weak reference 251 * | / 252 * MonitoredTaskImpl / 253 * | / 254 * StatAndWeakRefProxy ------/ 255 * </pre> 256 * 257 * Since we only return the Proxy to the creator of the MonitorableStatus, this means that they 258 * can leak that object, and we'll detect it since our weak reference will go null. But, we still 259 * have the actual object, so we can log it and display it as a leaked (incomplete) action. 260 */ 261 private static class TaskAndWeakRefPair { 262 private MonitoredTask impl; 263 private WeakReference<MonitoredTask> weakProxy; 264 265 public TaskAndWeakRefPair(MonitoredTask stat, MonitoredTask proxy) { 266 this.impl = stat; 267 this.weakProxy = new WeakReference<>(proxy); 268 } 269 270 public MonitoredTask get() { 271 return impl; 272 } 273 274 public boolean isDead() { 275 return weakProxy.get() == null; 276 } 277 } 278 279 /** 280 * An InvocationHandler that simply passes through calls to the original object. 281 */ 282 private static class PassthroughInvocationHandler<T> implements InvocationHandler { 283 private T delegatee; 284 285 public PassthroughInvocationHandler(T delegatee) { 286 this.delegatee = delegatee; 287 } 288 289 @Override 290 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 291 return method.invoke(delegatee, args); 292 } 293 } 294 295 private class MonitorRunnable implements Runnable { 296 private boolean running = true; 297 298 @Override 299 public void run() { 300 while (running) { 301 try { 302 Thread.sleep(monitorInterval); 303 if (tasks.isFull()) { 304 purgeExpiredTasks(); 305 } 306 warnStuckTasks(); 307 } catch (InterruptedException e) { 308 running = false; 309 } 310 } 311 } 312 } 313 314 private interface TaskFilter { 315 enum TaskType { 316 GENERAL("general"), 317 HANDLER("handler"), 318 RPC("rpc"), 319 OPERATION("operation"), 320 ALL("all"); 321 322 private final String type; 323 324 private TaskType(String type) { 325 this.type = type.toLowerCase(); 326 } 327 328 static TaskType getTaskType(String type) { 329 if (type == null || type.isEmpty()) { 330 return ALL; 331 } 332 type = type.toLowerCase(); 333 for (TaskType taskType : values()) { 334 if (taskType.toString().equals(type)) { 335 return taskType; 336 } 337 } 338 return ALL; 339 } 340 341 @Override 342 public String toString() { 343 return type; 344 } 345 } 346 347 /** 348 * Filter out unwanted task. 349 * @param task monitored task 350 * @return false if a task is accepted, true if it is filtered 351 */ 352 boolean filter(MonitoredTask task); 353 } 354}