001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.monitoring;
019
020import java.io.IOException;
021import java.util.Collections;
022import java.util.HashMap;
023import java.util.Iterator;
024import java.util.List;
025import java.util.Map;
026import java.util.concurrent.ConcurrentLinkedQueue;
027import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
028import org.apache.hadoop.hbase.util.GsonUtil;
029import org.apache.yetus.audience.InterfaceAudience;
030
031import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
032import org.apache.hbase.thirdparty.com.google.gson.Gson;
033
034@InterfaceAudience.Private
035class MonitoredTaskImpl implements MonitoredTask {
036  private long startTime;
037  private long statusTime;
038  private long stateTime;
039  private long warnTime;
040
041  private volatile String status;
042  private volatile String description;
043
044  protected volatile State state = State.RUNNING;
045  private final ConcurrentLinkedQueue<StatusJournalEntry> journal;
046
047  private static final Gson GSON = GsonUtil.createGson().create();
048
049  public MonitoredTaskImpl(boolean enableJournal) {
050    startTime = EnvironmentEdgeManager.currentTime();
051    statusTime = startTime;
052    stateTime = startTime;
053    warnTime = startTime;
054    if (enableJournal) {
055      journal = new ConcurrentLinkedQueue<>();
056    } else {
057      journal = null;
058    }
059  }
060
061  private static final class StatusJournalEntryImpl implements StatusJournalEntry {
062    private final long statusTime;
063    private final String status;
064
065    public StatusJournalEntryImpl(String status, long statusTime) {
066      this.status = status;
067      this.statusTime = statusTime;
068    }
069
070    @Override
071    public String getStatus() {
072      return status;
073    }
074
075    @Override
076    public long getTimeStamp() {
077      return statusTime;
078    }
079
080    @Override
081    public String toString() {
082      return status + " at " + statusTime;
083    }
084  }
085
086  @Override
087  public synchronized MonitoredTaskImpl clone() {
088    try {
089      return (MonitoredTaskImpl) super.clone();
090    } catch (CloneNotSupportedException e) {
091      throw new AssertionError(); // Won't happen
092    }
093  }
094
095  @Override
096  public long getStartTime() {
097    return startTime;
098  }
099
100  @Override
101  public String getDescription() {
102    return description;
103  }
104
105  @Override
106  public String getStatus() {
107    return status;
108  }
109
110  @Override
111  public long getStatusTime() {
112    return statusTime;
113  }
114
115  @Override
116  public State getState() {
117    return state;
118  }
119
120  @Override
121  public long getStateTime() {
122    return stateTime;
123  }
124
125  @Override
126  public long getWarnTime() {
127    return warnTime;
128  }
129
130  @Override
131  public long getCompletionTimestamp() {
132    if (state == State.COMPLETE || state == State.ABORTED) {
133      return stateTime;
134    }
135    return -1;
136  }
137
138  @Override
139  public void markComplete(String status) {
140    setState(State.COMPLETE);
141    setStatus(status);
142  }
143
144  @Override
145  public void pause(String msg) {
146    setState(State.WAITING);
147    setStatus(msg);
148  }
149
150  @Override
151  public void resume(String msg) {
152    setState(State.RUNNING);
153    setStatus(msg);
154  }
155
156  @Override
157  public void abort(String msg) {
158    setStatus(msg);
159    setState(State.ABORTED);
160  }
161
162  @Override
163  public void setStatus(String status) {
164    this.status = status;
165    statusTime = EnvironmentEdgeManager.currentTime();
166    if (journal != null) {
167      journal.add(new StatusJournalEntryImpl(this.status, statusTime));
168    }
169  }
170
171  protected void setState(State state) {
172    this.state = state;
173    stateTime = EnvironmentEdgeManager.currentTime();
174  }
175
176  @Override
177  public void setDescription(String description) {
178    this.description = description;
179  }
180
181  @Override
182  public void setWarnTime(long t) {
183    this.warnTime = t;
184  }
185
186  @Override
187  public void cleanup() {
188    if (state == State.RUNNING) {
189      setState(State.ABORTED);
190    }
191  }
192
193  /**
194   * Force the completion timestamp backwards so that it expires now.
195   */
196  @Override
197  public void expireNow() {
198    stateTime -= 180 * 1000;
199  }
200
201  @Override
202  public Map<String, Object> toMap() {
203    Map<String, Object> map = new HashMap<>();
204    map.put("description", getDescription());
205    map.put("status", getStatus());
206    map.put("state", getState());
207    map.put("starttimems", getStartTime());
208    map.put("statustimems", getCompletionTimestamp());
209    map.put("statetimems", getCompletionTimestamp());
210    return map;
211  }
212
213  @Override
214  public String toJSON() throws IOException {
215    return GSON.toJson(toMap());
216  }
217
218  @Override
219  public String toString() {
220    StringBuilder sb = new StringBuilder(512);
221    sb.append(getDescription());
222    sb.append(": status=");
223    sb.append(getStatus());
224    sb.append(", state=");
225    sb.append(getState());
226    sb.append(", startTime=");
227    sb.append(getStartTime());
228    sb.append(", completionTime=");
229    sb.append(getCompletionTimestamp());
230    return sb.toString();
231  }
232
233  /**
234   * Returns the status journal. This implementation of status journal is not thread-safe. Currently
235   * we use this to track various stages of flushes and compactions where we can use this/pretty
236   * print for post task analysis, by which time we are already done changing states (writing to
237   * journal)
238   */
239  @Override
240  public List<StatusJournalEntry> getStatusJournal() {
241    if (journal == null) {
242      return Collections.emptyList();
243    } else {
244      return ImmutableList.copyOf(journal);
245    }
246  }
247
248  @Override
249  public String prettyPrintJournal() {
250    if (journal == null) {
251      return "";
252    }
253    StringBuilder sb = new StringBuilder();
254    Iterator<StatusJournalEntry> iter = journal.iterator();
255    StatusJournalEntry previousEntry = null;
256    while (iter.hasNext()) {
257      StatusJournalEntry entry = iter.next();
258      sb.append(entry);
259      if (previousEntry != null) {
260        long delta = entry.getTimeStamp() - previousEntry.getTimeStamp();
261        if (delta != 0) {
262          sb.append(" (+" + delta + " ms)");
263        }
264      }
265      previousEntry = entry;
266    }
267    return sb.toString();
268  }
269}