001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.monitoring; 019 020import java.io.PrintWriter; 021import java.lang.ref.WeakReference; 022import java.lang.reflect.InvocationHandler; 023import java.lang.reflect.Method; 024import java.lang.reflect.Proxy; 025import java.util.ArrayList; 026import java.util.Iterator; 027import java.util.List; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 031import org.apache.hadoop.hbase.util.Threads; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 037import org.apache.hbase.thirdparty.org.apache.commons.collections4.queue.CircularFifoQueue; 038 039/** 040 * Singleton which keeps track of tasks going on in this VM. A Task here is anything which takes 041 * more than a few seconds and the user might want to inquire about the status 042 */ 043@InterfaceAudience.Private 044public class TaskMonitor { 045 private static final Logger LOG = LoggerFactory.getLogger(TaskMonitor.class); 046 047 public static final String MAX_TASKS_KEY = "hbase.taskmonitor.max.tasks"; 048 public static final int DEFAULT_MAX_TASKS = 1000; 049 public static final String RPC_WARN_TIME_KEY = "hbase.taskmonitor.rpc.warn.time"; 050 public static final long DEFAULT_RPC_WARN_TIME = 0; 051 public static final String EXPIRATION_TIME_KEY = "hbase.taskmonitor.expiration.time"; 052 public static final long DEFAULT_EXPIRATION_TIME = 60 * 1000; 053 public static final String MONITOR_INTERVAL_KEY = "hbase.taskmonitor.monitor.interval"; 054 public static final long DEFAULT_MONITOR_INTERVAL = 10 * 1000; 055 056 private static TaskMonitor instance; 057 058 private final int maxTasks; 059 private final long rpcWarnTime; 060 private final long expirationTime; 061 private final CircularFifoQueue<TaskAndWeakRefPair> tasks; 062 private final List<TaskAndWeakRefPair> rpcTasks; 063 private final long monitorInterval; 064 private Thread monitorThread; 065 066 TaskMonitor(Configuration conf) { 067 maxTasks = conf.getInt(MAX_TASKS_KEY, DEFAULT_MAX_TASKS); 068 expirationTime = conf.getLong(EXPIRATION_TIME_KEY, DEFAULT_EXPIRATION_TIME); 069 rpcWarnTime = conf.getLong(RPC_WARN_TIME_KEY, DEFAULT_RPC_WARN_TIME); 070 tasks = new CircularFifoQueue<>(maxTasks); 071 rpcTasks = Lists.newArrayList(); 072 monitorInterval = conf.getLong(MONITOR_INTERVAL_KEY, DEFAULT_MONITOR_INTERVAL); 073 monitorThread = new Thread(new MonitorRunnable()); 074 Threads.setDaemonThreadRunning(monitorThread, "Monitor thread for TaskMonitor"); 075 } 076 077 /** 078 * Get singleton instance. TODO this would be better off scoped to a single daemon 079 */ 080 public static synchronized TaskMonitor get() { 081 if (instance == null) { 082 instance = new TaskMonitor(HBaseConfiguration.create()); 083 } 084 return instance; 085 } 086 087 public MonitoredTask createStatus(String description) { 088 return createStatus(description, false); 089 } 090 091 public MonitoredTask createStatus(String description, boolean ignore) { 092 return createStatus(description, ignore, false); 093 } 094 095 /** 096 * Create a monitored task for users to inquire about the status 097 * @param description description of the status 098 * @param ignore whether to ignore to track(e.g. show/clear/expire) the task in the 099 * {@link TaskMonitor} 100 * @param enableJournal enable when the task contains some stage journals 101 * @return a monitored task 102 */ 103 public synchronized MonitoredTask createStatus(String description, boolean ignore, 104 boolean enableJournal) { 105 MonitoredTask stat = new MonitoredTaskImpl(enableJournal, description); 106 MonitoredTask proxy = (MonitoredTask) Proxy.newProxyInstance(stat.getClass().getClassLoader(), 107 new Class<?>[] { MonitoredTask.class }, new PassthroughInvocationHandler<>(stat)); 108 TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy); 109 if (tasks.isFull()) { 110 purgeExpiredTasks(); 111 } 112 if (!ignore) { 113 tasks.add(pair); 114 } 115 return proxy; 116 } 117 118 /** 119 * Create a task group which contains a series of monitored tasks for users to inquire about the 120 * status 121 * @param ignoreSubTasksInTaskMonitor whether to ignore to track(e.g. show/clear/expire) the task 122 * in the {@link TaskMonitor} 123 * @param description description of the status 124 * @return a group of monitored tasks 125 */ 126 public static TaskGroup createTaskGroup(boolean ignoreSubTasksInTaskMonitor, String description) { 127 return new TaskGroup(ignoreSubTasksInTaskMonitor, description); 128 } 129 130 public synchronized MonitoredRPCHandler createRPCStatus(String description) { 131 MonitoredRPCHandler stat = new MonitoredRPCHandlerImpl(description); 132 MonitoredRPCHandler proxy = 133 (MonitoredRPCHandler) Proxy.newProxyInstance(stat.getClass().getClassLoader(), 134 new Class<?>[] { MonitoredRPCHandler.class }, new PassthroughInvocationHandler<>(stat)); 135 TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy); 136 rpcTasks.add(pair); 137 return proxy; 138 } 139 140 private synchronized void warnStuckTasks() { 141 if (rpcWarnTime > 0) { 142 final long now = EnvironmentEdgeManager.currentTime(); 143 for (Iterator<TaskAndWeakRefPair> it = rpcTasks.iterator(); it.hasNext();) { 144 TaskAndWeakRefPair pair = it.next(); 145 MonitoredTask stat = pair.get(); 146 if ( 147 (stat.getState() == MonitoredTaskImpl.State.RUNNING) 148 && (now >= stat.getWarnTime() + rpcWarnTime) 149 ) { 150 LOG.warn("Task may be stuck: " + stat); 151 stat.setWarnTime(now); 152 } 153 } 154 } 155 } 156 157 private synchronized void purgeExpiredTasks() { 158 for (Iterator<TaskAndWeakRefPair> it = tasks.iterator(); it.hasNext();) { 159 TaskAndWeakRefPair pair = it.next(); 160 MonitoredTask stat = pair.get(); 161 162 if (pair.isDead()) { 163 // The class who constructed this leaked it. So we can 164 // assume it's done. 165 if (stat.getState() == MonitoredTaskImpl.State.RUNNING) { 166 LOG.warn("Status " + stat + " appears to have been leaked"); 167 stat.cleanup(); 168 } 169 } 170 171 if (canPurge(stat)) { 172 it.remove(); 173 } 174 } 175 } 176 177 /** 178 * Produces a list containing copies of the current state of all non-expired MonitoredTasks 179 * handled by this TaskMonitor. 180 * @return A complete list of MonitoredTasks. 181 */ 182 public List<MonitoredTask> getTasks() { 183 return getTasks(null); 184 } 185 186 /** 187 * Produces a list containing copies of the current state of all non-expired MonitoredTasks 188 * handled by this TaskMonitor. 189 * @param filter type of wanted tasks 190 * @return A filtered list of MonitoredTasks. 191 */ 192 public synchronized List<MonitoredTask> getTasks(String filter) { 193 purgeExpiredTasks(); 194 TaskFilter taskFilter = createTaskFilter(filter); 195 ArrayList<MonitoredTask> results = 196 Lists.newArrayListWithCapacity(tasks.size() + rpcTasks.size()); 197 processTasks(tasks, taskFilter, results); 198 processTasks(rpcTasks, taskFilter, results); 199 return results; 200 } 201 202 /** 203 * Create a task filter according to a given filter type. 204 * @param filter type of monitored task 205 * @return a task filter 206 */ 207 private static TaskFilter createTaskFilter(String filter) { 208 switch (TaskFilter.TaskType.getTaskType(filter)) { 209 case GENERAL: 210 return task -> task instanceof MonitoredRPCHandler; 211 case HANDLER: 212 return task -> !(task instanceof MonitoredRPCHandler); 213 case RPC: 214 return task -> !(task instanceof MonitoredRPCHandler) 215 || !((MonitoredRPCHandler) task).isRPCRunning(); 216 case OPERATION: 217 return task -> !(task instanceof MonitoredRPCHandler) 218 || !((MonitoredRPCHandler) task).isOperationRunning(); 219 default: 220 return task -> false; 221 } 222 } 223 224 private static void processTasks(Iterable<TaskAndWeakRefPair> tasks, TaskFilter filter, 225 List<MonitoredTask> results) { 226 for (TaskAndWeakRefPair task : tasks) { 227 MonitoredTask t = task.get(); 228 if (!filter.filter(t)) { 229 results.add(t.clone()); 230 } 231 } 232 } 233 234 private boolean canPurge(MonitoredTask stat) { 235 long cts = stat.getCompletionTimestamp(); 236 return (cts > 0 && EnvironmentEdgeManager.currentTime() - cts > expirationTime); 237 } 238 239 public void dumpAsText(PrintWriter out) { 240 long now = EnvironmentEdgeManager.currentTime(); 241 242 List<MonitoredTask> tasks = getTasks(); 243 for (MonitoredTask task : tasks) { 244 out.println("Task: " + task.getDescription()); 245 out.println("Status: " + task.getState() + ":" + task.getStatus()); 246 long running = (now - task.getStartTime()) / 1000; 247 if (task.getCompletionTimestamp() != -1) { 248 long completed = (now - task.getCompletionTimestamp()) / 1000; 249 out.println("Completed " + completed + "s ago"); 250 out 251 .println("Ran for " + (task.getCompletionTimestamp() - task.getStartTime()) / 1000 + "s"); 252 } else { 253 out.println("Running for " + running + "s"); 254 } 255 out.println(); 256 } 257 } 258 259 public synchronized void shutdown() { 260 if (this.monitorThread != null) { 261 monitorThread.interrupt(); 262 } 263 } 264 265 /** 266 * This class encapsulates an object as well as a weak reference to a proxy that passes through 267 * calls to that object. In art form: 268 * 269 * <pre> 270 * Proxy <------------------ 271 * | \ 272 * v \ 273 * PassthroughInvocationHandler | weak reference 274 * | / 275 * MonitoredTaskImpl / 276 * | / 277 * StatAndWeakRefProxy ------/ 278 * </pre> 279 * 280 * Since we only return the Proxy to the creator of the MonitorableStatus, this means that they 281 * can leak that object, and we'll detect it since our weak reference will go null. But, we still 282 * have the actual object, so we can log it and display it as a leaked (incomplete) action. 283 */ 284 private static class TaskAndWeakRefPair { 285 private MonitoredTask impl; 286 private WeakReference<MonitoredTask> weakProxy; 287 288 public TaskAndWeakRefPair(MonitoredTask stat, MonitoredTask proxy) { 289 this.impl = stat; 290 this.weakProxy = new WeakReference<>(proxy); 291 } 292 293 public MonitoredTask get() { 294 return impl; 295 } 296 297 public boolean isDead() { 298 return weakProxy.get() == null; 299 } 300 } 301 302 /** 303 * An InvocationHandler that simply passes through calls to the original object. 304 */ 305 private static class PassthroughInvocationHandler<T> implements InvocationHandler { 306 private T delegatee; 307 308 public PassthroughInvocationHandler(T delegatee) { 309 this.delegatee = delegatee; 310 } 311 312 @Override 313 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 314 return method.invoke(delegatee, args); 315 } 316 } 317 318 private class MonitorRunnable implements Runnable { 319 private boolean running = true; 320 321 @Override 322 public void run() { 323 while (running) { 324 try { 325 Thread.sleep(monitorInterval); 326 if (tasks.isFull()) { 327 purgeExpiredTasks(); 328 } 329 warnStuckTasks(); 330 } catch (InterruptedException e) { 331 running = false; 332 } 333 } 334 } 335 } 336 337 private interface TaskFilter { 338 enum TaskType { 339 GENERAL("general"), 340 HANDLER("handler"), 341 RPC("rpc"), 342 OPERATION("operation"), 343 ALL("all"); 344 345 private final String type; 346 347 private TaskType(String type) { 348 this.type = type.toLowerCase(); 349 } 350 351 static TaskType getTaskType(String type) { 352 if (type == null || type.isEmpty()) { 353 return ALL; 354 } 355 type = type.toLowerCase(); 356 for (TaskType taskType : values()) { 357 if (taskType.toString().equals(type)) { 358 return taskType; 359 } 360 } 361 return ALL; 362 } 363 364 @Override 365 public String toString() { 366 return type; 367 } 368 } 369 370 /** 371 * Filter out unwanted task. 372 * @param task monitored task 373 * @return false if a task is accepted, true if it is filtered 374 */ 375 boolean filter(MonitoredTask task); 376 } 377}