001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.monitoring;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotEquals;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024
025import java.util.ArrayList;
026import java.util.List;
027import java.util.Map;
028import java.util.concurrent.atomic.AtomicBoolean;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.client.Mutation;
032import org.apache.hadoop.hbase.client.Put;
033import org.apache.hadoop.hbase.client.Query;
034import org.apache.hadoop.hbase.client.Scan;
035import org.apache.hadoop.hbase.testclassification.MiscTests;
036import org.apache.hadoop.hbase.testclassification.SmallTests;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.junit.ClassRule;
039import org.junit.Test;
040import org.junit.experimental.categories.Category;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044@Category({ MiscTests.class, SmallTests.class })
045public class TestTaskMonitor {
046  private static final Logger LOG = LoggerFactory.getLogger(TestTaskMonitor.class);
047
048  @ClassRule
049  public static final HBaseClassTestRule CLASS_RULE =
050    HBaseClassTestRule.forClass(TestTaskMonitor.class);
051
052  @Test
053  public void testTaskMonitorBasics() {
054    TaskMonitor tm = new TaskMonitor(new Configuration());
055    assertTrue("Task monitor should start empty", tm.getTasks().isEmpty());
056
057    // Make a task and fetch it back out
058    MonitoredTask task = tm.createStatus("Test task");
059    MonitoredTask taskFromTm = tm.getTasks().get(0);
060
061    // Make sure the state is reasonable.
062    assertEquals(task.getDescription(), taskFromTm.getDescription());
063    assertEquals(-1, taskFromTm.getCompletionTimestamp());
064    assertEquals(MonitoredTask.State.RUNNING, taskFromTm.getState());
065
066    // Mark it as finished
067    task.markComplete("Finished!");
068    assertEquals(MonitoredTask.State.COMPLETE, task.getState());
069
070    // It should still show up in the TaskMonitor list
071    assertEquals(1, tm.getTasks().size());
072
073    // If we mark its completion time back a few minutes, it should get gced
074    task.expireNow();
075    assertEquals(0, tm.getTasks().size());
076
077    tm.shutdown();
078  }
079
080  @Test
081  public void testTasksGetAbortedOnLeak() throws InterruptedException {
082    final TaskMonitor tm = new TaskMonitor(new Configuration());
083    assertTrue("Task monitor should start empty", tm.getTasks().isEmpty());
084
085    final AtomicBoolean threadSuccess = new AtomicBoolean(false);
086    // Make a task in some other thread and leak it
087    Thread t = new Thread() {
088      @Override
089      public void run() {
090        MonitoredTask task = tm.createStatus("Test task");
091        assertEquals(MonitoredTask.State.RUNNING, task.getState());
092        threadSuccess.set(true);
093      }
094    };
095    t.start();
096    t.join();
097    // Make sure the thread saw the correct state
098    assertTrue(threadSuccess.get());
099
100    // Make sure the leaked reference gets cleared
101    System.gc();
102    System.gc();
103    System.gc();
104
105    // Now it should be aborted
106    MonitoredTask taskFromTm = tm.getTasks().get(0);
107    assertEquals(MonitoredTask.State.ABORTED, taskFromTm.getState());
108
109    tm.shutdown();
110  }
111
112  @Test
113  public void testTaskLimit() throws Exception {
114    TaskMonitor tm = new TaskMonitor(new Configuration());
115    for (int i = 0; i < TaskMonitor.DEFAULT_MAX_TASKS + 10; i++) {
116      tm.createStatus("task " + i);
117    }
118    // Make sure it was limited correctly
119    assertEquals(TaskMonitor.DEFAULT_MAX_TASKS, tm.getTasks().size());
120    // Make sure we culled the earlier tasks, not later
121    // (i.e. tasks 0 through 9 should have been deleted)
122    assertEquals("task 10", tm.getTasks().get(0).getDescription());
123    tm.shutdown();
124  }
125
126  @Test
127  public void testDoNotPurgeRPCTask() throws Exception {
128    int RPCTaskNums = 10;
129    TaskMonitor tm = TaskMonitor.get();
130    for (int i = 0; i < RPCTaskNums; i++) {
131      tm.createRPCStatus("PRCTask" + i);
132    }
133    for (int i = 0; i < TaskMonitor.DEFAULT_MAX_TASKS; i++) {
134      tm.createStatus("otherTask" + i);
135    }
136    int remainRPCTask = 0;
137    for (MonitoredTask task : tm.getTasks()) {
138      if (task instanceof MonitoredRPCHandler) {
139        remainRPCTask++;
140      }
141    }
142    assertEquals("RPC Tasks have been purged!", RPCTaskNums, remainRPCTask);
143    tm.shutdown();
144  }
145
146  @Test
147  public void testWarnStuckTasks() throws Exception {
148    final int RPC_WARN_TIME = 1500;
149    final int MONITOR_INTERVAL = 500;
150    Configuration conf = new Configuration();
151    conf.setLong(TaskMonitor.RPC_WARN_TIME_KEY, RPC_WARN_TIME);
152    conf.setLong(TaskMonitor.MONITOR_INTERVAL_KEY, MONITOR_INTERVAL);
153    final TaskMonitor tm = new TaskMonitor(conf);
154    MonitoredRPCHandler t = tm.createRPCStatus("test task");
155    long beforeSetRPC = EnvironmentEdgeManager.currentTime();
156    assertTrue("Validating initialization assumption", t.getWarnTime() <= beforeSetRPC);
157    Thread.sleep(MONITOR_INTERVAL * 2);
158    t.setRPC("testMethod", new Object[0], beforeSetRPC);
159    long afterSetRPC = EnvironmentEdgeManager.currentTime();
160    Thread.sleep(MONITOR_INTERVAL * 2);
161    assertTrue("Validating no warn after starting RPC", t.getWarnTime() <= afterSetRPC);
162    Thread.sleep(MONITOR_INTERVAL * 2);
163    assertTrue("Validating warn after RPC_WARN_TIME", t.getWarnTime() > afterSetRPC);
164    tm.shutdown();
165  }
166
167  @Test
168  public void testGetTasksWithFilter() throws Exception {
169    TaskMonitor tm = new TaskMonitor(new Configuration());
170    assertTrue("Task monitor should start empty", tm.getTasks().isEmpty());
171    // Create 5 general tasks
172    tm.createStatus("General task1");
173    tm.createStatus("General task2");
174    tm.createStatus("General task3");
175    tm.createStatus("General task4");
176    tm.createStatus("General task5");
177    // Create 5 rpc tasks, and mark 1 completed
178    int length = 5;
179    ArrayList<MonitoredRPCHandler> rpcHandlers = new ArrayList<>(length);
180    for (int i = 0; i < length; i++) {
181      MonitoredRPCHandler rpcHandler = tm.createRPCStatus("Rpc task" + i);
182      rpcHandlers.add(rpcHandler);
183    }
184    // Create rpc opertions
185    byte[] row = new byte[] { 0x01 };
186    Mutation m = new Put(row);
187    Query q = new Scan();
188    String notOperation = "for test";
189    rpcHandlers.get(0).setRPC("operations", new Object[] { m, q }, 3000);
190    rpcHandlers.get(1).setRPC("operations", new Object[] { m, q }, 3000);
191    rpcHandlers.get(2).setRPC("operations", new Object[] { m, q }, 3000);
192    rpcHandlers.get(3).setRPC("operations", new Object[] { notOperation }, 3000);
193    rpcHandlers.get(4).setRPC("operations", new Object[] { m, q }, 3000);
194    MonitoredRPCHandler completed = rpcHandlers.get(4);
195    completed.markComplete("Completed!");
196    // Test get tasks with filter
197    List<MonitoredTask> generalTasks = tm.getTasks("general");
198    assertEquals(5, generalTasks.size());
199    List<MonitoredTask> handlerTasks = tm.getTasks("handler");
200    assertEquals(5, handlerTasks.size());
201    List<MonitoredTask> rpcTasks = tm.getTasks("rpc");
202    // The last rpc handler is stopped
203    assertEquals(4, rpcTasks.size());
204    List<MonitoredTask> operationTasks = tm.getTasks("operation");
205    // Handler 3 doesn't handle Operation.
206    assertEquals(3, operationTasks.size());
207    tm.shutdown();
208  }
209
210  @Test
211  public void testStatusJournal() {
212    TaskMonitor tm = new TaskMonitor(new Configuration());
213    MonitoredTask task = tm.createStatus("Test task");
214    assertTrue(task.getStatusJournal().isEmpty());
215    task.setStatus("status1");
216    // journal should be empty since it is disabled
217    assertTrue(task.getStatusJournal().isEmpty());
218    task = tm.createStatus("Test task with journal", true);
219    task.setStatus("status2");
220    assertEquals(1, task.getStatusJournal().size());
221    assertEquals("status2", task.getStatusJournal().get(0).getStatus());
222    task.setStatus("status3");
223    assertEquals(2, task.getStatusJournal().size());
224    assertEquals("status3", task.getStatusJournal().get(1).getStatus());
225    task.prettyPrintJournal();
226    tm.shutdown();
227  }
228
229  @Test
230  public void testClone() throws Exception {
231    MonitoredRPCHandlerImpl monitor = new MonitoredRPCHandlerImpl();
232    monitor.abort("abort RPC");
233    TestParam testParam = new TestParam("param1");
234    monitor.setRPC("method1", new Object[] { testParam }, 0);
235    MonitoredRPCHandlerImpl clone = monitor.clone();
236    assertEquals(clone.getDescription(), monitor.getDescription());
237    assertEquals(clone.getState(), monitor.getState());
238    assertEquals(clone.getStatus(), monitor.getStatus());
239    assertEquals(clone.toString(), monitor.toString());
240    assertEquals(clone.toMap(), monitor.toMap());
241    assertEquals(clone.toJSON(), monitor.toJSON());
242
243    // mark complete and make param dirty
244    monitor.markComplete("complete RPC");
245    testParam.setParam("dirtyParam");
246    assertEquals(clone.getDescription(), monitor.getDescription());
247    assertNotEquals(clone.getState(), monitor.getState());
248    assertNotEquals(clone.getStatus(), monitor.getStatus());
249    monitor.setState(MonitoredTask.State.RUNNING);
250    try {
251      // when markComplete, the param in monitor is set null, so toMap should fail here
252      monitor.toMap();
253      fail("Should not call toMap successfully, because param=null");
254    } catch (Exception e) {
255    }
256    // the param of clone monitor should not be dirty
257    assertNotEquals("[dirtyString]",
258      String.valueOf(((Map<String, Object>) clone.toMap().get("rpcCall")).get("params")));
259
260    monitor.resume("resume");
261    monitor.setRPC("method2", new Object[] { new TestParam("param2") }, 1);
262    assertNotEquals(((Map<String, Object>) clone.toMap().get("rpcCall")).get("params"),
263      ((Map<String, Object>) monitor.toMap().get("rpcCall")).get("params"));
264    LOG.info(String.valueOf(clone.toMap()));
265    LOG.info(String.valueOf(monitor.toMap()));
266    assertNotEquals(clone.toString(), monitor.toString());
267    assertNotEquals(clone.getRPCQueueTime(), monitor.getRPCQueueTime());
268    assertNotEquals(clone.toMap(), monitor.toMap());
269    assertNotEquals(clone.toJSON(), monitor.toJSON());
270  }
271
272  private class TestParam {
273    public String param = null;
274
275    public TestParam(String param) {
276      this.param = param;
277    }
278
279    public void setParam(String param) {
280      this.param = param;
281    }
282
283    @Override
284    public String toString() {
285      return param;
286    }
287  }
288}