001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.util.HashSet; 022import java.util.List; 023import java.util.Set; 024import java.util.concurrent.ExecutionException; 025import java.util.concurrent.Future; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031public abstract class MultithreadedTestUtil { 032 033 private static final Logger LOG = LoggerFactory.getLogger(MultithreadedTestUtil.class); 034 035 public static class TestContext { 036 private final Configuration conf; 037 private Throwable err = null; 038 private boolean stopped = false; 039 private int threadDoneCount = 0; 040 private Set<TestThread> testThreads = new HashSet<>(); 041 042 public TestContext(Configuration configuration) { 043 this.conf = configuration; 044 } 045 046 protected Configuration getConf() { 047 return conf; 048 } 049 050 public synchronized boolean shouldRun() { 051 return !stopped && err == null; 052 } 053 054 public void addThread(TestThread t) { 055 testThreads.add(t); 056 } 057 058 public void startThreads() { 059 for (TestThread t : testThreads) { 060 t.start(); 061 } 062 } 063 064 public void waitFor(long millis) throws Exception { 065 long endTime = EnvironmentEdgeManager.currentTime() + millis; 066 while (!stopped) { 067 long left = endTime - EnvironmentEdgeManager.currentTime(); 068 if (left <= 0) break; 069 synchronized (this) { 070 checkException(); 071 wait(left); 072 } 073 } 074 } 075 076 private synchronized void checkException() throws Exception { 077 if (err != null) { 078 throw new RuntimeException("Deferred", err); 079 } 080 } 081 082 public synchronized void threadFailed(Throwable t) { 083 if (err == null) err = t; 084 LOG.error("Failed!", err); 085 notify(); 086 } 087 088 public synchronized void threadDone() { 089 threadDoneCount++; 090 } 091 092 public void setStopFlag(boolean s) throws Exception { 093 synchronized (this) { 094 stopped = s; 095 } 096 } 097 098 public void stop() throws Exception { 099 synchronized (this) { 100 stopped = true; 101 } 102 for (TestThread t : testThreads) { 103 t.join(); 104 } 105 checkException(); 106 } 107 } 108 109 /** 110 * A thread that can be added to a test context, and properly passes exceptions through. 111 */ 112 public static abstract class TestThread extends Thread { 113 protected final TestContext ctx; 114 protected boolean stopped; 115 116 public TestThread(TestContext ctx) { 117 this.ctx = ctx; 118 } 119 120 @Override 121 public void run() { 122 try { 123 doWork(); 124 } catch (Throwable t) { 125 ctx.threadFailed(t); 126 } 127 ctx.threadDone(); 128 } 129 130 public abstract void doWork() throws Exception; 131 132 protected void stopTestThread() { 133 this.stopped = true; 134 } 135 } 136 137 /** 138 * A test thread that performs a repeating operation. 139 */ 140 public static abstract class RepeatingTestThread extends TestThread { 141 public RepeatingTestThread(TestContext ctx) { 142 super(ctx); 143 } 144 145 @Override 146 public final void doWork() throws Exception { 147 try { 148 while (ctx.shouldRun() && !stopped) { 149 doAnAction(); 150 } 151 } finally { 152 workDone(); 153 } 154 } 155 156 public abstract void doAnAction() throws Exception; 157 158 public void workDone() throws IOException { 159 } 160 } 161 162 /** 163 * Verify that no assertions have failed inside a future. Used for unit tests that spawn threads. 164 * E.g., 165 * <p> 166 * 167 * <pre> 168 * List<Future<Void>> results = Lists.newArrayList(); 169 * Future<Void> f = executor.submit(new Callable<Void> { 170 * public Void call() { 171 * assertTrue(someMethod()); 172 * } 173 * }); 174 * results.add(f); 175 * assertOnFutures(results); 176 * </pre> 177 * 178 * @param threadResults A list of futures 179 * @throws InterruptedException If interrupted when waiting for a result from one of the futures 180 * @throws ExecutionException If an exception other than AssertionError occurs inside any of the 181 * futures 182 */ 183 public static void assertOnFutures(List<Future<?>> threadResults) 184 throws InterruptedException, ExecutionException { 185 for (Future<?> threadResult : threadResults) { 186 try { 187 threadResult.get(); 188 } catch (ExecutionException e) { 189 if (e.getCause() instanceof AssertionError) { 190 throw (AssertionError) e.getCause(); 191 } 192 throw e; 193 } 194 } 195 } 196}