001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.monitoring; 019 020import java.io.IOException; 021import java.util.Collections; 022import java.util.HashMap; 023import java.util.Iterator; 024import java.util.List; 025import java.util.Map; 026import java.util.concurrent.ConcurrentLinkedQueue; 027import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 028import org.apache.hadoop.hbase.util.GsonUtil; 029import org.apache.yetus.audience.InterfaceAudience; 030 031import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 032import org.apache.hbase.thirdparty.com.google.gson.Gson; 033 034@InterfaceAudience.Private 035class MonitoredTaskImpl implements MonitoredTask { 036 private long startTime; 037 private long statusTime; 038 private long stateTime; 039 private long warnTime; 040 041 private volatile String status; 042 private volatile String description; 043 044 protected volatile State state = State.RUNNING; 045 private final ConcurrentLinkedQueue<StatusJournalEntry> journal; 046 047 private static final Gson GSON = GsonUtil.createGson().create(); 048 049 public MonitoredTaskImpl(boolean enableJournal) { 050 startTime = EnvironmentEdgeManager.currentTime(); 051 statusTime = startTime; 052 stateTime = startTime; 053 warnTime = startTime; 054 if (enableJournal) { 055 journal = new ConcurrentLinkedQueue<>(); 056 } else { 057 journal = null; 058 } 059 } 060 061 private static final class StatusJournalEntryImpl implements StatusJournalEntry { 062 private final long statusTime; 063 private final String status; 064 065 public StatusJournalEntryImpl(String status, long statusTime) { 066 this.status = status; 067 this.statusTime = statusTime; 068 } 069 070 @Override 071 public String getStatus() { 072 return status; 073 } 074 075 @Override 076 public long getTimeStamp() { 077 return statusTime; 078 } 079 080 @Override 081 public String toString() { 082 return status + " at " + statusTime; 083 } 084 } 085 086 @Override 087 public synchronized MonitoredTaskImpl clone() { 088 try { 089 return (MonitoredTaskImpl) super.clone(); 090 } catch (CloneNotSupportedException e) { 091 throw new AssertionError(); // Won't happen 092 } 093 } 094 095 @Override 096 public long getStartTime() { 097 return startTime; 098 } 099 100 @Override 101 public String getDescription() { 102 return description; 103 } 104 105 @Override 106 public String getStatus() { 107 return status; 108 } 109 110 @Override 111 public long getStatusTime() { 112 return statusTime; 113 } 114 115 @Override 116 public State getState() { 117 return state; 118 } 119 120 @Override 121 public long getStateTime() { 122 return stateTime; 123 } 124 125 @Override 126 public long getWarnTime() { 127 return warnTime; 128 } 129 130 @Override 131 public long getCompletionTimestamp() { 132 if (state == State.COMPLETE || state == State.ABORTED) { 133 return stateTime; 134 } 135 return -1; 136 } 137 138 @Override 139 public void markComplete(String status) { 140 setState(State.COMPLETE); 141 setStatus(status); 142 } 143 144 @Override 145 public void pause(String msg) { 146 setState(State.WAITING); 147 setStatus(msg); 148 } 149 150 @Override 151 public void resume(String msg) { 152 setState(State.RUNNING); 153 setStatus(msg); 154 } 155 156 @Override 157 public void abort(String msg) { 158 setStatus(msg); 159 setState(State.ABORTED); 160 } 161 162 @Override 163 public void setStatus(String status) { 164 this.status = status; 165 statusTime = EnvironmentEdgeManager.currentTime(); 166 if (journal != null) { 167 journal.add(new StatusJournalEntryImpl(this.status, statusTime)); 168 } 169 } 170 171 protected void setState(State state) { 172 this.state = state; 173 stateTime = EnvironmentEdgeManager.currentTime(); 174 } 175 176 @Override 177 public void setDescription(String description) { 178 this.description = description; 179 } 180 181 @Override 182 public void setWarnTime(long t) { 183 this.warnTime = t; 184 } 185 186 @Override 187 public void cleanup() { 188 if (state == State.RUNNING) { 189 setState(State.ABORTED); 190 } 191 } 192 193 /** 194 * Force the completion timestamp backwards so that it expires now. 195 */ 196 @Override 197 public void expireNow() { 198 stateTime -= 180 * 1000; 199 } 200 201 @Override 202 public Map<String, Object> toMap() { 203 Map<String, Object> map = new HashMap<>(); 204 map.put("description", getDescription()); 205 map.put("status", getStatus()); 206 map.put("state", getState()); 207 map.put("starttimems", getStartTime()); 208 map.put("statustimems", getCompletionTimestamp()); 209 map.put("statetimems", getCompletionTimestamp()); 210 return map; 211 } 212 213 @Override 214 public String toJSON() throws IOException { 215 return GSON.toJson(toMap()); 216 } 217 218 @Override 219 public String toString() { 220 StringBuilder sb = new StringBuilder(512); 221 sb.append(getDescription()); 222 sb.append(": status="); 223 sb.append(getStatus()); 224 sb.append(", state="); 225 sb.append(getState()); 226 sb.append(", startTime="); 227 sb.append(getStartTime()); 228 sb.append(", completionTime="); 229 sb.append(getCompletionTimestamp()); 230 return sb.toString(); 231 } 232 233 /** 234 * Returns the status journal. This implementation of status journal is not thread-safe. Currently 235 * we use this to track various stages of flushes and compactions where we can use this/pretty 236 * print for post task analysis, by which time we are already done changing states (writing to 237 * journal) 238 */ 239 @Override 240 public List<StatusJournalEntry> getStatusJournal() { 241 if (journal == null) { 242 return Collections.emptyList(); 243 } else { 244 return ImmutableList.copyOf(journal); 245 } 246 } 247 248 @Override 249 public String prettyPrintJournal() { 250 if (journal == null) { 251 return ""; 252 } 253 StringBuilder sb = new StringBuilder(); 254 Iterator<StatusJournalEntry> iter = journal.iterator(); 255 StatusJournalEntry previousEntry = null; 256 while (iter.hasNext()) { 257 StatusJournalEntry entry = iter.next(); 258 sb.append(entry); 259 if (previousEntry != null) { 260 long delta = entry.getTimeStamp() - previousEntry.getTimeStamp(); 261 if (delta != 0) { 262 sb.append(" (+" + delta + " ms)"); 263 } 264 } 265 previousEntry = entry; 266 } 267 return sb.toString(); 268 } 269}