001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.Assert.assertFalse;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import org.apache.hadoop.fs.FileSystem;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.HBaseClassTestRule;
027import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
028import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
029import org.apache.hadoop.hbase.testclassification.MasterTests;
030import org.apache.hadoop.hbase.testclassification.SmallTests;
031import org.junit.After;
032import org.junit.Before;
033import org.junit.ClassRule;
034import org.junit.Test;
035import org.junit.experimental.categories.Category;
036
037import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
038
039@Category({ MasterTests.class, SmallTests.class })
040public class TestProcedureSkipPersistence {
041
042  @ClassRule
043  public static final HBaseClassTestRule CLASS_RULE =
044    HBaseClassTestRule.forClass(TestProcedureSkipPersistence.class);
045  private ProcedureExecutor<ProcEnv> procExecutor;
046  private ProcedureStore procStore;
047
048  private HBaseCommonTestingUtil htu;
049  private FileSystem fs;
050  private Path testDir;
051  private Path logDir;
052
053  private static volatile int STEP = 0;
054
055  public class ProcEnv {
056
057    public ProcedureExecutor<ProcEnv> getProcedureExecutor() {
058      return procExecutor;
059    }
060  }
061
062  public static class TestProcedure extends Procedure<ProcEnv> {
063
064    // need to override this method, otherwise we will persist the release lock operation and the
065    // test will fail.
066    @Override
067    protected boolean holdLock(ProcEnv env) {
068      return true;
069    }
070
071    @Override
072    protected Procedure<ProcEnv>[] execute(ProcEnv env)
073      throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
074      if (STEP == 0) {
075        STEP = 1;
076        setTimeout(60 * 60 * 1000);
077        setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
078        skipPersistence();
079        throw new ProcedureSuspendedException();
080      } else if (STEP == 1) {
081        STEP = 2;
082        if (hasTimeout()) {
083          setFailure("Should not persist the timeout value",
084            new IOException("Should not persist the timeout value"));
085          return null;
086        }
087        setTimeout(2 * 1000);
088        setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
089        // used to confirm that we reset the persist flag before execution
090        throw new ProcedureSuspendedException();
091      } else {
092        if (!hasTimeout()) {
093          setFailure("Should have persisted the timeout value",
094            new IOException("Should have persisted the timeout value"));
095        }
096        return null;
097      }
098    }
099
100    @Override
101    protected synchronized boolean setTimeoutFailure(ProcEnv env) {
102      setState(ProcedureProtos.ProcedureState.RUNNABLE);
103      env.getProcedureExecutor().getProcedureScheduler().addFront(this);
104      return false;
105    }
106
107    @Override
108    protected void rollback(ProcEnv env) throws IOException, InterruptedException {
109      throw new UnsupportedOperationException();
110    }
111
112    @Override
113    protected boolean abort(ProcEnv env) {
114      return false;
115    }
116
117    @Override
118    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
119    }
120
121    @Override
122    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
123    }
124  }
125
126  @Before
127  public void setUp() throws IOException {
128    htu = new HBaseCommonTestingUtil();
129    testDir = htu.getDataTestDir();
130    fs = testDir.getFileSystem(htu.getConfiguration());
131    assertTrue(testDir.depth() > 1);
132
133    logDir = new Path(testDir, "proc-logs");
134    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
135    procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), new ProcEnv(), procStore);
136    procStore.start(1);
137    ProcedureTestingUtility.initAndStartWorkers(procExecutor, 1, true);
138  }
139
140  @After
141  public void tearDown() throws IOException {
142    procExecutor.stop();
143    procStore.stop(false);
144    fs.delete(logDir, true);
145  }
146
147  @Test
148  public void test() throws Exception {
149    TestProcedure proc = new TestProcedure();
150    long procId = procExecutor.submitProcedure(proc);
151    htu.waitFor(30000, () -> proc.isWaiting() && procExecutor.getActiveExecutorCount() == 0);
152    ProcedureTestingUtility.restart(procExecutor);
153    htu.waitFor(30000, () -> {
154      Procedure<?> p = procExecutor.getProcedure(procId);
155      return (p.isWaiting() || p.isFinished()) && procExecutor.getActiveExecutorCount() == 0;
156    });
157    assertFalse(procExecutor.isFinished(procId));
158    ProcedureTestingUtility.restart(procExecutor);
159    htu.waitFor(30000, () -> procExecutor.isFinished(procId));
160    Procedure<ProcEnv> p = procExecutor.getResult(procId);
161    assertTrue(p.isSuccess());
162  }
163}