001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.monitoring; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import org.apache.hadoop.hbase.util.GsonUtil; 028import org.apache.yetus.audience.InterfaceAudience; 029 030import org.apache.hbase.thirdparty.com.google.gson.Gson; 031 032@InterfaceAudience.Private 033class MonitoredTaskImpl implements MonitoredTask { 034 private long startTime; 035 private long statusTime; 036 private long stateTime; 037 private long warnTime; 038 039 private volatile String status; 040 private volatile String description; 041 042 protected volatile State state = State.RUNNING; 043 044 private boolean journalEnabled = false; 045 private List<StatusJournalEntry> journal; 046 047 private static final Gson GSON = GsonUtil.createGson().create(); 048 049 public MonitoredTaskImpl() { 050 startTime = System.currentTimeMillis(); 051 statusTime = startTime; 052 stateTime = startTime; 053 warnTime = startTime; 054 } 055 056 private static class StatusJournalEntryImpl implements StatusJournalEntry { 057 private long statusTime; 058 private String status; 059 060 public StatusJournalEntryImpl(String status, long statusTime) { 061 this.status = status; 062 this.statusTime = statusTime; 063 } 064 065 @Override 066 public String getStatus() { 067 return status; 068 } 069 070 @Override 071 public long getTimeStamp() { 072 return statusTime; 073 } 074 075 @Override 076 public String toString() { 077 StringBuilder sb = new StringBuilder(); 078 sb.append(status); 079 sb.append(" at "); 080 sb.append(statusTime); 081 return sb.toString(); 082 } 083 } 084 085 @Override 086 public synchronized MonitoredTaskImpl clone() { 087 try { 088 return (MonitoredTaskImpl) super.clone(); 089 } catch (CloneNotSupportedException e) { 090 throw new AssertionError(); // Won't happen 091 } 092 } 093 094 @Override 095 public long getStartTime() { 096 return startTime; 097 } 098 099 @Override 100 public String getDescription() { 101 return description; 102 } 103 104 @Override 105 public String getStatus() { 106 return status; 107 } 108 109 @Override 110 public long getStatusTime() { 111 return statusTime; 112 } 113 114 @Override 115 public State getState() { 116 return state; 117 } 118 119 @Override 120 public long getStateTime() { 121 return stateTime; 122 } 123 124 @Override 125 public long getWarnTime() { 126 return warnTime; 127 } 128 129 @Override 130 public long getCompletionTimestamp() { 131 if (state == State.COMPLETE || state == State.ABORTED) { 132 return stateTime; 133 } 134 return -1; 135 } 136 137 @Override 138 public void markComplete(String status) { 139 setState(State.COMPLETE); 140 setStatus(status); 141 } 142 143 @Override 144 public void pause(String msg) { 145 setState(State.WAITING); 146 setStatus(msg); 147 } 148 149 @Override 150 public void resume(String msg) { 151 setState(State.RUNNING); 152 setStatus(msg); 153 } 154 155 @Override 156 public void abort(String msg) { 157 setStatus(msg); 158 setState(State.ABORTED); 159 } 160 161 @Override 162 public void setStatus(String status) { 163 this.status = status; 164 statusTime = System.currentTimeMillis(); 165 if (journalEnabled) { 166 journal.add(new StatusJournalEntryImpl(this.status, statusTime)); 167 } 168 } 169 170 protected void setState(State state) { 171 this.state = state; 172 stateTime = System.currentTimeMillis(); 173 } 174 175 @Override 176 public void setDescription(String description) { 177 this.description = description; 178 } 179 180 @Override 181 public void setWarnTime(long t) { 182 this.warnTime = t; 183 } 184 185 @Override 186 public void cleanup() { 187 if (state == State.RUNNING) { 188 setState(State.ABORTED); 189 } 190 } 191 192 /** 193 * Force the completion timestamp backwards so that 194 * it expires now. 195 */ 196 @Override 197 public void expireNow() { 198 stateTime -= 180 * 1000; 199 } 200 201 @Override 202 public Map<String, Object> toMap() { 203 Map<String, Object> map = new HashMap<>(); 204 map.put("description", getDescription()); 205 map.put("status", getStatus()); 206 map.put("state", getState()); 207 map.put("starttimems", getStartTime()); 208 map.put("statustimems", getCompletionTimestamp()); 209 map.put("statetimems", getCompletionTimestamp()); 210 return map; 211 } 212 213 @Override 214 public String toJSON() throws IOException { 215 return GSON.toJson(toMap()); 216 } 217 218 @Override 219 public String toString() { 220 StringBuilder sb = new StringBuilder(512); 221 sb.append(getDescription()); 222 sb.append(": status="); 223 sb.append(getStatus()); 224 sb.append(", state="); 225 sb.append(getState()); 226 sb.append(", startTime="); 227 sb.append(getStartTime()); 228 sb.append(", completionTime="); 229 sb.append(getCompletionTimestamp()); 230 return sb.toString(); 231 } 232 233 /** 234 * Returns the status journal. This implementation of status journal is not thread-safe. Currently 235 * we use this to track various stages of flushes and compactions where we can use this/pretty 236 * print for post task analysis, by which time we are already done changing states (writing to 237 * journal) 238 */ 239 @Override 240 public List<StatusJournalEntry> getStatusJournal() { 241 if (journal == null) { 242 return Collections.emptyList(); 243 } else { 244 return Collections.unmodifiableList(journal); 245 } 246 } 247 248 /** 249 * Enables journaling of this monitored task, the first invocation will lazily initialize the 250 * journal. The journal implementation itself and this method are not thread safe 251 */ 252 @Override 253 public void enableStatusJournal(boolean includeCurrentStatus) { 254 if (journalEnabled && journal != null) { 255 return; 256 } 257 journalEnabled = true; 258 if (journal == null) { 259 journal = new ArrayList<StatusJournalEntry>(); 260 } 261 if (includeCurrentStatus && status != null) { 262 journal.add(new StatusJournalEntryImpl(status, statusTime)); 263 } 264 } 265 266 @Override 267 public void disableStatusJournal() { 268 journalEnabled = false; 269 } 270 271 @Override 272 public String prettyPrintJournal() { 273 if (!journalEnabled) { 274 return ""; 275 } 276 StringBuilder sb = new StringBuilder(); 277 for (int i = 0; i < journal.size(); i++) { 278 StatusJournalEntry je = journal.get(i); 279 sb.append(je.toString()); 280 if (i != 0) { 281 StatusJournalEntry jep = journal.get(i-1); 282 long delta = je.getTimeStamp() - jep.getTimeStamp(); 283 if (delta != 0) { 284 sb.append(" (+" + delta + " ms)"); 285 } 286 } 287 sb.append("\n"); 288 } 289 return sb.toString(); 290 } 291 292}