001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.monitoring;
020
021import com.fasterxml.jackson.databind.ObjectMapper;
022
023import org.apache.hadoop.util.StringUtils;
024import org.apache.yetus.audience.InterfaceAudience;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Collections;
029import java.util.HashMap;
030import java.util.List;
031import java.util.Map;
032
033@InterfaceAudience.Private
034class MonitoredTaskImpl implements MonitoredTask {
035  private long startTime;
036  private long statusTime;
037  private long stateTime;
038  private long warnTime;
039
040  private volatile String status;
041  private volatile String description;
042  
043  protected volatile State state = State.RUNNING;
044
045  private boolean journalEnabled = false;
046  private List<StatusJournalEntry> journal;
047
048  private static final ObjectMapper MAPPER = new ObjectMapper();
049
050  public MonitoredTaskImpl() {
051    startTime = System.currentTimeMillis();
052    statusTime = startTime;
053    stateTime = startTime;
054    warnTime = startTime;
055  }
056
057  private static class StatusJournalEntryImpl implements StatusJournalEntry {
058    private long statusTime;
059    private String status;
060
061    public StatusJournalEntryImpl(String status, long statusTime) {
062      this.status = status;
063      this.statusTime = statusTime;
064    }
065
066    @Override
067    public String getStatus() {
068      return status;
069    }
070
071    @Override
072    public long getTimeStamp() {
073      return statusTime;
074    }
075
076    @Override
077    public String toString() {
078      StringBuilder sb = new StringBuilder();
079      sb.append(status);
080      sb.append(" at ");
081      sb.append(statusTime);
082      return sb.toString();
083    }
084  }
085
086  @Override
087  public synchronized MonitoredTaskImpl clone() {
088    try {
089      return (MonitoredTaskImpl) super.clone();
090    } catch (CloneNotSupportedException e) {
091      throw new AssertionError(); // Won't happen
092    }
093  }
094
095  @Override
096  public long getStartTime() {
097    return startTime;
098  }
099  
100  @Override
101  public String getDescription() {
102    return description;
103  }
104
105  @Override
106  public String getStatus() {
107    return status;
108  }
109
110  @Override
111  public long getStatusTime() {
112    return statusTime;
113  }
114  
115  @Override
116  public State getState() {
117    return state;
118  }
119  
120  @Override
121  public long getStateTime() {
122    return stateTime;
123  }
124
125  @Override
126  public long getWarnTime() {
127    return warnTime;
128  }
129
130  @Override
131  public long getCompletionTimestamp() {
132    if (state == State.COMPLETE || state == State.ABORTED) {
133      return stateTime;
134    }
135    return -1;
136  }
137
138  @Override
139  public void markComplete(String status) {
140    setState(State.COMPLETE);
141    setStatus(status);
142  }
143
144  @Override
145  public void pause(String msg) {
146    setState(State.WAITING);
147    setStatus(msg);
148  }
149
150  @Override
151  public void resume(String msg) {
152    setState(State.RUNNING);
153    setStatus(msg);
154  }
155
156  @Override
157  public void abort(String msg) {
158    setStatus(msg);
159    setState(State.ABORTED);
160  }
161  
162  @Override
163  public void setStatus(String status) {
164    this.status = status;
165    statusTime = System.currentTimeMillis();
166    if (journalEnabled) {
167      journal.add(new StatusJournalEntryImpl(this.status, statusTime));
168    }
169  }
170
171  protected void setState(State state) {
172    this.state = state;
173    stateTime = System.currentTimeMillis();
174  }
175
176  @Override
177  public void setDescription(String description) {
178    this.description = description;
179  }
180
181  @Override
182  public void setWarnTime(long t) {
183    this.warnTime = t;
184  }
185
186  @Override
187  public void cleanup() {
188    if (state == State.RUNNING) {
189      setState(State.ABORTED);
190    }
191  }
192
193  /**
194   * Force the completion timestamp backwards so that
195   * it expires now.
196   */
197  @Override
198  public void expireNow() {
199    stateTime -= 180 * 1000;
200  }
201
202  @Override
203  public Map<String, Object> toMap() {
204    Map<String, Object> map = new HashMap<>();
205    map.put("description", getDescription());
206    map.put("status", getStatus());
207    map.put("state", getState());
208    map.put("starttimems", getStartTime());
209    map.put("statustimems", getCompletionTimestamp());
210    map.put("statetimems", getCompletionTimestamp());
211    return map;
212  }
213
214  @Override
215  public String toJSON() throws IOException {
216    return MAPPER.writeValueAsString(toMap());
217  }
218
219  @Override
220  public String toString() {
221    StringBuilder sb = new StringBuilder(512);
222    sb.append(getDescription());
223    sb.append(": status=");
224    sb.append(getStatus());
225    sb.append(", state=");
226    sb.append(getState());
227    sb.append(", startTime=");
228    sb.append(getStartTime());
229    sb.append(", completionTime=");
230    sb.append(getCompletionTimestamp());
231    return sb.toString();
232  }
233
234  /**
235   * Returns the status journal. This implementation of status journal is not thread-safe. Currently
236   * we use this to track various stages of flushes and compactions where we can use this/pretty
237   * print for post task analysis, by which time we are already done changing states (writing to
238   * journal)
239   */
240  @Override
241  public List<StatusJournalEntry> getStatusJournal() {
242    if (journal == null) {
243      return Collections.emptyList();
244    } else {
245      return Collections.unmodifiableList(journal);
246    }
247  }
248
249  /**
250   * Enables journaling of this monitored task, the first invocation will lazily initialize the
251   * journal. The journal implementation itself and this method are not thread safe
252   */
253  @Override
254  public void enableStatusJournal(boolean includeCurrentStatus) {
255    if (journalEnabled && journal != null) {
256      return;
257    }
258    journalEnabled = true;
259    if (journal == null) {
260      journal = new ArrayList<StatusJournalEntry>();
261    }
262    if (includeCurrentStatus) {
263      journal.add(new StatusJournalEntryImpl(status, statusTime));
264    }
265  }
266
267  @Override
268  public void disableStatusJournal() {
269    journalEnabled = false;
270  }
271
272  @Override
273  public String prettyPrintJournal() {
274    return StringUtils.join("\n\t", getStatusJournal());
275  }
276
277}