001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.util.HashSet;
022import java.util.List;
023import java.util.Set;
024import java.util.concurrent.ExecutionException;
025import java.util.concurrent.Future;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031public abstract class MultithreadedTestUtil {
032
033  private static final Logger LOG = LoggerFactory.getLogger(MultithreadedTestUtil.class);
034
035  public static class TestContext {
036    private final Configuration conf;
037    private Throwable err = null;
038    private boolean stopped = false;
039    private int threadDoneCount = 0;
040    private Set<TestThread> testThreads = new HashSet<>();
041
042    public TestContext(Configuration configuration) {
043      this.conf = configuration;
044    }
045
046    protected Configuration getConf() {
047      return conf;
048    }
049
050    public synchronized boolean shouldRun() {
051      return !stopped && err == null;
052    }
053
054    public void addThread(TestThread t) {
055      testThreads.add(t);
056    }
057
058    public void startThreads() {
059      for (TestThread t : testThreads) {
060        t.start();
061      }
062    }
063
064    public void waitFor(long millis) throws Exception {
065      long endTime = EnvironmentEdgeManager.currentTime() + millis;
066      while (!stopped) {
067        long left = endTime - EnvironmentEdgeManager.currentTime();
068        if (left <= 0) break;
069        synchronized (this) {
070          checkException();
071          wait(left);
072        }
073      }
074    }
075
076    private synchronized void checkException() throws Exception {
077      if (err != null) {
078        throw new RuntimeException("Deferred", err);
079      }
080    }
081
082    public synchronized void threadFailed(Throwable t) {
083      if (err == null) err = t;
084      LOG.error("Failed!", err);
085      notify();
086    }
087
088    public synchronized void threadDone() {
089      threadDoneCount++;
090    }
091
092    public void setStopFlag(boolean s) throws Exception {
093      synchronized (this) {
094        stopped = s;
095      }
096    }
097
098    public void stop() throws Exception {
099      synchronized (this) {
100        stopped = true;
101      }
102      for (TestThread t : testThreads) {
103        t.join();
104      }
105      checkException();
106    }
107  }
108
109  /**
110   * A thread that can be added to a test context, and properly passes exceptions through.
111   */
112  public static abstract class TestThread extends Thread {
113    protected final TestContext ctx;
114    protected boolean stopped;
115
116    public TestThread(TestContext ctx) {
117      this.ctx = ctx;
118    }
119
120    @Override
121    public void run() {
122      try {
123        doWork();
124      } catch (Throwable t) {
125        ctx.threadFailed(t);
126      }
127      ctx.threadDone();
128    }
129
130    public abstract void doWork() throws Exception;
131
132    protected void stopTestThread() {
133      this.stopped = true;
134    }
135  }
136
137  /**
138   * A test thread that performs a repeating operation.
139   */
140  public static abstract class RepeatingTestThread extends TestThread {
141    public RepeatingTestThread(TestContext ctx) {
142      super(ctx);
143    }
144
145    @Override
146    public final void doWork() throws Exception {
147      try {
148        while (ctx.shouldRun() && !stopped) {
149          doAnAction();
150        }
151      } finally {
152        workDone();
153      }
154    }
155
156    public abstract void doAnAction() throws Exception;
157
158    public void workDone() throws IOException {
159    }
160  }
161
162  /**
163   * Verify that no assertions have failed inside a future. Used for unit tests that spawn threads.
164   * E.g.,
165   * <p>
166   *
167   * <pre>
168   *   List&lt;Future&lt;Void>> results = Lists.newArrayList();
169   *   Future&lt;Void> f = executor.submit(new Callable&lt;Void> {
170   *     public Void call() {
171   *       assertTrue(someMethod());
172   *     }
173   *   });
174   *   results.add(f);
175   *   assertOnFutures(results);
176   * </pre>
177   *
178   * @param threadResults A list of futures
179   * @throws InterruptedException If interrupted when waiting for a result from one of the futures
180   * @throws ExecutionException   If an exception other than AssertionError occurs inside any of the
181   *                              futures
182   */
183  public static void assertOnFutures(List<Future<?>> threadResults)
184    throws InterruptedException, ExecutionException {
185    for (Future<?> threadResult : threadResults) {
186      try {
187        threadResult.get();
188      } catch (ExecutionException e) {
189        if (e.getCause() instanceof AssertionError) {
190          throw (AssertionError) e.getCause();
191        }
192        throw e;
193      }
194    }
195  }
196}