001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.executor;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022import static org.mockito.ArgumentMatchers.any;
023import static org.mockito.ArgumentMatchers.anyString;
024import static org.mockito.Mockito.mock;
025import static org.mockito.Mockito.times;
026import static org.mockito.Mockito.verify;
027import static org.mockito.Mockito.when;
028
029import java.io.IOException;
030import java.io.StringWriter;
031import java.util.concurrent.CountDownLatch;
032import java.util.concurrent.ThreadPoolExecutor;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicBoolean;
035import java.util.concurrent.atomic.AtomicInteger;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.HBaseConfiguration;
038import org.apache.hadoop.hbase.Server;
039import org.apache.hadoop.hbase.Waiter;
040import org.apache.hadoop.hbase.Waiter.Predicate;
041import org.apache.hadoop.hbase.executor.ExecutorService.Executor;
042import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
043import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorStatus;
044import org.apache.hadoop.hbase.testclassification.MiscTests;
045import org.apache.hadoop.hbase.testclassification.SmallTests;
046import org.junit.jupiter.api.Tag;
047import org.junit.jupiter.api.Test;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051@Tag(MiscTests.TAG)
052@Tag(SmallTests.TAG)
053public class TestExecutorService {
054
055  private static final Logger LOG = LoggerFactory.getLogger(TestExecutorService.class);
056
057  @Test
058  public void testExecutorService() throws Exception {
059    int maxThreads = 5;
060    int maxTries = 10;
061    int sleepInterval = 10;
062
063    Server mockedServer = mock(Server.class);
064    when(mockedServer.getConfiguration()).thenReturn(HBaseConfiguration.create());
065
066    // Start an executor service pool with max 5 threads
067    ExecutorService executorService = new ExecutorService("unit_test");
068    executorService.startExecutorService(executorService.new ExecutorConfig()
069      .setExecutorType(ExecutorType.MASTER_SERVER_OPERATIONS).setCorePoolSize(maxThreads));
070
071    Executor executor = executorService.getExecutor(ExecutorType.MASTER_SERVER_OPERATIONS);
072    ThreadPoolExecutor pool = executor.threadPoolExecutor;
073
074    // Assert no threads yet
075    assertEquals(0, pool.getPoolSize());
076
077    AtomicBoolean lock = new AtomicBoolean(true);
078    AtomicInteger counter = new AtomicInteger(0);
079
080    // Submit maxThreads executors.
081    for (int i = 0; i < maxThreads; i++) {
082      executorService
083        .submit(new TestEventHandler(mockedServer, EventType.M_SERVER_SHUTDOWN, lock, counter));
084    }
085
086    // The TestEventHandler will increment counter when it starts.
087    int tries = 0;
088    while (counter.get() < maxThreads && tries < maxTries) {
089      LOG.info("Waiting for all event handlers to start...");
090      Thread.sleep(sleepInterval);
091      tries++;
092    }
093
094    // Assert that pool is at max threads.
095    assertEquals(maxThreads, counter.get());
096    assertEquals(maxThreads, pool.getPoolSize());
097
098    ExecutorStatus status = executor.getStatus();
099    assertTrue(status.queuedEvents.isEmpty());
100    assertEquals(5, status.running.size());
101    checkStatusDump(status);
102
103    // Now interrupt the running Executor
104    synchronized (lock) {
105      lock.set(false);
106      lock.notifyAll();
107    }
108
109    // Executor increments counter again on way out so.... test that happened.
110    while (counter.get() < (maxThreads * 2) && tries < maxTries) {
111      System.out.println("Waiting for all event handlers to finish...");
112      Thread.sleep(sleepInterval);
113      tries++;
114    }
115
116    assertEquals(maxThreads * 2, counter.get());
117    assertEquals(maxThreads, pool.getPoolSize());
118
119    // Add more than the number of threads items.
120    // Make sure we don't get RejectedExecutionException.
121    for (int i = 0; i < (2 * maxThreads); i++) {
122      executorService
123        .submit(new TestEventHandler(mockedServer, EventType.M_SERVER_SHUTDOWN, lock, counter));
124    }
125    // Now interrupt the running Executor
126    synchronized (lock) {
127      lock.set(false);
128      lock.notifyAll();
129    }
130
131    // Make sure threads are still around even after their timetolive expires.
132    Thread.sleep(ExecutorConfig.KEEP_ALIVE_TIME_MILLIS_DEFAULT * 2);
133    assertEquals(maxThreads, pool.getPoolSize());
134
135    executorService.shutdown();
136
137    assertEquals(0, executorService.getAllExecutorStatuses().size());
138
139    // Test that submit doesn't throw NPEs
140    executorService
141      .submit(new TestEventHandler(mockedServer, EventType.M_SERVER_SHUTDOWN, lock, counter));
142  }
143
144  private void checkStatusDump(ExecutorStatus status) throws IOException {
145    StringWriter sw = new StringWriter();
146    status.dumpTo(sw, "");
147    String dump = sw.toString();
148    LOG.info("Got status dump:\n" + dump);
149
150    assertTrue(dump.contains("Waiting on java.util.concurrent.atomic.AtomicBoolean"));
151  }
152
153  public static class TestEventHandler extends EventHandler {
154    private final AtomicBoolean lock;
155    private AtomicInteger counter;
156
157    public TestEventHandler(Server server, EventType eventType, AtomicBoolean lock,
158      AtomicInteger counter) {
159      super(server, eventType);
160      this.lock = lock;
161      this.counter = counter;
162    }
163
164    @Override
165    public void process() throws IOException {
166      int num = counter.incrementAndGet();
167      LOG.info("Running process #" + num + ", threadName=" + Thread.currentThread().getName());
168      synchronized (lock) {
169        while (lock.get()) {
170          try {
171            lock.wait();
172          } catch (InterruptedException e) {
173            // do nothing
174          }
175        }
176      }
177      counter.incrementAndGet();
178    }
179  }
180
181  @Test
182  public void testAborting() throws Exception {
183    final Configuration conf = HBaseConfiguration.create();
184    final Server server = mock(Server.class);
185    when(server.getConfiguration()).thenReturn(conf);
186
187    ExecutorService executorService = new ExecutorService("unit_test");
188    executorService.startExecutorService(executorService.new ExecutorConfig()
189      .setExecutorType(ExecutorType.MASTER_SERVER_OPERATIONS).setCorePoolSize(1));
190
191    executorService.submit(new EventHandler(server, EventType.M_SERVER_SHUTDOWN) {
192      @Override
193      public void process() throws IOException {
194        throw new RuntimeException("Should cause abort");
195      }
196    });
197
198    Waiter.waitFor(conf, 30000, new Predicate<Exception>() {
199      @Override
200      public boolean evaluate() throws Exception {
201        try {
202          verify(server, times(1)).abort(anyString(), any());
203          return true;
204        } catch (Throwable t) {
205          return false;
206        }
207      }
208    });
209
210    executorService.shutdown();
211  }
212
213  @Test
214  public void testSnapshotHandlers() throws Exception {
215    final Configuration conf = HBaseConfiguration.create();
216    final Server server = mock(Server.class);
217    when(server.getConfiguration()).thenReturn(conf);
218
219    ExecutorService executorService = new ExecutorService("testSnapshotHandlers");
220    executorService.startExecutorService(executorService.new ExecutorConfig()
221      .setExecutorType(ExecutorType.MASTER_SNAPSHOT_OPERATIONS).setCorePoolSize(1));
222
223    CountDownLatch latch = new CountDownLatch(1);
224    CountDownLatch waitForEventToStart = new CountDownLatch(1);
225    executorService.submit(new EventHandler(server, EventType.C_M_SNAPSHOT_TABLE) {
226      @Override
227      public void process() throws IOException {
228        waitForEventToStart.countDown();
229        try {
230          latch.await();
231        } catch (InterruptedException e) {
232          Thread.currentThread().interrupt();
233        }
234      }
235    });
236
237    // Wait EventHandler to start
238    waitForEventToStart.await(10, TimeUnit.SECONDS);
239    int activeCount = executorService.getExecutor(ExecutorType.MASTER_SNAPSHOT_OPERATIONS)
240      .getThreadPoolExecutor().getActiveCount();
241    assertEquals(1, activeCount);
242    latch.countDown();
243    Waiter.waitFor(conf, 3000, () -> {
244      int count = executorService.getExecutor(ExecutorType.MASTER_SNAPSHOT_OPERATIONS)
245        .getThreadPoolExecutor().getActiveCount();
246      return count == 0;
247    });
248  }
249}