001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.monitoring; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertNotEquals; 022import static org.junit.jupiter.api.Assertions.assertNotNull; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024import static org.junit.jupiter.api.Assertions.fail; 025 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.List; 029import java.util.Map; 030import java.util.concurrent.atomic.AtomicBoolean; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.client.Mutation; 033import org.apache.hadoop.hbase.client.Put; 034import org.apache.hadoop.hbase.client.Query; 035import org.apache.hadoop.hbase.client.Scan; 036import org.apache.hadoop.hbase.testclassification.MiscTests; 037import org.apache.hadoop.hbase.testclassification.SmallTests; 038import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 039import org.junit.jupiter.api.Tag; 040import org.junit.jupiter.api.Test; 041import org.skyscreamer.jsonassert.JSONAssert; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045@Tag(MiscTests.TAG) 046@Tag(SmallTests.TAG) 047public class TestTaskMonitor { 048 private static final Logger LOG = LoggerFactory.getLogger(TestTaskMonitor.class); 049 050 @Test 051 public void testTaskMonitorBasics() { 052 TaskMonitor tm = new TaskMonitor(new Configuration()); 053 assertTrue(tm.getTasks().isEmpty(), "Task monitor should start empty"); 054 055 // Make a task and fetch it back out 056 MonitoredTask task = tm.createStatus("Test task"); 057 MonitoredTask taskFromTm = tm.getTasks().get(0); 058 059 // Make sure the state is reasonable. 060 assertEquals(task.getDescription(), taskFromTm.getDescription()); 061 assertEquals(-1, taskFromTm.getCompletionTimestamp()); 062 assertEquals(MonitoredTask.State.RUNNING, taskFromTm.getState()); 063 assertEquals(task.getStatus(), taskFromTm.getStatus()); 064 assertEquals("status unset", taskFromTm.getStatus()); 065 066 // Mark it as finished 067 task.markComplete("Finished!"); 068 assertEquals(MonitoredTask.State.COMPLETE, task.getState()); 069 070 // It should still show up in the TaskMonitor list 071 assertEquals(1, tm.getTasks().size()); 072 073 // If we mark its completion time back a few minutes, it should get gced 074 task.expireNow(); 075 assertEquals(0, tm.getTasks().size()); 076 077 tm.shutdown(); 078 } 079 080 @Test 081 public void testTasksGetAbortedOnLeak() throws InterruptedException { 082 final TaskMonitor tm = new TaskMonitor(new Configuration()); 083 assertTrue(tm.getTasks().isEmpty(), "Task monitor should start empty"); 084 085 final AtomicBoolean threadSuccess = new AtomicBoolean(false); 086 // Make a task in some other thread and leak it 087 Thread t = new Thread() { 088 @Override 089 public void run() { 090 MonitoredTask task = tm.createStatus("Test task"); 091 assertEquals(MonitoredTask.State.RUNNING, task.getState()); 092 threadSuccess.set(true); 093 } 094 }; 095 t.start(); 096 t.join(); 097 // Make sure the thread saw the correct state 098 assertTrue(threadSuccess.get()); 099 100 // Make sure the leaked reference gets cleared 101 System.gc(); 102 System.gc(); 103 System.gc(); 104 105 // Now it should be aborted 106 MonitoredTask taskFromTm = tm.getTasks().get(0); 107 assertEquals(MonitoredTask.State.ABORTED, taskFromTm.getState()); 108 109 tm.shutdown(); 110 } 111 112 @Test 113 public void testTaskLimit() throws Exception { 114 TaskMonitor tm = new TaskMonitor(new Configuration()); 115 for (int i = 0; i < TaskMonitor.DEFAULT_MAX_TASKS + 10; i++) { 116 tm.createStatus("task " + i); 117 } 118 // Make sure it was limited correctly 119 assertEquals(TaskMonitor.DEFAULT_MAX_TASKS, tm.getTasks().size()); 120 // Make sure we culled the earlier tasks, not later 121 // (i.e. tasks 0 through 9 should have been deleted) 122 assertEquals("task 10", tm.getTasks().get(0).getDescription()); 123 tm.shutdown(); 124 } 125 126 @Test 127 public void testDoNotPurgeRPCTask() throws Exception { 128 int RPCTaskNums = 10; 129 TaskMonitor tm = TaskMonitor.get(); 130 for (int i = 0; i < RPCTaskNums; i++) { 131 tm.createRPCStatus("PRCTask" + i); 132 } 133 for (int i = 0; i < TaskMonitor.DEFAULT_MAX_TASKS; i++) { 134 tm.createStatus("otherTask" + i); 135 } 136 int remainRPCTask = 0; 137 for (MonitoredTask task : tm.getTasks()) { 138 if (task instanceof MonitoredRPCHandler) { 139 remainRPCTask++; 140 } 141 } 142 assertEquals(RPCTaskNums, remainRPCTask, "RPC Tasks have been purged!"); 143 tm.shutdown(); 144 } 145 146 @Test 147 public void testWarnStuckTasks() throws Exception { 148 final int RPC_WARN_TIME = 1500; 149 final int MONITOR_INTERVAL = 500; 150 Configuration conf = new Configuration(); 151 conf.setLong(TaskMonitor.RPC_WARN_TIME_KEY, RPC_WARN_TIME); 152 conf.setLong(TaskMonitor.MONITOR_INTERVAL_KEY, MONITOR_INTERVAL); 153 final TaskMonitor tm = new TaskMonitor(conf); 154 MonitoredRPCHandler t = tm.createRPCStatus("test task"); 155 long beforeSetRPC = EnvironmentEdgeManager.currentTime(); 156 assertTrue(t.getWarnTime() <= beforeSetRPC, "Validating initialization assumption"); 157 Thread.sleep(MONITOR_INTERVAL * 2); 158 t.setRPC("testMethod", new Object[0], beforeSetRPC); 159 long afterSetRPC = EnvironmentEdgeManager.currentTime(); 160 Thread.sleep(MONITOR_INTERVAL * 2); 161 assertTrue(t.getWarnTime() <= afterSetRPC, "Validating no warn after starting RPC"); 162 Thread.sleep(MONITOR_INTERVAL * 2); 163 assertTrue(t.getWarnTime() > afterSetRPC, "Validating warn after RPC_WARN_TIME"); 164 tm.shutdown(); 165 } 166 167 @Test 168 public void testGetTasksWithFilter() throws Exception { 169 TaskMonitor tm = new TaskMonitor(new Configuration()); 170 assertTrue(tm.getTasks().isEmpty(), "Task monitor should start empty"); 171 // Create 5 general tasks 172 tm.createStatus("General task1"); 173 tm.createStatus("General task2"); 174 tm.createStatus("General task3"); 175 tm.createStatus("General task4"); 176 tm.createStatus("General task5"); 177 // Create 5 rpc tasks, and mark 1 completed 178 int length = 5; 179 ArrayList<MonitoredRPCHandler> rpcHandlers = new ArrayList<>(length); 180 for (int i = 0; i < length; i++) { 181 MonitoredRPCHandler rpcHandler = tm.createRPCStatus("Rpc task" + i); 182 rpcHandlers.add(rpcHandler); 183 } 184 // Create rpc opertions 185 byte[] row = new byte[] { 0x01 }; 186 Mutation m = new Put(row); 187 Query q = new Scan(); 188 String notOperation = "for test"; 189 rpcHandlers.get(0).setRPC("operations", new Object[] { m, q }, 3000); 190 rpcHandlers.get(1).setRPC("operations", new Object[] { m, q }, 3000); 191 rpcHandlers.get(2).setRPC("operations", new Object[] { m, q }, 3000); 192 rpcHandlers.get(3).setRPC("operations", new Object[] { notOperation }, 3000); 193 rpcHandlers.get(4).setRPC("operations", new Object[] { m, q }, 3000); 194 MonitoredRPCHandler completed = rpcHandlers.get(4); 195 completed.markComplete("Completed!"); 196 // Test get tasks with filter 197 List<MonitoredTask> generalTasks = tm.getTasks("general"); 198 assertEquals(5, generalTasks.size()); 199 List<MonitoredTask> handlerTasks = tm.getTasks("handler"); 200 assertEquals(5, handlerTasks.size()); 201 List<MonitoredTask> rpcTasks = tm.getTasks("rpc"); 202 // The last rpc handler is stopped 203 assertEquals(4, rpcTasks.size()); 204 List<MonitoredTask> operationTasks = tm.getTasks("operation"); 205 // Handler 3 doesn't handle Operation. 206 assertEquals(3, operationTasks.size()); 207 tm.shutdown(); 208 } 209 210 @Test 211 public void testStatusJournal() { 212 TaskMonitor tm = new TaskMonitor(new Configuration()); 213 MonitoredTask task = tm.createStatus("Test task"); 214 assertTrue(task.getStatusJournal().isEmpty()); 215 task.setStatus("status1"); 216 // journal should be empty since it is disabled 217 assertTrue(task.getStatusJournal().isEmpty()); 218 task = tm.createStatus("Test task with journal", false, true); 219 task.setStatus("status2"); 220 assertEquals(1, task.getStatusJournal().size()); 221 assertEquals("status2", task.getStatusJournal().get(0).getStatus()); 222 task.setStatus("status3"); 223 assertEquals(2, task.getStatusJournal().size()); 224 assertEquals("status3", task.getStatusJournal().get(1).getStatus()); 225 task.prettyPrintJournal(); 226 tm.shutdown(); 227 } 228 229 @Test 230 public void testTaskGroup() { 231 TaskGroup group = TaskMonitor.createTaskGroup(true, "test task group"); 232 group.addTask("task1"); 233 MonitoredTask task2 = group.addTask("task2"); 234 task2.setStatus("task2 status2"); 235 task2.setStatus("task2 status3"); 236 group.addTask("task3"); 237 group.markComplete("group complete"); 238 Collection<MonitoredTask> tasks = group.getTasks(); 239 assertNotNull(tasks); 240 assertEquals(tasks.size(), 3); 241 for (MonitoredTask task : tasks) { 242 if (task.getDescription().equals("task2")) { 243 assertEquals(task.getStatusJournal().size(), 3); 244 task.prettyPrintJournal(); 245 } 246 } 247 } 248 249 @Test 250 public void testClone() throws Exception { 251 MonitoredRPCHandlerImpl monitor = new MonitoredRPCHandlerImpl("test"); 252 monitor.abort("abort RPC"); 253 TestParam testParam = new TestParam("param1"); 254 monitor.setRPC("method1", new Object[] { testParam }, 0); 255 MonitoredRPCHandlerImpl clone = monitor.clone(); 256 assertEquals(clone.getDescription(), monitor.getDescription()); 257 assertEquals(clone.getState(), monitor.getState()); 258 assertEquals(clone.getStatus(), monitor.getStatus()); 259 assertEquals(clone.toString(), monitor.toString()); 260 assertEquals(clone.toMap(), monitor.toMap()); 261 JSONAssert.assertEquals(clone.toJSON(), monitor.toJSON(), true); 262 263 // mark complete and make param dirty 264 monitor.markComplete("complete RPC"); 265 testParam.setParam("dirtyParam"); 266 assertEquals(clone.getDescription(), monitor.getDescription()); 267 assertNotEquals(clone.getState(), monitor.getState()); 268 assertNotEquals(clone.getStatus(), monitor.getStatus()); 269 monitor.setState(MonitoredTask.State.RUNNING); 270 try { 271 // when markComplete, the param in monitor is set null, so toMap should fail here 272 monitor.toMap(); 273 fail("Should not call toMap successfully, because param=null"); 274 } catch (Exception e) { 275 } 276 // the param of clone monitor should not be dirty 277 assertNotEquals("[dirtyString]", 278 String.valueOf(((Map<String, Object>) clone.toMap().get("rpcCall")).get("params"))); 279 280 monitor.resume("resume"); 281 monitor.setRPC("method2", new Object[] { new TestParam("param2") }, 1); 282 assertNotEquals(((Map<String, Object>) clone.toMap().get("rpcCall")).get("params"), 283 ((Map<String, Object>) monitor.toMap().get("rpcCall")).get("params")); 284 LOG.info(String.valueOf(clone.toMap())); 285 LOG.info(String.valueOf(monitor.toMap())); 286 assertNotEquals(clone.toString(), monitor.toString()); 287 assertNotEquals(clone.getRPCQueueTime(), monitor.getRPCQueueTime()); 288 assertNotEquals(clone.toMap(), monitor.toMap()); 289 assertNotEquals(clone.toJSON(), monitor.toJSON()); 290 } 291 292 private class TestParam { 293 public String param = null; 294 295 public TestParam(String param) { 296 this.param = param; 297 } 298 299 public void setParam(String param) { 300 this.param = param; 301 } 302 303 @Override 304 public String toString() { 305 return param; 306 } 307 } 308}