001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.monitoring;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotEquals;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.List;
029import java.util.Map;
030import java.util.concurrent.atomic.AtomicBoolean;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.client.Mutation;
034import org.apache.hadoop.hbase.client.Put;
035import org.apache.hadoop.hbase.client.Query;
036import org.apache.hadoop.hbase.client.Scan;
037import org.apache.hadoop.hbase.testclassification.MiscTests;
038import org.apache.hadoop.hbase.testclassification.SmallTests;
039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
040import org.junit.ClassRule;
041import org.junit.Test;
042import org.junit.experimental.categories.Category;
043import org.skyscreamer.jsonassert.JSONAssert;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047@Category({ MiscTests.class, SmallTests.class })
048public class TestTaskMonitor {
049  private static final Logger LOG = LoggerFactory.getLogger(TestTaskMonitor.class);
050
051  @ClassRule
052  public static final HBaseClassTestRule CLASS_RULE =
053    HBaseClassTestRule.forClass(TestTaskMonitor.class);
054
055  @Test
056  public void testTaskMonitorBasics() {
057    TaskMonitor tm = new TaskMonitor(new Configuration());
058    assertTrue("Task monitor should start empty", tm.getTasks().isEmpty());
059
060    // Make a task and fetch it back out
061    MonitoredTask task = tm.createStatus("Test task");
062    MonitoredTask taskFromTm = tm.getTasks().get(0);
063
064    // Make sure the state is reasonable.
065    assertEquals(task.getDescription(), taskFromTm.getDescription());
066    assertEquals(-1, taskFromTm.getCompletionTimestamp());
067    assertEquals(MonitoredTask.State.RUNNING, taskFromTm.getState());
068    assertEquals(task.getStatus(), taskFromTm.getStatus());
069    assertEquals("status unset", taskFromTm.getStatus());
070
071    // Mark it as finished
072    task.markComplete("Finished!");
073    assertEquals(MonitoredTask.State.COMPLETE, task.getState());
074
075    // It should still show up in the TaskMonitor list
076    assertEquals(1, tm.getTasks().size());
077
078    // If we mark its completion time back a few minutes, it should get gced
079    task.expireNow();
080    assertEquals(0, tm.getTasks().size());
081
082    tm.shutdown();
083  }
084
085  @Test
086  public void testTasksGetAbortedOnLeak() throws InterruptedException {
087    final TaskMonitor tm = new TaskMonitor(new Configuration());
088    assertTrue("Task monitor should start empty", tm.getTasks().isEmpty());
089
090    final AtomicBoolean threadSuccess = new AtomicBoolean(false);
091    // Make a task in some other thread and leak it
092    Thread t = new Thread() {
093      @Override
094      public void run() {
095        MonitoredTask task = tm.createStatus("Test task");
096        assertEquals(MonitoredTask.State.RUNNING, task.getState());
097        threadSuccess.set(true);
098      }
099    };
100    t.start();
101    t.join();
102    // Make sure the thread saw the correct state
103    assertTrue(threadSuccess.get());
104
105    // Make sure the leaked reference gets cleared
106    System.gc();
107    System.gc();
108    System.gc();
109
110    // Now it should be aborted
111    MonitoredTask taskFromTm = tm.getTasks().get(0);
112    assertEquals(MonitoredTask.State.ABORTED, taskFromTm.getState());
113
114    tm.shutdown();
115  }
116
117  @Test
118  public void testTaskLimit() throws Exception {
119    TaskMonitor tm = new TaskMonitor(new Configuration());
120    for (int i = 0; i < TaskMonitor.DEFAULT_MAX_TASKS + 10; i++) {
121      tm.createStatus("task " + i);
122    }
123    // Make sure it was limited correctly
124    assertEquals(TaskMonitor.DEFAULT_MAX_TASKS, tm.getTasks().size());
125    // Make sure we culled the earlier tasks, not later
126    // (i.e. tasks 0 through 9 should have been deleted)
127    assertEquals("task 10", tm.getTasks().get(0).getDescription());
128    tm.shutdown();
129  }
130
131  @Test
132  public void testDoNotPurgeRPCTask() throws Exception {
133    int RPCTaskNums = 10;
134    TaskMonitor tm = TaskMonitor.get();
135    for (int i = 0; i < RPCTaskNums; i++) {
136      tm.createRPCStatus("PRCTask" + i);
137    }
138    for (int i = 0; i < TaskMonitor.DEFAULT_MAX_TASKS; i++) {
139      tm.createStatus("otherTask" + i);
140    }
141    int remainRPCTask = 0;
142    for (MonitoredTask task : tm.getTasks()) {
143      if (task instanceof MonitoredRPCHandler) {
144        remainRPCTask++;
145      }
146    }
147    assertEquals("RPC Tasks have been purged!", RPCTaskNums, remainRPCTask);
148    tm.shutdown();
149  }
150
151  @Test
152  public void testWarnStuckTasks() throws Exception {
153    final int RPC_WARN_TIME = 1500;
154    final int MONITOR_INTERVAL = 500;
155    Configuration conf = new Configuration();
156    conf.setLong(TaskMonitor.RPC_WARN_TIME_KEY, RPC_WARN_TIME);
157    conf.setLong(TaskMonitor.MONITOR_INTERVAL_KEY, MONITOR_INTERVAL);
158    final TaskMonitor tm = new TaskMonitor(conf);
159    MonitoredRPCHandler t = tm.createRPCStatus("test task");
160    long beforeSetRPC = EnvironmentEdgeManager.currentTime();
161    assertTrue("Validating initialization assumption", t.getWarnTime() <= beforeSetRPC);
162    Thread.sleep(MONITOR_INTERVAL * 2);
163    t.setRPC("testMethod", new Object[0], beforeSetRPC);
164    long afterSetRPC = EnvironmentEdgeManager.currentTime();
165    Thread.sleep(MONITOR_INTERVAL * 2);
166    assertTrue("Validating no warn after starting RPC", t.getWarnTime() <= afterSetRPC);
167    Thread.sleep(MONITOR_INTERVAL * 2);
168    assertTrue("Validating warn after RPC_WARN_TIME", t.getWarnTime() > afterSetRPC);
169    tm.shutdown();
170  }
171
172  @Test
173  public void testGetTasksWithFilter() throws Exception {
174    TaskMonitor tm = new TaskMonitor(new Configuration());
175    assertTrue("Task monitor should start empty", tm.getTasks().isEmpty());
176    // Create 5 general tasks
177    tm.createStatus("General task1");
178    tm.createStatus("General task2");
179    tm.createStatus("General task3");
180    tm.createStatus("General task4");
181    tm.createStatus("General task5");
182    // Create 5 rpc tasks, and mark 1 completed
183    int length = 5;
184    ArrayList<MonitoredRPCHandler> rpcHandlers = new ArrayList<>(length);
185    for (int i = 0; i < length; i++) {
186      MonitoredRPCHandler rpcHandler = tm.createRPCStatus("Rpc task" + i);
187      rpcHandlers.add(rpcHandler);
188    }
189    // Create rpc opertions
190    byte[] row = new byte[] { 0x01 };
191    Mutation m = new Put(row);
192    Query q = new Scan();
193    String notOperation = "for test";
194    rpcHandlers.get(0).setRPC("operations", new Object[] { m, q }, 3000);
195    rpcHandlers.get(1).setRPC("operations", new Object[] { m, q }, 3000);
196    rpcHandlers.get(2).setRPC("operations", new Object[] { m, q }, 3000);
197    rpcHandlers.get(3).setRPC("operations", new Object[] { notOperation }, 3000);
198    rpcHandlers.get(4).setRPC("operations", new Object[] { m, q }, 3000);
199    MonitoredRPCHandler completed = rpcHandlers.get(4);
200    completed.markComplete("Completed!");
201    // Test get tasks with filter
202    List<MonitoredTask> generalTasks = tm.getTasks("general");
203    assertEquals(5, generalTasks.size());
204    List<MonitoredTask> handlerTasks = tm.getTasks("handler");
205    assertEquals(5, handlerTasks.size());
206    List<MonitoredTask> rpcTasks = tm.getTasks("rpc");
207    // The last rpc handler is stopped
208    assertEquals(4, rpcTasks.size());
209    List<MonitoredTask> operationTasks = tm.getTasks("operation");
210    // Handler 3 doesn't handle Operation.
211    assertEquals(3, operationTasks.size());
212    tm.shutdown();
213  }
214
215  @Test
216  public void testStatusJournal() {
217    TaskMonitor tm = new TaskMonitor(new Configuration());
218    MonitoredTask task = tm.createStatus("Test task");
219    assertTrue(task.getStatusJournal().isEmpty());
220    task.setStatus("status1");
221    // journal should be empty since it is disabled
222    assertTrue(task.getStatusJournal().isEmpty());
223    task = tm.createStatus("Test task with journal", false, true);
224    task.setStatus("status2");
225    assertEquals(1, task.getStatusJournal().size());
226    assertEquals("status2", task.getStatusJournal().get(0).getStatus());
227    task.setStatus("status3");
228    assertEquals(2, task.getStatusJournal().size());
229    assertEquals("status3", task.getStatusJournal().get(1).getStatus());
230    task.prettyPrintJournal();
231    tm.shutdown();
232  }
233
234  @Test
235  public void testTaskGroup() {
236    TaskGroup group = TaskMonitor.createTaskGroup(true, "test task group");
237    group.addTask("task1");
238    MonitoredTask task2 = group.addTask("task2");
239    task2.setStatus("task2 status2");
240    task2.setStatus("task2 status3");
241    group.addTask("task3");
242    group.markComplete("group complete");
243    Collection<MonitoredTask> tasks = group.getTasks();
244    assertNotNull(tasks);
245    assertEquals(tasks.size(), 3);
246    for (MonitoredTask task : tasks) {
247      if (task.getDescription().equals("task2")) {
248        assertEquals(task.getStatusJournal().size(), 3);
249        task.prettyPrintJournal();
250      }
251    }
252  }
253
254  @Test
255  public void testClone() throws Exception {
256    MonitoredRPCHandlerImpl monitor = new MonitoredRPCHandlerImpl("test");
257    monitor.abort("abort RPC");
258    TestParam testParam = new TestParam("param1");
259    monitor.setRPC("method1", new Object[] { testParam }, 0);
260    MonitoredRPCHandlerImpl clone = monitor.clone();
261    assertEquals(clone.getDescription(), monitor.getDescription());
262    assertEquals(clone.getState(), monitor.getState());
263    assertEquals(clone.getStatus(), monitor.getStatus());
264    assertEquals(clone.toString(), monitor.toString());
265    assertEquals(clone.toMap(), monitor.toMap());
266    JSONAssert.assertEquals(clone.toJSON(), monitor.toJSON(), true);
267
268    // mark complete and make param dirty
269    monitor.markComplete("complete RPC");
270    testParam.setParam("dirtyParam");
271    assertEquals(clone.getDescription(), monitor.getDescription());
272    assertNotEquals(clone.getState(), monitor.getState());
273    assertNotEquals(clone.getStatus(), monitor.getStatus());
274    monitor.setState(MonitoredTask.State.RUNNING);
275    try {
276      // when markComplete, the param in monitor is set null, so toMap should fail here
277      monitor.toMap();
278      fail("Should not call toMap successfully, because param=null");
279    } catch (Exception e) {
280    }
281    // the param of clone monitor should not be dirty
282    assertNotEquals("[dirtyString]",
283      String.valueOf(((Map<String, Object>) clone.toMap().get("rpcCall")).get("params")));
284
285    monitor.resume("resume");
286    monitor.setRPC("method2", new Object[] { new TestParam("param2") }, 1);
287    assertNotEquals(((Map<String, Object>) clone.toMap().get("rpcCall")).get("params"),
288      ((Map<String, Object>) monitor.toMap().get("rpcCall")).get("params"));
289    LOG.info(String.valueOf(clone.toMap()));
290    LOG.info(String.valueOf(monitor.toMap()));
291    assertNotEquals(clone.toString(), monitor.toString());
292    assertNotEquals(clone.getRPCQueueTime(), monitor.getRPCQueueTime());
293    assertNotEquals(clone.toMap(), monitor.toMap());
294    assertNotEquals(clone.toJSON(), monitor.toJSON());
295  }
296
297  private class TestParam {
298    public String param = null;
299
300    public TestParam(String param) {
301      this.param = param;
302    }
303
304    public void setParam(String param) {
305      this.param = param;
306    }
307
308    @Override
309    public String toString() {
310      return param;
311    }
312  }
313}