001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.Assert.assertTrue;
021
022import java.io.IOException;
023import java.util.Arrays;
024import java.util.concurrent.CountDownLatch;
025import java.util.concurrent.Exchanger;
026import org.apache.hadoop.fs.FSDataInputStream;
027import org.apache.hadoop.fs.FSDataOutputStream;
028import org.apache.hadoop.fs.FileStatus;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
033import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
034import org.apache.hadoop.hbase.testclassification.MasterTests;
035import org.apache.hadoop.hbase.testclassification.SmallTests;
036import org.junit.BeforeClass;
037import org.junit.ClassRule;
038import org.junit.Rule;
039import org.junit.Test;
040import org.junit.experimental.categories.Category;
041import org.junit.rules.TestName;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
046
047@Category({ MasterTests.class, SmallTests.class })
048public class TestProcedureCleanup {
049
050  @ClassRule
051  public static final HBaseClassTestRule CLASS_RULE =
052    HBaseClassTestRule.forClass(TestProcedureCleanup.class);
053
054
055  private static final Logger LOG = LoggerFactory.getLogger(TestProcedureCleanup.class);
056
057  private static final int PROCEDURE_EXECUTOR_SLOTS = 2;
058
059  private static WALProcedureStore procStore;
060
061  private static ProcedureExecutor<Void> procExecutor;
062
063  private static HBaseCommonTestingUtility htu;
064
065  private static FileSystem fs;
066  private static Path testDir;
067  private static Path logDir;
068
069  @Rule
070  public final TestName name = new TestName();
071
072  private void createProcExecutor() throws Exception {
073    logDir = new Path(testDir, name.getMethodName());
074    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
075    procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), null, procStore);
076    procStore.start(PROCEDURE_EXECUTOR_SLOTS);
077    ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true, true);
078  }
079
080  @BeforeClass
081  public static void setUp() throws Exception {
082    htu = new HBaseCommonTestingUtility();
083    htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, true);
084    // NOTE: The executor will be created by each test
085    testDir = htu.getDataTestDir();
086    fs = testDir.getFileSystem(htu.getConfiguration());
087    assertTrue(testDir.depth() > 1);
088  }
089
090  @Test
091  public void testProcedureShouldNotCleanOnLoad() throws Exception {
092    createProcExecutor();
093    final RootProcedure proc = new RootProcedure();
094    long rootProc = procExecutor.submitProcedure(proc);
095    LOG.info("Begin to execute " + rootProc);
096    // wait until the child procedure arrival
097    htu.waitFor(10000, () -> procExecutor.getProcedures().size() >= 2);
098    SuspendProcedure suspendProcedure = (SuspendProcedure) procExecutor
099        .getProcedures().get(1);
100    // wait until the suspendProcedure executed
101    suspendProcedure.latch.countDown();
102    Thread.sleep(100);
103    // roll the procedure log
104    LOG.info("Begin to roll log ");
105    procStore.rollWriterForTesting();
106    LOG.info("finish to roll log ");
107    Thread.sleep(500);
108    LOG.info("begin to restart1 ");
109    ProcedureTestingUtility.restart(procExecutor, true);
110    LOG.info("finish to restart1 ");
111    assertTrue(procExecutor.getProcedure(rootProc) != null);
112    Thread.sleep(500);
113    LOG.info("begin to restart2 ");
114    ProcedureTestingUtility.restart(procExecutor, true);
115    LOG.info("finish to restart2 ");
116    assertTrue(procExecutor.getProcedure(rootProc) != null);
117  }
118
119  @Test
120  public void testProcedureUpdatedShouldClean() throws Exception {
121    createProcExecutor();
122    SuspendProcedure suspendProcedure = new SuspendProcedure();
123    long suspendProc = procExecutor.submitProcedure(suspendProcedure);
124    LOG.info("Begin to execute " + suspendProc);
125    suspendProcedure.latch.countDown();
126    Thread.sleep(500);
127    LOG.info("begin to restart1 ");
128    ProcedureTestingUtility.restart(procExecutor, true);
129    LOG.info("finish to restart1 ");
130    htu.waitFor(10000, () -> procExecutor.getProcedure(suspendProc) != null);
131    // Wait until the suspendProc executed after restart
132    suspendProcedure = (SuspendProcedure) procExecutor.getProcedure(suspendProc);
133    suspendProcedure.latch.countDown();
134    Thread.sleep(500);
135    // Should be 1 log since the suspendProcedure is updated in the new log
136    assertTrue(procStore.getActiveLogs().size() == 1);
137    // restart procExecutor
138    LOG.info("begin to restart2");
139    // Restart the executor but do not start the workers.
140    // Otherwise, the suspendProcedure will soon be executed and the oldest log
141    // will be cleaned, leaving only the newest log.
142    ProcedureTestingUtility.restart(procExecutor, true, false);
143    LOG.info("finish to restart2");
144    // There should be two active logs
145    assertTrue(procStore.getActiveLogs().size() == 2);
146    procExecutor.startWorkers();
147
148  }
149
150  @Test
151  public void testProcedureDeletedShouldClean() throws Exception {
152    createProcExecutor();
153    WaitProcedure waitProcedure = new WaitProcedure();
154    long waitProce = procExecutor.submitProcedure(waitProcedure);
155    LOG.info("Begin to execute " + waitProce);
156    Thread.sleep(500);
157    LOG.info("begin to restart1 ");
158    ProcedureTestingUtility.restart(procExecutor, true);
159    LOG.info("finish to restart1 ");
160    htu.waitFor(10000, () -> procExecutor.getProcedure(waitProce) != null);
161    // Wait until the suspendProc executed after restart
162    waitProcedure = (WaitProcedure) procExecutor.getProcedure(waitProce);
163    waitProcedure.latch.countDown();
164    Thread.sleep(500);
165    // Should be 1 log since the suspendProcedure is updated in the new log
166    assertTrue(procStore.getActiveLogs().size() == 1);
167    // restart procExecutor
168    LOG.info("begin to restart2");
169    // Restart the executor but do not start the workers.
170    // Otherwise, the suspendProcedure will soon be executed and the oldest log
171    // will be cleaned, leaving only the newest log.
172    ProcedureTestingUtility.restart(procExecutor, true, false);
173    LOG.info("finish to restart2");
174    // There should be two active logs
175    assertTrue(procStore.getActiveLogs().size() == 2);
176    procExecutor.startWorkers();
177  }
178
179  private void corrupt(FileStatus file) throws IOException {
180    LOG.info("Corrupt " + file);
181    Path tmpFile = file.getPath().suffix(".tmp");
182    // remove the last byte to make the trailer corrupted
183    try (FSDataInputStream in = fs.open(file.getPath());
184      FSDataOutputStream out = fs.create(tmpFile)) {
185      ByteStreams.copy(ByteStreams.limit(in, file.getLen() - 1), out);
186    }
187    fs.delete(file.getPath(), false);
188    fs.rename(tmpFile, file.getPath());
189  }
190
191
192  public static final class ExchangeProcedure extends ProcedureTestingUtility.NoopProcedure<Void> {
193
194    private final Exchanger<Boolean> exchanger = new Exchanger<>();
195
196    @SuppressWarnings("unchecked")
197    @Override
198    protected Procedure<Void>[] execute(Void env)
199        throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
200      if (exchanger.exchange(Boolean.TRUE)) {
201        return new Procedure[] { this };
202      } else {
203        return null;
204      }
205    }
206  }
207
208  @Test
209  public void testResetDeleteWhenBuildingHoldingCleanupTracker() throws Exception {
210    createProcExecutor();
211    ExchangeProcedure proc1 = new ExchangeProcedure();
212    ExchangeProcedure proc2 = new ExchangeProcedure();
213    procExecutor.submitProcedure(proc1);
214    long procId2 = procExecutor.submitProcedure(proc2);
215    Thread.sleep(500);
216    procStore.rollWriterForTesting();
217    proc1.exchanger.exchange(Boolean.TRUE);
218    Thread.sleep(500);
219
220    FileStatus[] walFiles = fs.listStatus(logDir);
221    Arrays.sort(walFiles, (f1, f2) -> f1.getPath().getName().compareTo(f2.getPath().getName()));
222    // corrupt the first proc wal file, so we will have a partial tracker for it after restarting
223    corrupt(walFiles[0]);
224    ProcedureTestingUtility.restart(procExecutor, false, true);
225    // also update proc2, which means that all the procedures in the first proc wal have been
226    // updated and it should be deleted.
227    proc2 = (ExchangeProcedure) procExecutor.getProcedure(procId2);
228    proc2.exchanger.exchange(Boolean.TRUE);
229    htu.waitFor(10000, () -> !fs.exists(walFiles[0].getPath()));
230  }
231
232  public static class WaitProcedure extends ProcedureTestingUtility.NoopProcedure<Void> {
233    public WaitProcedure() {
234      super();
235    }
236
237    private CountDownLatch latch = new CountDownLatch(1);
238
239    @Override
240    protected Procedure<Void>[] execute(Void env) throws ProcedureSuspendedException {
241      // Always wait here
242      LOG.info("wait here");
243      try {
244        latch.await();
245      } catch (Throwable t) {
246
247      }
248      LOG.info("finished");
249      return null;
250    }
251  }
252
253  public static class SuspendProcedure extends ProcedureTestingUtility.NoopProcedure<Void> {
254    public SuspendProcedure() {
255      super();
256    }
257
258    private CountDownLatch latch = new CountDownLatch(1);
259
260    @Override
261    protected Procedure<Void>[] execute(Void env) throws ProcedureSuspendedException {
262      // Always suspend the procedure
263      LOG.info("suspend here");
264      latch.countDown();
265      throw new ProcedureSuspendedException();
266    }
267  }
268
269  public static class RootProcedure extends ProcedureTestingUtility.NoopProcedure<Void> {
270    private boolean childSpwaned = false;
271
272    public RootProcedure() {
273      super();
274    }
275
276    @Override
277    protected Procedure<Void>[] execute(Void env) throws ProcedureSuspendedException {
278      if (!childSpwaned) {
279        childSpwaned = true;
280        return new Procedure[] { new SuspendProcedure() };
281      } else {
282        return null;
283      }
284    }
285  }
286}