001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.monitoring; 019 020import java.io.IOException; 021import java.util.Collections; 022import java.util.HashMap; 023import java.util.Iterator; 024import java.util.List; 025import java.util.Map; 026import java.util.concurrent.ConcurrentLinkedQueue; 027import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 028import org.apache.hadoop.hbase.util.GsonUtil; 029import org.apache.yetus.audience.InterfaceAudience; 030 031import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 032import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 033import org.apache.hbase.thirdparty.com.google.gson.Gson; 034 035@InterfaceAudience.Private 036class MonitoredTaskImpl implements MonitoredTask { 037 private long startTime; 038 private long statusTime; 039 private long stateTime; 040 private long warnTime; 041 042 private volatile String status; 043 private volatile String description; 044 045 protected volatile State state = State.RUNNING; 046 private final ConcurrentLinkedQueue<StatusJournalEntry> journal; 047 048 private static final Gson GSON = GsonUtil.createGson().create(); 049 050 public MonitoredTaskImpl(boolean enableJournal, String description) { 051 startTime = EnvironmentEdgeManager.currentTime(); 052 statusTime = startTime; 053 stateTime = startTime; 054 warnTime = startTime; 055 this.description = description; 056 this.status = "status unset"; 057 if (enableJournal) { 058 journal = new ConcurrentLinkedQueue<>(); 059 } else { 060 journal = null; 061 } 062 } 063 064 private static final class StatusJournalEntryImpl implements StatusJournalEntry { 065 private final long statusTime; 066 private final String status; 067 068 public StatusJournalEntryImpl(String status, long statusTime) { 069 this.status = status; 070 this.statusTime = statusTime; 071 } 072 073 @Override 074 public String getStatus() { 075 return status; 076 } 077 078 @Override 079 public long getTimeStamp() { 080 return statusTime; 081 } 082 083 @Override 084 public String toString() { 085 return status + " at " + statusTime; 086 } 087 } 088 089 @Override 090 public synchronized MonitoredTaskImpl clone() { 091 try { 092 return (MonitoredTaskImpl) super.clone(); 093 } catch (CloneNotSupportedException e) { 094 throw new AssertionError(); // Won't happen 095 } 096 } 097 098 @Override 099 public long getStartTime() { 100 return startTime; 101 } 102 103 @Override 104 public String getDescription() { 105 return description; 106 } 107 108 @Override 109 public String getStatus() { 110 return status; 111 } 112 113 @Override 114 public long getStatusTime() { 115 return statusTime; 116 } 117 118 @Override 119 public State getState() { 120 return state; 121 } 122 123 @Override 124 public long getStateTime() { 125 return stateTime; 126 } 127 128 @Override 129 public long getWarnTime() { 130 return warnTime; 131 } 132 133 @Override 134 public long getCompletionTimestamp() { 135 if (state == State.COMPLETE || state == State.ABORTED) { 136 return stateTime; 137 } 138 return -1; 139 } 140 141 @Override 142 public void markComplete(String status) { 143 setState(State.COMPLETE); 144 setStatus(status); 145 } 146 147 @Override 148 public void pause(String msg) { 149 setState(State.WAITING); 150 setStatus(msg); 151 } 152 153 @Override 154 public void resume(String msg) { 155 setState(State.RUNNING); 156 setStatus(msg); 157 } 158 159 @Override 160 public void abort(String msg) { 161 setStatus(msg); 162 setState(State.ABORTED); 163 } 164 165 @Override 166 public void setStatus(String status) { 167 Preconditions.checkNotNull(status, "Status is null"); 168 this.status = status; 169 statusTime = EnvironmentEdgeManager.currentTime(); 170 if (journal != null) { 171 journal.add(new StatusJournalEntryImpl(this.status, statusTime)); 172 } 173 } 174 175 protected void setState(State state) { 176 this.state = state; 177 stateTime = EnvironmentEdgeManager.currentTime(); 178 } 179 180 @Override 181 public void setDescription(String description) { 182 Preconditions.checkNotNull(description, "Description is null"); 183 this.description = description; 184 } 185 186 @Override 187 public void setWarnTime(long t) { 188 this.warnTime = t; 189 } 190 191 @Override 192 public void cleanup() { 193 if (state == State.RUNNING) { 194 setState(State.ABORTED); 195 } 196 } 197 198 /** 199 * Force the completion timestamp backwards so that it expires now. 200 */ 201 @Override 202 public void expireNow() { 203 stateTime -= 180 * 1000; 204 } 205 206 @Override 207 public Map<String, Object> toMap() { 208 Map<String, Object> map = new HashMap<>(); 209 map.put("description", getDescription()); 210 map.put("status", getStatus()); 211 map.put("state", getState()); 212 map.put("starttimems", getStartTime()); 213 map.put("statustimems", getCompletionTimestamp()); 214 map.put("statetimems", getCompletionTimestamp()); 215 return map; 216 } 217 218 @Override 219 public String toJSON() throws IOException { 220 return GSON.toJson(toMap()); 221 } 222 223 @Override 224 public String toString() { 225 StringBuilder sb = new StringBuilder(512); 226 sb.append(getDescription()); 227 sb.append(": status="); 228 sb.append(getStatus()); 229 sb.append(", state="); 230 sb.append(getState()); 231 sb.append(", startTime="); 232 sb.append(getStartTime()); 233 sb.append(", completionTime="); 234 sb.append(getCompletionTimestamp()); 235 return sb.toString(); 236 } 237 238 /** 239 * Returns the status journal. This implementation of status journal is not thread-safe. Currently 240 * we use this to track various stages of flushes and compactions where we can use this/pretty 241 * print for post task analysis, by which time we are already done changing states (writing to 242 * journal) 243 */ 244 @Override 245 public List<StatusJournalEntry> getStatusJournal() { 246 if (journal == null) { 247 return Collections.emptyList(); 248 } else { 249 return ImmutableList.copyOf(journal); 250 } 251 } 252 253 @Override 254 public String prettyPrintJournal() { 255 if (journal == null) { 256 return ""; 257 } 258 StringBuilder sb = new StringBuilder(); 259 Iterator<StatusJournalEntry> iter = journal.iterator(); 260 StatusJournalEntry previousEntry = null; 261 while (iter.hasNext()) { 262 StatusJournalEntry entry = iter.next(); 263 sb.append(entry); 264 if (previousEntry != null) { 265 long delta = entry.getTimeStamp() - previousEntry.getTimeStamp(); 266 if (delta != 0) { 267 sb.append(" (+" + delta + " ms)"); 268 } 269 } 270 previousEntry = entry; 271 } 272 return sb.toString(); 273 } 274}