001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.monitoring;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.HashMap;
024import java.util.List;
025import java.util.Map;
026import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
027import org.apache.hadoop.hbase.util.GsonUtil;
028import org.apache.yetus.audience.InterfaceAudience;
029
030import org.apache.hbase.thirdparty.com.google.gson.Gson;
031
032@InterfaceAudience.Private
033class MonitoredTaskImpl implements MonitoredTask {
034  private long startTime;
035  private long statusTime;
036  private long stateTime;
037  private long warnTime;
038
039  private volatile String status;
040  private volatile String description;
041
042  protected volatile State state = State.RUNNING;
043
044  private boolean journalEnabled = false;
045  private List<StatusJournalEntry> journal;
046
047  private static final Gson GSON = GsonUtil.createGson().create();
048
049  public MonitoredTaskImpl() {
050    startTime = EnvironmentEdgeManager.currentTime();
051    statusTime = startTime;
052    stateTime = startTime;
053    warnTime = startTime;
054  }
055
056  private static class StatusJournalEntryImpl implements StatusJournalEntry {
057    private long statusTime;
058    private String status;
059
060    public StatusJournalEntryImpl(String status, long statusTime) {
061      this.status = status;
062      this.statusTime = statusTime;
063    }
064
065    @Override
066    public String getStatus() {
067      return status;
068    }
069
070    @Override
071    public long getTimeStamp() {
072      return statusTime;
073    }
074
075    @Override
076    public String toString() {
077      StringBuilder sb = new StringBuilder();
078      sb.append(status);
079      sb.append(" at ");
080      sb.append(statusTime);
081      return sb.toString();
082    }
083  }
084
085  @Override
086  public synchronized MonitoredTaskImpl clone() {
087    try {
088      return (MonitoredTaskImpl) super.clone();
089    } catch (CloneNotSupportedException e) {
090      throw new AssertionError(); // Won't happen
091    }
092  }
093
094  @Override
095  public long getStartTime() {
096    return startTime;
097  }
098
099  @Override
100  public String getDescription() {
101    return description;
102  }
103
104  @Override
105  public String getStatus() {
106    return status;
107  }
108
109  @Override
110  public long getStatusTime() {
111    return statusTime;
112  }
113
114  @Override
115  public State getState() {
116    return state;
117  }
118
119  @Override
120  public long getStateTime() {
121    return stateTime;
122  }
123
124  @Override
125  public long getWarnTime() {
126    return warnTime;
127  }
128
129  @Override
130  public long getCompletionTimestamp() {
131    if (state == State.COMPLETE || state == State.ABORTED) {
132      return stateTime;
133    }
134    return -1;
135  }
136
137  @Override
138  public void markComplete(String status) {
139    setState(State.COMPLETE);
140    setStatus(status);
141  }
142
143  @Override
144  public void pause(String msg) {
145    setState(State.WAITING);
146    setStatus(msg);
147  }
148
149  @Override
150  public void resume(String msg) {
151    setState(State.RUNNING);
152    setStatus(msg);
153  }
154
155  @Override
156  public void abort(String msg) {
157    setStatus(msg);
158    setState(State.ABORTED);
159  }
160
161  @Override
162  public void setStatus(String status) {
163    this.status = status;
164    statusTime = EnvironmentEdgeManager.currentTime();
165    if (journalEnabled) {
166      journal.add(new StatusJournalEntryImpl(this.status, statusTime));
167    }
168  }
169
170  protected void setState(State state) {
171    this.state = state;
172    stateTime = EnvironmentEdgeManager.currentTime();
173  }
174
175  @Override
176  public void setDescription(String description) {
177    this.description = description;
178  }
179
180  @Override
181  public void setWarnTime(long t) {
182    this.warnTime = t;
183  }
184
185  @Override
186  public void cleanup() {
187    if (state == State.RUNNING) {
188      setState(State.ABORTED);
189    }
190  }
191
192  /**
193   * Force the completion timestamp backwards so that it expires now.
194   */
195  @Override
196  public void expireNow() {
197    stateTime -= 180 * 1000;
198  }
199
200  @Override
201  public Map<String, Object> toMap() {
202    Map<String, Object> map = new HashMap<>();
203    map.put("description", getDescription());
204    map.put("status", getStatus());
205    map.put("state", getState());
206    map.put("starttimems", getStartTime());
207    map.put("statustimems", getCompletionTimestamp());
208    map.put("statetimems", getCompletionTimestamp());
209    return map;
210  }
211
212  @Override
213  public String toJSON() throws IOException {
214    return GSON.toJson(toMap());
215  }
216
217  @Override
218  public String toString() {
219    StringBuilder sb = new StringBuilder(512);
220    sb.append(getDescription());
221    sb.append(": status=");
222    sb.append(getStatus());
223    sb.append(", state=");
224    sb.append(getState());
225    sb.append(", startTime=");
226    sb.append(getStartTime());
227    sb.append(", completionTime=");
228    sb.append(getCompletionTimestamp());
229    return sb.toString();
230  }
231
232  /**
233   * Returns the status journal. This implementation of status journal is not thread-safe. Currently
234   * we use this to track various stages of flushes and compactions where we can use this/pretty
235   * print for post task analysis, by which time we are already done changing states (writing to
236   * journal)
237   */
238  @Override
239  public List<StatusJournalEntry> getStatusJournal() {
240    if (journal == null) {
241      return Collections.emptyList();
242    } else {
243      return Collections.unmodifiableList(journal);
244    }
245  }
246
247  /**
248   * Enables journaling of this monitored task, the first invocation will lazily initialize the
249   * journal. The journal implementation itself and this method are not thread safe
250   */
251  @Override
252  public void enableStatusJournal(boolean includeCurrentStatus) {
253    if (journalEnabled && journal != null) {
254      return;
255    }
256    journalEnabled = true;
257    if (journal == null) {
258      journal = new ArrayList<StatusJournalEntry>();
259    }
260    if (includeCurrentStatus && status != null) {
261      journal.add(new StatusJournalEntryImpl(status, statusTime));
262    }
263  }
264
265  @Override
266  public void disableStatusJournal() {
267    journalEnabled = false;
268  }
269
270  @Override
271  public String prettyPrintJournal() {
272    if (!journalEnabled) {
273      return "";
274    }
275    StringBuilder sb = new StringBuilder();
276    for (int i = 0; i < journal.size(); i++) {
277      StatusJournalEntry je = journal.get(i);
278      sb.append(je.toString());
279      if (i != 0) {
280        StatusJournalEntry jep = journal.get(i - 1);
281        long delta = je.getTimeStamp() - jep.getTimeStamp();
282        if (delta != 0) {
283          sb.append(" (+" + delta + " ms)");
284        }
285      }
286      sb.append("\n");
287    }
288    return sb.toString();
289  }
290
291}