001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.concurrent.atomic.AtomicLong;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
030import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
031import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
032import org.apache.hadoop.hbase.testclassification.MasterTests;
033import org.apache.hadoop.hbase.testclassification.SmallTests;
034import org.junit.After;
035import org.junit.Before;
036import org.junit.ClassRule;
037import org.junit.Ignore;
038import org.junit.Test;
039import org.junit.experimental.categories.Category;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hbase.thirdparty.com.google.protobuf.Int64Value;
044
045/**
046 * For now we do not guarantee this, we will restore the locks when restarting ProcedureExecutor so
047 * we should use lock to obtain the correct order. Ignored.
048 */
049@Ignore
050@Category({ MasterTests.class, SmallTests.class })
051public class TestProcedureReplayOrder {
052  @ClassRule
053  public static final HBaseClassTestRule CLASS_RULE =
054      HBaseClassTestRule.forClass(TestProcedureReplayOrder.class);
055
056  private static final Logger LOG = LoggerFactory.getLogger(TestProcedureReplayOrder.class);
057
058  private static final int NUM_THREADS = 16;
059
060  private ProcedureExecutor<TestProcedureEnv> procExecutor;
061  private TestProcedureEnv procEnv;
062  private ProcedureStore procStore;
063
064  private HBaseCommonTestingUtility htu;
065  private FileSystem fs;
066  private Path testDir;
067  private Path logDir;
068
069  @Before
070  public void setUp() throws IOException {
071    htu = new HBaseCommonTestingUtility();
072    htu.getConfiguration().setInt(WALProcedureStore.SYNC_WAIT_MSEC_CONF_KEY, 25);
073
074    testDir = htu.getDataTestDir();
075    fs = testDir.getFileSystem(htu.getConfiguration());
076    assertTrue(testDir.depth() > 1);
077
078    logDir = new Path(testDir, "proc-logs");
079    procEnv = new TestProcedureEnv();
080    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
081    procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore);
082    procStore.start(NUM_THREADS);
083    ProcedureTestingUtility.initAndStartWorkers(procExecutor, 1, true);
084  }
085
086  @After
087  public void tearDown() throws IOException {
088    procExecutor.stop();
089    procStore.stop(false);
090    fs.delete(logDir, true);
091  }
092
093  @Test
094  public void testSingleStepReplayOrder() throws Exception {
095    final int NUM_PROC_XTHREAD = 32;
096    final int NUM_PROCS = NUM_THREADS * NUM_PROC_XTHREAD;
097
098    // submit the procedures
099    submitProcedures(NUM_THREADS, NUM_PROC_XTHREAD, TestSingleStepProcedure.class);
100
101    while (procEnv.getExecId() < NUM_PROCS) {
102      Thread.sleep(100);
103    }
104
105    // restart the executor and allow the procedures to run
106    ProcedureTestingUtility.restart(procExecutor);
107
108    // wait the execution of all the procedures and
109    // assert that the execution order was sorted by procId
110    ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
111    procEnv.assertSortedExecList(NUM_PROCS);
112  }
113
114  @Test
115  public void testMultiStepReplayOrder() throws Exception {
116    final int NUM_PROC_XTHREAD = 24;
117    final int NUM_PROCS = NUM_THREADS * (NUM_PROC_XTHREAD * 2);
118
119    // submit the procedures
120    submitProcedures(NUM_THREADS, NUM_PROC_XTHREAD, TestTwoStepProcedure.class);
121
122    while (procEnv.getExecId() < NUM_PROCS) {
123      Thread.sleep(100);
124    }
125
126    // restart the executor and allow the procedures to run
127    ProcedureTestingUtility.restart(procExecutor);
128
129    // wait the execution of all the procedures and
130    // assert that the execution order was sorted by procId
131    ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
132    procEnv.assertSortedExecList(NUM_PROCS);
133  }
134
135  private void submitProcedures(final int nthreads, final int nprocPerThread,
136      final Class<?> procClazz) throws Exception {
137    Thread[] submitThreads = new Thread[nthreads];
138    for (int i = 0; i < submitThreads.length; ++i) {
139      submitThreads[i] = new Thread() {
140        @Override
141        public void run() {
142          for (int i = 0; i < nprocPerThread; ++i) {
143            try {
144              procExecutor.submitProcedure((Procedure)
145                procClazz.getDeclaredConstructor().newInstance());
146            } catch (Exception e) {
147              LOG.error("unable to instantiate the procedure", e);
148              fail("failure during the proc.newInstance(): " + e.getMessage());
149            }
150          }
151        }
152      };
153    }
154
155    for (int i = 0; i < submitThreads.length; ++i) {
156      submitThreads[i].start();
157    }
158
159    for (int i = 0; i < submitThreads.length; ++i) {
160      submitThreads[i].join();
161    }
162  }
163
164  private static class TestProcedureEnv {
165    private ArrayList<TestProcedure> execList = new ArrayList<>();
166    private AtomicLong execTimestamp = new AtomicLong(0);
167
168    public long getExecId() {
169      return execTimestamp.get();
170    }
171
172    public long nextExecId() {
173      return execTimestamp.incrementAndGet();
174    }
175
176    public void addToExecList(final TestProcedure proc) {
177      execList.add(proc);
178    }
179
180    public void assertSortedExecList(int numProcs) {
181      assertEquals(numProcs, execList.size());
182      LOG.debug("EXEC LIST: " + execList);
183      for (int i = 0; i < execList.size() - 1; ++i) {
184        TestProcedure a = execList.get(i);
185        TestProcedure b = execList.get(i + 1);
186        assertTrue("exec list not sorted: " + a + " < " + b, a.getExecId() > b.getExecId());
187      }
188    }
189  }
190
191  public static abstract class TestProcedure extends Procedure<TestProcedureEnv> {
192    protected long execId = 0;
193    protected int step = 0;
194
195    public long getExecId() {
196      return execId;
197    }
198
199    @Override
200    protected void rollback(TestProcedureEnv env) { }
201
202    @Override
203    protected boolean abort(TestProcedureEnv env) {
204      return true;
205    }
206
207    @Override
208    protected void serializeStateData(ProcedureStateSerializer serializer)
209        throws IOException {
210      Int64Value.Builder builder = Int64Value.newBuilder().setValue(execId);
211      serializer.serialize(builder.build());
212    }
213
214    @Override
215    protected void deserializeStateData(ProcedureStateSerializer serializer)
216        throws IOException {
217      Int64Value value = serializer.deserialize(Int64Value.class);
218      execId = value.getValue();
219      step = 2;
220    }
221  }
222
223  public static class TestSingleStepProcedure extends TestProcedure {
224    public TestSingleStepProcedure() { }
225
226    @Override
227    protected Procedure[] execute(TestProcedureEnv env) throws ProcedureYieldException {
228      LOG.trace("execute procedure step=" + step + ": " + this);
229      if (step == 0) {
230        step = 1;
231        execId = env.nextExecId();
232        return new Procedure[] { this };
233      } else if (step == 2) {
234        env.addToExecList(this);
235        return null;
236      }
237      throw new ProcedureYieldException();
238    }
239
240    @Override
241    public String toString() {
242      return "SingleStep(procId=" + getProcId() + " execId=" + execId + ")";
243    }
244  }
245
246  public static class TestTwoStepProcedure extends TestProcedure {
247    public TestTwoStepProcedure() { }
248
249    @Override
250    protected Procedure[] execute(TestProcedureEnv env) throws ProcedureYieldException {
251      LOG.trace("execute procedure step=" + step + ": " + this);
252      if (step == 0) {
253        step = 1;
254        execId = env.nextExecId();
255        return new Procedure[] { new TestSingleStepProcedure() };
256      } else if (step == 2) {
257        env.addToExecList(this);
258        return null;
259      }
260      throw new ProcedureYieldException();
261    }
262
263    @Override
264    public String toString() {
265      return "TwoStep(procId=" + getProcId() + " execId=" + execId + ")";
266    }
267  }
268}