001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.jupiter.api.Assertions.assertFalse; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022 023import java.io.IOException; 024import org.apache.hadoop.fs.FileSystem; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 027import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 028import org.apache.hadoop.hbase.testclassification.MasterTests; 029import org.apache.hadoop.hbase.testclassification.SmallTests; 030import org.junit.jupiter.api.AfterEach; 031import org.junit.jupiter.api.BeforeEach; 032import org.junit.jupiter.api.Tag; 033import org.junit.jupiter.api.Test; 034 035import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 036 037@Tag(MasterTests.TAG) 038@Tag(SmallTests.TAG) 039public class TestProcedureSkipPersistence { 040 041 private ProcedureExecutor<ProcEnv> procExecutor; 042 private ProcedureStore procStore; 043 044 private HBaseCommonTestingUtil htu; 045 private FileSystem fs; 046 private Path testDir; 047 private Path logDir; 048 049 private static volatile int STEP = 0; 050 051 public class ProcEnv { 052 053 public ProcedureExecutor<ProcEnv> getProcedureExecutor() { 054 return procExecutor; 055 } 056 } 057 058 public static class TestProcedure extends Procedure<ProcEnv> { 059 060 // need to override this method, otherwise we will persist the release lock operation and the 061 // test will fail. 062 @Override 063 protected boolean holdLock(ProcEnv env) { 064 return true; 065 } 066 067 @Override 068 protected Procedure<ProcEnv>[] execute(ProcEnv env) 069 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { 070 if (STEP == 0) { 071 STEP = 1; 072 setTimeout(60 * 60 * 1000); 073 setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); 074 skipPersistence(); 075 throw new ProcedureSuspendedException(); 076 } else if (STEP == 1) { 077 STEP = 2; 078 if (hasTimeout()) { 079 setFailure("Should not persist the timeout value", 080 new IOException("Should not persist the timeout value")); 081 return null; 082 } 083 setTimeout(2 * 1000); 084 setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); 085 // used to confirm that we reset the persist flag before execution 086 throw new ProcedureSuspendedException(); 087 } else { 088 if (!hasTimeout()) { 089 setFailure("Should have persisted the timeout value", 090 new IOException("Should have persisted the timeout value")); 091 } 092 return null; 093 } 094 } 095 096 @Override 097 protected synchronized boolean setTimeoutFailure(ProcEnv env) { 098 setState(ProcedureProtos.ProcedureState.RUNNABLE); 099 env.getProcedureExecutor().getProcedureScheduler().addFront(this); 100 return false; 101 } 102 103 @Override 104 protected void rollback(ProcEnv env) throws IOException, InterruptedException { 105 throw new UnsupportedOperationException(); 106 } 107 108 @Override 109 protected boolean abort(ProcEnv env) { 110 return false; 111 } 112 113 @Override 114 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 115 } 116 117 @Override 118 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 119 } 120 } 121 122 @BeforeEach 123 public void setUp() throws IOException { 124 htu = new HBaseCommonTestingUtil(); 125 testDir = htu.getDataTestDir(); 126 fs = testDir.getFileSystem(htu.getConfiguration()); 127 assertTrue(testDir.depth() > 1); 128 129 logDir = new Path(testDir, "proc-logs"); 130 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); 131 procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), new ProcEnv(), procStore); 132 procStore.start(1); 133 ProcedureTestingUtility.initAndStartWorkers(procExecutor, 1, true); 134 } 135 136 @AfterEach 137 public void tearDown() throws IOException { 138 procExecutor.stop(); 139 procStore.stop(false); 140 fs.delete(logDir, true); 141 } 142 143 @Test 144 public void test() throws Exception { 145 TestProcedure proc = new TestProcedure(); 146 long procId = procExecutor.submitProcedure(proc); 147 htu.waitFor(30000, () -> proc.isWaiting() && procExecutor.getActiveExecutorCount() == 0); 148 ProcedureTestingUtility.restart(procExecutor); 149 htu.waitFor(30000, () -> { 150 Procedure<?> p = procExecutor.getProcedure(procId); 151 return (p.isWaiting() || p.isFinished()) && procExecutor.getActiveExecutorCount() == 0; 152 }); 153 assertFalse(procExecutor.isFinished(procId)); 154 ProcedureTestingUtility.restart(procExecutor); 155 htu.waitFor(30000, () -> procExecutor.isFinished(procId)); 156 Procedure<ProcEnv> p = procExecutor.getResult(procId); 157 assertTrue(p.isSuccess()); 158 } 159}