001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase;
020
021import java.io.IOException;
022import java.util.HashSet;
023import java.util.List;
024import java.util.Set;
025import java.util.concurrent.ExecutionException;
026import java.util.concurrent.Future;
027
028import org.apache.hadoop.conf.Configuration;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032public abstract class MultithreadedTestUtil {
033
034  private static final Logger LOG =
035    LoggerFactory.getLogger(MultithreadedTestUtil.class);
036
037  public static class TestContext {
038    private final Configuration conf;
039    private Throwable err = null;
040    private boolean stopped = false;
041    private int threadDoneCount = 0;
042    private Set<TestThread> testThreads = new HashSet<>();
043
044    public TestContext(Configuration configuration) {
045      this.conf = configuration;
046    }
047
048    protected Configuration getConf() {
049      return conf;
050    }
051
052    public synchronized boolean shouldRun()  {
053      return !stopped && err == null;
054    }
055
056    public void addThread(TestThread t) {
057      testThreads.add(t);
058    }
059
060    public void startThreads() {
061      for (TestThread t : testThreads) {
062        t.start();
063      }
064    }
065
066    public void waitFor(long millis) throws Exception {
067      long endTime = System.currentTimeMillis() + millis;
068      while (!stopped) {
069        long left = endTime - System.currentTimeMillis();
070        if (left <= 0) break;
071        synchronized (this) {
072          checkException();
073          wait(left);
074        }
075      }
076    }
077    private synchronized void checkException() throws Exception {
078      if (err != null) {
079        throw new RuntimeException("Deferred", err);
080      }
081    }
082
083    public synchronized void threadFailed(Throwable t) {
084      if (err == null) err = t;
085      LOG.error("Failed!", err);
086      notify();
087    }
088
089    public synchronized void threadDone() {
090      threadDoneCount++;
091    }
092
093    public void setStopFlag(boolean s) throws Exception {
094      synchronized (this) {
095        stopped = s;
096      }
097    }
098
099    public void stop() throws Exception {
100      synchronized (this) {
101        stopped = true;
102      }
103      for (TestThread t : testThreads) {
104        t.join();
105      }
106      checkException();
107    }
108  }
109
110  /**
111   * A thread that can be added to a test context, and properly
112   * passes exceptions through.
113   */
114  public static abstract class TestThread extends Thread {
115    protected final TestContext ctx;
116    protected boolean stopped;
117
118    public TestThread(TestContext ctx) {
119      this.ctx = ctx;
120    }
121
122    @Override
123    public void run() {
124      try {
125        doWork();
126      } catch (Throwable t) {
127        ctx.threadFailed(t);
128      }
129      ctx.threadDone();
130    }
131
132    public abstract void doWork() throws Exception;
133
134    protected void stopTestThread() {
135      this.stopped = true;
136    }
137  }
138
139  /**
140   * A test thread that performs a repeating operation.
141   */
142  public static abstract class RepeatingTestThread extends TestThread {
143    public RepeatingTestThread(TestContext ctx) {
144      super(ctx);
145    }
146
147    @Override
148    public final void doWork() throws Exception {
149      try {
150        while (ctx.shouldRun() && !stopped) {
151          doAnAction();
152        }
153      } finally {
154        workDone();
155      }
156    }
157
158    public abstract void doAnAction() throws Exception;
159    public void workDone() throws IOException {}
160  }
161
162  /**
163   * Verify that no assertions have failed inside a future.
164   * Used for unit tests that spawn threads. E.g.,
165   * <p>
166   * <code>
167   *   List<Future<Void>> results = Lists.newArrayList();
168   *   Future<Void> f = executor.submit(new Callable<Void> {
169   *     public Void call() {
170   *       assertTrue(someMethod());
171   *     }
172   *   });
173   *   results.add(f);
174   *   assertOnFutures(results);
175   * </code>
176   * @param threadResults A list of futures
177   * @param <T>
178   * @throws InterruptedException If interrupted when waiting for a result
179   *                              from one of the futures
180   * @throws ExecutionException If an exception other than AssertionError
181   *                            occurs inside any of the futures
182   */
183  public static <T> void assertOnFutures(List<Future<T>> threadResults)
184  throws InterruptedException, ExecutionException {
185    for (Future<T> threadResult : threadResults) {
186      try {
187        threadResult.get();
188      } catch (ExecutionException e) {
189        if (e.getCause() instanceof AssertionError) {
190          throw (AssertionError) e.getCause();
191        }
192        throw e;
193      }
194    }
195  }
196}