001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022import static org.junit.Assert.fail; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.concurrent.atomic.AtomicLong; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 031import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 032import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; 033import org.apache.hadoop.hbase.testclassification.LargeTests; 034import org.apache.hadoop.hbase.testclassification.MasterTests; 035import org.junit.After; 036import org.junit.Before; 037import org.junit.ClassRule; 038import org.junit.Ignore; 039import org.junit.Test; 040import org.junit.experimental.categories.Category; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import org.apache.hbase.thirdparty.com.google.protobuf.Int64Value; 045 046/** 047 * For now we do not guarantee this, we will restore the locks when restarting ProcedureExecutor so 048 * we should use lock to obtain the correct order. Ignored. 049 */ 050@Ignore 051@Category({ MasterTests.class, LargeTests.class }) 052public class TestProcedureReplayOrder { 053 054 @ClassRule 055 public static final HBaseClassTestRule CLASS_RULE = 056 HBaseClassTestRule.forClass(TestProcedureReplayOrder.class); 057 058 private static final Logger LOG = LoggerFactory.getLogger(TestProcedureReplayOrder.class); 059 060 private static final int NUM_THREADS = 16; 061 062 private ProcedureExecutor<TestProcedureEnv> procExecutor; 063 private TestProcedureEnv procEnv; 064 private ProcedureStore procStore; 065 066 private HBaseCommonTestingUtility htu; 067 private FileSystem fs; 068 private Path testDir; 069 private Path logDir; 070 071 @Before 072 public void setUp() throws IOException { 073 htu = new HBaseCommonTestingUtility(); 074 htu.getConfiguration().setInt(WALProcedureStore.SYNC_WAIT_MSEC_CONF_KEY, 25); 075 076 testDir = htu.getDataTestDir(); 077 fs = testDir.getFileSystem(htu.getConfiguration()); 078 assertTrue(testDir.depth() > 1); 079 080 logDir = new Path(testDir, "proc-logs"); 081 procEnv = new TestProcedureEnv(); 082 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); 083 procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore); 084 procStore.start(NUM_THREADS); 085 ProcedureTestingUtility.initAndStartWorkers(procExecutor, 1, true); 086 } 087 088 @After 089 public void tearDown() throws IOException { 090 procExecutor.stop(); 091 procStore.stop(false); 092 fs.delete(logDir, true); 093 } 094 095 @Test 096 public void testSingleStepReplayOrder() throws Exception { 097 final int NUM_PROC_XTHREAD = 32; 098 final int NUM_PROCS = NUM_THREADS * NUM_PROC_XTHREAD; 099 100 // submit the procedures 101 submitProcedures(NUM_THREADS, NUM_PROC_XTHREAD, TestSingleStepProcedure.class); 102 103 while (procEnv.getExecId() < NUM_PROCS) { 104 Thread.sleep(100); 105 } 106 107 // restart the executor and allow the procedures to run 108 ProcedureTestingUtility.restart(procExecutor); 109 110 // wait the execution of all the procedures and 111 // assert that the execution order was sorted by procId 112 ProcedureTestingUtility.waitNoProcedureRunning(procExecutor); 113 procEnv.assertSortedExecList(NUM_PROCS); 114 } 115 116 @Test 117 public void testMultiStepReplayOrder() throws Exception { 118 final int NUM_PROC_XTHREAD = 24; 119 final int NUM_PROCS = NUM_THREADS * (NUM_PROC_XTHREAD * 2); 120 121 // submit the procedures 122 submitProcedures(NUM_THREADS, NUM_PROC_XTHREAD, TestTwoStepProcedure.class); 123 124 while (procEnv.getExecId() < NUM_PROCS) { 125 Thread.sleep(100); 126 } 127 128 // restart the executor and allow the procedures to run 129 ProcedureTestingUtility.restart(procExecutor); 130 131 // wait the execution of all the procedures and 132 // assert that the execution order was sorted by procId 133 ProcedureTestingUtility.waitNoProcedureRunning(procExecutor); 134 procEnv.assertSortedExecList(NUM_PROCS); 135 } 136 137 private void submitProcedures(final int nthreads, final int nprocPerThread, 138 final Class<?> procClazz) throws Exception { 139 Thread[] submitThreads = new Thread[nthreads]; 140 for (int i = 0; i < submitThreads.length; ++i) { 141 submitThreads[i] = new Thread() { 142 @Override 143 public void run() { 144 for (int i = 0; i < nprocPerThread; ++i) { 145 try { 146 procExecutor.submitProcedure((Procedure) 147 procClazz.getDeclaredConstructor().newInstance()); 148 } catch (Exception e) { 149 LOG.error("unable to instantiate the procedure", e); 150 fail("failure during the proc.newInstance(): " + e.getMessage()); 151 } 152 } 153 } 154 }; 155 } 156 157 for (int i = 0; i < submitThreads.length; ++i) { 158 submitThreads[i].start(); 159 } 160 161 for (int i = 0; i < submitThreads.length; ++i) { 162 submitThreads[i].join(); 163 } 164 } 165 166 private static class TestProcedureEnv { 167 private ArrayList<TestProcedure> execList = new ArrayList<>(); 168 private AtomicLong execTimestamp = new AtomicLong(0); 169 170 public long getExecId() { 171 return execTimestamp.get(); 172 } 173 174 public long nextExecId() { 175 return execTimestamp.incrementAndGet(); 176 } 177 178 public void addToExecList(final TestProcedure proc) { 179 execList.add(proc); 180 } 181 182 public void assertSortedExecList(int numProcs) { 183 assertEquals(numProcs, execList.size()); 184 LOG.debug("EXEC LIST: " + execList); 185 for (int i = 0; i < execList.size() - 1; ++i) { 186 TestProcedure a = execList.get(i); 187 TestProcedure b = execList.get(i + 1); 188 assertTrue("exec list not sorted: " + a + " < " + b, a.getExecId() > b.getExecId()); 189 } 190 } 191 } 192 193 public static abstract class TestProcedure extends Procedure<TestProcedureEnv> { 194 protected long execId = 0; 195 protected int step = 0; 196 197 public long getExecId() { 198 return execId; 199 } 200 201 @Override 202 protected void rollback(TestProcedureEnv env) { } 203 204 @Override 205 protected boolean abort(TestProcedureEnv env) { return true; } 206 207 @Override 208 protected void serializeStateData(ProcedureStateSerializer serializer) 209 throws IOException { 210 Int64Value.Builder builder = Int64Value.newBuilder().setValue(execId); 211 serializer.serialize(builder.build()); 212 } 213 214 @Override 215 protected void deserializeStateData(ProcedureStateSerializer serializer) 216 throws IOException { 217 Int64Value value = serializer.deserialize(Int64Value.class); 218 execId = value.getValue(); 219 step = 2; 220 } 221 } 222 223 public static class TestSingleStepProcedure extends TestProcedure { 224 public TestSingleStepProcedure() { } 225 226 @Override 227 protected Procedure[] execute(TestProcedureEnv env) throws ProcedureYieldException { 228 LOG.trace("execute procedure step=" + step + ": " + this); 229 if (step == 0) { 230 step = 1; 231 execId = env.nextExecId(); 232 return new Procedure[] { this }; 233 } else if (step == 2) { 234 env.addToExecList(this); 235 return null; 236 } 237 throw new ProcedureYieldException(); 238 } 239 240 @Override 241 public String toString() { 242 return "SingleStep(procId=" + getProcId() + " execId=" + execId + ")"; 243 } 244 } 245 246 public static class TestTwoStepProcedure extends TestProcedure { 247 public TestTwoStepProcedure() { } 248 249 @Override 250 protected Procedure[] execute(TestProcedureEnv env) throws ProcedureYieldException { 251 LOG.trace("execute procedure step=" + step + ": " + this); 252 if (step == 0) { 253 step = 1; 254 execId = env.nextExecId(); 255 return new Procedure[] { new TestSingleStepProcedure() }; 256 } else if (step == 2) { 257 env.addToExecList(this); 258 return null; 259 } 260 throw new ProcedureYieldException(); 261 } 262 263 @Override 264 public String toString() { 265 return "TwoStep(procId=" + getProcId() + " execId=" + execId + ")"; 266 } 267 } 268}