001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.executor;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.mockito.ArgumentMatchers.anyObject;
023import static org.mockito.ArgumentMatchers.anyString;
024import static org.mockito.Mockito.mock;
025import static org.mockito.Mockito.times;
026import static org.mockito.Mockito.verify;
027import static org.mockito.Mockito.when;
028
029import java.io.IOException;
030import java.io.StringWriter;
031import java.util.concurrent.ThreadPoolExecutor;
032import java.util.concurrent.atomic.AtomicBoolean;
033import java.util.concurrent.atomic.AtomicInteger;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseConfiguration;
037import org.apache.hadoop.hbase.Server;
038import org.apache.hadoop.hbase.Waiter;
039import org.apache.hadoop.hbase.Waiter.Predicate;
040import org.apache.hadoop.hbase.executor.ExecutorService.Executor;
041import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorStatus;
042import org.apache.hadoop.hbase.testclassification.MiscTests;
043import org.apache.hadoop.hbase.testclassification.SmallTests;
044import org.junit.ClassRule;
045import org.junit.Test;
046import org.junit.experimental.categories.Category;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050@Category({MiscTests.class, SmallTests.class})
051public class TestExecutorService {
052
053  @ClassRule
054  public static final HBaseClassTestRule CLASS_RULE =
055      HBaseClassTestRule.forClass(TestExecutorService.class);
056
057  private static final Logger LOG = LoggerFactory.getLogger(TestExecutorService.class);
058
059  @Test
060  public void testExecutorService() throws Exception {
061    int maxThreads = 5;
062    int maxTries = 10;
063    int sleepInterval = 10;
064
065    Server mockedServer = mock(Server.class);
066    when(mockedServer.getConfiguration()).thenReturn(HBaseConfiguration.create());
067
068    // Start an executor service pool with max 5 threads
069    ExecutorService executorService = new ExecutorService("unit_test");
070    executorService.startExecutorService(
071      ExecutorType.MASTER_SERVER_OPERATIONS, maxThreads);
072
073    Executor executor =
074      executorService.getExecutor(ExecutorType.MASTER_SERVER_OPERATIONS);
075    ThreadPoolExecutor pool = executor.threadPoolExecutor;
076
077    // Assert no threads yet
078    assertEquals(0, pool.getPoolSize());
079
080    AtomicBoolean lock = new AtomicBoolean(true);
081    AtomicInteger counter = new AtomicInteger(0);
082
083    // Submit maxThreads executors.
084    for (int i = 0; i < maxThreads; i++) {
085      executorService.submit(
086        new TestEventHandler(mockedServer, EventType.M_SERVER_SHUTDOWN,
087            lock, counter));
088    }
089
090    // The TestEventHandler will increment counter when it starts.
091    int tries = 0;
092    while (counter.get() < maxThreads && tries < maxTries) {
093      LOG.info("Waiting for all event handlers to start...");
094      Thread.sleep(sleepInterval);
095      tries++;
096    }
097
098    // Assert that pool is at max threads.
099    assertEquals(maxThreads, counter.get());
100    assertEquals(maxThreads, pool.getPoolSize());
101
102    ExecutorStatus status = executor.getStatus();
103    assertTrue(status.queuedEvents.isEmpty());
104    assertEquals(5, status.running.size());
105    checkStatusDump(status);
106
107
108    // Now interrupt the running Executor
109    synchronized (lock) {
110      lock.set(false);
111      lock.notifyAll();
112    }
113
114    // Executor increments counter again on way out so.... test that happened.
115    while (counter.get() < (maxThreads * 2) && tries < maxTries) {
116      System.out.println("Waiting for all event handlers to finish...");
117      Thread.sleep(sleepInterval);
118      tries++;
119    }
120
121    assertEquals(maxThreads * 2, counter.get());
122    assertEquals(maxThreads, pool.getPoolSize());
123
124    // Add more than the number of threads items.
125    // Make sure we don't get RejectedExecutionException.
126    for (int i = 0; i < (2 * maxThreads); i++) {
127      executorService.submit(
128        new TestEventHandler(mockedServer, EventType.M_SERVER_SHUTDOWN,
129            lock, counter));
130    }
131    // Now interrupt the running Executor
132    synchronized (lock) {
133      lock.set(false);
134      lock.notifyAll();
135    }
136
137    // Make sure threads are still around even after their timetolive expires.
138    Thread.sleep(ExecutorService.Executor.keepAliveTimeInMillis * 2);
139    assertEquals(maxThreads, pool.getPoolSize());
140
141    executorService.shutdown();
142
143    assertEquals(0, executorService.getAllExecutorStatuses().size());
144
145    // Test that submit doesn't throw NPEs
146    executorService.submit(
147      new TestEventHandler(mockedServer, EventType.M_SERVER_SHUTDOWN,
148            lock, counter));
149  }
150
151  private void checkStatusDump(ExecutorStatus status) throws IOException {
152    StringWriter sw = new StringWriter();
153    status.dumpTo(sw, "");
154    String dump = sw.toString();
155    LOG.info("Got status dump:\n" + dump);
156
157    assertTrue(dump.contains("Waiting on java.util.concurrent.atomic.AtomicBoolean"));
158  }
159
160  public static class TestEventHandler extends EventHandler {
161    private final AtomicBoolean lock;
162    private AtomicInteger counter;
163
164    public TestEventHandler(Server server, EventType eventType,
165                            AtomicBoolean lock, AtomicInteger counter) {
166      super(server, eventType);
167      this.lock = lock;
168      this.counter = counter;
169    }
170
171    @Override
172    public void process() throws IOException {
173      int num = counter.incrementAndGet();
174      LOG.info("Running process #" + num + ", threadName=" +
175        Thread.currentThread().getName());
176      synchronized (lock) {
177        while (lock.get()) {
178          try {
179            lock.wait();
180          } catch (InterruptedException e) {
181            // do nothing
182          }
183        }
184      }
185      counter.incrementAndGet();
186    }
187  }
188
189  @Test
190  public void testAborting() throws Exception {
191    final Configuration conf = HBaseConfiguration.create();
192    final Server server = mock(Server.class);
193    when(server.getConfiguration()).thenReturn(conf);
194
195    ExecutorService executorService = new ExecutorService("unit_test");
196    executorService.startExecutorService(
197      ExecutorType.MASTER_SERVER_OPERATIONS, 1);
198
199
200    executorService.submit(new EventHandler(server, EventType.M_SERVER_SHUTDOWN) {
201      @Override
202      public void process() throws IOException {
203        throw new RuntimeException("Should cause abort");
204      }
205    });
206
207    Waiter.waitFor(conf, 30000, new Predicate<Exception>() {
208      @Override
209      public boolean evaluate() throws Exception {
210        try {
211          verify(server, times(1)).abort(anyString(), (Throwable) anyObject());
212          return true;
213        } catch (Throwable t) {
214          return false;
215        }
216      }
217    });
218
219    executorService.shutdown();
220  }
221
222}
223