001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.executor;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.mockito.ArgumentMatchers.anyObject;
023import static org.mockito.ArgumentMatchers.anyString;
024import static org.mockito.Mockito.mock;
025import static org.mockito.Mockito.times;
026import static org.mockito.Mockito.verify;
027import static org.mockito.Mockito.when;
028
029import java.io.IOException;
030import java.io.StringWriter;
031import java.util.concurrent.CountDownLatch;
032import java.util.concurrent.ThreadPoolExecutor;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicBoolean;
035import java.util.concurrent.atomic.AtomicInteger;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseConfiguration;
039import org.apache.hadoop.hbase.Server;
040import org.apache.hadoop.hbase.Waiter;
041import org.apache.hadoop.hbase.Waiter.Predicate;
042import org.apache.hadoop.hbase.executor.ExecutorService.Executor;
043import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorStatus;
044import org.apache.hadoop.hbase.testclassification.MiscTests;
045import org.apache.hadoop.hbase.testclassification.SmallTests;
046import org.junit.Assert;
047import org.junit.ClassRule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053@Category({MiscTests.class, SmallTests.class})
054public class TestExecutorService {
055
056  @ClassRule
057  public static final HBaseClassTestRule CLASS_RULE =
058      HBaseClassTestRule.forClass(TestExecutorService.class);
059
060  private static final Logger LOG = LoggerFactory.getLogger(TestExecutorService.class);
061
062  @Test
063  public void testExecutorService() throws Exception {
064    int maxThreads = 5;
065    int maxTries = 10;
066    int sleepInterval = 10;
067
068    Server mockedServer = mock(Server.class);
069    when(mockedServer.getConfiguration()).thenReturn(HBaseConfiguration.create());
070
071    // Start an executor service pool with max 5 threads
072    ExecutorService executorService = new ExecutorService("unit_test");
073    executorService.startExecutorService(
074      ExecutorType.MASTER_SERVER_OPERATIONS, maxThreads);
075
076    Executor executor =
077      executorService.getExecutor(ExecutorType.MASTER_SERVER_OPERATIONS);
078    ThreadPoolExecutor pool = executor.threadPoolExecutor;
079
080    // Assert no threads yet
081    assertEquals(0, pool.getPoolSize());
082
083    AtomicBoolean lock = new AtomicBoolean(true);
084    AtomicInteger counter = new AtomicInteger(0);
085
086    // Submit maxThreads executors.
087    for (int i = 0; i < maxThreads; i++) {
088      executorService.submit(
089        new TestEventHandler(mockedServer, EventType.M_SERVER_SHUTDOWN,
090            lock, counter));
091    }
092
093    // The TestEventHandler will increment counter when it starts.
094    int tries = 0;
095    while (counter.get() < maxThreads && tries < maxTries) {
096      LOG.info("Waiting for all event handlers to start...");
097      Thread.sleep(sleepInterval);
098      tries++;
099    }
100
101    // Assert that pool is at max threads.
102    assertEquals(maxThreads, counter.get());
103    assertEquals(maxThreads, pool.getPoolSize());
104
105    ExecutorStatus status = executor.getStatus();
106    assertTrue(status.queuedEvents.isEmpty());
107    assertEquals(5, status.running.size());
108    checkStatusDump(status);
109
110
111    // Now interrupt the running Executor
112    synchronized (lock) {
113      lock.set(false);
114      lock.notifyAll();
115    }
116
117    // Executor increments counter again on way out so.... test that happened.
118    while (counter.get() < (maxThreads * 2) && tries < maxTries) {
119      System.out.println("Waiting for all event handlers to finish...");
120      Thread.sleep(sleepInterval);
121      tries++;
122    }
123
124    assertEquals(maxThreads * 2, counter.get());
125    assertEquals(maxThreads, pool.getPoolSize());
126
127    // Add more than the number of threads items.
128    // Make sure we don't get RejectedExecutionException.
129    for (int i = 0; i < (2 * maxThreads); i++) {
130      executorService.submit(
131        new TestEventHandler(mockedServer, EventType.M_SERVER_SHUTDOWN,
132            lock, counter));
133    }
134    // Now interrupt the running Executor
135    synchronized (lock) {
136      lock.set(false);
137      lock.notifyAll();
138    }
139
140    // Make sure threads are still around even after their timetolive expires.
141    Thread.sleep(ExecutorService.Executor.keepAliveTimeInMillis * 2);
142    assertEquals(maxThreads, pool.getPoolSize());
143
144    executorService.shutdown();
145
146    assertEquals(0, executorService.getAllExecutorStatuses().size());
147
148    // Test that submit doesn't throw NPEs
149    executorService.submit(
150      new TestEventHandler(mockedServer, EventType.M_SERVER_SHUTDOWN,
151            lock, counter));
152  }
153
154  private void checkStatusDump(ExecutorStatus status) throws IOException {
155    StringWriter sw = new StringWriter();
156    status.dumpTo(sw, "");
157    String dump = sw.toString();
158    LOG.info("Got status dump:\n" + dump);
159
160    assertTrue(dump.contains("Waiting on java.util.concurrent.atomic.AtomicBoolean"));
161  }
162
163  public static class TestEventHandler extends EventHandler {
164    private final AtomicBoolean lock;
165    private AtomicInteger counter;
166
167    public TestEventHandler(Server server, EventType eventType,
168                            AtomicBoolean lock, AtomicInteger counter) {
169      super(server, eventType);
170      this.lock = lock;
171      this.counter = counter;
172    }
173
174    @Override
175    public void process() throws IOException {
176      int num = counter.incrementAndGet();
177      LOG.info("Running process #" + num + ", threadName=" +
178        Thread.currentThread().getName());
179      synchronized (lock) {
180        while (lock.get()) {
181          try {
182            lock.wait();
183          } catch (InterruptedException e) {
184            // do nothing
185          }
186        }
187      }
188      counter.incrementAndGet();
189    }
190  }
191
192  @Test
193  public void testAborting() throws Exception {
194    final Configuration conf = HBaseConfiguration.create();
195    final Server server = mock(Server.class);
196    when(server.getConfiguration()).thenReturn(conf);
197
198    ExecutorService executorService = new ExecutorService("unit_test");
199    executorService.startExecutorService(
200      ExecutorType.MASTER_SERVER_OPERATIONS, 1);
201
202
203    executorService.submit(new EventHandler(server, EventType.M_SERVER_SHUTDOWN) {
204      @Override
205      public void process() throws IOException {
206        throw new RuntimeException("Should cause abort");
207      }
208    });
209
210    Waiter.waitFor(conf, 30000, new Predicate<Exception>() {
211      @Override
212      public boolean evaluate() throws Exception {
213        try {
214          verify(server, times(1)).abort(anyString(), (Throwable) anyObject());
215          return true;
216        } catch (Throwable t) {
217          return false;
218        }
219      }
220    });
221
222    executorService.shutdown();
223  }
224
225  @Test
226  public void testSnapshotHandlers() throws Exception {
227    final Configuration conf = HBaseConfiguration.create();
228    final Server server = mock(Server.class);
229    when(server.getConfiguration()).thenReturn(conf);
230
231    ExecutorService executorService = new ExecutorService("testSnapshotHandlers");
232    executorService.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS, 1);
233
234    CountDownLatch latch = new CountDownLatch(1);
235    CountDownLatch waitForEventToStart = new CountDownLatch(1);
236    executorService.submit(new EventHandler(server, EventType.C_M_SNAPSHOT_TABLE) {
237      @Override
238      public void process() throws IOException {
239        waitForEventToStart.countDown();
240        try {
241          latch.await();
242        } catch (InterruptedException e) {
243          Thread.currentThread().interrupt();
244        }
245      }
246    });
247
248    //Wait EventHandler to start
249    waitForEventToStart.await(10, TimeUnit.SECONDS);
250    int activeCount = executorService.getExecutor(ExecutorType.MASTER_SNAPSHOT_OPERATIONS)
251        .getThreadPoolExecutor().getActiveCount();
252    Assert.assertEquals(1, activeCount);
253    latch.countDown();
254    Waiter.waitFor(conf, 3000, () -> {
255      int count = executorService.getExecutor(ExecutorType.MASTER_SNAPSHOT_OPERATIONS)
256          .getThreadPoolExecutor().getActiveCount();
257      return count == 0;
258    });
259  }
260}
261