001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.Assert.assertTrue; 021 022import java.io.IOException; 023import java.util.Arrays; 024import java.util.concurrent.CountDownLatch; 025import java.util.concurrent.Exchanger; 026import org.apache.hadoop.fs.FSDataInputStream; 027import org.apache.hadoop.fs.FSDataOutputStream; 028import org.apache.hadoop.fs.FileStatus; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 033import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; 034import org.apache.hadoop.hbase.testclassification.MasterTests; 035import org.apache.hadoop.hbase.testclassification.SmallTests; 036import org.junit.BeforeClass; 037import org.junit.ClassRule; 038import org.junit.Rule; 039import org.junit.Test; 040import org.junit.experimental.categories.Category; 041import org.junit.rules.TestName; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams; 046 047@Category({ MasterTests.class, SmallTests.class }) 048public class TestProcedureCleanup { 049 050 @ClassRule 051 public static final HBaseClassTestRule CLASS_RULE = 052 HBaseClassTestRule.forClass(TestProcedureCleanup.class); 053 054 055 private static final Logger LOG = LoggerFactory.getLogger(TestProcedureCleanup.class); 056 057 private static final int PROCEDURE_EXECUTOR_SLOTS = 2; 058 059 private static WALProcedureStore procStore; 060 061 private static ProcedureExecutor<Void> procExecutor; 062 063 private static HBaseCommonTestingUtility htu; 064 065 private static FileSystem fs; 066 private static Path testDir; 067 private static Path logDir; 068 069 @Rule 070 public final TestName name = new TestName(); 071 072 private void createProcExecutor() throws Exception { 073 logDir = new Path(testDir, name.getMethodName()); 074 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); 075 procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), null, procStore); 076 procStore.start(PROCEDURE_EXECUTOR_SLOTS); 077 ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true, true); 078 } 079 080 @BeforeClass 081 public static void setUp() throws Exception { 082 htu = new HBaseCommonTestingUtility(); 083 htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, true); 084 // NOTE: The executor will be created by each test 085 testDir = htu.getDataTestDir(); 086 fs = testDir.getFileSystem(htu.getConfiguration()); 087 assertTrue(testDir.depth() > 1); 088 } 089 090 @Test 091 public void testProcedureShouldNotCleanOnLoad() throws Exception { 092 createProcExecutor(); 093 final RootProcedure proc = new RootProcedure(); 094 long rootProc = procExecutor.submitProcedure(proc); 095 LOG.info("Begin to execute " + rootProc); 096 // wait until the child procedure arrival 097 htu.waitFor(10000, () -> procExecutor.getProcedures().size() >= 2); 098 SuspendProcedure suspendProcedure = (SuspendProcedure) procExecutor 099 .getProcedures().get(1); 100 // wait until the suspendProcedure executed 101 suspendProcedure.latch.countDown(); 102 Thread.sleep(100); 103 // roll the procedure log 104 LOG.info("Begin to roll log "); 105 procStore.rollWriterForTesting(); 106 LOG.info("finish to roll log "); 107 Thread.sleep(500); 108 LOG.info("begin to restart1 "); 109 ProcedureTestingUtility.restart(procExecutor, true); 110 LOG.info("finish to restart1 "); 111 assertTrue(procExecutor.getProcedure(rootProc) != null); 112 Thread.sleep(500); 113 LOG.info("begin to restart2 "); 114 ProcedureTestingUtility.restart(procExecutor, true); 115 LOG.info("finish to restart2 "); 116 assertTrue(procExecutor.getProcedure(rootProc) != null); 117 } 118 119 @Test 120 public void testProcedureUpdatedShouldClean() throws Exception { 121 createProcExecutor(); 122 SuspendProcedure suspendProcedure = new SuspendProcedure(); 123 long suspendProc = procExecutor.submitProcedure(suspendProcedure); 124 LOG.info("Begin to execute " + suspendProc); 125 suspendProcedure.latch.countDown(); 126 Thread.sleep(500); 127 LOG.info("begin to restart1 "); 128 ProcedureTestingUtility.restart(procExecutor, true); 129 LOG.info("finish to restart1 "); 130 htu.waitFor(10000, () -> procExecutor.getProcedure(suspendProc) != null); 131 // Wait until the suspendProc executed after restart 132 suspendProcedure = (SuspendProcedure) procExecutor.getProcedure(suspendProc); 133 suspendProcedure.latch.countDown(); 134 Thread.sleep(500); 135 // Should be 1 log since the suspendProcedure is updated in the new log 136 assertTrue(procStore.getActiveLogs().size() == 1); 137 // restart procExecutor 138 LOG.info("begin to restart2"); 139 // Restart the executor but do not start the workers. 140 // Otherwise, the suspendProcedure will soon be executed and the oldest log 141 // will be cleaned, leaving only the newest log. 142 ProcedureTestingUtility.restart(procExecutor, true, false); 143 LOG.info("finish to restart2"); 144 // There should be two active logs 145 assertTrue(procStore.getActiveLogs().size() == 2); 146 procExecutor.startWorkers(); 147 148 } 149 150 @Test 151 public void testProcedureDeletedShouldClean() throws Exception { 152 createProcExecutor(); 153 WaitProcedure waitProcedure = new WaitProcedure(); 154 long waitProce = procExecutor.submitProcedure(waitProcedure); 155 LOG.info("Begin to execute " + waitProce); 156 Thread.sleep(500); 157 LOG.info("begin to restart1 "); 158 ProcedureTestingUtility.restart(procExecutor, true); 159 LOG.info("finish to restart1 "); 160 htu.waitFor(10000, () -> procExecutor.getProcedure(waitProce) != null); 161 // Wait until the suspendProc executed after restart 162 waitProcedure = (WaitProcedure) procExecutor.getProcedure(waitProce); 163 waitProcedure.latch.countDown(); 164 Thread.sleep(500); 165 // Should be 1 log since the suspendProcedure is updated in the new log 166 assertTrue(procStore.getActiveLogs().size() == 1); 167 // restart procExecutor 168 LOG.info("begin to restart2"); 169 // Restart the executor but do not start the workers. 170 // Otherwise, the suspendProcedure will soon be executed and the oldest log 171 // will be cleaned, leaving only the newest log. 172 ProcedureTestingUtility.restart(procExecutor, true, false); 173 LOG.info("finish to restart2"); 174 // There should be two active logs 175 assertTrue(procStore.getActiveLogs().size() == 2); 176 procExecutor.startWorkers(); 177 } 178 179 private void corrupt(FileStatus file) throws IOException { 180 LOG.info("Corrupt " + file); 181 Path tmpFile = file.getPath().suffix(".tmp"); 182 // remove the last byte to make the trailer corrupted 183 try (FSDataInputStream in = fs.open(file.getPath()); 184 FSDataOutputStream out = fs.create(tmpFile)) { 185 ByteStreams.copy(ByteStreams.limit(in, file.getLen() - 1), out); 186 } 187 fs.delete(file.getPath(), false); 188 fs.rename(tmpFile, file.getPath()); 189 } 190 191 192 public static final class ExchangeProcedure extends ProcedureTestingUtility.NoopProcedure<Void> { 193 194 private final Exchanger<Boolean> exchanger = new Exchanger<>(); 195 196 @SuppressWarnings("unchecked") 197 @Override 198 protected Procedure<Void>[] execute(Void env) 199 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { 200 if (exchanger.exchange(Boolean.TRUE)) { 201 return new Procedure[] { this }; 202 } else { 203 return null; 204 } 205 } 206 } 207 208 @Test 209 public void testResetDeleteWhenBuildingHoldingCleanupTracker() throws Exception { 210 createProcExecutor(); 211 ExchangeProcedure proc1 = new ExchangeProcedure(); 212 ExchangeProcedure proc2 = new ExchangeProcedure(); 213 procExecutor.submitProcedure(proc1); 214 long procId2 = procExecutor.submitProcedure(proc2); 215 Thread.sleep(500); 216 procStore.rollWriterForTesting(); 217 proc1.exchanger.exchange(Boolean.TRUE); 218 Thread.sleep(500); 219 220 FileStatus[] walFiles = fs.listStatus(logDir); 221 Arrays.sort(walFiles, (f1, f2) -> f1.getPath().getName().compareTo(f2.getPath().getName())); 222 // corrupt the first proc wal file, so we will have a partial tracker for it after restarting 223 corrupt(walFiles[0]); 224 ProcedureTestingUtility.restart(procExecutor, false, true); 225 // also update proc2, which means that all the procedures in the first proc wal have been 226 // updated and it should be deleted. 227 proc2 = (ExchangeProcedure) procExecutor.getProcedure(procId2); 228 proc2.exchanger.exchange(Boolean.TRUE); 229 htu.waitFor(10000, () -> !fs.exists(walFiles[0].getPath())); 230 } 231 232 public static class WaitProcedure extends ProcedureTestingUtility.NoopProcedure<Void> { 233 public WaitProcedure() { 234 super(); 235 } 236 237 private CountDownLatch latch = new CountDownLatch(1); 238 239 @Override 240 protected Procedure<Void>[] execute(Void env) throws ProcedureSuspendedException { 241 // Always wait here 242 LOG.info("wait here"); 243 try { 244 latch.await(); 245 } catch (Throwable t) { 246 247 } 248 LOG.info("finished"); 249 return null; 250 } 251 } 252 253 public static class SuspendProcedure extends ProcedureTestingUtility.NoopProcedure<Void> { 254 public SuspendProcedure() { 255 super(); 256 } 257 258 private CountDownLatch latch = new CountDownLatch(1); 259 260 @Override 261 protected Procedure<Void>[] execute(Void env) throws ProcedureSuspendedException { 262 // Always suspend the procedure 263 LOG.info("suspend here"); 264 latch.countDown(); 265 throw new ProcedureSuspendedException(); 266 } 267 } 268 269 public static class RootProcedure extends ProcedureTestingUtility.NoopProcedure<Void> { 270 private boolean childSpwaned = false; 271 272 public RootProcedure() { 273 super(); 274 } 275 276 @Override 277 protected Procedure<Void>[] execute(Void env) throws ProcedureSuspendedException { 278 if (!childSpwaned) { 279 childSpwaned = true; 280 return new Procedure[] { new SuspendProcedure() }; 281 } else { 282 return null; 283 } 284 } 285 } 286}