View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.monitoring;
20  
21  import org.apache.hadoop.hbase.classification.InterfaceAudience;
22  import org.apache.hadoop.util.StringUtils;
23  import org.codehaus.jackson.map.ObjectMapper;
24  
25  import java.io.IOException;
26  import java.util.ArrayList;
27  import java.util.Collections;
28  import java.util.HashMap;
29  import java.util.List;
30  import java.util.Map;
31  
32  @InterfaceAudience.Private
33  class MonitoredTaskImpl implements MonitoredTask {
34    private long startTime;
35    private long statusTime;
36    private long stateTime;
37    
38    private volatile String status;
39    private volatile String description;
40    
41    protected volatile State state = State.RUNNING;
42  
43    private boolean journalEnabled = false;
44    private List<StatusJournalEntry> journal;
45  
46    private static final ObjectMapper MAPPER = new ObjectMapper();
47  
48    public MonitoredTaskImpl() {
49      startTime = System.currentTimeMillis();
50      statusTime = startTime;
51      stateTime = startTime;
52    }
53  
54    private static class StatusJournalEntryImpl implements StatusJournalEntry {
55      private long statusTime;
56      private String status;
57  
58      public StatusJournalEntryImpl(String status, long statusTime) {
59        this.status = status;
60        this.statusTime = statusTime;
61      }
62  
63      @Override
64      public String getStatus() {
65        return status;
66      }
67  
68      @Override
69      public long getTimeStamp() {
70        return statusTime;
71      }
72  
73      @Override
74      public String toString() {
75        StringBuilder sb = new StringBuilder();
76        sb.append(status);
77        sb.append(" at ");
78        sb.append(statusTime);
79        return sb.toString();
80      }
81    }
82  
83    @Override
84    public synchronized MonitoredTaskImpl clone() {
85      try {
86        return (MonitoredTaskImpl) super.clone();
87      } catch (CloneNotSupportedException e) {
88        throw new AssertionError(); // Won't happen
89      }
90    }
91  
92    @Override
93    public long getStartTime() {
94      return startTime;
95    }
96    
97    @Override
98    public String getDescription() {
99      return description;
100   }
101 
102   @Override
103   public String getStatus() {
104     return status;
105   }
106 
107   @Override
108   public long getStatusTime() {
109     return statusTime;
110   }
111   
112   @Override
113   public State getState() {
114     return state;
115   }
116   
117   @Override
118   public long getStateTime() {
119     return stateTime;
120   }
121   
122   @Override
123   public long getCompletionTimestamp() {
124     if (state == State.COMPLETE || state == State.ABORTED) {
125       return stateTime;
126     }
127     return -1;
128   }
129 
130   @Override
131   public void markComplete(String status) {
132     setState(State.COMPLETE);
133     setStatus(status);
134   }
135 
136   @Override
137   public void pause(String msg) {
138     setState(State.WAITING);
139     setStatus(msg);
140   }
141 
142   @Override
143   public void resume(String msg) {
144     setState(State.RUNNING);
145     setStatus(msg);
146   }
147 
148   @Override
149   public void abort(String msg) {
150     setStatus(msg);
151     setState(State.ABORTED);
152   }
153   
154   @Override
155   public void setStatus(String status) {
156     this.status = status;
157     statusTime = System.currentTimeMillis();
158     if (journalEnabled) {
159       journal.add(new StatusJournalEntryImpl(this.status, statusTime));
160     }
161   }
162 
163   protected void setState(State state) {
164     this.state = state;
165     stateTime = System.currentTimeMillis();
166   }
167 
168   @Override
169   public void setDescription(String description) {
170     this.description = description;
171   }
172 
173   @Override
174   public void cleanup() {
175     if (state == State.RUNNING) {
176       setState(State.ABORTED);
177     }
178   }
179 
180   /**
181    * Force the completion timestamp backwards so that
182    * it expires now.
183    */
184   public void expireNow() {
185     stateTime -= 180 * 1000;
186   }
187 
188   @Override
189   public Map<String, Object> toMap() {
190     Map<String, Object> map = new HashMap<String, Object>();
191     map.put("description", getDescription());
192     map.put("status", getStatus());
193     map.put("state", getState());
194     map.put("starttimems", getStartTime());
195     map.put("statustimems", getCompletionTimestamp());
196     map.put("statetimems", getCompletionTimestamp());
197     return map;
198   }
199 
200   @Override
201   public String toJSON() throws IOException {
202     return MAPPER.writeValueAsString(toMap());
203   }
204 
205   @Override
206   public String toString() {
207     StringBuilder sb = new StringBuilder(512);
208     sb.append(getDescription());
209     sb.append(": status=");
210     sb.append(getStatus());
211     sb.append(", state=");
212     sb.append(getState());
213     sb.append(", startTime=");
214     sb.append(getStartTime());
215     sb.append(", completionTime=");
216     sb.append(getCompletionTimestamp());
217     return sb.toString();
218   }
219 
220   /**
221    * Returns the status journal. This implementation of status journal is not thread-safe. Currently
222    * we use this to track various stages of flushes and compactions where we can use this/pretty
223    * print for post task analysis, by which time we are already done changing states (writing to
224    * journal)
225    */
226   @Override
227   public List<StatusJournalEntry> getStatusJournal() {
228     if (journal == null) {
229       return Collections.emptyList();
230     } else {
231       return Collections.unmodifiableList(journal);
232     }
233   }
234 
235   /**
236    * Enables journaling of this monitored task, the first invocation will lazily initialize the
237    * journal. The journal implementation itself and this method are not thread safe
238    */
239   @Override
240   public void enableStatusJournal(boolean includeCurrentStatus) {
241     if (journalEnabled && journal != null) {
242       return;
243     }
244     journalEnabled = true;
245     if (journal == null) {
246       journal = new ArrayList<StatusJournalEntry>();
247     }
248     if (includeCurrentStatus) {
249       journal.add(new StatusJournalEntryImpl(status, statusTime));
250     }
251   }
252 
253   @Override
254   public void disableStatusJournal() {
255     journalEnabled = false;
256   }
257 
258   @Override
259   public String prettyPrintJournal() {
260     return StringUtils.join("\n\t", getStatusJournal());
261   }
262 
263 }