001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.monitoring;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotEquals;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024import java.util.ArrayList;
025import java.util.List;
026import java.util.Map;
027import java.util.concurrent.atomic.AtomicBoolean;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.client.Mutation;
031import org.apache.hadoop.hbase.client.Put;
032import org.apache.hadoop.hbase.client.Query;
033import org.apache.hadoop.hbase.client.Scan;
034import org.apache.hadoop.hbase.testclassification.MiscTests;
035import org.apache.hadoop.hbase.testclassification.SmallTests;
036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
037import org.junit.ClassRule;
038import org.junit.Test;
039import org.junit.experimental.categories.Category;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043@Category({MiscTests.class, SmallTests.class})
044public class TestTaskMonitor {
045  private static final Logger LOG = LoggerFactory.getLogger(TestTaskMonitor.class);
046
047  @ClassRule
048  public static final HBaseClassTestRule CLASS_RULE =
049      HBaseClassTestRule.forClass(TestTaskMonitor.class);
050
051  @Test
052  public void testTaskMonitorBasics() {
053    TaskMonitor tm = new TaskMonitor(new Configuration());
054    assertTrue("Task monitor should start empty",
055        tm.getTasks().isEmpty());
056
057    // Make a task and fetch it back out
058    MonitoredTask task = tm.createStatus("Test task");
059    MonitoredTask taskFromTm = tm.getTasks().get(0);
060
061    // Make sure the state is reasonable.
062    assertEquals(task.getDescription(), taskFromTm.getDescription());
063    assertEquals(-1, taskFromTm.getCompletionTimestamp());
064    assertEquals(MonitoredTask.State.RUNNING, taskFromTm.getState());
065
066    // Mark it as finished
067    task.markComplete("Finished!");
068    assertEquals(MonitoredTask.State.COMPLETE, task.getState());
069
070    // It should still show up in the TaskMonitor list
071    assertEquals(1, tm.getTasks().size());
072
073    // If we mark its completion time back a few minutes, it should get gced
074    task.expireNow();
075    assertEquals(0, tm.getTasks().size());
076
077    tm.shutdown();
078  }
079
080  @Test
081  public void testTasksGetAbortedOnLeak() throws InterruptedException {
082    final TaskMonitor tm = new TaskMonitor(new Configuration());
083    assertTrue("Task monitor should start empty",
084        tm.getTasks().isEmpty());
085
086    final AtomicBoolean threadSuccess = new AtomicBoolean(false);
087    // Make a task in some other thread and leak it
088    Thread t = new Thread() {
089      @Override
090      public void run() {
091        MonitoredTask task = tm.createStatus("Test task");
092        assertEquals(MonitoredTask.State.RUNNING, task.getState());
093        threadSuccess.set(true);
094      }
095    };
096    t.start();
097    t.join();
098    // Make sure the thread saw the correct state
099    assertTrue(threadSuccess.get());
100
101    // Make sure the leaked reference gets cleared
102    System.gc();
103    System.gc();
104    System.gc();
105
106    // Now it should be aborted
107    MonitoredTask taskFromTm = tm.getTasks().get(0);
108    assertEquals(MonitoredTask.State.ABORTED, taskFromTm.getState());
109
110    tm.shutdown();
111  }
112
113  @Test
114  public void testTaskLimit() throws Exception {
115    TaskMonitor tm = new TaskMonitor(new Configuration());
116    for (int i = 0; i < TaskMonitor.DEFAULT_MAX_TASKS + 10; i++) {
117      tm.createStatus("task " + i);
118    }
119    // Make sure it was limited correctly
120    assertEquals(TaskMonitor.DEFAULT_MAX_TASKS, tm.getTasks().size());
121    // Make sure we culled the earlier tasks, not later
122    // (i.e. tasks 0 through 9 should have been deleted)
123    assertEquals("task 10", tm.getTasks().get(0).getDescription());
124    tm.shutdown();
125  }
126
127  @Test
128  public void testDoNotPurgeRPCTask() throws Exception {
129    int RPCTaskNums = 10;
130    TaskMonitor tm = TaskMonitor.get();
131    for(int i = 0; i < RPCTaskNums; i++) {
132      tm.createRPCStatus("PRCTask" + i);
133    }
134    for(int i = 0; i < TaskMonitor.DEFAULT_MAX_TASKS; i++) {
135      tm.createStatus("otherTask" + i);
136    }
137    int remainRPCTask = 0;
138    for(MonitoredTask task: tm.getTasks()) {
139      if(task instanceof MonitoredRPCHandler) {
140        remainRPCTask++;
141      }
142    }
143    assertEquals("RPC Tasks have been purged!", RPCTaskNums, remainRPCTask);
144    tm.shutdown();
145  }
146
147  @Test
148  public void testWarnStuckTasks() throws Exception {
149    final int RPC_WARN_TIME = 1500;
150    final int MONITOR_INTERVAL = 500;
151    Configuration conf = new Configuration();
152    conf.setLong(TaskMonitor.RPC_WARN_TIME_KEY, RPC_WARN_TIME);
153    conf.setLong(TaskMonitor.MONITOR_INTERVAL_KEY, MONITOR_INTERVAL);
154    final TaskMonitor tm = new TaskMonitor(conf);
155    MonitoredRPCHandler t = tm.createRPCStatus("test task");
156    long beforeSetRPC = EnvironmentEdgeManager.currentTime();
157    assertTrue("Validating initialization assumption", t.getWarnTime() <= beforeSetRPC);
158    Thread.sleep(MONITOR_INTERVAL * 2);
159    t.setRPC("testMethod", new Object[0], beforeSetRPC);
160    long afterSetRPC = EnvironmentEdgeManager.currentTime();
161    Thread.sleep(MONITOR_INTERVAL * 2);
162    assertTrue("Validating no warn after starting RPC", t.getWarnTime() <= afterSetRPC);
163    Thread.sleep(MONITOR_INTERVAL * 2);
164    assertTrue("Validating warn after RPC_WARN_TIME", t.getWarnTime() > afterSetRPC);
165    tm.shutdown();
166  }
167
168  @Test
169  public void testGetTasksWithFilter() throws Exception {
170    TaskMonitor tm = new TaskMonitor(new Configuration());
171    assertTrue("Task monitor should start empty", tm.getTasks().isEmpty());
172    // Create 5 general tasks
173    tm.createStatus("General task1");
174    tm.createStatus("General task2");
175    tm.createStatus("General task3");
176    tm.createStatus("General task4");
177    tm.createStatus("General task5");
178    // Create 5 rpc tasks, and mark 1 completed
179    int length = 5;
180    ArrayList<MonitoredRPCHandler> rpcHandlers = new ArrayList<>(length);
181    for (int i = 0; i < length; i++) {
182      MonitoredRPCHandler rpcHandler = tm.createRPCStatus("Rpc task" + i);
183      rpcHandlers.add(rpcHandler);
184    }
185    // Create rpc opertions
186    byte[] row = new byte[] { 0x01 };
187    Mutation m = new Put(row);
188    Query q = new Scan();
189    String notOperation = "for test";
190    rpcHandlers.get(0).setRPC("operations", new Object[]{ m, q }, 3000);
191    rpcHandlers.get(1).setRPC("operations", new Object[]{ m, q }, 3000);
192    rpcHandlers.get(2).setRPC("operations", new Object[]{ m, q }, 3000);
193    rpcHandlers.get(3).setRPC("operations", new Object[]{ notOperation }, 3000);
194    rpcHandlers.get(4).setRPC("operations", new Object[]{ m, q }, 3000);
195    MonitoredRPCHandler completed = rpcHandlers.get(4);
196    completed.markComplete("Completed!");
197    // Test get tasks with filter
198    List<MonitoredTask> generalTasks = tm.getTasks("general");
199    assertEquals(5, generalTasks.size());
200    List<MonitoredTask> handlerTasks = tm.getTasks("handler");
201    assertEquals(5, handlerTasks.size());
202    List<MonitoredTask> rpcTasks = tm.getTasks("rpc");
203    // The last rpc handler is stopped
204    assertEquals(4, rpcTasks.size());
205    List<MonitoredTask> operationTasks = tm.getTasks("operation");
206    // Handler 3 doesn't handle Operation.
207    assertEquals(3, operationTasks.size());
208    tm.shutdown();
209  }
210
211  @Test
212  public void testStatusJournal() {
213    TaskMonitor tm = new TaskMonitor(new Configuration());
214    MonitoredTask task = tm.createStatus("Test task");
215    assertTrue(task.getStatusJournal().isEmpty());
216    task.disableStatusJournal();
217    task.setStatus("status1");
218    // journal should be empty since it is disabled
219    assertTrue(task.getStatusJournal().isEmpty());
220    task.enableStatusJournal(true);
221    // check existing status entered in journal
222    assertEquals("status1", task.getStatusJournal().get(0).getStatus());
223    assertTrue(task.getStatusJournal().get(0).getTimeStamp() > 0);
224    task.disableStatusJournal();
225    task.setStatus("status2");
226    // check status 2 not added since disabled
227    assertEquals(1, task.getStatusJournal().size());
228    task.enableStatusJournal(false);
229    // size should still be 1 since we didn't include current status
230    assertEquals(1, task.getStatusJournal().size());
231    task.setStatus("status3");
232    assertEquals("status3", task.getStatusJournal().get(1).getStatus());
233    tm.shutdown();
234  }
235
236  @Test
237  public void testClone() throws Exception {
238    MonitoredRPCHandlerImpl monitor = new MonitoredRPCHandlerImpl();
239    monitor.abort("abort RPC");
240    TestParam testParam = new TestParam("param1");
241    monitor.setRPC("method1", new Object[]{ testParam }, 0);
242    MonitoredRPCHandlerImpl clone = monitor.clone();
243    assertEquals(clone.getDescription(), monitor.getDescription());
244    assertEquals(clone.getState(), monitor.getState());
245    assertEquals(clone.getStatus(), monitor.getStatus());
246    assertEquals(clone.toString(), monitor.toString());
247    assertEquals(clone.toMap(), monitor.toMap());
248    assertEquals(clone.toJSON(), monitor.toJSON());
249
250    // mark complete and make param dirty
251    monitor.markComplete("complete RPC");
252    testParam.setParam("dirtyParam");
253    assertEquals(clone.getDescription(), monitor.getDescription());
254    assertNotEquals(clone.getState(), monitor.getState());
255    assertNotEquals(clone.getStatus(), monitor.getStatus());
256    monitor.setState(MonitoredTask.State.RUNNING);
257    try {
258      // when markComplete, the param in monitor is set null, so toMap should fail here
259      monitor.toMap();
260      fail("Should not call toMap successfully, because param=null");
261    } catch (Exception e) {
262    }
263    // the param of clone monitor should not be dirty
264    assertNotEquals("[dirtyString]",
265      String.valueOf(((Map<String, Object>) clone.toMap().get("rpcCall")).get("params")));
266
267    monitor.resume("resume");
268    monitor.setRPC("method2", new Object[]{new TestParam("param2")}, 1);
269    assertNotEquals(((Map<String, Object>) clone.toMap().get("rpcCall")).get("params"),
270      ((Map<String, Object>) monitor.toMap().get("rpcCall")).get(
271        "params"));
272    LOG.info(String.valueOf(clone.toMap()));
273    LOG.info(String.valueOf(monitor.toMap()));
274    assertNotEquals(clone.toString(), monitor.toString());
275    assertNotEquals(clone.getRPCQueueTime(), monitor.getRPCQueueTime());
276    assertNotEquals(clone.toMap(), monitor.toMap());
277    assertNotEquals(clone.toJSON(), monitor.toJSON());
278  }
279
280  private class TestParam {
281    public String param = null;
282
283    public TestParam(String param) {
284      this.param = param;
285    }
286
287    public void setParam(String param) {
288      this.param = param;
289    }
290
291    @Override
292    public String toString() {
293      return param;
294    }
295  }
296}
297