001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.monitoring; 019 020import java.io.PrintWriter; 021import java.lang.ref.WeakReference; 022import java.lang.reflect.InvocationHandler; 023import java.lang.reflect.Method; 024import java.lang.reflect.Proxy; 025import java.util.ArrayList; 026import java.util.Iterator; 027import java.util.List; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 031import org.apache.hadoop.hbase.util.Threads; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 037import org.apache.hbase.thirdparty.org.apache.commons.collections4.queue.CircularFifoQueue; 038 039/** 040 * Singleton which keeps track of tasks going on in this VM. A Task here is anything which takes 041 * more than a few seconds and the user might want to inquire about the status 042 */ 043@InterfaceAudience.Private 044public class TaskMonitor { 045 private static final Logger LOG = LoggerFactory.getLogger(TaskMonitor.class); 046 047 public static final String MAX_TASKS_KEY = "hbase.taskmonitor.max.tasks"; 048 public static final int DEFAULT_MAX_TASKS = 1000; 049 public static final String RPC_WARN_TIME_KEY = "hbase.taskmonitor.rpc.warn.time"; 050 public static final long DEFAULT_RPC_WARN_TIME = 0; 051 public static final String EXPIRATION_TIME_KEY = "hbase.taskmonitor.expiration.time"; 052 public static final long DEFAULT_EXPIRATION_TIME = 60 * 1000; 053 public static final String MONITOR_INTERVAL_KEY = "hbase.taskmonitor.monitor.interval"; 054 public static final long DEFAULT_MONITOR_INTERVAL = 10 * 1000; 055 056 private static TaskMonitor instance; 057 058 private final int maxTasks; 059 private final long rpcWarnTime; 060 private final long expirationTime; 061 private final CircularFifoQueue<TaskAndWeakRefPair> tasks; 062 private final List<TaskAndWeakRefPair> rpcTasks; 063 private final long monitorInterval; 064 private Thread monitorThread; 065 066 TaskMonitor(Configuration conf) { 067 maxTasks = conf.getInt(MAX_TASKS_KEY, DEFAULT_MAX_TASKS); 068 expirationTime = conf.getLong(EXPIRATION_TIME_KEY, DEFAULT_EXPIRATION_TIME); 069 rpcWarnTime = conf.getLong(RPC_WARN_TIME_KEY, DEFAULT_RPC_WARN_TIME); 070 tasks = new CircularFifoQueue<>(maxTasks); 071 rpcTasks = Lists.newArrayList(); 072 monitorInterval = conf.getLong(MONITOR_INTERVAL_KEY, DEFAULT_MONITOR_INTERVAL); 073 monitorThread = new Thread(new MonitorRunnable()); 074 Threads.setDaemonThreadRunning(monitorThread, "Monitor thread for TaskMonitor"); 075 } 076 077 /** 078 * Get singleton instance. TODO this would be better off scoped to a single daemon 079 */ 080 public static synchronized TaskMonitor get() { 081 if (instance == null) { 082 instance = new TaskMonitor(HBaseConfiguration.create()); 083 } 084 return instance; 085 } 086 087 public MonitoredTask createStatus(String description) { 088 return createStatus(description, false); 089 } 090 091 public MonitoredTask createStatus(String description, boolean ignore) { 092 return createStatus(description, ignore, false); 093 } 094 095 public synchronized MonitoredTask createStatus(String description, boolean ignore, 096 boolean enableJournal) { 097 MonitoredTask stat = new MonitoredTaskImpl(enableJournal, description); 098 MonitoredTask proxy = (MonitoredTask) Proxy.newProxyInstance(stat.getClass().getClassLoader(), 099 new Class<?>[] { MonitoredTask.class }, new PassthroughInvocationHandler<>(stat)); 100 TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy); 101 if (tasks.isFull()) { 102 purgeExpiredTasks(); 103 } 104 if (!ignore) { 105 tasks.add(pair); 106 } 107 return proxy; 108 } 109 110 public synchronized MonitoredRPCHandler createRPCStatus(String description) { 111 MonitoredRPCHandler stat = new MonitoredRPCHandlerImpl(description); 112 MonitoredRPCHandler proxy = 113 (MonitoredRPCHandler) Proxy.newProxyInstance(stat.getClass().getClassLoader(), 114 new Class<?>[] { MonitoredRPCHandler.class }, new PassthroughInvocationHandler<>(stat)); 115 TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy); 116 rpcTasks.add(pair); 117 return proxy; 118 } 119 120 private synchronized void warnStuckTasks() { 121 if (rpcWarnTime > 0) { 122 final long now = EnvironmentEdgeManager.currentTime(); 123 for (Iterator<TaskAndWeakRefPair> it = rpcTasks.iterator(); it.hasNext();) { 124 TaskAndWeakRefPair pair = it.next(); 125 MonitoredTask stat = pair.get(); 126 if ( 127 (stat.getState() == MonitoredTaskImpl.State.RUNNING) 128 && (now >= stat.getWarnTime() + rpcWarnTime) 129 ) { 130 LOG.warn("Task may be stuck: " + stat); 131 stat.setWarnTime(now); 132 } 133 } 134 } 135 } 136 137 private synchronized void purgeExpiredTasks() { 138 for (Iterator<TaskAndWeakRefPair> it = tasks.iterator(); it.hasNext();) { 139 TaskAndWeakRefPair pair = it.next(); 140 MonitoredTask stat = pair.get(); 141 142 if (pair.isDead()) { 143 // The class who constructed this leaked it. So we can 144 // assume it's done. 145 if (stat.getState() == MonitoredTaskImpl.State.RUNNING) { 146 LOG.warn("Status " + stat + " appears to have been leaked"); 147 stat.cleanup(); 148 } 149 } 150 151 if (canPurge(stat)) { 152 it.remove(); 153 } 154 } 155 } 156 157 /** 158 * Produces a list containing copies of the current state of all non-expired MonitoredTasks 159 * handled by this TaskMonitor. 160 * @return A complete list of MonitoredTasks. 161 */ 162 public List<MonitoredTask> getTasks() { 163 return getTasks(null); 164 } 165 166 /** 167 * Produces a list containing copies of the current state of all non-expired MonitoredTasks 168 * handled by this TaskMonitor. 169 * @param filter type of wanted tasks 170 * @return A filtered list of MonitoredTasks. 171 */ 172 public synchronized List<MonitoredTask> getTasks(String filter) { 173 purgeExpiredTasks(); 174 TaskFilter taskFilter = createTaskFilter(filter); 175 ArrayList<MonitoredTask> results = 176 Lists.newArrayListWithCapacity(tasks.size() + rpcTasks.size()); 177 processTasks(tasks, taskFilter, results); 178 processTasks(rpcTasks, taskFilter, results); 179 return results; 180 } 181 182 /** 183 * Create a task filter according to a given filter type. 184 * @param filter type of monitored task 185 * @return a task filter 186 */ 187 private static TaskFilter createTaskFilter(String filter) { 188 switch (TaskFilter.TaskType.getTaskType(filter)) { 189 case GENERAL: 190 return task -> task instanceof MonitoredRPCHandler; 191 case HANDLER: 192 return task -> !(task instanceof MonitoredRPCHandler); 193 case RPC: 194 return task -> !(task instanceof MonitoredRPCHandler) 195 || !((MonitoredRPCHandler) task).isRPCRunning(); 196 case OPERATION: 197 return task -> !(task instanceof MonitoredRPCHandler) 198 || !((MonitoredRPCHandler) task).isOperationRunning(); 199 default: 200 return task -> false; 201 } 202 } 203 204 private static void processTasks(Iterable<TaskAndWeakRefPair> tasks, TaskFilter filter, 205 List<MonitoredTask> results) { 206 for (TaskAndWeakRefPair task : tasks) { 207 MonitoredTask t = task.get(); 208 if (!filter.filter(t)) { 209 results.add(t.clone()); 210 } 211 } 212 } 213 214 private boolean canPurge(MonitoredTask stat) { 215 long cts = stat.getCompletionTimestamp(); 216 return (cts > 0 && EnvironmentEdgeManager.currentTime() - cts > expirationTime); 217 } 218 219 public void dumpAsText(PrintWriter out) { 220 long now = EnvironmentEdgeManager.currentTime(); 221 222 List<MonitoredTask> tasks = getTasks(); 223 for (MonitoredTask task : tasks) { 224 out.println("Task: " + task.getDescription()); 225 out.println("Status: " + task.getState() + ":" + task.getStatus()); 226 long running = (now - task.getStartTime()) / 1000; 227 if (task.getCompletionTimestamp() != -1) { 228 long completed = (now - task.getCompletionTimestamp()) / 1000; 229 out.println("Completed " + completed + "s ago"); 230 out 231 .println("Ran for " + (task.getCompletionTimestamp() - task.getStartTime()) / 1000 + "s"); 232 } else { 233 out.println("Running for " + running + "s"); 234 } 235 out.println(); 236 } 237 } 238 239 public synchronized void shutdown() { 240 if (this.monitorThread != null) { 241 monitorThread.interrupt(); 242 } 243 } 244 245 /** 246 * This class encapsulates an object as well as a weak reference to a proxy that passes through 247 * calls to that object. In art form: 248 * 249 * <pre> 250 * Proxy <------------------ 251 * | \ 252 * v \ 253 * PassthroughInvocationHandler | weak reference 254 * | / 255 * MonitoredTaskImpl / 256 * | / 257 * StatAndWeakRefProxy ------/ 258 * </pre> 259 * 260 * Since we only return the Proxy to the creator of the MonitorableStatus, this means that they 261 * can leak that object, and we'll detect it since our weak reference will go null. But, we still 262 * have the actual object, so we can log it and display it as a leaked (incomplete) action. 263 */ 264 private static class TaskAndWeakRefPair { 265 private MonitoredTask impl; 266 private WeakReference<MonitoredTask> weakProxy; 267 268 public TaskAndWeakRefPair(MonitoredTask stat, MonitoredTask proxy) { 269 this.impl = stat; 270 this.weakProxy = new WeakReference<>(proxy); 271 } 272 273 public MonitoredTask get() { 274 return impl; 275 } 276 277 public boolean isDead() { 278 return weakProxy.get() == null; 279 } 280 } 281 282 /** 283 * An InvocationHandler that simply passes through calls to the original object. 284 */ 285 private static class PassthroughInvocationHandler<T> implements InvocationHandler { 286 private T delegatee; 287 288 public PassthroughInvocationHandler(T delegatee) { 289 this.delegatee = delegatee; 290 } 291 292 @Override 293 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 294 return method.invoke(delegatee, args); 295 } 296 } 297 298 private class MonitorRunnable implements Runnable { 299 private boolean running = true; 300 301 @Override 302 public void run() { 303 while (running) { 304 try { 305 Thread.sleep(monitorInterval); 306 if (tasks.isFull()) { 307 purgeExpiredTasks(); 308 } 309 warnStuckTasks(); 310 } catch (InterruptedException e) { 311 running = false; 312 } 313 } 314 } 315 } 316 317 private interface TaskFilter { 318 enum TaskType { 319 GENERAL("general"), 320 HANDLER("handler"), 321 RPC("rpc"), 322 OPERATION("operation"), 323 ALL("all"); 324 325 private final String type; 326 327 private TaskType(String type) { 328 this.type = type.toLowerCase(); 329 } 330 331 static TaskType getTaskType(String type) { 332 if (type == null || type.isEmpty()) { 333 return ALL; 334 } 335 type = type.toLowerCase(); 336 for (TaskType taskType : values()) { 337 if (taskType.toString().equals(type)) { 338 return taskType; 339 } 340 } 341 return ALL; 342 } 343 344 @Override 345 public String toString() { 346 return type; 347 } 348 } 349 350 /** 351 * Filter out unwanted task. 352 * @param task monitored task 353 * @return false if a task is accepted, true if it is filtered 354 */ 355 boolean filter(MonitoredTask task); 356 } 357}