001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.jupiter.api.Assertions.assertFalse;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022
023import java.io.IOException;
024import org.apache.hadoop.fs.FileSystem;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
027import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
028import org.apache.hadoop.hbase.testclassification.MasterTests;
029import org.apache.hadoop.hbase.testclassification.SmallTests;
030import org.junit.jupiter.api.AfterEach;
031import org.junit.jupiter.api.BeforeEach;
032import org.junit.jupiter.api.Tag;
033import org.junit.jupiter.api.Test;
034
035import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
036
037@Tag(MasterTests.TAG)
038@Tag(SmallTests.TAG)
039public class TestProcedureSkipPersistence {
040
041  private ProcedureExecutor<ProcEnv> procExecutor;
042  private ProcedureStore procStore;
043
044  private HBaseCommonTestingUtil htu;
045  private FileSystem fs;
046  private Path testDir;
047  private Path logDir;
048
049  private static volatile int STEP = 0;
050
051  public class ProcEnv {
052
053    public ProcedureExecutor<ProcEnv> getProcedureExecutor() {
054      return procExecutor;
055    }
056  }
057
058  public static class TestProcedure extends Procedure<ProcEnv> {
059
060    // need to override this method, otherwise we will persist the release lock operation and the
061    // test will fail.
062    @Override
063    protected boolean holdLock(ProcEnv env) {
064      return true;
065    }
066
067    @Override
068    protected Procedure<ProcEnv>[] execute(ProcEnv env)
069      throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
070      if (STEP == 0) {
071        STEP = 1;
072        setTimeout(60 * 60 * 1000);
073        setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
074        skipPersistence();
075        throw new ProcedureSuspendedException();
076      } else if (STEP == 1) {
077        STEP = 2;
078        if (hasTimeout()) {
079          setFailure("Should not persist the timeout value",
080            new IOException("Should not persist the timeout value"));
081          return null;
082        }
083        setTimeout(2 * 1000);
084        setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
085        // used to confirm that we reset the persist flag before execution
086        throw new ProcedureSuspendedException();
087      } else {
088        if (!hasTimeout()) {
089          setFailure("Should have persisted the timeout value",
090            new IOException("Should have persisted the timeout value"));
091        }
092        return null;
093      }
094    }
095
096    @Override
097    protected synchronized boolean setTimeoutFailure(ProcEnv env) {
098      setState(ProcedureProtos.ProcedureState.RUNNABLE);
099      env.getProcedureExecutor().getProcedureScheduler().addFront(this);
100      return false;
101    }
102
103    @Override
104    protected void rollback(ProcEnv env) throws IOException, InterruptedException {
105      throw new UnsupportedOperationException();
106    }
107
108    @Override
109    protected boolean abort(ProcEnv env) {
110      return false;
111    }
112
113    @Override
114    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
115    }
116
117    @Override
118    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
119    }
120  }
121
122  @BeforeEach
123  public void setUp() throws IOException {
124    htu = new HBaseCommonTestingUtil();
125    testDir = htu.getDataTestDir();
126    fs = testDir.getFileSystem(htu.getConfiguration());
127    assertTrue(testDir.depth() > 1);
128
129    logDir = new Path(testDir, "proc-logs");
130    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
131    procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), new ProcEnv(), procStore);
132    procStore.start(1);
133    ProcedureTestingUtility.initAndStartWorkers(procExecutor, 1, true);
134  }
135
136  @AfterEach
137  public void tearDown() throws IOException {
138    procExecutor.stop();
139    procStore.stop(false);
140    fs.delete(logDir, true);
141  }
142
143  @Test
144  public void test() throws Exception {
145    TestProcedure proc = new TestProcedure();
146    long procId = procExecutor.submitProcedure(proc);
147    htu.waitFor(30000, () -> proc.isWaiting() && procExecutor.getActiveExecutorCount() == 0);
148    ProcedureTestingUtility.restart(procExecutor);
149    htu.waitFor(30000, () -> {
150      Procedure<?> p = procExecutor.getProcedure(procId);
151      return (p.isWaiting() || p.isFinished()) && procExecutor.getActiveExecutorCount() == 0;
152    });
153    assertFalse(procExecutor.isFinished(procId));
154    ProcedureTestingUtility.restart(procExecutor);
155    htu.waitFor(30000, () -> procExecutor.isFinished(procId));
156    Procedure<ProcEnv> p = procExecutor.getResult(procId);
157    assertTrue(p.isSuccess());
158  }
159}