001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.executor; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022import static org.mockito.ArgumentMatchers.any; 023import static org.mockito.ArgumentMatchers.anyString; 024import static org.mockito.Mockito.mock; 025import static org.mockito.Mockito.times; 026import static org.mockito.Mockito.verify; 027import static org.mockito.Mockito.when; 028 029import java.io.IOException; 030import java.io.StringWriter; 031import java.util.concurrent.CountDownLatch; 032import java.util.concurrent.ThreadPoolExecutor; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.atomic.AtomicBoolean; 035import java.util.concurrent.atomic.AtomicInteger; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.hbase.HBaseConfiguration; 038import org.apache.hadoop.hbase.Server; 039import org.apache.hadoop.hbase.Waiter; 040import org.apache.hadoop.hbase.Waiter.Predicate; 041import org.apache.hadoop.hbase.executor.ExecutorService.Executor; 042import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig; 043import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorStatus; 044import org.apache.hadoop.hbase.testclassification.MiscTests; 045import org.apache.hadoop.hbase.testclassification.SmallTests; 046import org.junit.jupiter.api.Tag; 047import org.junit.jupiter.api.Test; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051@Tag(MiscTests.TAG) 052@Tag(SmallTests.TAG) 053public class TestExecutorService { 054 055 private static final Logger LOG = LoggerFactory.getLogger(TestExecutorService.class); 056 057 @Test 058 public void testExecutorService() throws Exception { 059 int maxThreads = 5; 060 int maxTries = 10; 061 int sleepInterval = 10; 062 063 Server mockedServer = mock(Server.class); 064 when(mockedServer.getConfiguration()).thenReturn(HBaseConfiguration.create()); 065 066 // Start an executor service pool with max 5 threads 067 ExecutorService executorService = new ExecutorService("unit_test"); 068 executorService.startExecutorService(executorService.new ExecutorConfig() 069 .setExecutorType(ExecutorType.MASTER_SERVER_OPERATIONS).setCorePoolSize(maxThreads)); 070 071 Executor executor = executorService.getExecutor(ExecutorType.MASTER_SERVER_OPERATIONS); 072 ThreadPoolExecutor pool = executor.threadPoolExecutor; 073 074 // Assert no threads yet 075 assertEquals(0, pool.getPoolSize()); 076 077 AtomicBoolean lock = new AtomicBoolean(true); 078 AtomicInteger counter = new AtomicInteger(0); 079 080 // Submit maxThreads executors. 081 for (int i = 0; i < maxThreads; i++) { 082 executorService 083 .submit(new TestEventHandler(mockedServer, EventType.M_SERVER_SHUTDOWN, lock, counter)); 084 } 085 086 // The TestEventHandler will increment counter when it starts. 087 int tries = 0; 088 while (counter.get() < maxThreads && tries < maxTries) { 089 LOG.info("Waiting for all event handlers to start..."); 090 Thread.sleep(sleepInterval); 091 tries++; 092 } 093 094 // Assert that pool is at max threads. 095 assertEquals(maxThreads, counter.get()); 096 assertEquals(maxThreads, pool.getPoolSize()); 097 098 ExecutorStatus status = executor.getStatus(); 099 assertTrue(status.queuedEvents.isEmpty()); 100 assertEquals(5, status.running.size()); 101 checkStatusDump(status); 102 103 // Now interrupt the running Executor 104 synchronized (lock) { 105 lock.set(false); 106 lock.notifyAll(); 107 } 108 109 // Executor increments counter again on way out so.... test that happened. 110 while (counter.get() < (maxThreads * 2) && tries < maxTries) { 111 System.out.println("Waiting for all event handlers to finish..."); 112 Thread.sleep(sleepInterval); 113 tries++; 114 } 115 116 assertEquals(maxThreads * 2, counter.get()); 117 assertEquals(maxThreads, pool.getPoolSize()); 118 119 // Add more than the number of threads items. 120 // Make sure we don't get RejectedExecutionException. 121 for (int i = 0; i < (2 * maxThreads); i++) { 122 executorService 123 .submit(new TestEventHandler(mockedServer, EventType.M_SERVER_SHUTDOWN, lock, counter)); 124 } 125 // Now interrupt the running Executor 126 synchronized (lock) { 127 lock.set(false); 128 lock.notifyAll(); 129 } 130 131 // Make sure threads are still around even after their timetolive expires. 132 Thread.sleep(ExecutorConfig.KEEP_ALIVE_TIME_MILLIS_DEFAULT * 2); 133 assertEquals(maxThreads, pool.getPoolSize()); 134 135 executorService.shutdown(); 136 137 assertEquals(0, executorService.getAllExecutorStatuses().size()); 138 139 // Test that submit doesn't throw NPEs 140 executorService 141 .submit(new TestEventHandler(mockedServer, EventType.M_SERVER_SHUTDOWN, lock, counter)); 142 } 143 144 private void checkStatusDump(ExecutorStatus status) throws IOException { 145 StringWriter sw = new StringWriter(); 146 status.dumpTo(sw, ""); 147 String dump = sw.toString(); 148 LOG.info("Got status dump:\n" + dump); 149 150 assertTrue(dump.contains("Waiting on java.util.concurrent.atomic.AtomicBoolean")); 151 } 152 153 public static class TestEventHandler extends EventHandler { 154 private final AtomicBoolean lock; 155 private AtomicInteger counter; 156 157 public TestEventHandler(Server server, EventType eventType, AtomicBoolean lock, 158 AtomicInteger counter) { 159 super(server, eventType); 160 this.lock = lock; 161 this.counter = counter; 162 } 163 164 @Override 165 public void process() throws IOException { 166 int num = counter.incrementAndGet(); 167 LOG.info("Running process #" + num + ", threadName=" + Thread.currentThread().getName()); 168 synchronized (lock) { 169 while (lock.get()) { 170 try { 171 lock.wait(); 172 } catch (InterruptedException e) { 173 // do nothing 174 } 175 } 176 } 177 counter.incrementAndGet(); 178 } 179 } 180 181 @Test 182 public void testAborting() throws Exception { 183 final Configuration conf = HBaseConfiguration.create(); 184 final Server server = mock(Server.class); 185 when(server.getConfiguration()).thenReturn(conf); 186 187 ExecutorService executorService = new ExecutorService("unit_test"); 188 executorService.startExecutorService(executorService.new ExecutorConfig() 189 .setExecutorType(ExecutorType.MASTER_SERVER_OPERATIONS).setCorePoolSize(1)); 190 191 executorService.submit(new EventHandler(server, EventType.M_SERVER_SHUTDOWN) { 192 @Override 193 public void process() throws IOException { 194 throw new RuntimeException("Should cause abort"); 195 } 196 }); 197 198 Waiter.waitFor(conf, 30000, new Predicate<Exception>() { 199 @Override 200 public boolean evaluate() throws Exception { 201 try { 202 verify(server, times(1)).abort(anyString(), any()); 203 return true; 204 } catch (Throwable t) { 205 return false; 206 } 207 } 208 }); 209 210 executorService.shutdown(); 211 } 212 213 @Test 214 public void testSnapshotHandlers() throws Exception { 215 final Configuration conf = HBaseConfiguration.create(); 216 final Server server = mock(Server.class); 217 when(server.getConfiguration()).thenReturn(conf); 218 219 ExecutorService executorService = new ExecutorService("testSnapshotHandlers"); 220 executorService.startExecutorService(executorService.new ExecutorConfig() 221 .setExecutorType(ExecutorType.MASTER_SNAPSHOT_OPERATIONS).setCorePoolSize(1)); 222 223 CountDownLatch latch = new CountDownLatch(1); 224 CountDownLatch waitForEventToStart = new CountDownLatch(1); 225 executorService.submit(new EventHandler(server, EventType.C_M_SNAPSHOT_TABLE) { 226 @Override 227 public void process() throws IOException { 228 waitForEventToStart.countDown(); 229 try { 230 latch.await(); 231 } catch (InterruptedException e) { 232 Thread.currentThread().interrupt(); 233 } 234 } 235 }); 236 237 // Wait EventHandler to start 238 waitForEventToStart.await(10, TimeUnit.SECONDS); 239 int activeCount = executorService.getExecutor(ExecutorType.MASTER_SNAPSHOT_OPERATIONS) 240 .getThreadPoolExecutor().getActiveCount(); 241 assertEquals(1, activeCount); 242 latch.countDown(); 243 Waiter.waitFor(conf, 3000, () -> { 244 int count = executorService.getExecutor(ExecutorType.MASTER_SNAPSHOT_OPERATIONS) 245 .getThreadPoolExecutor().getActiveCount(); 246 return count == 0; 247 }); 248 } 249}