001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.concurrent.atomic.AtomicLong;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
031import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
032import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
033import org.apache.hadoop.hbase.testclassification.LargeTests;
034import org.apache.hadoop.hbase.testclassification.MasterTests;
035import org.junit.After;
036import org.junit.Before;
037import org.junit.ClassRule;
038import org.junit.Ignore;
039import org.junit.Test;
040import org.junit.experimental.categories.Category;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.apache.hbase.thirdparty.com.google.protobuf.Int64Value;
045
046/**
047 * For now we do not guarantee this, we will restore the locks when restarting ProcedureExecutor so
048 * we should use lock to obtain the correct order. Ignored.
049 */
050@Ignore
051@Category({ MasterTests.class, LargeTests.class })
052public class TestProcedureReplayOrder {
053
054  @ClassRule
055  public static final HBaseClassTestRule CLASS_RULE =
056      HBaseClassTestRule.forClass(TestProcedureReplayOrder.class);
057
058  private static final Logger LOG = LoggerFactory.getLogger(TestProcedureReplayOrder.class);
059
060  private static final int NUM_THREADS = 16;
061
062  private ProcedureExecutor<TestProcedureEnv> procExecutor;
063  private TestProcedureEnv procEnv;
064  private ProcedureStore procStore;
065
066  private HBaseCommonTestingUtility htu;
067  private FileSystem fs;
068  private Path testDir;
069  private Path logDir;
070
071  @Before
072  public void setUp() throws IOException {
073    htu = new HBaseCommonTestingUtility();
074    htu.getConfiguration().setInt(WALProcedureStore.SYNC_WAIT_MSEC_CONF_KEY, 25);
075
076    testDir = htu.getDataTestDir();
077    fs = testDir.getFileSystem(htu.getConfiguration());
078    assertTrue(testDir.depth() > 1);
079
080    logDir = new Path(testDir, "proc-logs");
081    procEnv = new TestProcedureEnv();
082    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
083    procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore);
084    procStore.start(NUM_THREADS);
085    ProcedureTestingUtility.initAndStartWorkers(procExecutor, 1, true);
086  }
087
088  @After
089  public void tearDown() throws IOException {
090    procExecutor.stop();
091    procStore.stop(false);
092    fs.delete(logDir, true);
093  }
094
095  @Test
096  public void testSingleStepReplayOrder() throws Exception {
097    final int NUM_PROC_XTHREAD = 32;
098    final int NUM_PROCS = NUM_THREADS * NUM_PROC_XTHREAD;
099
100    // submit the procedures
101    submitProcedures(NUM_THREADS, NUM_PROC_XTHREAD, TestSingleStepProcedure.class);
102
103    while (procEnv.getExecId() < NUM_PROCS) {
104      Thread.sleep(100);
105    }
106
107    // restart the executor and allow the procedures to run
108    ProcedureTestingUtility.restart(procExecutor);
109
110    // wait the execution of all the procedures and
111    // assert that the execution order was sorted by procId
112    ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
113    procEnv.assertSortedExecList(NUM_PROCS);
114  }
115
116  @Test
117  public void testMultiStepReplayOrder() throws Exception {
118    final int NUM_PROC_XTHREAD = 24;
119    final int NUM_PROCS = NUM_THREADS * (NUM_PROC_XTHREAD * 2);
120
121    // submit the procedures
122    submitProcedures(NUM_THREADS, NUM_PROC_XTHREAD, TestTwoStepProcedure.class);
123
124    while (procEnv.getExecId() < NUM_PROCS) {
125      Thread.sleep(100);
126    }
127
128    // restart the executor and allow the procedures to run
129    ProcedureTestingUtility.restart(procExecutor);
130
131    // wait the execution of all the procedures and
132    // assert that the execution order was sorted by procId
133    ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
134    procEnv.assertSortedExecList(NUM_PROCS);
135  }
136
137  private void submitProcedures(final int nthreads, final int nprocPerThread,
138      final Class<?> procClazz) throws Exception {
139    Thread[] submitThreads = new Thread[nthreads];
140    for (int i = 0; i < submitThreads.length; ++i) {
141      submitThreads[i] = new Thread() {
142        @Override
143        public void run() {
144          for (int i = 0; i < nprocPerThread; ++i) {
145            try {
146              procExecutor.submitProcedure((Procedure)
147                procClazz.getDeclaredConstructor().newInstance());
148            } catch (Exception e) {
149              LOG.error("unable to instantiate the procedure", e);
150              fail("failure during the proc.newInstance(): " + e.getMessage());
151            }
152          }
153        }
154      };
155    }
156
157    for (int i = 0; i < submitThreads.length; ++i) {
158      submitThreads[i].start();
159    }
160
161    for (int i = 0; i < submitThreads.length; ++i) {
162      submitThreads[i].join();
163    }
164  }
165
166  private static class TestProcedureEnv {
167    private ArrayList<TestProcedure> execList = new ArrayList<>();
168    private AtomicLong execTimestamp = new AtomicLong(0);
169
170    public long getExecId() {
171      return execTimestamp.get();
172    }
173
174    public long nextExecId() {
175      return execTimestamp.incrementAndGet();
176    }
177
178    public void addToExecList(final TestProcedure proc) {
179      execList.add(proc);
180    }
181
182    public void assertSortedExecList(int numProcs) {
183      assertEquals(numProcs, execList.size());
184      LOG.debug("EXEC LIST: " + execList);
185      for (int i = 0; i < execList.size() - 1; ++i) {
186        TestProcedure a = execList.get(i);
187        TestProcedure b = execList.get(i + 1);
188        assertTrue("exec list not sorted: " + a + " < " + b, a.getExecId() > b.getExecId());
189      }
190    }
191  }
192
193  public static abstract class TestProcedure extends Procedure<TestProcedureEnv> {
194    protected long execId = 0;
195    protected int step = 0;
196
197    public long getExecId() {
198      return execId;
199    }
200
201    @Override
202    protected void rollback(TestProcedureEnv env) { }
203
204    @Override
205    protected boolean abort(TestProcedureEnv env) { return true; }
206
207    @Override
208    protected void serializeStateData(ProcedureStateSerializer serializer)
209        throws IOException {
210      Int64Value.Builder builder = Int64Value.newBuilder().setValue(execId);
211      serializer.serialize(builder.build());
212    }
213
214    @Override
215    protected void deserializeStateData(ProcedureStateSerializer serializer)
216        throws IOException {
217      Int64Value value = serializer.deserialize(Int64Value.class);
218      execId = value.getValue();
219      step = 2;
220    }
221  }
222
223  public static class TestSingleStepProcedure extends TestProcedure {
224    public TestSingleStepProcedure() { }
225
226    @Override
227    protected Procedure[] execute(TestProcedureEnv env) throws ProcedureYieldException {
228      LOG.trace("execute procedure step=" + step + ": " + this);
229      if (step == 0) {
230        step = 1;
231        execId = env.nextExecId();
232        return new Procedure[] { this };
233      } else if (step == 2) {
234        env.addToExecList(this);
235        return null;
236      }
237      throw new ProcedureYieldException();
238    }
239
240    @Override
241    public String toString() {
242      return "SingleStep(procId=" + getProcId() + " execId=" + execId + ")";
243    }
244  }
245
246  public static class TestTwoStepProcedure extends TestProcedure {
247    public TestTwoStepProcedure() { }
248
249    @Override
250    protected Procedure[] execute(TestProcedureEnv env) throws ProcedureYieldException {
251      LOG.trace("execute procedure step=" + step + ": " + this);
252      if (step == 0) {
253        step = 1;
254        execId = env.nextExecId();
255        return new Procedure[] { new TestSingleStepProcedure() };
256      } else if (step == 2) {
257        env.addToExecList(this);
258        return null;
259      }
260      throw new ProcedureYieldException();
261    }
262
263    @Override
264    public String toString() {
265      return "TwoStep(procId=" + getProcId() + " execId=" + execId + ")";
266    }
267  }
268}