001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.executor; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022import static org.mockito.ArgumentMatchers.anyObject; 023import static org.mockito.ArgumentMatchers.anyString; 024import static org.mockito.Mockito.mock; 025import static org.mockito.Mockito.times; 026import static org.mockito.Mockito.verify; 027import static org.mockito.Mockito.when; 028 029import java.io.IOException; 030import java.io.StringWriter; 031import java.util.concurrent.ThreadPoolExecutor; 032import java.util.concurrent.atomic.AtomicBoolean; 033import java.util.concurrent.atomic.AtomicInteger; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseConfiguration; 037import org.apache.hadoop.hbase.Server; 038import org.apache.hadoop.hbase.Waiter; 039import org.apache.hadoop.hbase.Waiter.Predicate; 040import org.apache.hadoop.hbase.executor.ExecutorService.Executor; 041import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorStatus; 042import org.apache.hadoop.hbase.testclassification.MiscTests; 043import org.apache.hadoop.hbase.testclassification.SmallTests; 044import org.junit.ClassRule; 045import org.junit.Test; 046import org.junit.experimental.categories.Category; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050@Category({MiscTests.class, SmallTests.class}) 051public class TestExecutorService { 052 053 @ClassRule 054 public static final HBaseClassTestRule CLASS_RULE = 055 HBaseClassTestRule.forClass(TestExecutorService.class); 056 057 private static final Logger LOG = LoggerFactory.getLogger(TestExecutorService.class); 058 059 @Test 060 public void testExecutorService() throws Exception { 061 int maxThreads = 5; 062 int maxTries = 10; 063 int sleepInterval = 10; 064 065 Server mockedServer = mock(Server.class); 066 when(mockedServer.getConfiguration()).thenReturn(HBaseConfiguration.create()); 067 068 // Start an executor service pool with max 5 threads 069 ExecutorService executorService = new ExecutorService("unit_test"); 070 executorService.startExecutorService( 071 ExecutorType.MASTER_SERVER_OPERATIONS, maxThreads); 072 073 Executor executor = 074 executorService.getExecutor(ExecutorType.MASTER_SERVER_OPERATIONS); 075 ThreadPoolExecutor pool = executor.threadPoolExecutor; 076 077 // Assert no threads yet 078 assertEquals(0, pool.getPoolSize()); 079 080 AtomicBoolean lock = new AtomicBoolean(true); 081 AtomicInteger counter = new AtomicInteger(0); 082 083 // Submit maxThreads executors. 084 for (int i = 0; i < maxThreads; i++) { 085 executorService.submit( 086 new TestEventHandler(mockedServer, EventType.M_SERVER_SHUTDOWN, 087 lock, counter)); 088 } 089 090 // The TestEventHandler will increment counter when it starts. 091 int tries = 0; 092 while (counter.get() < maxThreads && tries < maxTries) { 093 LOG.info("Waiting for all event handlers to start..."); 094 Thread.sleep(sleepInterval); 095 tries++; 096 } 097 098 // Assert that pool is at max threads. 099 assertEquals(maxThreads, counter.get()); 100 assertEquals(maxThreads, pool.getPoolSize()); 101 102 ExecutorStatus status = executor.getStatus(); 103 assertTrue(status.queuedEvents.isEmpty()); 104 assertEquals(5, status.running.size()); 105 checkStatusDump(status); 106 107 108 // Now interrupt the running Executor 109 synchronized (lock) { 110 lock.set(false); 111 lock.notifyAll(); 112 } 113 114 // Executor increments counter again on way out so.... test that happened. 115 while (counter.get() < (maxThreads * 2) && tries < maxTries) { 116 System.out.println("Waiting for all event handlers to finish..."); 117 Thread.sleep(sleepInterval); 118 tries++; 119 } 120 121 assertEquals(maxThreads * 2, counter.get()); 122 assertEquals(maxThreads, pool.getPoolSize()); 123 124 // Add more than the number of threads items. 125 // Make sure we don't get RejectedExecutionException. 126 for (int i = 0; i < (2 * maxThreads); i++) { 127 executorService.submit( 128 new TestEventHandler(mockedServer, EventType.M_SERVER_SHUTDOWN, 129 lock, counter)); 130 } 131 // Now interrupt the running Executor 132 synchronized (lock) { 133 lock.set(false); 134 lock.notifyAll(); 135 } 136 137 // Make sure threads are still around even after their timetolive expires. 138 Thread.sleep(ExecutorService.Executor.keepAliveTimeInMillis * 2); 139 assertEquals(maxThreads, pool.getPoolSize()); 140 141 executorService.shutdown(); 142 143 assertEquals(0, executorService.getAllExecutorStatuses().size()); 144 145 // Test that submit doesn't throw NPEs 146 executorService.submit( 147 new TestEventHandler(mockedServer, EventType.M_SERVER_SHUTDOWN, 148 lock, counter)); 149 } 150 151 private void checkStatusDump(ExecutorStatus status) throws IOException { 152 StringWriter sw = new StringWriter(); 153 status.dumpTo(sw, ""); 154 String dump = sw.toString(); 155 LOG.info("Got status dump:\n" + dump); 156 157 assertTrue(dump.contains("Waiting on java.util.concurrent.atomic.AtomicBoolean")); 158 } 159 160 public static class TestEventHandler extends EventHandler { 161 private final AtomicBoolean lock; 162 private AtomicInteger counter; 163 164 public TestEventHandler(Server server, EventType eventType, 165 AtomicBoolean lock, AtomicInteger counter) { 166 super(server, eventType); 167 this.lock = lock; 168 this.counter = counter; 169 } 170 171 @Override 172 public void process() throws IOException { 173 int num = counter.incrementAndGet(); 174 LOG.info("Running process #" + num + ", threadName=" + 175 Thread.currentThread().getName()); 176 synchronized (lock) { 177 while (lock.get()) { 178 try { 179 lock.wait(); 180 } catch (InterruptedException e) { 181 // do nothing 182 } 183 } 184 } 185 counter.incrementAndGet(); 186 } 187 } 188 189 @Test 190 public void testAborting() throws Exception { 191 final Configuration conf = HBaseConfiguration.create(); 192 final Server server = mock(Server.class); 193 when(server.getConfiguration()).thenReturn(conf); 194 195 ExecutorService executorService = new ExecutorService("unit_test"); 196 executorService.startExecutorService( 197 ExecutorType.MASTER_SERVER_OPERATIONS, 1); 198 199 200 executorService.submit(new EventHandler(server, EventType.M_SERVER_SHUTDOWN) { 201 @Override 202 public void process() throws IOException { 203 throw new RuntimeException("Should cause abort"); 204 } 205 }); 206 207 Waiter.waitFor(conf, 30000, new Predicate<Exception>() { 208 @Override 209 public boolean evaluate() throws Exception { 210 try { 211 verify(server, times(1)).abort(anyString(), (Throwable) anyObject()); 212 return true; 213 } catch (Throwable t) { 214 return false; 215 } 216 } 217 }); 218 219 executorService.shutdown(); 220 } 221 222} 223