001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.concurrent.atomic.AtomicLong;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
031import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
032import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
033import org.apache.hadoop.hbase.testclassification.MasterTests;
034import org.apache.hadoop.hbase.testclassification.SmallTests;
035import org.junit.After;
036import org.junit.Before;
037import org.junit.ClassRule;
038import org.junit.Ignore;
039import org.junit.Test;
040import org.junit.experimental.categories.Category;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.apache.hbase.thirdparty.com.google.protobuf.Int64Value;
045
046/**
047 * For now we do not guarantee this, we will restore the locks when restarting ProcedureExecutor so
048 * we should use lock to obtain the correct order. Ignored.
049 */
050@Ignore
051@Category({ MasterTests.class, SmallTests.class })
052public class TestProcedureReplayOrder {
053  @ClassRule
054  public static final HBaseClassTestRule CLASS_RULE =
055    HBaseClassTestRule.forClass(TestProcedureReplayOrder.class);
056
057  private static final Logger LOG = LoggerFactory.getLogger(TestProcedureReplayOrder.class);
058
059  private static final int NUM_THREADS = 16;
060
061  private ProcedureExecutor<TestProcedureEnv> procExecutor;
062  private TestProcedureEnv procEnv;
063  private ProcedureStore procStore;
064
065  private HBaseCommonTestingUtility htu;
066  private FileSystem fs;
067  private Path testDir;
068  private Path logDir;
069
070  @Before
071  public void setUp() throws IOException {
072    htu = new HBaseCommonTestingUtility();
073    htu.getConfiguration().setInt(WALProcedureStore.SYNC_WAIT_MSEC_CONF_KEY, 25);
074
075    testDir = htu.getDataTestDir();
076    fs = testDir.getFileSystem(htu.getConfiguration());
077    assertTrue(testDir.depth() > 1);
078
079    logDir = new Path(testDir, "proc-logs");
080    procEnv = new TestProcedureEnv();
081    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
082    procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore);
083    procStore.start(NUM_THREADS);
084    ProcedureTestingUtility.initAndStartWorkers(procExecutor, 1, true);
085  }
086
087  @After
088  public void tearDown() throws IOException {
089    procExecutor.stop();
090    procStore.stop(false);
091    fs.delete(logDir, true);
092  }
093
094  @Test
095  public void testSingleStepReplayOrder() throws Exception {
096    final int NUM_PROC_XTHREAD = 32;
097    final int NUM_PROCS = NUM_THREADS * NUM_PROC_XTHREAD;
098
099    // submit the procedures
100    submitProcedures(NUM_THREADS, NUM_PROC_XTHREAD, TestSingleStepProcedure.class);
101
102    while (procEnv.getExecId() < NUM_PROCS) {
103      Thread.sleep(100);
104    }
105
106    // restart the executor and allow the procedures to run
107    ProcedureTestingUtility.restart(procExecutor);
108
109    // wait the execution of all the procedures and
110    // assert that the execution order was sorted by procId
111    ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
112    procEnv.assertSortedExecList(NUM_PROCS);
113  }
114
115  @Test
116  public void testMultiStepReplayOrder() throws Exception {
117    final int NUM_PROC_XTHREAD = 24;
118    final int NUM_PROCS = NUM_THREADS * (NUM_PROC_XTHREAD * 2);
119
120    // submit the procedures
121    submitProcedures(NUM_THREADS, NUM_PROC_XTHREAD, TestTwoStepProcedure.class);
122
123    while (procEnv.getExecId() < NUM_PROCS) {
124      Thread.sleep(100);
125    }
126
127    // restart the executor and allow the procedures to run
128    ProcedureTestingUtility.restart(procExecutor);
129
130    // wait the execution of all the procedures and
131    // assert that the execution order was sorted by procId
132    ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
133    procEnv.assertSortedExecList(NUM_PROCS);
134  }
135
136  private void submitProcedures(final int nthreads, final int nprocPerThread,
137    final Class<?> procClazz) throws Exception {
138    Thread[] submitThreads = new Thread[nthreads];
139    for (int i = 0; i < submitThreads.length; ++i) {
140      submitThreads[i] = new Thread() {
141        @Override
142        public void run() {
143          for (int i = 0; i < nprocPerThread; ++i) {
144            try {
145              procExecutor
146                .submitProcedure((Procedure) procClazz.getDeclaredConstructor().newInstance());
147            } catch (Exception e) {
148              LOG.error("unable to instantiate the procedure", e);
149              fail("failure during the proc.newInstance(): " + e.getMessage());
150            }
151          }
152        }
153      };
154    }
155
156    for (int i = 0; i < submitThreads.length; ++i) {
157      submitThreads[i].start();
158    }
159
160    for (int i = 0; i < submitThreads.length; ++i) {
161      submitThreads[i].join();
162    }
163  }
164
165  private static class TestProcedureEnv {
166    private ArrayList<TestProcedure> execList = new ArrayList<>();
167    private AtomicLong execTimestamp = new AtomicLong(0);
168
169    public long getExecId() {
170      return execTimestamp.get();
171    }
172
173    public long nextExecId() {
174      return execTimestamp.incrementAndGet();
175    }
176
177    public void addToExecList(final TestProcedure proc) {
178      execList.add(proc);
179    }
180
181    public void assertSortedExecList(int numProcs) {
182      assertEquals(numProcs, execList.size());
183      LOG.debug("EXEC LIST: " + execList);
184      for (int i = 0; i < execList.size() - 1; ++i) {
185        TestProcedure a = execList.get(i);
186        TestProcedure b = execList.get(i + 1);
187        assertTrue("exec list not sorted: " + a + " < " + b, a.getExecId() > b.getExecId());
188      }
189    }
190  }
191
192  public static abstract class TestProcedure extends Procedure<TestProcedureEnv> {
193    protected long execId = 0;
194    protected int step = 0;
195
196    public long getExecId() {
197      return execId;
198    }
199
200    @Override
201    protected void rollback(TestProcedureEnv env) {
202    }
203
204    @Override
205    protected boolean abort(TestProcedureEnv env) {
206      return true;
207    }
208
209    @Override
210    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
211      Int64Value.Builder builder = Int64Value.newBuilder().setValue(execId);
212      serializer.serialize(builder.build());
213    }
214
215    @Override
216    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
217      Int64Value value = serializer.deserialize(Int64Value.class);
218      execId = value.getValue();
219      step = 2;
220    }
221  }
222
223  public static class TestSingleStepProcedure extends TestProcedure {
224    public TestSingleStepProcedure() {
225    }
226
227    @Override
228    protected Procedure[] execute(TestProcedureEnv env) throws ProcedureYieldException {
229      LOG.trace("execute procedure step=" + step + ": " + this);
230      if (step == 0) {
231        step = 1;
232        execId = env.nextExecId();
233        return new Procedure[] { this };
234      } else if (step == 2) {
235        env.addToExecList(this);
236        return null;
237      }
238      throw new ProcedureYieldException();
239    }
240
241    @Override
242    public String toString() {
243      return "SingleStep(procId=" + getProcId() + " execId=" + execId + ")";
244    }
245  }
246
247  public static class TestTwoStepProcedure extends TestProcedure {
248    public TestTwoStepProcedure() {
249    }
250
251    @Override
252    protected Procedure[] execute(TestProcedureEnv env) throws ProcedureYieldException {
253      LOG.trace("execute procedure step=" + step + ": " + this);
254      if (step == 0) {
255        step = 1;
256        execId = env.nextExecId();
257        return new Procedure[] { new TestSingleStepProcedure() };
258      } else if (step == 2) {
259        env.addToExecList(this);
260        return null;
261      }
262      throw new ProcedureYieldException();
263    }
264
265    @Override
266    public String toString() {
267      return "TwoStep(procId=" + getProcId() + " execId=" + execId + ")";
268    }
269  }
270}