001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.assignment;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.hamcrest.Matchers.containsString;
022import static org.hamcrest.Matchers.everyItem;
023import static org.hamcrest.Matchers.not;
024import static org.junit.jupiter.api.Assertions.assertEquals;
025
026import java.io.IOException;
027import java.util.concurrent.CompletableFuture;
028import java.util.concurrent.atomic.AtomicBoolean;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.ServerName;
032import org.apache.hadoop.hbase.StartTestingClusterOption;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.BalanceRequest;
035import org.apache.hadoop.hbase.master.HMaster;
036import org.apache.hadoop.hbase.master.MasterServices;
037import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
038import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
039import org.apache.hadoop.hbase.master.procedure.MasterProcedureTestingUtility;
040import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
041import org.apache.hadoop.hbase.master.region.MasterRegion;
042import org.apache.hadoop.hbase.procedure2.Procedure;
043import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
044import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
045import org.apache.hadoop.hbase.regionserver.HRegionServer;
046import org.apache.hadoop.hbase.testclassification.LargeTests;
047import org.apache.hadoop.hbase.testclassification.MasterTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.util.FutureUtils;
050import org.hamcrest.BaseMatcher;
051import org.hamcrest.Description;
052import org.hamcrest.Matcher;
053import org.junit.jupiter.api.AfterAll;
054import org.junit.jupiter.api.BeforeAll;
055import org.junit.jupiter.api.BeforeEach;
056import org.junit.jupiter.api.Tag;
057import org.junit.jupiter.api.Test;
058
059import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
060
061/**
062 * SCP does not support rollback actually, here we just want to simulate that when there is a code
063 * bug, SCP and its sub procedures will not hang there forever, and it will not mess up the
064 * procedure store.
065 */
066@Tag(MasterTests.TAG)
067@Tag(LargeTests.TAG)
068public class TestRollbackSCP {
069
070  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
071
072  private static final TableName TABLE_NAME = TableName.valueOf("test");
073
074  private static final byte[] FAMILY = Bytes.toBytes("family");
075
076  private static final AtomicBoolean INJECTED = new AtomicBoolean(false);
077
078  private static final class AssignmentManagerForTest extends AssignmentManager {
079
080    public AssignmentManagerForTest(MasterServices master, MasterRegion masterRegion) {
081      super(master, masterRegion);
082    }
083
084    @Override
085    CompletableFuture<Void> persistToMeta(RegionStateNode regionNode) {
086      TransitRegionStateProcedure proc = regionNode.getProcedure();
087      if (!regionNode.getRegionInfo().isMetaRegion() && proc.hasParent()) {
088        Procedure<?> p =
089          getMaster().getMasterProcedureExecutor().getProcedure(proc.getRootProcId());
090        // fail the procedure if it is a sub procedure for SCP
091        if (p instanceof ServerCrashProcedure) {
092          if (INJECTED.compareAndSet(false, true)) {
093            ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdateInRollback(
094              getMaster().getMasterProcedureExecutor(), true);
095          }
096          return FutureUtils.failedFuture(new RuntimeException("inject code bug"));
097        }
098      }
099      return super.persistToMeta(regionNode);
100    }
101  }
102
103  public static final class HMasterForTest extends HMaster {
104
105    public HMasterForTest(Configuration conf) throws IOException {
106      super(conf);
107    }
108
109    @Override
110    protected AssignmentManager createAssignmentManager(MasterServices master,
111      MasterRegion masterRegion) {
112      return new AssignmentManagerForTest(master, masterRegion);
113    }
114  }
115
116  @BeforeAll
117  public static void setUpBeforeClass() throws Exception {
118    UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
119    UTIL.startMiniCluster(StartTestingClusterOption.builder().numDataNodes(3).numRegionServers(3)
120      .masterClass(HMasterForTest.class).build());
121    UTIL.createMultiRegionTable(TABLE_NAME, FAMILY);
122    UTIL.waitTableAvailable(TABLE_NAME);
123    UTIL.getAdmin().balance(BalanceRequest.newBuilder().setIgnoreRegionsInTransition(true).build());
124    UTIL.waitUntilNoRegionsInTransition();
125    UTIL.getAdmin().balancerSwitch(false, true);
126  }
127
128  @AfterAll
129  public static void tearDownAfterClass() throws IOException {
130    UTIL.shutdownMiniCluster();
131  }
132
133  @BeforeEach
134  public void setUp() throws IOException {
135    UTIL.ensureSomeNonStoppedRegionServersAvailable(2);
136  }
137
138  private ServerCrashProcedure getSCPForServer(ServerName serverName) throws IOException {
139    return UTIL.getMiniHBaseCluster().getMaster().getProcedures().stream()
140      .filter(p -> p instanceof ServerCrashProcedure).map(p -> (ServerCrashProcedure) p)
141      .filter(p -> p.getServerName().equals(serverName)).findFirst().orElse(null);
142  }
143
144  private Matcher<Procedure<MasterProcedureEnv>> subProcOf(Procedure<MasterProcedureEnv> proc) {
145    return new BaseMatcher<Procedure<MasterProcedureEnv>>() {
146
147      @Override
148      public boolean matches(Object item) {
149        if (!(item instanceof Procedure)) {
150          return false;
151        }
152        Procedure<?> p = (Procedure<?>) item;
153        return p.hasParent() && p.getRootProcId() == proc.getProcId();
154      }
155
156      @Override
157      public void describeTo(Description description) {
158        description.appendText("sub procedure of(").appendValue(proc).appendText(")");
159      }
160    };
161  }
162
163  @Test
164  public void testFailAndRollback() throws Exception {
165    HRegionServer rsWithMeta = UTIL.getRSForFirstRegionInTable(TableName.META_TABLE_NAME);
166    UTIL.getMiniHBaseCluster().killRegionServer(rsWithMeta.getServerName());
167    UTIL.waitFor(15000, () -> getSCPForServer(rsWithMeta.getServerName()) != null);
168    ServerCrashProcedure scp = getSCPForServer(rsWithMeta.getServerName());
169    ProcedureExecutor<MasterProcedureEnv> procExec =
170      UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
171    // wait for the procedure to stop, as we inject a code bug and also set kill before store update
172    UTIL.waitFor(30000, () -> !procExec.isRunning());
173    // make sure that finally we could successfully rollback the procedure
174    while (scp.getState() != ProcedureState.FAILED || !procExec.isRunning()) {
175      MasterProcedureTestingUtility.restartMasterProcedureExecutor(procExec);
176      ProcedureTestingUtility.waitProcedure(procExec, scp);
177    }
178    assertEquals(scp.getState(), ProcedureState.FAILED);
179    assertThat(scp.getException().getMessage(), containsString("inject code bug"));
180    // make sure all sub procedures are cleaned up
181    assertThat(procExec.getProcedures(), everyItem(not(subProcOf(scp))));
182  }
183}