001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.executor; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022import static org.mockito.ArgumentMatchers.anyObject; 023import static org.mockito.ArgumentMatchers.anyString; 024import static org.mockito.Mockito.mock; 025import static org.mockito.Mockito.times; 026import static org.mockito.Mockito.verify; 027import static org.mockito.Mockito.when; 028 029import java.io.IOException; 030import java.io.StringWriter; 031import java.util.concurrent.CountDownLatch; 032import java.util.concurrent.ThreadPoolExecutor; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.atomic.AtomicBoolean; 035import java.util.concurrent.atomic.AtomicInteger; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseConfiguration; 039import org.apache.hadoop.hbase.Server; 040import org.apache.hadoop.hbase.Waiter; 041import org.apache.hadoop.hbase.Waiter.Predicate; 042import org.apache.hadoop.hbase.executor.ExecutorService.Executor; 043import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorStatus; 044import org.apache.hadoop.hbase.testclassification.MiscTests; 045import org.apache.hadoop.hbase.testclassification.SmallTests; 046import org.junit.Assert; 047import org.junit.ClassRule; 048import org.junit.Test; 049import org.junit.experimental.categories.Category; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053@Category({MiscTests.class, SmallTests.class}) 054public class TestExecutorService { 055 056 @ClassRule 057 public static final HBaseClassTestRule CLASS_RULE = 058 HBaseClassTestRule.forClass(TestExecutorService.class); 059 060 private static final Logger LOG = LoggerFactory.getLogger(TestExecutorService.class); 061 062 @Test 063 public void testExecutorService() throws Exception { 064 int maxThreads = 5; 065 int maxTries = 10; 066 int sleepInterval = 10; 067 068 Server mockedServer = mock(Server.class); 069 when(mockedServer.getConfiguration()).thenReturn(HBaseConfiguration.create()); 070 071 // Start an executor service pool with max 5 threads 072 ExecutorService executorService = new ExecutorService("unit_test"); 073 executorService.startExecutorService( 074 ExecutorType.MASTER_SERVER_OPERATIONS, maxThreads); 075 076 Executor executor = 077 executorService.getExecutor(ExecutorType.MASTER_SERVER_OPERATIONS); 078 ThreadPoolExecutor pool = executor.threadPoolExecutor; 079 080 // Assert no threads yet 081 assertEquals(0, pool.getPoolSize()); 082 083 AtomicBoolean lock = new AtomicBoolean(true); 084 AtomicInteger counter = new AtomicInteger(0); 085 086 // Submit maxThreads executors. 087 for (int i = 0; i < maxThreads; i++) { 088 executorService.submit( 089 new TestEventHandler(mockedServer, EventType.M_SERVER_SHUTDOWN, 090 lock, counter)); 091 } 092 093 // The TestEventHandler will increment counter when it starts. 094 int tries = 0; 095 while (counter.get() < maxThreads && tries < maxTries) { 096 LOG.info("Waiting for all event handlers to start..."); 097 Thread.sleep(sleepInterval); 098 tries++; 099 } 100 101 // Assert that pool is at max threads. 102 assertEquals(maxThreads, counter.get()); 103 assertEquals(maxThreads, pool.getPoolSize()); 104 105 ExecutorStatus status = executor.getStatus(); 106 assertTrue(status.queuedEvents.isEmpty()); 107 assertEquals(5, status.running.size()); 108 checkStatusDump(status); 109 110 111 // Now interrupt the running Executor 112 synchronized (lock) { 113 lock.set(false); 114 lock.notifyAll(); 115 } 116 117 // Executor increments counter again on way out so.... test that happened. 118 while (counter.get() < (maxThreads * 2) && tries < maxTries) { 119 System.out.println("Waiting for all event handlers to finish..."); 120 Thread.sleep(sleepInterval); 121 tries++; 122 } 123 124 assertEquals(maxThreads * 2, counter.get()); 125 assertEquals(maxThreads, pool.getPoolSize()); 126 127 // Add more than the number of threads items. 128 // Make sure we don't get RejectedExecutionException. 129 for (int i = 0; i < (2 * maxThreads); i++) { 130 executorService.submit( 131 new TestEventHandler(mockedServer, EventType.M_SERVER_SHUTDOWN, 132 lock, counter)); 133 } 134 // Now interrupt the running Executor 135 synchronized (lock) { 136 lock.set(false); 137 lock.notifyAll(); 138 } 139 140 // Make sure threads are still around even after their timetolive expires. 141 Thread.sleep(ExecutorService.Executor.keepAliveTimeInMillis * 2); 142 assertEquals(maxThreads, pool.getPoolSize()); 143 144 executorService.shutdown(); 145 146 assertEquals(0, executorService.getAllExecutorStatuses().size()); 147 148 // Test that submit doesn't throw NPEs 149 executorService.submit( 150 new TestEventHandler(mockedServer, EventType.M_SERVER_SHUTDOWN, 151 lock, counter)); 152 } 153 154 private void checkStatusDump(ExecutorStatus status) throws IOException { 155 StringWriter sw = new StringWriter(); 156 status.dumpTo(sw, ""); 157 String dump = sw.toString(); 158 LOG.info("Got status dump:\n" + dump); 159 160 assertTrue(dump.contains("Waiting on java.util.concurrent.atomic.AtomicBoolean")); 161 } 162 163 public static class TestEventHandler extends EventHandler { 164 private final AtomicBoolean lock; 165 private AtomicInteger counter; 166 167 public TestEventHandler(Server server, EventType eventType, 168 AtomicBoolean lock, AtomicInteger counter) { 169 super(server, eventType); 170 this.lock = lock; 171 this.counter = counter; 172 } 173 174 @Override 175 public void process() throws IOException { 176 int num = counter.incrementAndGet(); 177 LOG.info("Running process #" + num + ", threadName=" + 178 Thread.currentThread().getName()); 179 synchronized (lock) { 180 while (lock.get()) { 181 try { 182 lock.wait(); 183 } catch (InterruptedException e) { 184 // do nothing 185 } 186 } 187 } 188 counter.incrementAndGet(); 189 } 190 } 191 192 @Test 193 public void testAborting() throws Exception { 194 final Configuration conf = HBaseConfiguration.create(); 195 final Server server = mock(Server.class); 196 when(server.getConfiguration()).thenReturn(conf); 197 198 ExecutorService executorService = new ExecutorService("unit_test"); 199 executorService.startExecutorService( 200 ExecutorType.MASTER_SERVER_OPERATIONS, 1); 201 202 203 executorService.submit(new EventHandler(server, EventType.M_SERVER_SHUTDOWN) { 204 @Override 205 public void process() throws IOException { 206 throw new RuntimeException("Should cause abort"); 207 } 208 }); 209 210 Waiter.waitFor(conf, 30000, new Predicate<Exception>() { 211 @Override 212 public boolean evaluate() throws Exception { 213 try { 214 verify(server, times(1)).abort(anyString(), (Throwable) anyObject()); 215 return true; 216 } catch (Throwable t) { 217 return false; 218 } 219 } 220 }); 221 222 executorService.shutdown(); 223 } 224 225 @Test 226 public void testSnapshotHandlers() throws Exception { 227 final Configuration conf = HBaseConfiguration.create(); 228 final Server server = mock(Server.class); 229 when(server.getConfiguration()).thenReturn(conf); 230 231 ExecutorService executorService = new ExecutorService("testSnapshotHandlers"); 232 executorService.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS, 1); 233 234 CountDownLatch latch = new CountDownLatch(1); 235 CountDownLatch waitForEventToStart = new CountDownLatch(1); 236 executorService.submit(new EventHandler(server, EventType.C_M_SNAPSHOT_TABLE) { 237 @Override 238 public void process() throws IOException { 239 waitForEventToStart.countDown(); 240 try { 241 latch.await(); 242 } catch (InterruptedException e) { 243 Thread.currentThread().interrupt(); 244 } 245 } 246 }); 247 248 //Wait EventHandler to start 249 waitForEventToStart.await(10, TimeUnit.SECONDS); 250 int activeCount = executorService.getExecutor(ExecutorType.MASTER_SNAPSHOT_OPERATIONS) 251 .getThreadPoolExecutor().getActiveCount(); 252 Assert.assertEquals(1, activeCount); 253 latch.countDown(); 254 Waiter.waitFor(conf, 3000, () -> { 255 int count = executorService.getExecutor(ExecutorType.MASTER_SNAPSHOT_OPERATIONS) 256 .getThreadPoolExecutor().getActiveCount(); 257 return count == 0; 258 }); 259 } 260} 261