001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022import static org.junit.jupiter.api.Assertions.fail; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.concurrent.atomic.AtomicLong; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 030import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 031import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; 032import org.apache.hadoop.hbase.testclassification.MasterTests; 033import org.apache.hadoop.hbase.testclassification.SmallTests; 034import org.junit.jupiter.api.AfterEach; 035import org.junit.jupiter.api.BeforeEach; 036import org.junit.jupiter.api.Disabled; 037import org.junit.jupiter.api.Tag; 038import org.junit.jupiter.api.Test; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042import org.apache.hbase.thirdparty.com.google.protobuf.Int64Value; 043 044/** 045 * For now we do not guarantee this, we will restore the locks when restarting ProcedureExecutor so 046 * we should use lock to obtain the correct order. Ignored. 047 */ 048@Disabled 049@Tag(MasterTests.TAG) 050@Tag(SmallTests.TAG) 051public class TestProcedureReplayOrder { 052 private static final Logger LOG = LoggerFactory.getLogger(TestProcedureReplayOrder.class); 053 054 private static final int NUM_THREADS = 16; 055 056 private ProcedureExecutor<TestProcedureEnv> procExecutor; 057 private TestProcedureEnv procEnv; 058 private ProcedureStore procStore; 059 060 private HBaseCommonTestingUtil htu; 061 private FileSystem fs; 062 private Path testDir; 063 private Path logDir; 064 065 @BeforeEach 066 public void setUp() throws IOException { 067 htu = new HBaseCommonTestingUtil(); 068 htu.getConfiguration().setInt(WALProcedureStore.SYNC_WAIT_MSEC_CONF_KEY, 25); 069 070 testDir = htu.getDataTestDir(); 071 fs = testDir.getFileSystem(htu.getConfiguration()); 072 assertTrue(testDir.depth() > 1); 073 074 logDir = new Path(testDir, "proc-logs"); 075 procEnv = new TestProcedureEnv(); 076 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); 077 procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore); 078 procStore.start(NUM_THREADS); 079 ProcedureTestingUtility.initAndStartWorkers(procExecutor, 1, true); 080 } 081 082 @AfterEach 083 public void tearDown() throws IOException { 084 procExecutor.stop(); 085 procStore.stop(false); 086 fs.delete(logDir, true); 087 } 088 089 @Test 090 public void testSingleStepReplayOrder() throws Exception { 091 final int NUM_PROC_XTHREAD = 32; 092 final int NUM_PROCS = NUM_THREADS * NUM_PROC_XTHREAD; 093 094 // submit the procedures 095 submitProcedures(NUM_THREADS, NUM_PROC_XTHREAD, TestSingleStepProcedure.class); 096 097 while (procEnv.getExecId() < NUM_PROCS) { 098 Thread.sleep(100); 099 } 100 101 // restart the executor and allow the procedures to run 102 ProcedureTestingUtility.restart(procExecutor); 103 104 // wait the execution of all the procedures and 105 // assert that the execution order was sorted by procId 106 ProcedureTestingUtility.waitNoProcedureRunning(procExecutor); 107 procEnv.assertSortedExecList(NUM_PROCS); 108 } 109 110 @Test 111 public void testMultiStepReplayOrder() throws Exception { 112 final int NUM_PROC_XTHREAD = 24; 113 final int NUM_PROCS = NUM_THREADS * (NUM_PROC_XTHREAD * 2); 114 115 // submit the procedures 116 submitProcedures(NUM_THREADS, NUM_PROC_XTHREAD, TestTwoStepProcedure.class); 117 118 while (procEnv.getExecId() < NUM_PROCS) { 119 Thread.sleep(100); 120 } 121 122 // restart the executor and allow the procedures to run 123 ProcedureTestingUtility.restart(procExecutor); 124 125 // wait the execution of all the procedures and 126 // assert that the execution order was sorted by procId 127 ProcedureTestingUtility.waitNoProcedureRunning(procExecutor); 128 procEnv.assertSortedExecList(NUM_PROCS); 129 } 130 131 private void submitProcedures(final int nthreads, final int nprocPerThread, 132 final Class<?> procClazz) throws Exception { 133 Thread[] submitThreads = new Thread[nthreads]; 134 for (int i = 0; i < submitThreads.length; ++i) { 135 submitThreads[i] = new Thread() { 136 @Override 137 public void run() { 138 for (int i = 0; i < nprocPerThread; ++i) { 139 try { 140 procExecutor 141 .submitProcedure((Procedure) procClazz.getDeclaredConstructor().newInstance()); 142 } catch (Exception e) { 143 LOG.error("unable to instantiate the procedure", e); 144 fail("failure during the proc.newInstance(): " + e.getMessage()); 145 } 146 } 147 } 148 }; 149 } 150 151 for (int i = 0; i < submitThreads.length; ++i) { 152 submitThreads[i].start(); 153 } 154 155 for (int i = 0; i < submitThreads.length; ++i) { 156 submitThreads[i].join(); 157 } 158 } 159 160 private static class TestProcedureEnv { 161 private ArrayList<TestProcedure> execList = new ArrayList<>(); 162 private AtomicLong execTimestamp = new AtomicLong(0); 163 164 public long getExecId() { 165 return execTimestamp.get(); 166 } 167 168 public long nextExecId() { 169 return execTimestamp.incrementAndGet(); 170 } 171 172 public void addToExecList(final TestProcedure proc) { 173 execList.add(proc); 174 } 175 176 public void assertSortedExecList(int numProcs) { 177 assertEquals(numProcs, execList.size()); 178 LOG.debug("EXEC LIST: " + execList); 179 for (int i = 0; i < execList.size() - 1; ++i) { 180 TestProcedure a = execList.get(i); 181 TestProcedure b = execList.get(i + 1); 182 assertTrue(a.getExecId() > b.getExecId(), "exec list not sorted: " + a + " < " + b); 183 } 184 } 185 } 186 187 public static abstract class TestProcedure extends Procedure<TestProcedureEnv> { 188 protected long execId = 0; 189 protected int step = 0; 190 191 public long getExecId() { 192 return execId; 193 } 194 195 @Override 196 protected void rollback(TestProcedureEnv env) { 197 } 198 199 @Override 200 protected boolean abort(TestProcedureEnv env) { 201 return true; 202 } 203 204 @Override 205 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 206 Int64Value.Builder builder = Int64Value.newBuilder().setValue(execId); 207 serializer.serialize(builder.build()); 208 } 209 210 @Override 211 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 212 Int64Value value = serializer.deserialize(Int64Value.class); 213 execId = value.getValue(); 214 step = 2; 215 } 216 } 217 218 public static class TestSingleStepProcedure extends TestProcedure { 219 public TestSingleStepProcedure() { 220 } 221 222 @Override 223 protected Procedure[] execute(TestProcedureEnv env) throws ProcedureYieldException { 224 LOG.trace("execute procedure step=" + step + ": " + this); 225 if (step == 0) { 226 step = 1; 227 execId = env.nextExecId(); 228 return new Procedure[] { this }; 229 } else if (step == 2) { 230 env.addToExecList(this); 231 return null; 232 } 233 throw new ProcedureYieldException(); 234 } 235 236 @Override 237 public String toString() { 238 return "SingleStep(procId=" + getProcId() + " execId=" + execId + ")"; 239 } 240 } 241 242 public static class TestTwoStepProcedure extends TestProcedure { 243 public TestTwoStepProcedure() { 244 } 245 246 @Override 247 protected Procedure[] execute(TestProcedureEnv env) throws ProcedureYieldException { 248 LOG.trace("execute procedure step=" + step + ": " + this); 249 if (step == 0) { 250 step = 1; 251 execId = env.nextExecId(); 252 return new Procedure[] { new TestSingleStepProcedure() }; 253 } else if (step == 2) { 254 env.addToExecList(this); 255 return null; 256 } 257 throw new ProcedureYieldException(); 258 } 259 260 @Override 261 public String toString() { 262 return "TwoStep(procId=" + getProcId() + " execId=" + execId + ")"; 263 } 264 } 265}