001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.monitoring; 019 020import static org.junit.Assert.*; 021 022import java.util.ArrayList; 023import java.util.List; 024import java.util.concurrent.atomic.AtomicBoolean; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.HBaseClassTestRule; 027import org.apache.hadoop.hbase.client.Mutation; 028import org.apache.hadoop.hbase.client.Put; 029import org.apache.hadoop.hbase.client.Query; 030import org.apache.hadoop.hbase.client.Scan; 031import org.apache.hadoop.hbase.testclassification.MiscTests; 032import org.apache.hadoop.hbase.testclassification.SmallTests; 033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 034import org.junit.ClassRule; 035import org.junit.Test; 036import org.junit.experimental.categories.Category; 037 038@Category({MiscTests.class, SmallTests.class}) 039public class TestTaskMonitor { 040 041 @ClassRule 042 public static final HBaseClassTestRule CLASS_RULE = 043 HBaseClassTestRule.forClass(TestTaskMonitor.class); 044 045 @Test 046 public void testTaskMonitorBasics() { 047 TaskMonitor tm = new TaskMonitor(new Configuration()); 048 assertTrue("Task monitor should start empty", 049 tm.getTasks().isEmpty()); 050 051 // Make a task and fetch it back out 052 MonitoredTask task = tm.createStatus("Test task"); 053 MonitoredTask taskFromTm = tm.getTasks().get(0); 054 055 // Make sure the state is reasonable. 056 assertEquals(task.getDescription(), taskFromTm.getDescription()); 057 assertEquals(-1, taskFromTm.getCompletionTimestamp()); 058 assertEquals(MonitoredTask.State.RUNNING, taskFromTm.getState()); 059 060 // Mark it as finished 061 task.markComplete("Finished!"); 062 assertEquals(MonitoredTask.State.COMPLETE, task.getState()); 063 064 // It should still show up in the TaskMonitor list 065 assertEquals(1, tm.getTasks().size()); 066 067 // If we mark its completion time back a few minutes, it should get gced 068 task.expireNow(); 069 assertEquals(0, tm.getTasks().size()); 070 071 tm.shutdown(); 072 } 073 074 @Test 075 public void testTasksGetAbortedOnLeak() throws InterruptedException { 076 final TaskMonitor tm = new TaskMonitor(new Configuration()); 077 assertTrue("Task monitor should start empty", 078 tm.getTasks().isEmpty()); 079 080 final AtomicBoolean threadSuccess = new AtomicBoolean(false); 081 // Make a task in some other thread and leak it 082 Thread t = new Thread() { 083 @Override 084 public void run() { 085 MonitoredTask task = tm.createStatus("Test task"); 086 assertEquals(MonitoredTask.State.RUNNING, task.getState()); 087 threadSuccess.set(true); 088 } 089 }; 090 t.start(); 091 t.join(); 092 // Make sure the thread saw the correct state 093 assertTrue(threadSuccess.get()); 094 095 // Make sure the leaked reference gets cleared 096 System.gc(); 097 System.gc(); 098 System.gc(); 099 100 // Now it should be aborted 101 MonitoredTask taskFromTm = tm.getTasks().get(0); 102 assertEquals(MonitoredTask.State.ABORTED, taskFromTm.getState()); 103 104 tm.shutdown(); 105 } 106 107 @Test 108 public void testTaskLimit() throws Exception { 109 TaskMonitor tm = new TaskMonitor(new Configuration()); 110 for (int i = 0; i < TaskMonitor.DEFAULT_MAX_TASKS + 10; i++) { 111 tm.createStatus("task " + i); 112 } 113 // Make sure it was limited correctly 114 assertEquals(TaskMonitor.DEFAULT_MAX_TASKS, tm.getTasks().size()); 115 // Make sure we culled the earlier tasks, not later 116 // (i.e. tasks 0 through 9 should have been deleted) 117 assertEquals("task 10", tm.getTasks().get(0).getDescription()); 118 tm.shutdown(); 119 } 120 121 @Test 122 public void testDoNotPurgeRPCTask() throws Exception { 123 int RPCTaskNums = 10; 124 TaskMonitor tm = TaskMonitor.get(); 125 for(int i = 0; i < RPCTaskNums; i++) { 126 tm.createRPCStatus("PRCTask" + i); 127 } 128 for(int i = 0; i < TaskMonitor.DEFAULT_MAX_TASKS; i++) { 129 tm.createStatus("otherTask" + i); 130 } 131 int remainRPCTask = 0; 132 for(MonitoredTask task: tm.getTasks()) { 133 if(task instanceof MonitoredRPCHandler) { 134 remainRPCTask++; 135 } 136 } 137 assertEquals("RPC Tasks have been purged!", RPCTaskNums, remainRPCTask); 138 tm.shutdown(); 139 } 140 141 @Test 142 public void testWarnStuckTasks() throws Exception { 143 final int RPC_WARN_TIME = 1500; 144 final int MONITOR_INTERVAL = 500; 145 Configuration conf = new Configuration(); 146 conf.setLong(TaskMonitor.RPC_WARN_TIME_KEY, RPC_WARN_TIME); 147 conf.setLong(TaskMonitor.MONITOR_INTERVAL_KEY, MONITOR_INTERVAL); 148 final TaskMonitor tm = new TaskMonitor(conf); 149 MonitoredRPCHandler t = tm.createRPCStatus("test task"); 150 long beforeSetRPC = EnvironmentEdgeManager.currentTime(); 151 assertTrue("Validating initialization assumption", t.getWarnTime() <= beforeSetRPC); 152 Thread.sleep(MONITOR_INTERVAL * 2); 153 t.setRPC("testMethod", new Object[0], beforeSetRPC); 154 long afterSetRPC = EnvironmentEdgeManager.currentTime(); 155 Thread.sleep(MONITOR_INTERVAL * 2); 156 assertTrue("Validating no warn after starting RPC", t.getWarnTime() <= afterSetRPC); 157 Thread.sleep(MONITOR_INTERVAL * 2); 158 assertTrue("Validating warn after RPC_WARN_TIME", t.getWarnTime() > afterSetRPC); 159 tm.shutdown(); 160 } 161 162 @Test 163 public void testGetTasksWithFilter() throws Exception { 164 TaskMonitor tm = new TaskMonitor(new Configuration()); 165 assertTrue("Task monitor should start empty", tm.getTasks().isEmpty()); 166 // Create 5 general tasks 167 tm.createStatus("General task1"); 168 tm.createStatus("General task2"); 169 tm.createStatus("General task3"); 170 tm.createStatus("General task4"); 171 tm.createStatus("General task5"); 172 // Create 5 rpc tasks, and mark 1 completed 173 int length = 5; 174 ArrayList<MonitoredRPCHandler> rpcHandlers = new ArrayList<>(length); 175 for (int i = 0; i < length; i++) { 176 MonitoredRPCHandler rpcHandler = tm.createRPCStatus("Rpc task" + i); 177 rpcHandlers.add(rpcHandler); 178 } 179 // Create rpc opertions 180 byte[] row = new byte[] { 0x01 }; 181 Mutation m = new Put(row); 182 Query q = new Scan(); 183 String notOperation = "for test"; 184 rpcHandlers.get(0).setRPC("operations", new Object[]{ m, q }, 3000); 185 rpcHandlers.get(1).setRPC("operations", new Object[]{ m, q }, 3000); 186 rpcHandlers.get(2).setRPC("operations", new Object[]{ m, q }, 3000); 187 rpcHandlers.get(3).setRPC("operations", new Object[]{ notOperation }, 3000); 188 rpcHandlers.get(4).setRPC("operations", new Object[]{ m, q }, 3000); 189 MonitoredRPCHandler completed = rpcHandlers.get(4); 190 completed.markComplete("Completed!"); 191 // Test get tasks with filter 192 List<MonitoredTask> generalTasks = tm.getTasks("general"); 193 assertEquals(5, generalTasks.size()); 194 List<MonitoredTask> handlerTasks = tm.getTasks("handler"); 195 assertEquals(5, handlerTasks.size()); 196 List<MonitoredTask> rpcTasks = tm.getTasks("rpc"); 197 // The last rpc handler is stopped 198 assertEquals(4, rpcTasks.size()); 199 List<MonitoredTask> operationTasks = tm.getTasks("operation"); 200 // Handler 3 doesn't handle Operation. 201 assertEquals(3, operationTasks.size()); 202 tm.shutdown(); 203 } 204 205 @Test 206 public void testStatusJournal() { 207 TaskMonitor tm = new TaskMonitor(new Configuration()); 208 MonitoredTask task = tm.createStatus("Test task"); 209 assertTrue(task.getStatusJournal().isEmpty()); 210 task.disableStatusJournal(); 211 task.setStatus("status1"); 212 // journal should be empty since it is disabled 213 assertTrue(task.getStatusJournal().isEmpty()); 214 task.enableStatusJournal(true); 215 // check existing status entered in journal 216 assertEquals("status1", task.getStatusJournal().get(0).getStatus()); 217 assertTrue(task.getStatusJournal().get(0).getTimeStamp() > 0); 218 task.disableStatusJournal(); 219 task.setStatus("status2"); 220 // check status 2 not added since disabled 221 assertEquals(1, task.getStatusJournal().size()); 222 task.enableStatusJournal(false); 223 // size should still be 1 since we didn't include current status 224 assertEquals(1, task.getStatusJournal().size()); 225 task.setStatus("status3"); 226 assertEquals("status3", task.getStatusJournal().get(1).getStatus()); 227 tm.shutdown(); 228 } 229} 230