001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022import static org.junit.jupiter.api.Assertions.fail;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.concurrent.atomic.AtomicLong;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
030import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
031import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
032import org.apache.hadoop.hbase.testclassification.MasterTests;
033import org.apache.hadoop.hbase.testclassification.SmallTests;
034import org.junit.jupiter.api.AfterEach;
035import org.junit.jupiter.api.BeforeEach;
036import org.junit.jupiter.api.Disabled;
037import org.junit.jupiter.api.Tag;
038import org.junit.jupiter.api.Test;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import org.apache.hbase.thirdparty.com.google.protobuf.Int64Value;
043
044/**
045 * For now we do not guarantee this, we will restore the locks when restarting ProcedureExecutor so
046 * we should use lock to obtain the correct order. Ignored.
047 */
048@Disabled
049@Tag(MasterTests.TAG)
050@Tag(SmallTests.TAG)
051public class TestProcedureReplayOrder {
052  private static final Logger LOG = LoggerFactory.getLogger(TestProcedureReplayOrder.class);
053
054  private static final int NUM_THREADS = 16;
055
056  private ProcedureExecutor<TestProcedureEnv> procExecutor;
057  private TestProcedureEnv procEnv;
058  private ProcedureStore procStore;
059
060  private HBaseCommonTestingUtil htu;
061  private FileSystem fs;
062  private Path testDir;
063  private Path logDir;
064
065  @BeforeEach
066  public void setUp() throws IOException {
067    htu = new HBaseCommonTestingUtil();
068    htu.getConfiguration().setInt(WALProcedureStore.SYNC_WAIT_MSEC_CONF_KEY, 25);
069
070    testDir = htu.getDataTestDir();
071    fs = testDir.getFileSystem(htu.getConfiguration());
072    assertTrue(testDir.depth() > 1);
073
074    logDir = new Path(testDir, "proc-logs");
075    procEnv = new TestProcedureEnv();
076    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
077    procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore);
078    procStore.start(NUM_THREADS);
079    ProcedureTestingUtility.initAndStartWorkers(procExecutor, 1, true);
080  }
081
082  @AfterEach
083  public void tearDown() throws IOException {
084    procExecutor.stop();
085    procStore.stop(false);
086    fs.delete(logDir, true);
087  }
088
089  @Test
090  public void testSingleStepReplayOrder() throws Exception {
091    final int NUM_PROC_XTHREAD = 32;
092    final int NUM_PROCS = NUM_THREADS * NUM_PROC_XTHREAD;
093
094    // submit the procedures
095    submitProcedures(NUM_THREADS, NUM_PROC_XTHREAD, TestSingleStepProcedure.class);
096
097    while (procEnv.getExecId() < NUM_PROCS) {
098      Thread.sleep(100);
099    }
100
101    // restart the executor and allow the procedures to run
102    ProcedureTestingUtility.restart(procExecutor);
103
104    // wait the execution of all the procedures and
105    // assert that the execution order was sorted by procId
106    ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
107    procEnv.assertSortedExecList(NUM_PROCS);
108  }
109
110  @Test
111  public void testMultiStepReplayOrder() throws Exception {
112    final int NUM_PROC_XTHREAD = 24;
113    final int NUM_PROCS = NUM_THREADS * (NUM_PROC_XTHREAD * 2);
114
115    // submit the procedures
116    submitProcedures(NUM_THREADS, NUM_PROC_XTHREAD, TestTwoStepProcedure.class);
117
118    while (procEnv.getExecId() < NUM_PROCS) {
119      Thread.sleep(100);
120    }
121
122    // restart the executor and allow the procedures to run
123    ProcedureTestingUtility.restart(procExecutor);
124
125    // wait the execution of all the procedures and
126    // assert that the execution order was sorted by procId
127    ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
128    procEnv.assertSortedExecList(NUM_PROCS);
129  }
130
131  private void submitProcedures(final int nthreads, final int nprocPerThread,
132    final Class<?> procClazz) throws Exception {
133    Thread[] submitThreads = new Thread[nthreads];
134    for (int i = 0; i < submitThreads.length; ++i) {
135      submitThreads[i] = new Thread() {
136        @Override
137        public void run() {
138          for (int i = 0; i < nprocPerThread; ++i) {
139            try {
140              procExecutor
141                .submitProcedure((Procedure) procClazz.getDeclaredConstructor().newInstance());
142            } catch (Exception e) {
143              LOG.error("unable to instantiate the procedure", e);
144              fail("failure during the proc.newInstance(): " + e.getMessage());
145            }
146          }
147        }
148      };
149    }
150
151    for (int i = 0; i < submitThreads.length; ++i) {
152      submitThreads[i].start();
153    }
154
155    for (int i = 0; i < submitThreads.length; ++i) {
156      submitThreads[i].join();
157    }
158  }
159
160  private static class TestProcedureEnv {
161    private ArrayList<TestProcedure> execList = new ArrayList<>();
162    private AtomicLong execTimestamp = new AtomicLong(0);
163
164    public long getExecId() {
165      return execTimestamp.get();
166    }
167
168    public long nextExecId() {
169      return execTimestamp.incrementAndGet();
170    }
171
172    public void addToExecList(final TestProcedure proc) {
173      execList.add(proc);
174    }
175
176    public void assertSortedExecList(int numProcs) {
177      assertEquals(numProcs, execList.size());
178      LOG.debug("EXEC LIST: " + execList);
179      for (int i = 0; i < execList.size() - 1; ++i) {
180        TestProcedure a = execList.get(i);
181        TestProcedure b = execList.get(i + 1);
182        assertTrue(a.getExecId() > b.getExecId(), "exec list not sorted: " + a + " < " + b);
183      }
184    }
185  }
186
187  public static abstract class TestProcedure extends Procedure<TestProcedureEnv> {
188    protected long execId = 0;
189    protected int step = 0;
190
191    public long getExecId() {
192      return execId;
193    }
194
195    @Override
196    protected void rollback(TestProcedureEnv env) {
197    }
198
199    @Override
200    protected boolean abort(TestProcedureEnv env) {
201      return true;
202    }
203
204    @Override
205    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
206      Int64Value.Builder builder = Int64Value.newBuilder().setValue(execId);
207      serializer.serialize(builder.build());
208    }
209
210    @Override
211    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
212      Int64Value value = serializer.deserialize(Int64Value.class);
213      execId = value.getValue();
214      step = 2;
215    }
216  }
217
218  public static class TestSingleStepProcedure extends TestProcedure {
219    public TestSingleStepProcedure() {
220    }
221
222    @Override
223    protected Procedure[] execute(TestProcedureEnv env) throws ProcedureYieldException {
224      LOG.trace("execute procedure step=" + step + ": " + this);
225      if (step == 0) {
226        step = 1;
227        execId = env.nextExecId();
228        return new Procedure[] { this };
229      } else if (step == 2) {
230        env.addToExecList(this);
231        return null;
232      }
233      throw new ProcedureYieldException();
234    }
235
236    @Override
237    public String toString() {
238      return "SingleStep(procId=" + getProcId() + " execId=" + execId + ")";
239    }
240  }
241
242  public static class TestTwoStepProcedure extends TestProcedure {
243    public TestTwoStepProcedure() {
244    }
245
246    @Override
247    protected Procedure[] execute(TestProcedureEnv env) throws ProcedureYieldException {
248      LOG.trace("execute procedure step=" + step + ": " + this);
249      if (step == 0) {
250        step = 1;
251        execId = env.nextExecId();
252        return new Procedure[] { new TestSingleStepProcedure() };
253      } else if (step == 2) {
254        env.addToExecList(this);
255        return null;
256      }
257      throw new ProcedureYieldException();
258    }
259
260    @Override
261    public String toString() {
262      return "TwoStep(procId=" + getProcId() + " execId=" + execId + ")";
263    }
264  }
265}