001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022import static org.junit.Assert.fail; 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.concurrent.atomic.AtomicLong; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 030import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 031import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; 032import org.apache.hadoop.hbase.testclassification.MasterTests; 033import org.apache.hadoop.hbase.testclassification.SmallTests; 034import org.junit.After; 035import org.junit.Before; 036import org.junit.ClassRule; 037import org.junit.Ignore; 038import org.junit.Test; 039import org.junit.experimental.categories.Category; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043import org.apache.hbase.thirdparty.com.google.protobuf.Int64Value; 044 045/** 046 * For now we do not guarantee this, we will restore the locks when restarting ProcedureExecutor so 047 * we should use lock to obtain the correct order. Ignored. 048 */ 049@Ignore 050@Category({ MasterTests.class, SmallTests.class }) 051public class TestProcedureReplayOrder { 052 @ClassRule 053 public static final HBaseClassTestRule CLASS_RULE = 054 HBaseClassTestRule.forClass(TestProcedureReplayOrder.class); 055 056 private static final Logger LOG = LoggerFactory.getLogger(TestProcedureReplayOrder.class); 057 058 private static final int NUM_THREADS = 16; 059 060 private ProcedureExecutor<TestProcedureEnv> procExecutor; 061 private TestProcedureEnv procEnv; 062 private ProcedureStore procStore; 063 064 private HBaseCommonTestingUtility htu; 065 private FileSystem fs; 066 private Path testDir; 067 private Path logDir; 068 069 @Before 070 public void setUp() throws IOException { 071 htu = new HBaseCommonTestingUtility(); 072 htu.getConfiguration().setInt(WALProcedureStore.SYNC_WAIT_MSEC_CONF_KEY, 25); 073 074 testDir = htu.getDataTestDir(); 075 fs = testDir.getFileSystem(htu.getConfiguration()); 076 assertTrue(testDir.depth() > 1); 077 078 logDir = new Path(testDir, "proc-logs"); 079 procEnv = new TestProcedureEnv(); 080 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); 081 procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore); 082 procStore.start(NUM_THREADS); 083 ProcedureTestingUtility.initAndStartWorkers(procExecutor, 1, true); 084 } 085 086 @After 087 public void tearDown() throws IOException { 088 procExecutor.stop(); 089 procStore.stop(false); 090 fs.delete(logDir, true); 091 } 092 093 @Test 094 public void testSingleStepReplayOrder() throws Exception { 095 final int NUM_PROC_XTHREAD = 32; 096 final int NUM_PROCS = NUM_THREADS * NUM_PROC_XTHREAD; 097 098 // submit the procedures 099 submitProcedures(NUM_THREADS, NUM_PROC_XTHREAD, TestSingleStepProcedure.class); 100 101 while (procEnv.getExecId() < NUM_PROCS) { 102 Thread.sleep(100); 103 } 104 105 // restart the executor and allow the procedures to run 106 ProcedureTestingUtility.restart(procExecutor); 107 108 // wait the execution of all the procedures and 109 // assert that the execution order was sorted by procId 110 ProcedureTestingUtility.waitNoProcedureRunning(procExecutor); 111 procEnv.assertSortedExecList(NUM_PROCS); 112 } 113 114 @Test 115 public void testMultiStepReplayOrder() throws Exception { 116 final int NUM_PROC_XTHREAD = 24; 117 final int NUM_PROCS = NUM_THREADS * (NUM_PROC_XTHREAD * 2); 118 119 // submit the procedures 120 submitProcedures(NUM_THREADS, NUM_PROC_XTHREAD, TestTwoStepProcedure.class); 121 122 while (procEnv.getExecId() < NUM_PROCS) { 123 Thread.sleep(100); 124 } 125 126 // restart the executor and allow the procedures to run 127 ProcedureTestingUtility.restart(procExecutor); 128 129 // wait the execution of all the procedures and 130 // assert that the execution order was sorted by procId 131 ProcedureTestingUtility.waitNoProcedureRunning(procExecutor); 132 procEnv.assertSortedExecList(NUM_PROCS); 133 } 134 135 private void submitProcedures(final int nthreads, final int nprocPerThread, 136 final Class<?> procClazz) throws Exception { 137 Thread[] submitThreads = new Thread[nthreads]; 138 for (int i = 0; i < submitThreads.length; ++i) { 139 submitThreads[i] = new Thread() { 140 @Override 141 public void run() { 142 for (int i = 0; i < nprocPerThread; ++i) { 143 try { 144 procExecutor.submitProcedure((Procedure) 145 procClazz.getDeclaredConstructor().newInstance()); 146 } catch (Exception e) { 147 LOG.error("unable to instantiate the procedure", e); 148 fail("failure during the proc.newInstance(): " + e.getMessage()); 149 } 150 } 151 } 152 }; 153 } 154 155 for (int i = 0; i < submitThreads.length; ++i) { 156 submitThreads[i].start(); 157 } 158 159 for (int i = 0; i < submitThreads.length; ++i) { 160 submitThreads[i].join(); 161 } 162 } 163 164 private static class TestProcedureEnv { 165 private ArrayList<TestProcedure> execList = new ArrayList<>(); 166 private AtomicLong execTimestamp = new AtomicLong(0); 167 168 public long getExecId() { 169 return execTimestamp.get(); 170 } 171 172 public long nextExecId() { 173 return execTimestamp.incrementAndGet(); 174 } 175 176 public void addToExecList(final TestProcedure proc) { 177 execList.add(proc); 178 } 179 180 public void assertSortedExecList(int numProcs) { 181 assertEquals(numProcs, execList.size()); 182 LOG.debug("EXEC LIST: " + execList); 183 for (int i = 0; i < execList.size() - 1; ++i) { 184 TestProcedure a = execList.get(i); 185 TestProcedure b = execList.get(i + 1); 186 assertTrue("exec list not sorted: " + a + " < " + b, a.getExecId() > b.getExecId()); 187 } 188 } 189 } 190 191 public static abstract class TestProcedure extends Procedure<TestProcedureEnv> { 192 protected long execId = 0; 193 protected int step = 0; 194 195 public long getExecId() { 196 return execId; 197 } 198 199 @Override 200 protected void rollback(TestProcedureEnv env) { } 201 202 @Override 203 protected boolean abort(TestProcedureEnv env) { 204 return true; 205 } 206 207 @Override 208 protected void serializeStateData(ProcedureStateSerializer serializer) 209 throws IOException { 210 Int64Value.Builder builder = Int64Value.newBuilder().setValue(execId); 211 serializer.serialize(builder.build()); 212 } 213 214 @Override 215 protected void deserializeStateData(ProcedureStateSerializer serializer) 216 throws IOException { 217 Int64Value value = serializer.deserialize(Int64Value.class); 218 execId = value.getValue(); 219 step = 2; 220 } 221 } 222 223 public static class TestSingleStepProcedure extends TestProcedure { 224 public TestSingleStepProcedure() { } 225 226 @Override 227 protected Procedure[] execute(TestProcedureEnv env) throws ProcedureYieldException { 228 LOG.trace("execute procedure step=" + step + ": " + this); 229 if (step == 0) { 230 step = 1; 231 execId = env.nextExecId(); 232 return new Procedure[] { this }; 233 } else if (step == 2) { 234 env.addToExecList(this); 235 return null; 236 } 237 throw new ProcedureYieldException(); 238 } 239 240 @Override 241 public String toString() { 242 return "SingleStep(procId=" + getProcId() + " execId=" + execId + ")"; 243 } 244 } 245 246 public static class TestTwoStepProcedure extends TestProcedure { 247 public TestTwoStepProcedure() { } 248 249 @Override 250 protected Procedure[] execute(TestProcedureEnv env) throws ProcedureYieldException { 251 LOG.trace("execute procedure step=" + step + ": " + this); 252 if (step == 0) { 253 step = 1; 254 execId = env.nextExecId(); 255 return new Procedure[] { new TestSingleStepProcedure() }; 256 } else if (step == 2) { 257 env.addToExecList(this); 258 return null; 259 } 260 throw new ProcedureYieldException(); 261 } 262 263 @Override 264 public String toString() { 265 return "TwoStep(procId=" + getProcId() + " execId=" + execId + ")"; 266 } 267 } 268}