001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.monitoring; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotEquals; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024 025import java.util.ArrayList; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.atomic.AtomicBoolean; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.client.Mutation; 032import org.apache.hadoop.hbase.client.Put; 033import org.apache.hadoop.hbase.client.Query; 034import org.apache.hadoop.hbase.client.Scan; 035import org.apache.hadoop.hbase.testclassification.MiscTests; 036import org.apache.hadoop.hbase.testclassification.SmallTests; 037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 038import org.junit.ClassRule; 039import org.junit.Test; 040import org.junit.experimental.categories.Category; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044@Category({ MiscTests.class, SmallTests.class }) 045public class TestTaskMonitor { 046 private static final Logger LOG = LoggerFactory.getLogger(TestTaskMonitor.class); 047 048 @ClassRule 049 public static final HBaseClassTestRule CLASS_RULE = 050 HBaseClassTestRule.forClass(TestTaskMonitor.class); 051 052 @Test 053 public void testTaskMonitorBasics() { 054 TaskMonitor tm = new TaskMonitor(new Configuration()); 055 assertTrue("Task monitor should start empty", tm.getTasks().isEmpty()); 056 057 // Make a task and fetch it back out 058 MonitoredTask task = tm.createStatus("Test task"); 059 MonitoredTask taskFromTm = tm.getTasks().get(0); 060 061 // Make sure the state is reasonable. 062 assertEquals(task.getDescription(), taskFromTm.getDescription()); 063 assertEquals(-1, taskFromTm.getCompletionTimestamp()); 064 assertEquals(MonitoredTask.State.RUNNING, taskFromTm.getState()); 065 066 // Mark it as finished 067 task.markComplete("Finished!"); 068 assertEquals(MonitoredTask.State.COMPLETE, task.getState()); 069 070 // It should still show up in the TaskMonitor list 071 assertEquals(1, tm.getTasks().size()); 072 073 // If we mark its completion time back a few minutes, it should get gced 074 task.expireNow(); 075 assertEquals(0, tm.getTasks().size()); 076 077 tm.shutdown(); 078 } 079 080 @Test 081 public void testTasksGetAbortedOnLeak() throws InterruptedException { 082 final TaskMonitor tm = new TaskMonitor(new Configuration()); 083 assertTrue("Task monitor should start empty", tm.getTasks().isEmpty()); 084 085 final AtomicBoolean threadSuccess = new AtomicBoolean(false); 086 // Make a task in some other thread and leak it 087 Thread t = new Thread() { 088 @Override 089 public void run() { 090 MonitoredTask task = tm.createStatus("Test task"); 091 assertEquals(MonitoredTask.State.RUNNING, task.getState()); 092 threadSuccess.set(true); 093 } 094 }; 095 t.start(); 096 t.join(); 097 // Make sure the thread saw the correct state 098 assertTrue(threadSuccess.get()); 099 100 // Make sure the leaked reference gets cleared 101 System.gc(); 102 System.gc(); 103 System.gc(); 104 105 // Now it should be aborted 106 MonitoredTask taskFromTm = tm.getTasks().get(0); 107 assertEquals(MonitoredTask.State.ABORTED, taskFromTm.getState()); 108 109 tm.shutdown(); 110 } 111 112 @Test 113 public void testTaskLimit() throws Exception { 114 TaskMonitor tm = new TaskMonitor(new Configuration()); 115 for (int i = 0; i < TaskMonitor.DEFAULT_MAX_TASKS + 10; i++) { 116 tm.createStatus("task " + i); 117 } 118 // Make sure it was limited correctly 119 assertEquals(TaskMonitor.DEFAULT_MAX_TASKS, tm.getTasks().size()); 120 // Make sure we culled the earlier tasks, not later 121 // (i.e. tasks 0 through 9 should have been deleted) 122 assertEquals("task 10", tm.getTasks().get(0).getDescription()); 123 tm.shutdown(); 124 } 125 126 @Test 127 public void testDoNotPurgeRPCTask() throws Exception { 128 int RPCTaskNums = 10; 129 TaskMonitor tm = TaskMonitor.get(); 130 for (int i = 0; i < RPCTaskNums; i++) { 131 tm.createRPCStatus("PRCTask" + i); 132 } 133 for (int i = 0; i < TaskMonitor.DEFAULT_MAX_TASKS; i++) { 134 tm.createStatus("otherTask" + i); 135 } 136 int remainRPCTask = 0; 137 for (MonitoredTask task : tm.getTasks()) { 138 if (task instanceof MonitoredRPCHandler) { 139 remainRPCTask++; 140 } 141 } 142 assertEquals("RPC Tasks have been purged!", RPCTaskNums, remainRPCTask); 143 tm.shutdown(); 144 } 145 146 @Test 147 public void testWarnStuckTasks() throws Exception { 148 final int RPC_WARN_TIME = 1500; 149 final int MONITOR_INTERVAL = 500; 150 Configuration conf = new Configuration(); 151 conf.setLong(TaskMonitor.RPC_WARN_TIME_KEY, RPC_WARN_TIME); 152 conf.setLong(TaskMonitor.MONITOR_INTERVAL_KEY, MONITOR_INTERVAL); 153 final TaskMonitor tm = new TaskMonitor(conf); 154 MonitoredRPCHandler t = tm.createRPCStatus("test task"); 155 long beforeSetRPC = EnvironmentEdgeManager.currentTime(); 156 assertTrue("Validating initialization assumption", t.getWarnTime() <= beforeSetRPC); 157 Thread.sleep(MONITOR_INTERVAL * 2); 158 t.setRPC("testMethod", new Object[0], beforeSetRPC); 159 long afterSetRPC = EnvironmentEdgeManager.currentTime(); 160 Thread.sleep(MONITOR_INTERVAL * 2); 161 assertTrue("Validating no warn after starting RPC", t.getWarnTime() <= afterSetRPC); 162 Thread.sleep(MONITOR_INTERVAL * 2); 163 assertTrue("Validating warn after RPC_WARN_TIME", t.getWarnTime() > afterSetRPC); 164 tm.shutdown(); 165 } 166 167 @Test 168 public void testGetTasksWithFilter() throws Exception { 169 TaskMonitor tm = new TaskMonitor(new Configuration()); 170 assertTrue("Task monitor should start empty", tm.getTasks().isEmpty()); 171 // Create 5 general tasks 172 tm.createStatus("General task1"); 173 tm.createStatus("General task2"); 174 tm.createStatus("General task3"); 175 tm.createStatus("General task4"); 176 tm.createStatus("General task5"); 177 // Create 5 rpc tasks, and mark 1 completed 178 int length = 5; 179 ArrayList<MonitoredRPCHandler> rpcHandlers = new ArrayList<>(length); 180 for (int i = 0; i < length; i++) { 181 MonitoredRPCHandler rpcHandler = tm.createRPCStatus("Rpc task" + i); 182 rpcHandlers.add(rpcHandler); 183 } 184 // Create rpc opertions 185 byte[] row = new byte[] { 0x01 }; 186 Mutation m = new Put(row); 187 Query q = new Scan(); 188 String notOperation = "for test"; 189 rpcHandlers.get(0).setRPC("operations", new Object[] { m, q }, 3000); 190 rpcHandlers.get(1).setRPC("operations", new Object[] { m, q }, 3000); 191 rpcHandlers.get(2).setRPC("operations", new Object[] { m, q }, 3000); 192 rpcHandlers.get(3).setRPC("operations", new Object[] { notOperation }, 3000); 193 rpcHandlers.get(4).setRPC("operations", new Object[] { m, q }, 3000); 194 MonitoredRPCHandler completed = rpcHandlers.get(4); 195 completed.markComplete("Completed!"); 196 // Test get tasks with filter 197 List<MonitoredTask> generalTasks = tm.getTasks("general"); 198 assertEquals(5, generalTasks.size()); 199 List<MonitoredTask> handlerTasks = tm.getTasks("handler"); 200 assertEquals(5, handlerTasks.size()); 201 List<MonitoredTask> rpcTasks = tm.getTasks("rpc"); 202 // The last rpc handler is stopped 203 assertEquals(4, rpcTasks.size()); 204 List<MonitoredTask> operationTasks = tm.getTasks("operation"); 205 // Handler 3 doesn't handle Operation. 206 assertEquals(3, operationTasks.size()); 207 tm.shutdown(); 208 } 209 210 @Test 211 public void testStatusJournal() { 212 TaskMonitor tm = new TaskMonitor(new Configuration()); 213 MonitoredTask task = tm.createStatus("Test task"); 214 assertTrue(task.getStatusJournal().isEmpty()); 215 task.setStatus("status1"); 216 // journal should be empty since it is disabled 217 assertTrue(task.getStatusJournal().isEmpty()); 218 task = tm.createStatus("Test task with journal", true); 219 task.setStatus("status2"); 220 assertEquals(1, task.getStatusJournal().size()); 221 assertEquals("status2", task.getStatusJournal().get(0).getStatus()); 222 task.setStatus("status3"); 223 assertEquals(2, task.getStatusJournal().size()); 224 assertEquals("status3", task.getStatusJournal().get(1).getStatus()); 225 task.prettyPrintJournal(); 226 tm.shutdown(); 227 } 228 229 @Test 230 public void testClone() throws Exception { 231 MonitoredRPCHandlerImpl monitor = new MonitoredRPCHandlerImpl(); 232 monitor.abort("abort RPC"); 233 TestParam testParam = new TestParam("param1"); 234 monitor.setRPC("method1", new Object[] { testParam }, 0); 235 MonitoredRPCHandlerImpl clone = monitor.clone(); 236 assertEquals(clone.getDescription(), monitor.getDescription()); 237 assertEquals(clone.getState(), monitor.getState()); 238 assertEquals(clone.getStatus(), monitor.getStatus()); 239 assertEquals(clone.toString(), monitor.toString()); 240 assertEquals(clone.toMap(), monitor.toMap()); 241 assertEquals(clone.toJSON(), monitor.toJSON()); 242 243 // mark complete and make param dirty 244 monitor.markComplete("complete RPC"); 245 testParam.setParam("dirtyParam"); 246 assertEquals(clone.getDescription(), monitor.getDescription()); 247 assertNotEquals(clone.getState(), monitor.getState()); 248 assertNotEquals(clone.getStatus(), monitor.getStatus()); 249 monitor.setState(MonitoredTask.State.RUNNING); 250 try { 251 // when markComplete, the param in monitor is set null, so toMap should fail here 252 monitor.toMap(); 253 fail("Should not call toMap successfully, because param=null"); 254 } catch (Exception e) { 255 } 256 // the param of clone monitor should not be dirty 257 assertNotEquals("[dirtyString]", 258 String.valueOf(((Map<String, Object>) clone.toMap().get("rpcCall")).get("params"))); 259 260 monitor.resume("resume"); 261 monitor.setRPC("method2", new Object[] { new TestParam("param2") }, 1); 262 assertNotEquals(((Map<String, Object>) clone.toMap().get("rpcCall")).get("params"), 263 ((Map<String, Object>) monitor.toMap().get("rpcCall")).get("params")); 264 LOG.info(String.valueOf(clone.toMap())); 265 LOG.info(String.valueOf(monitor.toMap())); 266 assertNotEquals(clone.toString(), monitor.toString()); 267 assertNotEquals(clone.getRPCQueueTime(), monitor.getRPCQueueTime()); 268 assertNotEquals(clone.toMap(), monitor.toMap()); 269 assertNotEquals(clone.toJSON(), monitor.toJSON()); 270 } 271 272 private class TestParam { 273 public String param = null; 274 275 public TestParam(String param) { 276 this.param = param; 277 } 278 279 public void setParam(String param) { 280 this.param = param; 281 } 282 283 @Override 284 public String toString() { 285 return param; 286 } 287 } 288}