001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.monitoring; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotEquals; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024 025import java.util.ArrayList; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.atomic.AtomicBoolean; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.client.Mutation; 032import org.apache.hadoop.hbase.client.Put; 033import org.apache.hadoop.hbase.client.Query; 034import org.apache.hadoop.hbase.client.Scan; 035import org.apache.hadoop.hbase.testclassification.MiscTests; 036import org.apache.hadoop.hbase.testclassification.SmallTests; 037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 038import org.junit.ClassRule; 039import org.junit.Test; 040import org.junit.experimental.categories.Category; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044@Category({ MiscTests.class, SmallTests.class }) 045public class TestTaskMonitor { 046 private static final Logger LOG = LoggerFactory.getLogger(TestTaskMonitor.class); 047 048 @ClassRule 049 public static final HBaseClassTestRule CLASS_RULE = 050 HBaseClassTestRule.forClass(TestTaskMonitor.class); 051 052 @Test 053 public void testTaskMonitorBasics() { 054 TaskMonitor tm = new TaskMonitor(new Configuration()); 055 assertTrue("Task monitor should start empty", tm.getTasks().isEmpty()); 056 057 // Make a task and fetch it back out 058 MonitoredTask task = tm.createStatus("Test task"); 059 MonitoredTask taskFromTm = tm.getTasks().get(0); 060 061 // Make sure the state is reasonable. 062 assertEquals(task.getDescription(), taskFromTm.getDescription()); 063 assertEquals(-1, taskFromTm.getCompletionTimestamp()); 064 assertEquals(MonitoredTask.State.RUNNING, taskFromTm.getState()); 065 066 // Mark it as finished 067 task.markComplete("Finished!"); 068 assertEquals(MonitoredTask.State.COMPLETE, task.getState()); 069 070 // It should still show up in the TaskMonitor list 071 assertEquals(1, tm.getTasks().size()); 072 073 // If we mark its completion time back a few minutes, it should get gced 074 task.expireNow(); 075 assertEquals(0, tm.getTasks().size()); 076 077 tm.shutdown(); 078 } 079 080 @Test 081 public void testTasksGetAbortedOnLeak() throws InterruptedException { 082 final TaskMonitor tm = new TaskMonitor(new Configuration()); 083 assertTrue("Task monitor should start empty", tm.getTasks().isEmpty()); 084 085 final AtomicBoolean threadSuccess = new AtomicBoolean(false); 086 // Make a task in some other thread and leak it 087 Thread t = new Thread() { 088 @Override 089 public void run() { 090 MonitoredTask task = tm.createStatus("Test task"); 091 assertEquals(MonitoredTask.State.RUNNING, task.getState()); 092 threadSuccess.set(true); 093 } 094 }; 095 t.start(); 096 t.join(); 097 // Make sure the thread saw the correct state 098 assertTrue(threadSuccess.get()); 099 100 // Make sure the leaked reference gets cleared 101 System.gc(); 102 System.gc(); 103 System.gc(); 104 105 // Now it should be aborted 106 MonitoredTask taskFromTm = tm.getTasks().get(0); 107 assertEquals(MonitoredTask.State.ABORTED, taskFromTm.getState()); 108 109 tm.shutdown(); 110 } 111 112 @Test 113 public void testTaskLimit() throws Exception { 114 TaskMonitor tm = new TaskMonitor(new Configuration()); 115 for (int i = 0; i < TaskMonitor.DEFAULT_MAX_TASKS + 10; i++) { 116 tm.createStatus("task " + i); 117 } 118 // Make sure it was limited correctly 119 assertEquals(TaskMonitor.DEFAULT_MAX_TASKS, tm.getTasks().size()); 120 // Make sure we culled the earlier tasks, not later 121 // (i.e. tasks 0 through 9 should have been deleted) 122 assertEquals("task 10", tm.getTasks().get(0).getDescription()); 123 tm.shutdown(); 124 } 125 126 @Test 127 public void testDoNotPurgeRPCTask() throws Exception { 128 int RPCTaskNums = 10; 129 TaskMonitor tm = TaskMonitor.get(); 130 for (int i = 0; i < RPCTaskNums; i++) { 131 tm.createRPCStatus("PRCTask" + i); 132 } 133 for (int i = 0; i < TaskMonitor.DEFAULT_MAX_TASKS; i++) { 134 tm.createStatus("otherTask" + i); 135 } 136 int remainRPCTask = 0; 137 for (MonitoredTask task : tm.getTasks()) { 138 if (task instanceof MonitoredRPCHandler) { 139 remainRPCTask++; 140 } 141 } 142 assertEquals("RPC Tasks have been purged!", RPCTaskNums, remainRPCTask); 143 tm.shutdown(); 144 } 145 146 @Test 147 public void testWarnStuckTasks() throws Exception { 148 final int RPC_WARN_TIME = 1500; 149 final int MONITOR_INTERVAL = 500; 150 Configuration conf = new Configuration(); 151 conf.setLong(TaskMonitor.RPC_WARN_TIME_KEY, RPC_WARN_TIME); 152 conf.setLong(TaskMonitor.MONITOR_INTERVAL_KEY, MONITOR_INTERVAL); 153 final TaskMonitor tm = new TaskMonitor(conf); 154 MonitoredRPCHandler t = tm.createRPCStatus("test task"); 155 long beforeSetRPC = EnvironmentEdgeManager.currentTime(); 156 assertTrue("Validating initialization assumption", t.getWarnTime() <= beforeSetRPC); 157 Thread.sleep(MONITOR_INTERVAL * 2); 158 t.setRPC("testMethod", new Object[0], beforeSetRPC); 159 long afterSetRPC = EnvironmentEdgeManager.currentTime(); 160 Thread.sleep(MONITOR_INTERVAL * 2); 161 assertTrue("Validating no warn after starting RPC", t.getWarnTime() <= afterSetRPC); 162 Thread.sleep(MONITOR_INTERVAL * 2); 163 assertTrue("Validating warn after RPC_WARN_TIME", t.getWarnTime() > afterSetRPC); 164 tm.shutdown(); 165 } 166 167 @Test 168 public void testGetTasksWithFilter() throws Exception { 169 TaskMonitor tm = new TaskMonitor(new Configuration()); 170 assertTrue("Task monitor should start empty", tm.getTasks().isEmpty()); 171 // Create 5 general tasks 172 tm.createStatus("General task1"); 173 tm.createStatus("General task2"); 174 tm.createStatus("General task3"); 175 tm.createStatus("General task4"); 176 tm.createStatus("General task5"); 177 // Create 5 rpc tasks, and mark 1 completed 178 int length = 5; 179 ArrayList<MonitoredRPCHandler> rpcHandlers = new ArrayList<>(length); 180 for (int i = 0; i < length; i++) { 181 MonitoredRPCHandler rpcHandler = tm.createRPCStatus("Rpc task" + i); 182 rpcHandlers.add(rpcHandler); 183 } 184 // Create rpc opertions 185 byte[] row = new byte[] { 0x01 }; 186 Mutation m = new Put(row); 187 Query q = new Scan(); 188 String notOperation = "for test"; 189 rpcHandlers.get(0).setRPC("operations", new Object[] { m, q }, 3000); 190 rpcHandlers.get(1).setRPC("operations", new Object[] { m, q }, 3000); 191 rpcHandlers.get(2).setRPC("operations", new Object[] { m, q }, 3000); 192 rpcHandlers.get(3).setRPC("operations", new Object[] { notOperation }, 3000); 193 rpcHandlers.get(4).setRPC("operations", new Object[] { m, q }, 3000); 194 MonitoredRPCHandler completed = rpcHandlers.get(4); 195 completed.markComplete("Completed!"); 196 // Test get tasks with filter 197 List<MonitoredTask> generalTasks = tm.getTasks("general"); 198 assertEquals(5, generalTasks.size()); 199 List<MonitoredTask> handlerTasks = tm.getTasks("handler"); 200 assertEquals(5, handlerTasks.size()); 201 List<MonitoredTask> rpcTasks = tm.getTasks("rpc"); 202 // The last rpc handler is stopped 203 assertEquals(4, rpcTasks.size()); 204 List<MonitoredTask> operationTasks = tm.getTasks("operation"); 205 // Handler 3 doesn't handle Operation. 206 assertEquals(3, operationTasks.size()); 207 tm.shutdown(); 208 } 209 210 @Test 211 public void testStatusJournal() { 212 TaskMonitor tm = new TaskMonitor(new Configuration()); 213 MonitoredTask task = tm.createStatus("Test task"); 214 assertTrue(task.getStatusJournal().isEmpty()); 215 task.disableStatusJournal(); 216 task.setStatus("status1"); 217 // journal should be empty since it is disabled 218 assertTrue(task.getStatusJournal().isEmpty()); 219 task.enableStatusJournal(true); 220 // check existing status entered in journal 221 assertEquals("status1", task.getStatusJournal().get(0).getStatus()); 222 assertTrue(task.getStatusJournal().get(0).getTimeStamp() > 0); 223 task.disableStatusJournal(); 224 task.setStatus("status2"); 225 // check status 2 not added since disabled 226 assertEquals(1, task.getStatusJournal().size()); 227 task.enableStatusJournal(false); 228 // size should still be 1 since we didn't include current status 229 assertEquals(1, task.getStatusJournal().size()); 230 task.setStatus("status3"); 231 assertEquals("status3", task.getStatusJournal().get(1).getStatus()); 232 tm.shutdown(); 233 } 234 235 @Test 236 public void testClone() throws Exception { 237 MonitoredRPCHandlerImpl monitor = new MonitoredRPCHandlerImpl(); 238 monitor.abort("abort RPC"); 239 TestParam testParam = new TestParam("param1"); 240 monitor.setRPC("method1", new Object[] { testParam }, 0); 241 MonitoredRPCHandlerImpl clone = monitor.clone(); 242 assertEquals(clone.getDescription(), monitor.getDescription()); 243 assertEquals(clone.getState(), monitor.getState()); 244 assertEquals(clone.getStatus(), monitor.getStatus()); 245 assertEquals(clone.toString(), monitor.toString()); 246 assertEquals(clone.toMap(), monitor.toMap()); 247 assertEquals(clone.toJSON(), monitor.toJSON()); 248 249 // mark complete and make param dirty 250 monitor.markComplete("complete RPC"); 251 testParam.setParam("dirtyParam"); 252 assertEquals(clone.getDescription(), monitor.getDescription()); 253 assertNotEquals(clone.getState(), monitor.getState()); 254 assertNotEquals(clone.getStatus(), monitor.getStatus()); 255 monitor.setState(MonitoredTask.State.RUNNING); 256 try { 257 // when markComplete, the param in monitor is set null, so toMap should fail here 258 monitor.toMap(); 259 fail("Should not call toMap successfully, because param=null"); 260 } catch (Exception e) { 261 } 262 // the param of clone monitor should not be dirty 263 assertNotEquals("[dirtyString]", 264 String.valueOf(((Map<String, Object>) clone.toMap().get("rpcCall")).get("params"))); 265 266 monitor.resume("resume"); 267 monitor.setRPC("method2", new Object[] { new TestParam("param2") }, 1); 268 assertNotEquals(((Map<String, Object>) clone.toMap().get("rpcCall")).get("params"), 269 ((Map<String, Object>) monitor.toMap().get("rpcCall")).get("params")); 270 LOG.info(String.valueOf(clone.toMap())); 271 LOG.info(String.valueOf(monitor.toMap())); 272 assertNotEquals(clone.toString(), monitor.toString()); 273 assertNotEquals(clone.getRPCQueueTime(), monitor.getRPCQueueTime()); 274 assertNotEquals(clone.toMap(), monitor.toMap()); 275 assertNotEquals(clone.toJSON(), monitor.toJSON()); 276 } 277 278 private class TestParam { 279 public String param = null; 280 281 public TestParam(String param) { 282 this.param = param; 283 } 284 285 public void setParam(String param) { 286 this.param = param; 287 } 288 289 @Override 290 public String toString() { 291 return param; 292 } 293 } 294}