001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase; 020 021import java.io.IOException; 022import java.util.HashSet; 023import java.util.List; 024import java.util.Set; 025import java.util.concurrent.ExecutionException; 026import java.util.concurrent.Future; 027 028import org.apache.hadoop.conf.Configuration; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032public abstract class MultithreadedTestUtil { 033 034 private static final Logger LOG = 035 LoggerFactory.getLogger(MultithreadedTestUtil.class); 036 037 public static class TestContext { 038 private final Configuration conf; 039 private Throwable err = null; 040 private boolean stopped = false; 041 private int threadDoneCount = 0; 042 private Set<TestThread> testThreads = new HashSet<>(); 043 044 public TestContext(Configuration configuration) { 045 this.conf = configuration; 046 } 047 048 protected Configuration getConf() { 049 return conf; 050 } 051 052 public synchronized boolean shouldRun() { 053 return !stopped && err == null; 054 } 055 056 public void addThread(TestThread t) { 057 testThreads.add(t); 058 } 059 060 public void startThreads() { 061 for (TestThread t : testThreads) { 062 t.start(); 063 } 064 } 065 066 public void waitFor(long millis) throws Exception { 067 long endTime = System.currentTimeMillis() + millis; 068 while (!stopped) { 069 long left = endTime - System.currentTimeMillis(); 070 if (left <= 0) break; 071 synchronized (this) { 072 checkException(); 073 wait(left); 074 } 075 } 076 } 077 private synchronized void checkException() throws Exception { 078 if (err != null) { 079 throw new RuntimeException("Deferred", err); 080 } 081 } 082 083 public synchronized void threadFailed(Throwable t) { 084 if (err == null) err = t; 085 LOG.error("Failed!", err); 086 notify(); 087 } 088 089 public synchronized void threadDone() { 090 threadDoneCount++; 091 } 092 093 public void setStopFlag(boolean s) throws Exception { 094 synchronized (this) { 095 stopped = s; 096 } 097 } 098 099 public void stop() throws Exception { 100 synchronized (this) { 101 stopped = true; 102 } 103 for (TestThread t : testThreads) { 104 t.join(); 105 } 106 checkException(); 107 } 108 } 109 110 /** 111 * A thread that can be added to a test context, and properly 112 * passes exceptions through. 113 */ 114 public static abstract class TestThread extends Thread { 115 protected final TestContext ctx; 116 protected boolean stopped; 117 118 public TestThread(TestContext ctx) { 119 this.ctx = ctx; 120 } 121 122 @Override 123 public void run() { 124 try { 125 doWork(); 126 } catch (Throwable t) { 127 ctx.threadFailed(t); 128 } 129 ctx.threadDone(); 130 } 131 132 public abstract void doWork() throws Exception; 133 134 protected void stopTestThread() { 135 this.stopped = true; 136 } 137 } 138 139 /** 140 * A test thread that performs a repeating operation. 141 */ 142 public static abstract class RepeatingTestThread extends TestThread { 143 public RepeatingTestThread(TestContext ctx) { 144 super(ctx); 145 } 146 147 @Override 148 public final void doWork() throws Exception { 149 try { 150 while (ctx.shouldRun() && !stopped) { 151 doAnAction(); 152 } 153 } finally { 154 workDone(); 155 } 156 } 157 158 public abstract void doAnAction() throws Exception; 159 public void workDone() throws IOException {} 160 } 161 162 /** 163 * Verify that no assertions have failed inside a future. 164 * Used for unit tests that spawn threads. E.g., 165 * <p> 166 * <pre> 167 * List<Future<Void>> results = Lists.newArrayList(); 168 * Future<Void> f = executor.submit(new Callable<Void> { 169 * public Void call() { 170 * assertTrue(someMethod()); 171 * } 172 * }); 173 * results.add(f); 174 * assertOnFutures(results); 175 * </pre> 176 * @param threadResults A list of futures 177 * @throws InterruptedException If interrupted when waiting for a result 178 * from one of the futures 179 * @throws ExecutionException If an exception other than AssertionError 180 * occurs inside any of the futures 181 */ 182 public static void assertOnFutures(List<Future<?>> threadResults) 183 throws InterruptedException, ExecutionException { 184 for (Future<?> threadResult : threadResults) { 185 try { 186 threadResult.get(); 187 } catch (ExecutionException e) { 188 if (e.getCause() instanceof AssertionError) { 189 throw (AssertionError) e.getCause(); 190 } 191 throw e; 192 } 193 } 194 } 195}