001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.executor;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.mockito.ArgumentMatchers.any;
023import static org.mockito.ArgumentMatchers.anyString;
024import static org.mockito.Mockito.mock;
025import static org.mockito.Mockito.times;
026import static org.mockito.Mockito.verify;
027import static org.mockito.Mockito.when;
028
029import java.io.IOException;
030import java.io.StringWriter;
031import java.util.concurrent.CountDownLatch;
032import java.util.concurrent.ThreadPoolExecutor;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicBoolean;
035import java.util.concurrent.atomic.AtomicInteger;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseConfiguration;
039import org.apache.hadoop.hbase.Server;
040import org.apache.hadoop.hbase.Waiter;
041import org.apache.hadoop.hbase.Waiter.Predicate;
042import org.apache.hadoop.hbase.executor.ExecutorService.Executor;
043import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
044import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorStatus;
045import org.apache.hadoop.hbase.testclassification.MiscTests;
046import org.apache.hadoop.hbase.testclassification.SmallTests;
047import org.junit.Assert;
048import org.junit.ClassRule;
049import org.junit.Test;
050import org.junit.experimental.categories.Category;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054@Category({ MiscTests.class, SmallTests.class })
055public class TestExecutorService {
056
057  @ClassRule
058  public static final HBaseClassTestRule CLASS_RULE =
059    HBaseClassTestRule.forClass(TestExecutorService.class);
060
061  private static final Logger LOG = LoggerFactory.getLogger(TestExecutorService.class);
062
063  @Test
064  public void testExecutorService() throws Exception {
065    int maxThreads = 5;
066    int maxTries = 10;
067    int sleepInterval = 10;
068
069    Server mockedServer = mock(Server.class);
070    when(mockedServer.getConfiguration()).thenReturn(HBaseConfiguration.create());
071
072    // Start an executor service pool with max 5 threads
073    ExecutorService executorService = new ExecutorService("unit_test");
074    executorService.startExecutorService(executorService.new ExecutorConfig()
075      .setExecutorType(ExecutorType.MASTER_SERVER_OPERATIONS).setCorePoolSize(maxThreads));
076
077    Executor executor = executorService.getExecutor(ExecutorType.MASTER_SERVER_OPERATIONS);
078    ThreadPoolExecutor pool = executor.threadPoolExecutor;
079
080    // Assert no threads yet
081    assertEquals(0, pool.getPoolSize());
082
083    AtomicBoolean lock = new AtomicBoolean(true);
084    AtomicInteger counter = new AtomicInteger(0);
085
086    // Submit maxThreads executors.
087    for (int i = 0; i < maxThreads; i++) {
088      executorService
089        .submit(new TestEventHandler(mockedServer, EventType.M_SERVER_SHUTDOWN, lock, counter));
090    }
091
092    // The TestEventHandler will increment counter when it starts.
093    int tries = 0;
094    while (counter.get() < maxThreads && tries < maxTries) {
095      LOG.info("Waiting for all event handlers to start...");
096      Thread.sleep(sleepInterval);
097      tries++;
098    }
099
100    // Assert that pool is at max threads.
101    assertEquals(maxThreads, counter.get());
102    assertEquals(maxThreads, pool.getPoolSize());
103
104    ExecutorStatus status = executor.getStatus();
105    assertTrue(status.queuedEvents.isEmpty());
106    assertEquals(5, status.running.size());
107    checkStatusDump(status);
108
109    // Now interrupt the running Executor
110    synchronized (lock) {
111      lock.set(false);
112      lock.notifyAll();
113    }
114
115    // Executor increments counter again on way out so.... test that happened.
116    while (counter.get() < (maxThreads * 2) && tries < maxTries) {
117      System.out.println("Waiting for all event handlers to finish...");
118      Thread.sleep(sleepInterval);
119      tries++;
120    }
121
122    assertEquals(maxThreads * 2, counter.get());
123    assertEquals(maxThreads, pool.getPoolSize());
124
125    // Add more than the number of threads items.
126    // Make sure we don't get RejectedExecutionException.
127    for (int i = 0; i < (2 * maxThreads); i++) {
128      executorService
129        .submit(new TestEventHandler(mockedServer, EventType.M_SERVER_SHUTDOWN, lock, counter));
130    }
131    // Now interrupt the running Executor
132    synchronized (lock) {
133      lock.set(false);
134      lock.notifyAll();
135    }
136
137    // Make sure threads are still around even after their timetolive expires.
138    Thread.sleep(ExecutorConfig.KEEP_ALIVE_TIME_MILLIS_DEFAULT * 2);
139    assertEquals(maxThreads, pool.getPoolSize());
140
141    executorService.shutdown();
142
143    assertEquals(0, executorService.getAllExecutorStatuses().size());
144
145    // Test that submit doesn't throw NPEs
146    executorService
147      .submit(new TestEventHandler(mockedServer, EventType.M_SERVER_SHUTDOWN, lock, counter));
148  }
149
150  private void checkStatusDump(ExecutorStatus status) throws IOException {
151    StringWriter sw = new StringWriter();
152    status.dumpTo(sw, "");
153    String dump = sw.toString();
154    LOG.info("Got status dump:\n" + dump);
155
156    assertTrue(dump.contains("Waiting on java.util.concurrent.atomic.AtomicBoolean"));
157  }
158
159  public static class TestEventHandler extends EventHandler {
160    private final AtomicBoolean lock;
161    private AtomicInteger counter;
162
163    public TestEventHandler(Server server, EventType eventType, AtomicBoolean lock,
164      AtomicInteger counter) {
165      super(server, eventType);
166      this.lock = lock;
167      this.counter = counter;
168    }
169
170    @Override
171    public void process() throws IOException {
172      int num = counter.incrementAndGet();
173      LOG.info("Running process #" + num + ", threadName=" + Thread.currentThread().getName());
174      synchronized (lock) {
175        while (lock.get()) {
176          try {
177            lock.wait();
178          } catch (InterruptedException e) {
179            // do nothing
180          }
181        }
182      }
183      counter.incrementAndGet();
184    }
185  }
186
187  @Test
188  public void testAborting() throws Exception {
189    final Configuration conf = HBaseConfiguration.create();
190    final Server server = mock(Server.class);
191    when(server.getConfiguration()).thenReturn(conf);
192
193    ExecutorService executorService = new ExecutorService("unit_test");
194    executorService.startExecutorService(executorService.new ExecutorConfig()
195      .setExecutorType(ExecutorType.MASTER_SERVER_OPERATIONS).setCorePoolSize(1));
196
197    executorService.submit(new EventHandler(server, EventType.M_SERVER_SHUTDOWN) {
198      @Override
199      public void process() throws IOException {
200        throw new RuntimeException("Should cause abort");
201      }
202    });
203
204    Waiter.waitFor(conf, 30000, new Predicate<Exception>() {
205      @Override
206      public boolean evaluate() throws Exception {
207        try {
208          verify(server, times(1)).abort(anyString(), any());
209          return true;
210        } catch (Throwable t) {
211          return false;
212        }
213      }
214    });
215
216    executorService.shutdown();
217  }
218
219  @Test
220  public void testSnapshotHandlers() throws Exception {
221    final Configuration conf = HBaseConfiguration.create();
222    final Server server = mock(Server.class);
223    when(server.getConfiguration()).thenReturn(conf);
224
225    ExecutorService executorService = new ExecutorService("testSnapshotHandlers");
226    executorService.startExecutorService(executorService.new ExecutorConfig()
227      .setExecutorType(ExecutorType.MASTER_SNAPSHOT_OPERATIONS).setCorePoolSize(1));
228
229    CountDownLatch latch = new CountDownLatch(1);
230    CountDownLatch waitForEventToStart = new CountDownLatch(1);
231    executorService.submit(new EventHandler(server, EventType.C_M_SNAPSHOT_TABLE) {
232      @Override
233      public void process() throws IOException {
234        waitForEventToStart.countDown();
235        try {
236          latch.await();
237        } catch (InterruptedException e) {
238          Thread.currentThread().interrupt();
239        }
240      }
241    });
242
243    // Wait EventHandler to start
244    waitForEventToStart.await(10, TimeUnit.SECONDS);
245    int activeCount = executorService.getExecutor(ExecutorType.MASTER_SNAPSHOT_OPERATIONS)
246      .getThreadPoolExecutor().getActiveCount();
247    Assert.assertEquals(1, activeCount);
248    latch.countDown();
249    Waiter.waitFor(conf, 3000, () -> {
250      int count = executorService.getExecutor(ExecutorType.MASTER_SNAPSHOT_OPERATIONS)
251        .getThreadPoolExecutor().getActiveCount();
252      return count == 0;
253    });
254  }
255}