001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.monitoring; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.HashMap; 024import java.util.List; 025import java.util.Map; 026import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 027import org.apache.hadoop.hbase.util.GsonUtil; 028import org.apache.yetus.audience.InterfaceAudience; 029 030import org.apache.hbase.thirdparty.com.google.gson.Gson; 031 032@InterfaceAudience.Private 033class MonitoredTaskImpl implements MonitoredTask { 034 private long startTime; 035 private long statusTime; 036 private long stateTime; 037 private long warnTime; 038 039 private volatile String status; 040 private volatile String description; 041 042 protected volatile State state = State.RUNNING; 043 044 private boolean journalEnabled = false; 045 private List<StatusJournalEntry> journal; 046 047 private static final Gson GSON = GsonUtil.createGson().create(); 048 049 public MonitoredTaskImpl() { 050 startTime = EnvironmentEdgeManager.currentTime(); 051 statusTime = startTime; 052 stateTime = startTime; 053 warnTime = startTime; 054 } 055 056 private static class StatusJournalEntryImpl implements StatusJournalEntry { 057 private long statusTime; 058 private String status; 059 060 public StatusJournalEntryImpl(String status, long statusTime) { 061 this.status = status; 062 this.statusTime = statusTime; 063 } 064 065 @Override 066 public String getStatus() { 067 return status; 068 } 069 070 @Override 071 public long getTimeStamp() { 072 return statusTime; 073 } 074 075 @Override 076 public String toString() { 077 StringBuilder sb = new StringBuilder(); 078 sb.append(status); 079 sb.append(" at "); 080 sb.append(statusTime); 081 return sb.toString(); 082 } 083 } 084 085 @Override 086 public synchronized MonitoredTaskImpl clone() { 087 try { 088 return (MonitoredTaskImpl) super.clone(); 089 } catch (CloneNotSupportedException e) { 090 throw new AssertionError(); // Won't happen 091 } 092 } 093 094 @Override 095 public long getStartTime() { 096 return startTime; 097 } 098 099 @Override 100 public String getDescription() { 101 return description; 102 } 103 104 @Override 105 public String getStatus() { 106 return status; 107 } 108 109 @Override 110 public long getStatusTime() { 111 return statusTime; 112 } 113 114 @Override 115 public State getState() { 116 return state; 117 } 118 119 @Override 120 public long getStateTime() { 121 return stateTime; 122 } 123 124 @Override 125 public long getWarnTime() { 126 return warnTime; 127 } 128 129 @Override 130 public long getCompletionTimestamp() { 131 if (state == State.COMPLETE || state == State.ABORTED) { 132 return stateTime; 133 } 134 return -1; 135 } 136 137 @Override 138 public void markComplete(String status) { 139 setState(State.COMPLETE); 140 setStatus(status); 141 } 142 143 @Override 144 public void pause(String msg) { 145 setState(State.WAITING); 146 setStatus(msg); 147 } 148 149 @Override 150 public void resume(String msg) { 151 setState(State.RUNNING); 152 setStatus(msg); 153 } 154 155 @Override 156 public void abort(String msg) { 157 setStatus(msg); 158 setState(State.ABORTED); 159 } 160 161 @Override 162 public void setStatus(String status) { 163 this.status = status; 164 statusTime = EnvironmentEdgeManager.currentTime(); 165 if (journalEnabled) { 166 journal.add(new StatusJournalEntryImpl(this.status, statusTime)); 167 } 168 } 169 170 protected void setState(State state) { 171 this.state = state; 172 stateTime = EnvironmentEdgeManager.currentTime(); 173 } 174 175 @Override 176 public void setDescription(String description) { 177 this.description = description; 178 } 179 180 @Override 181 public void setWarnTime(long t) { 182 this.warnTime = t; 183 } 184 185 @Override 186 public void cleanup() { 187 if (state == State.RUNNING) { 188 setState(State.ABORTED); 189 } 190 } 191 192 /** 193 * Force the completion timestamp backwards so that it expires now. 194 */ 195 @Override 196 public void expireNow() { 197 stateTime -= 180 * 1000; 198 } 199 200 @Override 201 public Map<String, Object> toMap() { 202 Map<String, Object> map = new HashMap<>(); 203 map.put("description", getDescription()); 204 map.put("status", getStatus()); 205 map.put("state", getState()); 206 map.put("starttimems", getStartTime()); 207 map.put("statustimems", getCompletionTimestamp()); 208 map.put("statetimems", getCompletionTimestamp()); 209 return map; 210 } 211 212 @Override 213 public String toJSON() throws IOException { 214 return GSON.toJson(toMap()); 215 } 216 217 @Override 218 public String toString() { 219 StringBuilder sb = new StringBuilder(512); 220 sb.append(getDescription()); 221 sb.append(": status="); 222 sb.append(getStatus()); 223 sb.append(", state="); 224 sb.append(getState()); 225 sb.append(", startTime="); 226 sb.append(getStartTime()); 227 sb.append(", completionTime="); 228 sb.append(getCompletionTimestamp()); 229 return sb.toString(); 230 } 231 232 /** 233 * Returns the status journal. This implementation of status journal is not thread-safe. Currently 234 * we use this to track various stages of flushes and compactions where we can use this/pretty 235 * print for post task analysis, by which time we are already done changing states (writing to 236 * journal) 237 */ 238 @Override 239 public List<StatusJournalEntry> getStatusJournal() { 240 if (journal == null) { 241 return Collections.emptyList(); 242 } else { 243 return Collections.unmodifiableList(journal); 244 } 245 } 246 247 /** 248 * Enables journaling of this monitored task, the first invocation will lazily initialize the 249 * journal. The journal implementation itself and this method are not thread safe 250 */ 251 @Override 252 public void enableStatusJournal(boolean includeCurrentStatus) { 253 if (journalEnabled && journal != null) { 254 return; 255 } 256 journalEnabled = true; 257 if (journal == null) { 258 journal = new ArrayList<StatusJournalEntry>(); 259 } 260 if (includeCurrentStatus && status != null) { 261 journal.add(new StatusJournalEntryImpl(status, statusTime)); 262 } 263 } 264 265 @Override 266 public void disableStatusJournal() { 267 journalEnabled = false; 268 } 269 270 @Override 271 public String prettyPrintJournal() { 272 if (!journalEnabled) { 273 return ""; 274 } 275 StringBuilder sb = new StringBuilder(); 276 for (int i = 0; i < journal.size(); i++) { 277 StatusJournalEntry je = journal.get(i); 278 sb.append(je.toString()); 279 if (i != 0) { 280 StatusJournalEntry jep = journal.get(i - 1); 281 long delta = je.getTimeStamp() - jep.getTimeStamp(); 282 if (delta != 0) { 283 sb.append(" (+" + delta + " ms)"); 284 } 285 } 286 sb.append("\n"); 287 } 288 return sb.toString(); 289 } 290 291}