001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.monitoring; 020 021import com.fasterxml.jackson.databind.ObjectMapper; 022 023import org.apache.hadoop.util.StringUtils; 024import org.apache.yetus.audience.InterfaceAudience; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.Collections; 029import java.util.HashMap; 030import java.util.List; 031import java.util.Map; 032 033@InterfaceAudience.Private 034class MonitoredTaskImpl implements MonitoredTask { 035 private long startTime; 036 private long statusTime; 037 private long stateTime; 038 private long warnTime; 039 040 private volatile String status; 041 private volatile String description; 042 043 protected volatile State state = State.RUNNING; 044 045 private boolean journalEnabled = false; 046 private List<StatusJournalEntry> journal; 047 048 private static final ObjectMapper MAPPER = new ObjectMapper(); 049 050 public MonitoredTaskImpl() { 051 startTime = System.currentTimeMillis(); 052 statusTime = startTime; 053 stateTime = startTime; 054 warnTime = startTime; 055 } 056 057 private static class StatusJournalEntryImpl implements StatusJournalEntry { 058 private long statusTime; 059 private String status; 060 061 public StatusJournalEntryImpl(String status, long statusTime) { 062 this.status = status; 063 this.statusTime = statusTime; 064 } 065 066 @Override 067 public String getStatus() { 068 return status; 069 } 070 071 @Override 072 public long getTimeStamp() { 073 return statusTime; 074 } 075 076 @Override 077 public String toString() { 078 StringBuilder sb = new StringBuilder(); 079 sb.append(status); 080 sb.append(" at "); 081 sb.append(statusTime); 082 return sb.toString(); 083 } 084 } 085 086 @Override 087 public synchronized MonitoredTaskImpl clone() { 088 try { 089 return (MonitoredTaskImpl) super.clone(); 090 } catch (CloneNotSupportedException e) { 091 throw new AssertionError(); // Won't happen 092 } 093 } 094 095 @Override 096 public long getStartTime() { 097 return startTime; 098 } 099 100 @Override 101 public String getDescription() { 102 return description; 103 } 104 105 @Override 106 public String getStatus() { 107 return status; 108 } 109 110 @Override 111 public long getStatusTime() { 112 return statusTime; 113 } 114 115 @Override 116 public State getState() { 117 return state; 118 } 119 120 @Override 121 public long getStateTime() { 122 return stateTime; 123 } 124 125 @Override 126 public long getWarnTime() { 127 return warnTime; 128 } 129 130 @Override 131 public long getCompletionTimestamp() { 132 if (state == State.COMPLETE || state == State.ABORTED) { 133 return stateTime; 134 } 135 return -1; 136 } 137 138 @Override 139 public void markComplete(String status) { 140 setState(State.COMPLETE); 141 setStatus(status); 142 } 143 144 @Override 145 public void pause(String msg) { 146 setState(State.WAITING); 147 setStatus(msg); 148 } 149 150 @Override 151 public void resume(String msg) { 152 setState(State.RUNNING); 153 setStatus(msg); 154 } 155 156 @Override 157 public void abort(String msg) { 158 setStatus(msg); 159 setState(State.ABORTED); 160 } 161 162 @Override 163 public void setStatus(String status) { 164 this.status = status; 165 statusTime = System.currentTimeMillis(); 166 if (journalEnabled) { 167 journal.add(new StatusJournalEntryImpl(this.status, statusTime)); 168 } 169 } 170 171 protected void setState(State state) { 172 this.state = state; 173 stateTime = System.currentTimeMillis(); 174 } 175 176 @Override 177 public void setDescription(String description) { 178 this.description = description; 179 } 180 181 @Override 182 public void setWarnTime(long t) { 183 this.warnTime = t; 184 } 185 186 @Override 187 public void cleanup() { 188 if (state == State.RUNNING) { 189 setState(State.ABORTED); 190 } 191 } 192 193 /** 194 * Force the completion timestamp backwards so that 195 * it expires now. 196 */ 197 @Override 198 public void expireNow() { 199 stateTime -= 180 * 1000; 200 } 201 202 @Override 203 public Map<String, Object> toMap() { 204 Map<String, Object> map = new HashMap<>(); 205 map.put("description", getDescription()); 206 map.put("status", getStatus()); 207 map.put("state", getState()); 208 map.put("starttimems", getStartTime()); 209 map.put("statustimems", getCompletionTimestamp()); 210 map.put("statetimems", getCompletionTimestamp()); 211 return map; 212 } 213 214 @Override 215 public String toJSON() throws IOException { 216 return MAPPER.writeValueAsString(toMap()); 217 } 218 219 @Override 220 public String toString() { 221 StringBuilder sb = new StringBuilder(512); 222 sb.append(getDescription()); 223 sb.append(": status="); 224 sb.append(getStatus()); 225 sb.append(", state="); 226 sb.append(getState()); 227 sb.append(", startTime="); 228 sb.append(getStartTime()); 229 sb.append(", completionTime="); 230 sb.append(getCompletionTimestamp()); 231 return sb.toString(); 232 } 233 234 /** 235 * Returns the status journal. This implementation of status journal is not thread-safe. Currently 236 * we use this to track various stages of flushes and compactions where we can use this/pretty 237 * print for post task analysis, by which time we are already done changing states (writing to 238 * journal) 239 */ 240 @Override 241 public List<StatusJournalEntry> getStatusJournal() { 242 if (journal == null) { 243 return Collections.emptyList(); 244 } else { 245 return Collections.unmodifiableList(journal); 246 } 247 } 248 249 /** 250 * Enables journaling of this monitored task, the first invocation will lazily initialize the 251 * journal. The journal implementation itself and this method are not thread safe 252 */ 253 @Override 254 public void enableStatusJournal(boolean includeCurrentStatus) { 255 if (journalEnabled && journal != null) { 256 return; 257 } 258 journalEnabled = true; 259 if (journal == null) { 260 journal = new ArrayList<StatusJournalEntry>(); 261 } 262 if (includeCurrentStatus) { 263 journal.add(new StatusJournalEntryImpl(status, statusTime)); 264 } 265 } 266 267 @Override 268 public void disableStatusJournal() { 269 journalEnabled = false; 270 } 271 272 @Override 273 public String prettyPrintJournal() { 274 return StringUtils.join("\n\t", getStatusJournal()); 275 } 276 277}