001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.monitoring;
019
020import static org.junit.Assert.*;
021
022import java.util.ArrayList;
023import java.util.List;
024import java.util.concurrent.atomic.AtomicBoolean;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.HBaseClassTestRule;
027import org.apache.hadoop.hbase.client.Mutation;
028import org.apache.hadoop.hbase.client.Put;
029import org.apache.hadoop.hbase.client.Query;
030import org.apache.hadoop.hbase.client.Scan;
031import org.apache.hadoop.hbase.testclassification.MiscTests;
032import org.apache.hadoop.hbase.testclassification.SmallTests;
033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
034import org.junit.ClassRule;
035import org.junit.Test;
036import org.junit.experimental.categories.Category;
037
038@Category({MiscTests.class, SmallTests.class})
039public class TestTaskMonitor {
040
041  @ClassRule
042  public static final HBaseClassTestRule CLASS_RULE =
043      HBaseClassTestRule.forClass(TestTaskMonitor.class);
044
045  @Test
046  public void testTaskMonitorBasics() {
047    TaskMonitor tm = new TaskMonitor(new Configuration());
048    assertTrue("Task monitor should start empty",
049        tm.getTasks().isEmpty());
050
051    // Make a task and fetch it back out
052    MonitoredTask task = tm.createStatus("Test task");
053    MonitoredTask taskFromTm = tm.getTasks().get(0);
054
055    // Make sure the state is reasonable.
056    assertEquals(task.getDescription(), taskFromTm.getDescription());
057    assertEquals(-1, taskFromTm.getCompletionTimestamp());
058    assertEquals(MonitoredTask.State.RUNNING, taskFromTm.getState());
059
060    // Mark it as finished
061    task.markComplete("Finished!");
062    assertEquals(MonitoredTask.State.COMPLETE, task.getState());
063
064    // It should still show up in the TaskMonitor list
065    assertEquals(1, tm.getTasks().size());
066
067    // If we mark its completion time back a few minutes, it should get gced
068    task.expireNow();
069    assertEquals(0, tm.getTasks().size());
070
071    tm.shutdown();
072  }
073
074  @Test
075  public void testTasksGetAbortedOnLeak() throws InterruptedException {
076    final TaskMonitor tm = new TaskMonitor(new Configuration());
077    assertTrue("Task monitor should start empty",
078        tm.getTasks().isEmpty());
079
080    final AtomicBoolean threadSuccess = new AtomicBoolean(false);
081    // Make a task in some other thread and leak it
082    Thread t = new Thread() {
083      @Override
084      public void run() {
085        MonitoredTask task = tm.createStatus("Test task");
086        assertEquals(MonitoredTask.State.RUNNING, task.getState());
087        threadSuccess.set(true);
088      }
089    };
090    t.start();
091    t.join();
092    // Make sure the thread saw the correct state
093    assertTrue(threadSuccess.get());
094
095    // Make sure the leaked reference gets cleared
096    System.gc();
097    System.gc();
098    System.gc();
099
100    // Now it should be aborted
101    MonitoredTask taskFromTm = tm.getTasks().get(0);
102    assertEquals(MonitoredTask.State.ABORTED, taskFromTm.getState());
103
104    tm.shutdown();
105  }
106
107  @Test
108  public void testTaskLimit() throws Exception {
109    TaskMonitor tm = new TaskMonitor(new Configuration());
110    for (int i = 0; i < TaskMonitor.DEFAULT_MAX_TASKS + 10; i++) {
111      tm.createStatus("task " + i);
112    }
113    // Make sure it was limited correctly
114    assertEquals(TaskMonitor.DEFAULT_MAX_TASKS, tm.getTasks().size());
115    // Make sure we culled the earlier tasks, not later
116    // (i.e. tasks 0 through 9 should have been deleted)
117    assertEquals("task 10", tm.getTasks().get(0).getDescription());
118    tm.shutdown();
119  }
120
121  @Test
122  public void testDoNotPurgeRPCTask() throws Exception {
123    int RPCTaskNums = 10;
124    TaskMonitor tm = TaskMonitor.get();
125    for(int i = 0; i < RPCTaskNums; i++) {
126      tm.createRPCStatus("PRCTask" + i);
127    }
128    for(int i = 0; i < TaskMonitor.DEFAULT_MAX_TASKS; i++) {
129      tm.createStatus("otherTask" + i);
130    }
131    int remainRPCTask = 0;
132    for(MonitoredTask task: tm.getTasks()) {
133      if(task instanceof MonitoredRPCHandler) {
134        remainRPCTask++;
135      }
136    }
137    assertEquals("RPC Tasks have been purged!", RPCTaskNums, remainRPCTask);
138    tm.shutdown();
139  }
140
141  @Test
142  public void testWarnStuckTasks() throws Exception {
143    final int RPC_WARN_TIME = 1500;
144    final int MONITOR_INTERVAL = 500;
145    Configuration conf = new Configuration();
146    conf.setLong(TaskMonitor.RPC_WARN_TIME_KEY, RPC_WARN_TIME);
147    conf.setLong(TaskMonitor.MONITOR_INTERVAL_KEY, MONITOR_INTERVAL);
148    final TaskMonitor tm = new TaskMonitor(conf);
149    MonitoredRPCHandler t = tm.createRPCStatus("test task");
150    long beforeSetRPC = EnvironmentEdgeManager.currentTime();
151    assertTrue("Validating initialization assumption", t.getWarnTime() <= beforeSetRPC);
152    Thread.sleep(MONITOR_INTERVAL * 2);
153    t.setRPC("testMethod", new Object[0], beforeSetRPC);
154    long afterSetRPC = EnvironmentEdgeManager.currentTime();
155    Thread.sleep(MONITOR_INTERVAL * 2);
156    assertTrue("Validating no warn after starting RPC", t.getWarnTime() <= afterSetRPC);
157    Thread.sleep(MONITOR_INTERVAL * 2);
158    assertTrue("Validating warn after RPC_WARN_TIME", t.getWarnTime() > afterSetRPC);
159    tm.shutdown();
160  }
161
162  @Test
163  public void testGetTasksWithFilter() throws Exception {
164    TaskMonitor tm = new TaskMonitor(new Configuration());
165    assertTrue("Task monitor should start empty", tm.getTasks().isEmpty());
166    // Create 5 general tasks
167    tm.createStatus("General task1");
168    tm.createStatus("General task2");
169    tm.createStatus("General task3");
170    tm.createStatus("General task4");
171    tm.createStatus("General task5");
172    // Create 5 rpc tasks, and mark 1 completed
173    int length = 5;
174    ArrayList<MonitoredRPCHandler> rpcHandlers = new ArrayList<>(length);
175    for (int i = 0; i < length; i++) {
176      MonitoredRPCHandler rpcHandler = tm.createRPCStatus("Rpc task" + i);
177      rpcHandlers.add(rpcHandler);
178    }
179    // Create rpc opertions
180    byte[] row = new byte[] { 0x01 };
181    Mutation m = new Put(row);
182    Query q = new Scan();
183    String notOperation = "for test";
184    rpcHandlers.get(0).setRPC("operations", new Object[]{ m, q }, 3000);
185    rpcHandlers.get(1).setRPC("operations", new Object[]{ m, q }, 3000);
186    rpcHandlers.get(2).setRPC("operations", new Object[]{ m, q }, 3000);
187    rpcHandlers.get(3).setRPC("operations", new Object[]{ notOperation }, 3000);
188    rpcHandlers.get(4).setRPC("operations", new Object[]{ m, q }, 3000);
189    MonitoredRPCHandler completed = rpcHandlers.get(4);
190    completed.markComplete("Completed!");
191    // Test get tasks with filter
192    List<MonitoredTask> generalTasks = tm.getTasks("general");
193    assertEquals(5, generalTasks.size());
194    List<MonitoredTask> handlerTasks = tm.getTasks("handler");
195    assertEquals(5, handlerTasks.size());
196    List<MonitoredTask> rpcTasks = tm.getTasks("rpc");
197    // The last rpc handler is stopped
198    assertEquals(4, rpcTasks.size());
199    List<MonitoredTask> operationTasks = tm.getTasks("operation");
200    // Handler 3 doesn't handle Operation.
201    assertEquals(3, operationTasks.size());
202    tm.shutdown();
203  }
204
205  @Test
206  public void testStatusJournal() {
207    TaskMonitor tm = new TaskMonitor(new Configuration());
208    MonitoredTask task = tm.createStatus("Test task");
209    assertTrue(task.getStatusJournal().isEmpty());
210    task.disableStatusJournal();
211    task.setStatus("status1");
212    // journal should be empty since it is disabled
213    assertTrue(task.getStatusJournal().isEmpty());
214    task.enableStatusJournal(true);
215    // check existing status entered in journal
216    assertEquals("status1", task.getStatusJournal().get(0).getStatus());
217    assertTrue(task.getStatusJournal().get(0).getTimeStamp() > 0);
218    task.disableStatusJournal();
219    task.setStatus("status2");
220    // check status 2 not added since disabled
221    assertEquals(1, task.getStatusJournal().size());
222    task.enableStatusJournal(false);
223    // size should still be 1 since we didn't include current status
224    assertEquals(1, task.getStatusJournal().size());
225    task.setStatus("status3");
226    assertEquals("status3", task.getStatusJournal().get(1).getStatus());
227    tm.shutdown();
228  }
229}
230