001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.monitoring; 019 020import static org.junit.Assert.*; 021 022import java.util.ArrayList; 023import java.util.List; 024import java.util.concurrent.atomic.AtomicBoolean; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.HBaseClassTestRule; 027import org.apache.hadoop.hbase.client.Mutation; 028import org.apache.hadoop.hbase.client.Put; 029import org.apache.hadoop.hbase.client.Query; 030import org.apache.hadoop.hbase.client.Scan; 031import org.apache.hadoop.hbase.testclassification.MiscTests; 032import org.apache.hadoop.hbase.testclassification.SmallTests; 033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 034import org.junit.ClassRule; 035import org.junit.Test; 036import org.junit.experimental.categories.Category; 037 038@Category({MiscTests.class, SmallTests.class}) 039public class TestTaskMonitor { 040 041 @ClassRule 042 public static final HBaseClassTestRule CLASS_RULE = 043 HBaseClassTestRule.forClass(TestTaskMonitor.class); 044 045 @Test 046 public void testTaskMonitorBasics() { 047 TaskMonitor tm = new TaskMonitor(new Configuration()); 048 assertTrue("Task monitor should start empty", 049 tm.getTasks().isEmpty()); 050 051 // Make a task and fetch it back out 052 MonitoredTask task = tm.createStatus("Test task"); 053 MonitoredTask taskFromTm = tm.getTasks().get(0); 054 055 // Make sure the state is reasonable. 056 assertEquals(task.getDescription(), taskFromTm.getDescription()); 057 assertEquals(-1, taskFromTm.getCompletionTimestamp()); 058 assertEquals(MonitoredTask.State.RUNNING, taskFromTm.getState()); 059 060 // Mark it as finished 061 task.markComplete("Finished!"); 062 assertEquals(MonitoredTask.State.COMPLETE, task.getState()); 063 064 // It should still show up in the TaskMonitor list 065 assertEquals(1, tm.getTasks().size()); 066 067 // If we mark its completion time back a few minutes, it should get gced 068 task.expireNow(); 069 assertEquals(0, tm.getTasks().size()); 070 071 tm.shutdown(); 072 } 073 074 @Test 075 public void testTasksGetAbortedOnLeak() throws InterruptedException { 076 final TaskMonitor tm = new TaskMonitor(new Configuration()); 077 assertTrue("Task monitor should start empty", 078 tm.getTasks().isEmpty()); 079 080 final AtomicBoolean threadSuccess = new AtomicBoolean(false); 081 // Make a task in some other thread and leak it 082 Thread t = new Thread() { 083 @Override 084 public void run() { 085 MonitoredTask task = tm.createStatus("Test task"); 086 assertEquals(MonitoredTask.State.RUNNING, task.getState()); 087 threadSuccess.set(true); 088 } 089 }; 090 t.start(); 091 t.join(); 092 // Make sure the thread saw the correct state 093 assertTrue(threadSuccess.get()); 094 095 // Make sure the leaked reference gets cleared 096 System.gc(); 097 System.gc(); 098 System.gc(); 099 100 // Now it should be aborted 101 MonitoredTask taskFromTm = tm.getTasks().get(0); 102 assertEquals(MonitoredTask.State.ABORTED, taskFromTm.getState()); 103 104 tm.shutdown(); 105 } 106 107 @Test 108 public void testTaskLimit() throws Exception { 109 TaskMonitor tm = new TaskMonitor(new Configuration()); 110 for (int i = 0; i < TaskMonitor.DEFAULT_MAX_TASKS + 10; i++) { 111 tm.createStatus("task " + i); 112 } 113 // Make sure it was limited correctly 114 assertEquals(TaskMonitor.DEFAULT_MAX_TASKS, tm.getTasks().size()); 115 // Make sure we culled the earlier tasks, not later 116 // (i.e. tasks 0 through 9 should have been deleted) 117 assertEquals("task 10", tm.getTasks().get(0).getDescription()); 118 tm.shutdown(); 119 } 120 121 @Test 122 public void testDoNotPurgeRPCTask() throws Exception { 123 int RPCTaskNums = 10; 124 TaskMonitor tm = TaskMonitor.get(); 125 for(int i = 0; i < RPCTaskNums; i++) { 126 tm.createRPCStatus("PRCTask" + i); 127 } 128 for(int i = 0; i < TaskMonitor.DEFAULT_MAX_TASKS; i++) { 129 tm.createStatus("otherTask" + i); 130 } 131 int remainRPCTask = 0; 132 for(MonitoredTask task: tm.getTasks()) { 133 if(task instanceof MonitoredRPCHandler) { 134 remainRPCTask++; 135 } 136 } 137 assertEquals("RPC Tasks have been purged!", RPCTaskNums, remainRPCTask); 138 tm.shutdown(); 139 } 140 141 @Test 142 public void testWarnStuckTasks() throws Exception { 143 final int INTERVAL = 1000; 144 Configuration conf = new Configuration(); 145 conf.setLong(TaskMonitor.RPC_WARN_TIME_KEY, INTERVAL); 146 conf.setLong(TaskMonitor.MONITOR_INTERVAL_KEY, INTERVAL); 147 final TaskMonitor tm = new TaskMonitor(conf); 148 MonitoredRPCHandler t = tm.createRPCStatus("test task"); 149 long then = EnvironmentEdgeManager.currentTime(); 150 t.setRPC("testMethod", new Object[0], then); 151 Thread.sleep(INTERVAL * 2); 152 assertTrue("We did not warn", t.getWarnTime() > then); 153 tm.shutdown(); 154 } 155 156 @Test 157 public void testGetTasksWithFilter() throws Exception { 158 TaskMonitor tm = new TaskMonitor(new Configuration()); 159 assertTrue("Task monitor should start empty", tm.getTasks().isEmpty()); 160 // Create 5 general tasks 161 tm.createStatus("General task1"); 162 tm.createStatus("General task2"); 163 tm.createStatus("General task3"); 164 tm.createStatus("General task4"); 165 tm.createStatus("General task5"); 166 // Create 5 rpc tasks, and mark 1 completed 167 int length = 5; 168 ArrayList<MonitoredRPCHandler> rpcHandlers = new ArrayList<>(length); 169 for (int i = 0; i < length; i++) { 170 MonitoredRPCHandler rpcHandler = tm.createRPCStatus("Rpc task" + i); 171 rpcHandlers.add(rpcHandler); 172 } 173 // Create rpc opertions 174 byte[] row = new byte[] { 0x01 }; 175 Mutation m = new Put(row); 176 Query q = new Scan(); 177 String notOperation = "for test"; 178 rpcHandlers.get(0).setRPC("operations", new Object[]{ m, q }, 3000); 179 rpcHandlers.get(1).setRPC("operations", new Object[]{ m, q }, 3000); 180 rpcHandlers.get(2).setRPC("operations", new Object[]{ m, q }, 3000); 181 rpcHandlers.get(3).setRPC("operations", new Object[]{ notOperation }, 3000); 182 rpcHandlers.get(4).setRPC("operations", new Object[]{ m, q }, 3000); 183 MonitoredRPCHandler completed = rpcHandlers.get(4); 184 completed.markComplete("Completed!"); 185 // Test get tasks with filter 186 List<MonitoredTask> generalTasks = tm.getTasks("general"); 187 assertEquals(5, generalTasks.size()); 188 List<MonitoredTask> handlerTasks = tm.getTasks("handler"); 189 assertEquals(5, handlerTasks.size()); 190 List<MonitoredTask> rpcTasks = tm.getTasks("rpc"); 191 // The last rpc handler is stopped 192 assertEquals(4, rpcTasks.size()); 193 List<MonitoredTask> operationTasks = tm.getTasks("operation"); 194 // Handler 3 doesn't handle Operation. 195 assertEquals(3, operationTasks.size()); 196 tm.shutdown(); 197 } 198 199 @Test 200 public void testStatusJournal() { 201 TaskMonitor tm = new TaskMonitor(new Configuration()); 202 MonitoredTask task = tm.createStatus("Test task"); 203 assertTrue(task.getStatusJournal().isEmpty()); 204 task.disableStatusJournal(); 205 task.setStatus("status1"); 206 // journal should be empty since it is disabled 207 assertTrue(task.getStatusJournal().isEmpty()); 208 task.enableStatusJournal(true); 209 // check existing status entered in journal 210 assertEquals("status1", task.getStatusJournal().get(0).getStatus()); 211 assertTrue(task.getStatusJournal().get(0).getTimeStamp() > 0); 212 task.disableStatusJournal(); 213 task.setStatus("status2"); 214 // check status 2 not added since disabled 215 assertEquals(1, task.getStatusJournal().size()); 216 task.enableStatusJournal(false); 217 // size should still be 1 since we didn't include current status 218 assertEquals(1, task.getStatusJournal().size()); 219 task.setStatus("status3"); 220 assertEquals("status3", task.getStatusJournal().get(1).getStatus()); 221 tm.shutdown(); 222 } 223} 224