001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.monitoring;
019
020import java.io.IOException;
021import java.util.Collections;
022import java.util.HashMap;
023import java.util.Iterator;
024import java.util.List;
025import java.util.Map;
026import java.util.concurrent.ConcurrentLinkedQueue;
027import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
028import org.apache.hadoop.hbase.util.GsonUtil;
029import org.apache.yetus.audience.InterfaceAudience;
030
031import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
032import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
033import org.apache.hbase.thirdparty.com.google.gson.Gson;
034
035@InterfaceAudience.Private
036class MonitoredTaskImpl implements MonitoredTask {
037  private long startTime;
038  private long statusTime;
039  private long stateTime;
040  private long warnTime;
041
042  private volatile String status;
043  private volatile String description;
044
045  protected volatile State state = State.RUNNING;
046  private final ConcurrentLinkedQueue<StatusJournalEntry> journal;
047
048  private static final Gson GSON = GsonUtil.createGson().create();
049
050  public MonitoredTaskImpl(boolean enableJournal, String description) {
051    startTime = EnvironmentEdgeManager.currentTime();
052    statusTime = startTime;
053    stateTime = startTime;
054    warnTime = startTime;
055    this.description = description;
056    this.status = "status unset";
057    if (enableJournal) {
058      journal = new ConcurrentLinkedQueue<>();
059    } else {
060      journal = null;
061    }
062  }
063
064  private static final class StatusJournalEntryImpl implements StatusJournalEntry {
065    private final long statusTime;
066    private final String status;
067
068    public StatusJournalEntryImpl(String status, long statusTime) {
069      this.status = status;
070      this.statusTime = statusTime;
071    }
072
073    @Override
074    public String getStatus() {
075      return status;
076    }
077
078    @Override
079    public long getTimeStamp() {
080      return statusTime;
081    }
082
083    @Override
084    public String toString() {
085      return status + " at " + statusTime;
086    }
087  }
088
089  @Override
090  public synchronized MonitoredTaskImpl clone() {
091    try {
092      return (MonitoredTaskImpl) super.clone();
093    } catch (CloneNotSupportedException e) {
094      throw new AssertionError(); // Won't happen
095    }
096  }
097
098  @Override
099  public long getStartTime() {
100    return startTime;
101  }
102
103  @Override
104  public String getDescription() {
105    return description;
106  }
107
108  @Override
109  public String getStatus() {
110    return status;
111  }
112
113  @Override
114  public long getStatusTime() {
115    return statusTime;
116  }
117
118  @Override
119  public State getState() {
120    return state;
121  }
122
123  @Override
124  public long getStateTime() {
125    return stateTime;
126  }
127
128  @Override
129  public long getWarnTime() {
130    return warnTime;
131  }
132
133  @Override
134  public long getCompletionTimestamp() {
135    if (state == State.COMPLETE || state == State.ABORTED) {
136      return stateTime;
137    }
138    return -1;
139  }
140
141  @Override
142  public void markComplete(String status) {
143    setState(State.COMPLETE);
144    setStatus(status);
145  }
146
147  @Override
148  public void pause(String msg) {
149    setState(State.WAITING);
150    setStatus(msg);
151  }
152
153  @Override
154  public void resume(String msg) {
155    setState(State.RUNNING);
156    setStatus(msg);
157  }
158
159  @Override
160  public void abort(String msg) {
161    setStatus(msg);
162    setState(State.ABORTED);
163  }
164
165  @Override
166  public void setStatus(String status) {
167    Preconditions.checkNotNull(status, "Status is null");
168    this.status = status;
169    statusTime = EnvironmentEdgeManager.currentTime();
170    if (journal != null) {
171      journal.add(new StatusJournalEntryImpl(this.status, statusTime));
172    }
173  }
174
175  protected void setState(State state) {
176    this.state = state;
177    stateTime = EnvironmentEdgeManager.currentTime();
178  }
179
180  @Override
181  public void setDescription(String description) {
182    Preconditions.checkNotNull(description, "Description is null");
183    this.description = description;
184  }
185
186  @Override
187  public void setWarnTime(long t) {
188    this.warnTime = t;
189  }
190
191  @Override
192  public void cleanup() {
193    if (state == State.RUNNING) {
194      setState(State.ABORTED);
195    }
196  }
197
198  /**
199   * Force the completion timestamp backwards so that it expires now.
200   */
201  @Override
202  public void expireNow() {
203    stateTime -= 180 * 1000;
204  }
205
206  @Override
207  public Map<String, Object> toMap() {
208    Map<String, Object> map = new HashMap<>();
209    map.put("description", getDescription());
210    map.put("status", getStatus());
211    map.put("state", getState());
212    map.put("starttimems", getStartTime());
213    map.put("statustimems", getCompletionTimestamp());
214    map.put("statetimems", getCompletionTimestamp());
215    return map;
216  }
217
218  @Override
219  public String toJSON() throws IOException {
220    return GSON.toJson(toMap());
221  }
222
223  @Override
224  public String toString() {
225    StringBuilder sb = new StringBuilder(512);
226    sb.append(getDescription());
227    sb.append(": status=");
228    sb.append(getStatus());
229    sb.append(", state=");
230    sb.append(getState());
231    sb.append(", startTime=");
232    sb.append(getStartTime());
233    sb.append(", completionTime=");
234    sb.append(getCompletionTimestamp());
235    return sb.toString();
236  }
237
238  /**
239   * Returns the status journal. This implementation of status journal is not thread-safe. Currently
240   * we use this to track various stages of flushes and compactions where we can use this/pretty
241   * print for post task analysis, by which time we are already done changing states (writing to
242   * journal)
243   */
244  @Override
245  public List<StatusJournalEntry> getStatusJournal() {
246    if (journal == null) {
247      return Collections.emptyList();
248    } else {
249      return ImmutableList.copyOf(journal);
250    }
251  }
252
253  @Override
254  public String prettyPrintJournal() {
255    if (journal == null) {
256      return "";
257    }
258    StringBuilder sb = new StringBuilder();
259    Iterator<StatusJournalEntry> iter = journal.iterator();
260    StatusJournalEntry previousEntry = null;
261    while (iter.hasNext()) {
262      StatusJournalEntry entry = iter.next();
263      sb.append(entry);
264      if (previousEntry != null) {
265        long delta = entry.getTimeStamp() - previousEntry.getTimeStamp();
266        if (delta != 0) {
267          sb.append(" (+" + delta + " ms)");
268        }
269      }
270      previousEntry = entry;
271    }
272    return sb.toString();
273  }
274}