001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022
023import java.io.IOException;
024import java.util.HashMap;
025import java.util.Map;
026import java.util.concurrent.Exchanger;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
029import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
030import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
031import org.apache.hadoop.hbase.testclassification.MasterTests;
032import org.apache.hadoop.hbase.testclassification.SmallTests;
033import org.junit.jupiter.api.AfterAll;
034import org.junit.jupiter.api.AfterEach;
035import org.junit.jupiter.api.BeforeAll;
036import org.junit.jupiter.api.BeforeEach;
037import org.junit.jupiter.api.Tag;
038import org.junit.jupiter.api.Test;
039import org.junit.jupiter.api.TestInfo;
040
041import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
042
043@Tag(MasterTests.TAG)
044@Tag(SmallTests.TAG)
045public class TestForceUpdateProcedure {
046
047  private static final HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil();
048
049  private static WALProcedureStore STORE;
050
051  private static ProcedureExecutor<Void> EXEC;
052
053  private static final Exchanger<Boolean> EXCHANGER = new Exchanger<>();
054
055  private static final int WAL_COUNT = 5;
056
057  private String methodName;
058
059  private void createStoreAndExecutor() throws IOException {
060    UTIL.getConfiguration().setInt(CompletedProcedureCleaner.CLEANER_INTERVAL_CONF_KEY, 1000);
061    Path logDir = UTIL.getDataTestDir(methodName);
062    STORE = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(), logDir);
063    STORE.start(1);
064    EXEC = new ProcedureExecutor<>(UTIL.getConfiguration(), null, STORE);
065    ProcedureTestingUtility.initAndStartWorkers(EXEC, 1, true);
066  }
067
068  @BeforeAll
069  public static void setUpBeforeClass() throws IOException {
070    UTIL.getConfiguration().setInt(WALProcedureStore.WAL_COUNT_WARN_THRESHOLD_CONF_KEY, WAL_COUNT);
071  }
072
073  private void stopStoreAndExecutor() {
074    EXEC.stop();
075    STORE.stop(false);
076    EXEC = null;
077    STORE = null;
078  }
079
080  @AfterAll
081  public static void tearDownAfterClass() throws IOException {
082    UTIL.cleanupTestDir();
083  }
084
085  @BeforeEach
086  public void setUp(TestInfo testInfo) throws IOException {
087    methodName = testInfo.getTestMethod().get().getName();
088    createStoreAndExecutor();
089  }
090
091  @AfterEach
092  public void tearDown() {
093    stopStoreAndExecutor();
094  }
095
096  public static final class WaitingProcedure extends NoopProcedure<Void> {
097
098    @Override
099    protected Procedure<Void>[] execute(Void env)
100      throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
101      EXCHANGER.exchange(Boolean.TRUE);
102      setState(ProcedureState.WAITING_TIMEOUT);
103      setTimeout(Integer.MAX_VALUE);
104      throw new ProcedureSuspendedException();
105    }
106  }
107
108  public static final class ParentProcedure extends NoopProcedure<Void> {
109
110    @SuppressWarnings("unchecked")
111    @Override
112    protected Procedure<Void>[] execute(Void env)
113      throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
114      return new Procedure[] { new NoopProcedure<>(), new WaitingProcedure() };
115    }
116  }
117
118  public static final class ExchangeProcedure extends NoopProcedure<Void> {
119
120    @SuppressWarnings("unchecked")
121    @Override
122    protected Procedure<Void>[] execute(Void env)
123      throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
124      if (EXCHANGER.exchange(Boolean.TRUE)) {
125        return new Procedure[] { this };
126      } else {
127        return null;
128      }
129    }
130  }
131
132  public static final class NoopNoAckProcedure extends NoopProcedure<Void> {
133
134    @Override
135    protected boolean shouldWaitClientAck(Void env) {
136      return false;
137    }
138  }
139
140  @Test
141  public void testProcedureStuck() throws IOException, InterruptedException {
142    EXEC.submitProcedure(new ParentProcedure());
143    EXCHANGER.exchange(Boolean.TRUE);
144    UTIL.waitFor(10000, () -> EXEC.getActiveExecutorCount() == 0);
145    // The above operations are used to make sure that we have persist the states of the two
146    // procedures.
147    long procId = EXEC.submitProcedure(new ExchangeProcedure());
148    assertEquals(1, STORE.getActiveLogs().size());
149    for (int i = 0; i < WAL_COUNT - 1; i++) {
150      assertTrue(STORE.rollWriterForTesting());
151      // The WaitinProcedure never gets updated so we can not delete the oldest wal file, so the
152      // number of wal files will increase
153      assertEquals(2 + i, STORE.getActiveLogs().size());
154      EXCHANGER.exchange(Boolean.TRUE);
155      Thread.sleep(1000);
156    }
157    STORE.rollWriterForTesting();
158    // Finish the ExchangeProcedure
159    EXCHANGER.exchange(Boolean.FALSE);
160    // Make sure that we can delete several wal files because we force update the state of
161    // WaitingProcedure. Notice that the last closed wal files can not be deleted, as when rolling
162    // the newest wal file does not have anything in it, and in the closed file we still have the
163    // state for the ExchangeProcedure so it can not be deleted
164    UTIL.waitFor(10000, () -> STORE.getActiveLogs().size() <= 2);
165    UTIL.waitFor(10000, () -> EXEC.isFinished(procId));
166    // Make sure that after the force update we could still load the procedures
167    stopStoreAndExecutor();
168    createStoreAndExecutor();
169    Map<Class<?>, Procedure<Void>> procMap = new HashMap<>();
170    EXEC.getActiveProceduresNoCopy().forEach(p -> procMap.put(p.getClass(), p));
171    assertEquals(3, procMap.size());
172    ParentProcedure parentProc = (ParentProcedure) procMap.get(ParentProcedure.class);
173    assertEquals(ProcedureState.WAITING, parentProc.getState());
174    WaitingProcedure waitingProc = (WaitingProcedure) procMap.get(WaitingProcedure.class);
175    assertEquals(ProcedureState.WAITING_TIMEOUT, waitingProc.getState());
176    NoopProcedure<Void> noopProc = (NoopProcedure<Void>) procMap.get(NoopProcedure.class);
177    assertEquals(ProcedureState.SUCCESS, noopProc.getState());
178  }
179
180  @Test
181  public void testCompletedProcedure() throws InterruptedException, IOException {
182    long procId = EXEC.submitProcedure(new ExchangeProcedure());
183    EXCHANGER.exchange(Boolean.FALSE);
184    UTIL.waitFor(10000, () -> EXEC.isFinished(procId));
185    for (int i = 0; i < WAL_COUNT - 1; i++) {
186      assertTrue(STORE.rollWriterForTesting());
187      // The exchange procedure is completed but still not deleted yet so we can not delete the
188      // oldest wal file
189      long pid = EXEC.submitProcedure(new NoopNoAckProcedure());
190      assertEquals(2 + i, STORE.getActiveLogs().size());
191      UTIL.waitFor(10000, () -> EXEC.isFinished(pid));
192    }
193    // Only the exchange procedure can not be deleted
194    UTIL.waitFor(10000, () -> EXEC.getCompletedSize() == 1);
195    STORE.rollWriterForTesting();
196    UTIL.waitFor(10000, () -> STORE.getActiveLogs().size() <= 1);
197  }
198}