001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.jupiter.api.Assertions.assertTrue; 021 022import java.io.IOException; 023import java.util.Arrays; 024import java.util.concurrent.CountDownLatch; 025import java.util.concurrent.Exchanger; 026import org.apache.hadoop.fs.FSDataInputStream; 027import org.apache.hadoop.fs.FSDataOutputStream; 028import org.apache.hadoop.fs.FileStatus; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 032import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; 033import org.apache.hadoop.hbase.testclassification.MasterTests; 034import org.apache.hadoop.hbase.testclassification.SmallTests; 035import org.junit.jupiter.api.BeforeAll; 036import org.junit.jupiter.api.BeforeEach; 037import org.junit.jupiter.api.Tag; 038import org.junit.jupiter.api.Test; 039import org.junit.jupiter.api.TestInfo; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams; 044 045@Tag(MasterTests.TAG) 046@Tag(SmallTests.TAG) 047public class TestProcedureCleanup { 048 049 private static final Logger LOG = LoggerFactory.getLogger(TestProcedureCleanup.class); 050 051 private static final int PROCEDURE_EXECUTOR_SLOTS = 2; 052 053 private static WALProcedureStore procStore; 054 055 private static ProcedureExecutor<Void> procExecutor; 056 057 private static HBaseCommonTestingUtil htu; 058 059 private static FileSystem fs; 060 private static Path testDir; 061 private static Path logDir; 062 063 private String methodName; 064 065 private void createProcExecutor() throws Exception { 066 logDir = new Path(testDir, methodName); 067 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); 068 procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), null, procStore); 069 procStore.start(PROCEDURE_EXECUTOR_SLOTS); 070 ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true, true); 071 } 072 073 @BeforeAll 074 public static void setUp() throws Exception { 075 htu = new HBaseCommonTestingUtil(); 076 htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, true); 077 // NOTE: The executor will be created by each test 078 testDir = htu.getDataTestDir(); 079 fs = testDir.getFileSystem(htu.getConfiguration()); 080 assertTrue(testDir.depth() > 1); 081 } 082 083 @BeforeEach 084 public void setUpEach(TestInfo testInfo) throws Exception { 085 methodName = testInfo.getTestMethod().get().getName(); 086 } 087 088 @Test 089 public void testProcedureShouldNotCleanOnLoad() throws Exception { 090 createProcExecutor(); 091 final RootProcedure proc = new RootProcedure(); 092 long rootProc = procExecutor.submitProcedure(proc); 093 LOG.info("Begin to execute " + rootProc); 094 // wait until the child procedure arrival 095 htu.waitFor(10000, () -> procExecutor.getProcedures().size() >= 2); 096 SuspendProcedure suspendProcedure = (SuspendProcedure) procExecutor.getProcedures().get(1); 097 // wait until the suspendProcedure executed 098 suspendProcedure.latch.countDown(); 099 Thread.sleep(100); 100 // roll the procedure log 101 LOG.info("Begin to roll log "); 102 procStore.rollWriterForTesting(); 103 LOG.info("finish to roll log "); 104 Thread.sleep(500); 105 LOG.info("begin to restart1 "); 106 ProcedureTestingUtility.restart(procExecutor, true); 107 LOG.info("finish to restart1 "); 108 assertTrue(procExecutor.getProcedure(rootProc) != null); 109 Thread.sleep(500); 110 LOG.info("begin to restart2 "); 111 ProcedureTestingUtility.restart(procExecutor, true); 112 LOG.info("finish to restart2 "); 113 assertTrue(procExecutor.getProcedure(rootProc) != null); 114 } 115 116 @Test 117 public void testProcedureUpdatedShouldClean() throws Exception { 118 createProcExecutor(); 119 SuspendProcedure suspendProcedure = new SuspendProcedure(); 120 long suspendProc = procExecutor.submitProcedure(suspendProcedure); 121 LOG.info("Begin to execute " + suspendProc); 122 suspendProcedure.latch.countDown(); 123 Thread.sleep(500); 124 LOG.info("begin to restart1 "); 125 ProcedureTestingUtility.restart(procExecutor, true); 126 LOG.info("finish to restart1 "); 127 htu.waitFor(10000, () -> procExecutor.getProcedure(suspendProc) != null); 128 // Wait until the suspendProc executed after restart 129 suspendProcedure = (SuspendProcedure) procExecutor.getProcedure(suspendProc); 130 suspendProcedure.latch.countDown(); 131 Thread.sleep(500); 132 // Should be 1 log since the suspendProcedure is updated in the new log 133 assertTrue(procStore.getActiveLogs().size() == 1); 134 // restart procExecutor 135 LOG.info("begin to restart2"); 136 // Restart the executor but do not start the workers. 137 // Otherwise, the suspendProcedure will soon be executed and the oldest log 138 // will be cleaned, leaving only the newest log. 139 ProcedureTestingUtility.restart(procExecutor, true, false); 140 LOG.info("finish to restart2"); 141 // There should be two active logs 142 assertTrue(procStore.getActiveLogs().size() == 2); 143 procExecutor.startWorkers(); 144 145 } 146 147 @Test 148 public void testProcedureDeletedShouldClean() throws Exception { 149 createProcExecutor(); 150 WaitProcedure waitProcedure = new WaitProcedure(); 151 long waitProce = procExecutor.submitProcedure(waitProcedure); 152 LOG.info("Begin to execute " + waitProce); 153 Thread.sleep(500); 154 LOG.info("begin to restart1 "); 155 ProcedureTestingUtility.restart(procExecutor, true); 156 LOG.info("finish to restart1 "); 157 htu.waitFor(10000, () -> procExecutor.getProcedure(waitProce) != null); 158 // Wait until the suspendProc executed after restart 159 waitProcedure = (WaitProcedure) procExecutor.getProcedure(waitProce); 160 waitProcedure.latch.countDown(); 161 Thread.sleep(500); 162 // Should be 1 log since the suspendProcedure is updated in the new log 163 assertTrue(procStore.getActiveLogs().size() == 1); 164 // restart procExecutor 165 LOG.info("begin to restart2"); 166 // Restart the executor but do not start the workers. 167 // Otherwise, the suspendProcedure will soon be executed and the oldest log 168 // will be cleaned, leaving only the newest log. 169 ProcedureTestingUtility.restart(procExecutor, true, false); 170 LOG.info("finish to restart2"); 171 // There should be two active logs 172 assertTrue(procStore.getActiveLogs().size() == 2); 173 procExecutor.startWorkers(); 174 } 175 176 private void corrupt(FileStatus file) throws IOException { 177 LOG.info("Corrupt " + file); 178 Path tmpFile = file.getPath().suffix(".tmp"); 179 // remove the last byte to make the trailer corrupted 180 try (FSDataInputStream in = fs.open(file.getPath()); 181 FSDataOutputStream out = fs.create(tmpFile)) { 182 ByteStreams.copy(ByteStreams.limit(in, file.getLen() - 1), out); 183 } 184 fs.delete(file.getPath(), false); 185 fs.rename(tmpFile, file.getPath()); 186 } 187 188 public static final class ExchangeProcedure extends ProcedureTestingUtility.NoopProcedure<Void> { 189 190 private final Exchanger<Boolean> exchanger = new Exchanger<>(); 191 192 @SuppressWarnings("unchecked") 193 @Override 194 protected Procedure<Void>[] execute(Void env) 195 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { 196 if (exchanger.exchange(Boolean.TRUE)) { 197 return new Procedure[] { this }; 198 } else { 199 return null; 200 } 201 } 202 } 203 204 @Test 205 public void testResetDeleteWhenBuildingHoldingCleanupTracker() throws Exception { 206 createProcExecutor(); 207 ExchangeProcedure proc1 = new ExchangeProcedure(); 208 ExchangeProcedure proc2 = new ExchangeProcedure(); 209 procExecutor.submitProcedure(proc1); 210 long procId2 = procExecutor.submitProcedure(proc2); 211 Thread.sleep(500); 212 procStore.rollWriterForTesting(); 213 proc1.exchanger.exchange(Boolean.TRUE); 214 Thread.sleep(500); 215 216 FileStatus[] walFiles = fs.listStatus(logDir); 217 Arrays.sort(walFiles, (f1, f2) -> f1.getPath().getName().compareTo(f2.getPath().getName())); 218 // corrupt the first proc wal file, so we will have a partial tracker for it after restarting 219 corrupt(walFiles[0]); 220 ProcedureTestingUtility.restart(procExecutor, false, true); 221 // also update proc2, which means that all the procedures in the first proc wal have been 222 // updated and it should be deleted. 223 proc2 = (ExchangeProcedure) procExecutor.getProcedure(procId2); 224 proc2.exchanger.exchange(Boolean.TRUE); 225 htu.waitFor(10000, () -> !fs.exists(walFiles[0].getPath())); 226 } 227 228 public static class WaitProcedure extends ProcedureTestingUtility.NoopProcedure<Void> { 229 public WaitProcedure() { 230 super(); 231 } 232 233 private CountDownLatch latch = new CountDownLatch(1); 234 235 @Override 236 protected Procedure<Void>[] execute(Void env) throws ProcedureSuspendedException { 237 // Always wait here 238 LOG.info("wait here"); 239 try { 240 latch.await(); 241 } catch (Throwable t) { 242 243 } 244 LOG.info("finished"); 245 return null; 246 } 247 } 248 249 public static class SuspendProcedure extends ProcedureTestingUtility.NoopProcedure<Void> { 250 public SuspendProcedure() { 251 super(); 252 } 253 254 private CountDownLatch latch = new CountDownLatch(1); 255 256 @Override 257 protected Procedure<Void>[] execute(Void env) throws ProcedureSuspendedException { 258 // Always suspend the procedure 259 LOG.info("suspend here"); 260 latch.countDown(); 261 throw new ProcedureSuspendedException(); 262 } 263 } 264 265 public static class RootProcedure extends ProcedureTestingUtility.NoopProcedure<Void> { 266 private boolean childSpwaned = false; 267 268 public RootProcedure() { 269 super(); 270 } 271 272 @Override 273 protected Procedure<Void>[] execute(Void env) throws ProcedureSuspendedException { 274 if (!childSpwaned) { 275 childSpwaned = true; 276 return new Procedure[] { new SuspendProcedure() }; 277 } else { 278 return null; 279 } 280 } 281 } 282}