001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.jupiter.api.Assertions.assertTrue;
021
022import java.io.IOException;
023import java.util.Arrays;
024import java.util.concurrent.CountDownLatch;
025import java.util.concurrent.Exchanger;
026import org.apache.hadoop.fs.FSDataInputStream;
027import org.apache.hadoop.fs.FSDataOutputStream;
028import org.apache.hadoop.fs.FileStatus;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
032import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
033import org.apache.hadoop.hbase.testclassification.MasterTests;
034import org.apache.hadoop.hbase.testclassification.SmallTests;
035import org.junit.jupiter.api.BeforeAll;
036import org.junit.jupiter.api.BeforeEach;
037import org.junit.jupiter.api.Tag;
038import org.junit.jupiter.api.Test;
039import org.junit.jupiter.api.TestInfo;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
044
045@Tag(MasterTests.TAG)
046@Tag(SmallTests.TAG)
047public class TestProcedureCleanup {
048
049  private static final Logger LOG = LoggerFactory.getLogger(TestProcedureCleanup.class);
050
051  private static final int PROCEDURE_EXECUTOR_SLOTS = 2;
052
053  private static WALProcedureStore procStore;
054
055  private static ProcedureExecutor<Void> procExecutor;
056
057  private static HBaseCommonTestingUtil htu;
058
059  private static FileSystem fs;
060  private static Path testDir;
061  private static Path logDir;
062
063  private String methodName;
064
065  private void createProcExecutor() throws Exception {
066    logDir = new Path(testDir, methodName);
067    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
068    procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), null, procStore);
069    procStore.start(PROCEDURE_EXECUTOR_SLOTS);
070    ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true, true);
071  }
072
073  @BeforeAll
074  public static void setUp() throws Exception {
075    htu = new HBaseCommonTestingUtil();
076    htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, true);
077    // NOTE: The executor will be created by each test
078    testDir = htu.getDataTestDir();
079    fs = testDir.getFileSystem(htu.getConfiguration());
080    assertTrue(testDir.depth() > 1);
081  }
082
083  @BeforeEach
084  public void setUpEach(TestInfo testInfo) throws Exception {
085    methodName = testInfo.getTestMethod().get().getName();
086  }
087
088  @Test
089  public void testProcedureShouldNotCleanOnLoad() throws Exception {
090    createProcExecutor();
091    final RootProcedure proc = new RootProcedure();
092    long rootProc = procExecutor.submitProcedure(proc);
093    LOG.info("Begin to execute " + rootProc);
094    // wait until the child procedure arrival
095    htu.waitFor(10000, () -> procExecutor.getProcedures().size() >= 2);
096    SuspendProcedure suspendProcedure = (SuspendProcedure) procExecutor.getProcedures().get(1);
097    // wait until the suspendProcedure executed
098    suspendProcedure.latch.countDown();
099    Thread.sleep(100);
100    // roll the procedure log
101    LOG.info("Begin to roll log ");
102    procStore.rollWriterForTesting();
103    LOG.info("finish to roll log ");
104    Thread.sleep(500);
105    LOG.info("begin to restart1 ");
106    ProcedureTestingUtility.restart(procExecutor, true);
107    LOG.info("finish to restart1 ");
108    assertTrue(procExecutor.getProcedure(rootProc) != null);
109    Thread.sleep(500);
110    LOG.info("begin to restart2 ");
111    ProcedureTestingUtility.restart(procExecutor, true);
112    LOG.info("finish to restart2 ");
113    assertTrue(procExecutor.getProcedure(rootProc) != null);
114  }
115
116  @Test
117  public void testProcedureUpdatedShouldClean() throws Exception {
118    createProcExecutor();
119    SuspendProcedure suspendProcedure = new SuspendProcedure();
120    long suspendProc = procExecutor.submitProcedure(suspendProcedure);
121    LOG.info("Begin to execute " + suspendProc);
122    suspendProcedure.latch.countDown();
123    Thread.sleep(500);
124    LOG.info("begin to restart1 ");
125    ProcedureTestingUtility.restart(procExecutor, true);
126    LOG.info("finish to restart1 ");
127    htu.waitFor(10000, () -> procExecutor.getProcedure(suspendProc) != null);
128    // Wait until the suspendProc executed after restart
129    suspendProcedure = (SuspendProcedure) procExecutor.getProcedure(suspendProc);
130    suspendProcedure.latch.countDown();
131    Thread.sleep(500);
132    // Should be 1 log since the suspendProcedure is updated in the new log
133    assertTrue(procStore.getActiveLogs().size() == 1);
134    // restart procExecutor
135    LOG.info("begin to restart2");
136    // Restart the executor but do not start the workers.
137    // Otherwise, the suspendProcedure will soon be executed and the oldest log
138    // will be cleaned, leaving only the newest log.
139    ProcedureTestingUtility.restart(procExecutor, true, false);
140    LOG.info("finish to restart2");
141    // There should be two active logs
142    assertTrue(procStore.getActiveLogs().size() == 2);
143    procExecutor.startWorkers();
144
145  }
146
147  @Test
148  public void testProcedureDeletedShouldClean() throws Exception {
149    createProcExecutor();
150    WaitProcedure waitProcedure = new WaitProcedure();
151    long waitProce = procExecutor.submitProcedure(waitProcedure);
152    LOG.info("Begin to execute " + waitProce);
153    Thread.sleep(500);
154    LOG.info("begin to restart1 ");
155    ProcedureTestingUtility.restart(procExecutor, true);
156    LOG.info("finish to restart1 ");
157    htu.waitFor(10000, () -> procExecutor.getProcedure(waitProce) != null);
158    // Wait until the suspendProc executed after restart
159    waitProcedure = (WaitProcedure) procExecutor.getProcedure(waitProce);
160    waitProcedure.latch.countDown();
161    Thread.sleep(500);
162    // Should be 1 log since the suspendProcedure is updated in the new log
163    assertTrue(procStore.getActiveLogs().size() == 1);
164    // restart procExecutor
165    LOG.info("begin to restart2");
166    // Restart the executor but do not start the workers.
167    // Otherwise, the suspendProcedure will soon be executed and the oldest log
168    // will be cleaned, leaving only the newest log.
169    ProcedureTestingUtility.restart(procExecutor, true, false);
170    LOG.info("finish to restart2");
171    // There should be two active logs
172    assertTrue(procStore.getActiveLogs().size() == 2);
173    procExecutor.startWorkers();
174  }
175
176  private void corrupt(FileStatus file) throws IOException {
177    LOG.info("Corrupt " + file);
178    Path tmpFile = file.getPath().suffix(".tmp");
179    // remove the last byte to make the trailer corrupted
180    try (FSDataInputStream in = fs.open(file.getPath());
181      FSDataOutputStream out = fs.create(tmpFile)) {
182      ByteStreams.copy(ByteStreams.limit(in, file.getLen() - 1), out);
183    }
184    fs.delete(file.getPath(), false);
185    fs.rename(tmpFile, file.getPath());
186  }
187
188  public static final class ExchangeProcedure extends ProcedureTestingUtility.NoopProcedure<Void> {
189
190    private final Exchanger<Boolean> exchanger = new Exchanger<>();
191
192    @SuppressWarnings("unchecked")
193    @Override
194    protected Procedure<Void>[] execute(Void env)
195      throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
196      if (exchanger.exchange(Boolean.TRUE)) {
197        return new Procedure[] { this };
198      } else {
199        return null;
200      }
201    }
202  }
203
204  @Test
205  public void testResetDeleteWhenBuildingHoldingCleanupTracker() throws Exception {
206    createProcExecutor();
207    ExchangeProcedure proc1 = new ExchangeProcedure();
208    ExchangeProcedure proc2 = new ExchangeProcedure();
209    procExecutor.submitProcedure(proc1);
210    long procId2 = procExecutor.submitProcedure(proc2);
211    Thread.sleep(500);
212    procStore.rollWriterForTesting();
213    proc1.exchanger.exchange(Boolean.TRUE);
214    Thread.sleep(500);
215
216    FileStatus[] walFiles = fs.listStatus(logDir);
217    Arrays.sort(walFiles, (f1, f2) -> f1.getPath().getName().compareTo(f2.getPath().getName()));
218    // corrupt the first proc wal file, so we will have a partial tracker for it after restarting
219    corrupt(walFiles[0]);
220    ProcedureTestingUtility.restart(procExecutor, false, true);
221    // also update proc2, which means that all the procedures in the first proc wal have been
222    // updated and it should be deleted.
223    proc2 = (ExchangeProcedure) procExecutor.getProcedure(procId2);
224    proc2.exchanger.exchange(Boolean.TRUE);
225    htu.waitFor(10000, () -> !fs.exists(walFiles[0].getPath()));
226  }
227
228  public static class WaitProcedure extends ProcedureTestingUtility.NoopProcedure<Void> {
229    public WaitProcedure() {
230      super();
231    }
232
233    private CountDownLatch latch = new CountDownLatch(1);
234
235    @Override
236    protected Procedure<Void>[] execute(Void env) throws ProcedureSuspendedException {
237      // Always wait here
238      LOG.info("wait here");
239      try {
240        latch.await();
241      } catch (Throwable t) {
242
243      }
244      LOG.info("finished");
245      return null;
246    }
247  }
248
249  public static class SuspendProcedure extends ProcedureTestingUtility.NoopProcedure<Void> {
250    public SuspendProcedure() {
251      super();
252    }
253
254    private CountDownLatch latch = new CountDownLatch(1);
255
256    @Override
257    protected Procedure<Void>[] execute(Void env) throws ProcedureSuspendedException {
258      // Always suspend the procedure
259      LOG.info("suspend here");
260      latch.countDown();
261      throw new ProcedureSuspendedException();
262    }
263  }
264
265  public static class RootProcedure extends ProcedureTestingUtility.NoopProcedure<Void> {
266    private boolean childSpwaned = false;
267
268    public RootProcedure() {
269      super();
270    }
271
272    @Override
273    protected Procedure<Void>[] execute(Void env) throws ProcedureSuspendedException {
274      if (!childSpwaned) {
275        childSpwaned = true;
276        return new Procedure[] { new SuspendProcedure() };
277      } else {
278        return null;
279      }
280    }
281  }
282}