001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.monitoring;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import org.apache.hadoop.hbase.util.GsonUtil;
028import org.apache.yetus.audience.InterfaceAudience;
029
030import org.apache.hbase.thirdparty.com.google.gson.Gson;
031
032@InterfaceAudience.Private
033class MonitoredTaskImpl implements MonitoredTask {
034  private long startTime;
035  private long statusTime;
036  private long stateTime;
037  private long warnTime;
038
039  private volatile String status;
040  private volatile String description;
041  
042  protected volatile State state = State.RUNNING;
043
044  private boolean journalEnabled = false;
045  private List<StatusJournalEntry> journal;
046
047  private static final Gson GSON = GsonUtil.createGson().create();
048
049  public MonitoredTaskImpl() {
050    startTime = System.currentTimeMillis();
051    statusTime = startTime;
052    stateTime = startTime;
053    warnTime = startTime;
054  }
055
056  private static class StatusJournalEntryImpl implements StatusJournalEntry {
057    private long statusTime;
058    private String status;
059
060    public StatusJournalEntryImpl(String status, long statusTime) {
061      this.status = status;
062      this.statusTime = statusTime;
063    }
064
065    @Override
066    public String getStatus() {
067      return status;
068    }
069
070    @Override
071    public long getTimeStamp() {
072      return statusTime;
073    }
074
075    @Override
076    public String toString() {
077      StringBuilder sb = new StringBuilder();
078      sb.append(status);
079      sb.append(" at ");
080      sb.append(statusTime);
081      return sb.toString();
082    }
083  }
084
085  @Override
086  public synchronized MonitoredTaskImpl clone() {
087    try {
088      return (MonitoredTaskImpl) super.clone();
089    } catch (CloneNotSupportedException e) {
090      throw new AssertionError(); // Won't happen
091    }
092  }
093
094  @Override
095  public long getStartTime() {
096    return startTime;
097  }
098  
099  @Override
100  public String getDescription() {
101    return description;
102  }
103
104  @Override
105  public String getStatus() {
106    return status;
107  }
108
109  @Override
110  public long getStatusTime() {
111    return statusTime;
112  }
113  
114  @Override
115  public State getState() {
116    return state;
117  }
118  
119  @Override
120  public long getStateTime() {
121    return stateTime;
122  }
123
124  @Override
125  public long getWarnTime() {
126    return warnTime;
127  }
128
129  @Override
130  public long getCompletionTimestamp() {
131    if (state == State.COMPLETE || state == State.ABORTED) {
132      return stateTime;
133    }
134    return -1;
135  }
136
137  @Override
138  public void markComplete(String status) {
139    setState(State.COMPLETE);
140    setStatus(status);
141  }
142
143  @Override
144  public void pause(String msg) {
145    setState(State.WAITING);
146    setStatus(msg);
147  }
148
149  @Override
150  public void resume(String msg) {
151    setState(State.RUNNING);
152    setStatus(msg);
153  }
154
155  @Override
156  public void abort(String msg) {
157    setStatus(msg);
158    setState(State.ABORTED);
159  }
160  
161  @Override
162  public void setStatus(String status) {
163    this.status = status;
164    statusTime = System.currentTimeMillis();
165    if (journalEnabled) {
166      journal.add(new StatusJournalEntryImpl(this.status, statusTime));
167    }
168  }
169
170  protected void setState(State state) {
171    this.state = state;
172    stateTime = System.currentTimeMillis();
173  }
174
175  @Override
176  public void setDescription(String description) {
177    this.description = description;
178  }
179
180  @Override
181  public void setWarnTime(long t) {
182    this.warnTime = t;
183  }
184
185  @Override
186  public void cleanup() {
187    if (state == State.RUNNING) {
188      setState(State.ABORTED);
189    }
190  }
191
192  /**
193   * Force the completion timestamp backwards so that
194   * it expires now.
195   */
196  @Override
197  public void expireNow() {
198    stateTime -= 180 * 1000;
199  }
200
201  @Override
202  public Map<String, Object> toMap() {
203    Map<String, Object> map = new HashMap<>();
204    map.put("description", getDescription());
205    map.put("status", getStatus());
206    map.put("state", getState());
207    map.put("starttimems", getStartTime());
208    map.put("statustimems", getCompletionTimestamp());
209    map.put("statetimems", getCompletionTimestamp());
210    return map;
211  }
212
213  @Override
214  public String toJSON() throws IOException {
215    return GSON.toJson(toMap());
216  }
217
218  @Override
219  public String toString() {
220    StringBuilder sb = new StringBuilder(512);
221    sb.append(getDescription());
222    sb.append(": status=");
223    sb.append(getStatus());
224    sb.append(", state=");
225    sb.append(getState());
226    sb.append(", startTime=");
227    sb.append(getStartTime());
228    sb.append(", completionTime=");
229    sb.append(getCompletionTimestamp());
230    return sb.toString();
231  }
232
233  /**
234   * Returns the status journal. This implementation of status journal is not thread-safe. Currently
235   * we use this to track various stages of flushes and compactions where we can use this/pretty
236   * print for post task analysis, by which time we are already done changing states (writing to
237   * journal)
238   */
239  @Override
240  public List<StatusJournalEntry> getStatusJournal() {
241    if (journal == null) {
242      return Collections.emptyList();
243    } else {
244      return Collections.unmodifiableList(journal);
245    }
246  }
247
248  /**
249   * Enables journaling of this monitored task, the first invocation will lazily initialize the
250   * journal. The journal implementation itself and this method are not thread safe
251   */
252  @Override
253  public void enableStatusJournal(boolean includeCurrentStatus) {
254    if (journalEnabled && journal != null) {
255      return;
256    }
257    journalEnabled = true;
258    if (journal == null) {
259      journal = new ArrayList<StatusJournalEntry>();
260    }
261    if (includeCurrentStatus && status != null) {
262      journal.add(new StatusJournalEntryImpl(status, statusTime));
263    }
264  }
265
266  @Override
267  public void disableStatusJournal() {
268    journalEnabled = false;
269  }
270
271  @Override
272  public String prettyPrintJournal() {
273    if (!journalEnabled) {
274      return "";
275    }
276    StringBuilder sb = new StringBuilder();
277    for (int i = 0; i < journal.size(); i++) {
278      StatusJournalEntry je = journal.get(i);
279      sb.append(je.toString());
280      if (i != 0) {
281        StatusJournalEntry jep = journal.get(i-1);
282        long delta = je.getTimeStamp() - jep.getTimeStamp();
283        if (delta != 0) {
284          sb.append(" (+" + delta + " ms)");
285        }
286      }
287      sb.append("\n");
288    }
289    return sb.toString();
290  }
291
292}