001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.assignment;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021
022import java.io.IOException;
023import java.net.ConnectException;
024import java.util.List;
025import java.util.concurrent.CopyOnWriteArrayList;
026import java.util.concurrent.CountDownLatch;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.HBaseTestingUtil;
029import org.apache.hadoop.hbase.PleaseHoldException;
030import org.apache.hadoop.hbase.ServerName;
031import org.apache.hadoop.hbase.SingleProcessHBaseCluster.MiniHBaseClusterRegionServer;
032import org.apache.hadoop.hbase.StartTestingClusterOption;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.RegionInfo;
035import org.apache.hadoop.hbase.master.HMaster;
036import org.apache.hadoop.hbase.master.MasterServices;
037import org.apache.hadoop.hbase.master.RegionPlan;
038import org.apache.hadoop.hbase.master.RegionServerList;
039import org.apache.hadoop.hbase.master.RegionState;
040import org.apache.hadoop.hbase.master.ServerManager;
041import org.apache.hadoop.hbase.master.region.MasterRegion;
042import org.apache.hadoop.hbase.regionserver.HRegionServer;
043import org.apache.hadoop.hbase.regionserver.RSRpcServices;
044import org.apache.hadoop.hbase.testclassification.LargeTests;
045import org.apache.hadoop.hbase.testclassification.MasterTests;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
048import org.junit.jupiter.api.AfterAll;
049import org.junit.jupiter.api.BeforeAll;
050import org.junit.jupiter.api.Tag;
051import org.junit.jupiter.api.Test;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
056import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
057
058import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
060import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
065
066/**
067 * Testcase for HBASE-21811.
068 */
069@Tag(MasterTests.TAG)
070@Tag(LargeTests.TAG)
071public class TestWakeUpUnexpectedProcedure {
072
073  private static final Logger LOG = LoggerFactory.getLogger(TestWakeUpUnexpectedProcedure.class);
074
075  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
076
077  private static TableName NAME = TableName.valueOf("Assign");
078
079  private static final List<ServerName> EXCLUDE_SERVERS = new CopyOnWriteArrayList<>();
080
081  private static byte[] CF = Bytes.toBytes("cf");
082
083  private static volatile ServerName SERVER_TO_KILL;
084
085  private static volatile CountDownLatch ARRIVE_EXEC_PROC;
086
087  private static volatile CountDownLatch RESUME_EXEC_PROC;
088
089  private static volatile CountDownLatch RESUME_IS_SERVER_ONLINE;
090
091  private static volatile CountDownLatch ARRIVE_REPORT;
092
093  private static volatile CountDownLatch RESUME_REPORT;
094
095  private static final class RSRpcServicesForTest extends RSRpcServices {
096
097    public RSRpcServicesForTest(HRegionServer rs) throws IOException {
098      super(rs);
099    }
100
101    @Override
102    public ExecuteProceduresResponse executeProcedures(RpcController controller,
103      ExecuteProceduresRequest request) throws ServiceException {
104      if (request.getOpenRegionCount() > 0) {
105        if (ARRIVE_EXEC_PROC != null) {
106          SERVER_TO_KILL = getServer().getServerName();
107          ARRIVE_EXEC_PROC.countDown();
108          ARRIVE_EXEC_PROC = null;
109          try {
110            RESUME_EXEC_PROC.await();
111          } catch (InterruptedException e) {
112            throw new RuntimeException(e);
113          }
114          throw new ServiceException(new ConnectException("Inject error"));
115        }
116      }
117      return super.executeProcedures(controller, request);
118    }
119  }
120
121  public static final class RSForTest extends MiniHBaseClusterRegionServer {
122
123    public RSForTest(Configuration conf) throws IOException, InterruptedException {
124      super(conf);
125    }
126
127    @Override
128    protected RSRpcServices createRpcServices() throws IOException {
129      return new RSRpcServicesForTest(this);
130    }
131  }
132
133  private static final class AMForTest extends AssignmentManager {
134
135    public AMForTest(MasterServices master, MasterRegion masterRegion) {
136      super(master, masterRegion);
137    }
138
139    @Override
140    public ReportRegionStateTransitionResponse reportRegionStateTransition(
141      ReportRegionStateTransitionRequest req) throws PleaseHoldException {
142      RegionStateTransition rst = req.getTransition(0);
143      if (
144        rst.getTransitionCode() == TransitionCode.OPENED
145          && ProtobufUtil.toTableName(rst.getRegionInfo(0).getTableName()).equals(NAME)
146      ) {
147        CountDownLatch arrive = ARRIVE_REPORT;
148        if (ARRIVE_REPORT != null) {
149          ARRIVE_REPORT = null;
150          arrive.countDown();
151          // so we will choose another rs next time
152          EXCLUDE_SERVERS.add(ProtobufUtil.toServerName(req.getServer()));
153          try {
154            RESUME_REPORT.await();
155          } catch (InterruptedException e) {
156            throw new RuntimeException();
157          }
158        }
159      }
160      return super.reportRegionStateTransition(req);
161    }
162  }
163
164  private static final class SMForTest extends ServerManager {
165
166    public SMForTest(MasterServices master, RegionServerList storage) {
167      super(master, storage);
168    }
169
170    @Override
171    public boolean isServerOnline(ServerName serverName) {
172      ServerName toKill = SERVER_TO_KILL;
173      if (toKill != null && toKill.equals(serverName)) {
174        for (StackTraceElement ele : new Exception().getStackTrace()) {
175          // halt it is called from RSProcedureDispatcher, to delay the remoteCallFailed.
176          if ("scheduleForRetry".equals(ele.getMethodName())) {
177            if (RESUME_IS_SERVER_ONLINE != null) {
178              try {
179                RESUME_IS_SERVER_ONLINE.await();
180              } catch (InterruptedException e) {
181                throw new RuntimeException(e);
182              }
183            }
184            break;
185          }
186        }
187      }
188      return super.isServerOnline(serverName);
189    }
190
191    @Override
192    public List<ServerName> createDestinationServersList() {
193      return super.createDestinationServersList(EXCLUDE_SERVERS);
194    }
195  }
196
197  public static final class HMasterForTest extends HMaster {
198
199    public HMasterForTest(Configuration conf) throws IOException {
200      super(conf);
201    }
202
203    @Override
204    protected AssignmentManager createAssignmentManager(MasterServices master,
205      MasterRegion masterRegion) {
206      return new AMForTest(master, masterRegion);
207    }
208
209    @Override
210    protected ServerManager createServerManager(MasterServices master, RegionServerList storage)
211      throws IOException {
212      setupClusterConnection();
213      return new SMForTest(master, storage);
214    }
215  }
216
217  @BeforeAll
218  public static void setUp() throws Exception {
219    UTIL.startMiniCluster(StartTestingClusterOption.builder().numMasters(1)
220      .masterClass(HMasterForTest.class).numRegionServers(3).rsClass(RSForTest.class).build());
221    UTIL.createTable(NAME, CF);
222    // Here the test region must not be hosted on the same rs with meta region.
223    // We have 3 RSes and only two regions(meta and the test region), so they will not likely to be
224    // hosted on the same RS.
225    UTIL.waitTableAvailable(NAME);
226    UTIL.getAdmin().balancerSwitch(false, true);
227  }
228
229  @AfterAll
230  public static void tearDown() throws Exception {
231    UTIL.shutdownMiniCluster();
232  }
233
234  @SuppressWarnings("FutureReturnValueIgnored")
235  @Test
236  public void test() throws Exception {
237    RegionInfo region = UTIL.getMiniHBaseCluster().getRegions(NAME).get(0).getRegionInfo();
238    AssignmentManager am = UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager();
239    RegionStateNode rsn = am.getRegionStates().getRegionStateNode(region);
240
241    ServerName sn = rsn.getRegionLocation();
242    RESUME_EXEC_PROC = new CountDownLatch(1);
243    ARRIVE_EXEC_PROC = new CountDownLatch(1);
244    RESUME_IS_SERVER_ONLINE = new CountDownLatch(1);
245
246    // reopen the region, and halt the executeProcedures method at RS side
247    am.moveAsync(new RegionPlan(region, sn, sn));
248    ARRIVE_EXEC_PROC.await();
249
250    RESUME_REPORT = new CountDownLatch(1);
251    ARRIVE_REPORT = new CountDownLatch(1);
252
253    // kill the region server
254    ServerName serverToKill = SERVER_TO_KILL;
255    UTIL.getMiniHBaseCluster().stopRegionServer(serverToKill);
256    RESUME_EXEC_PROC.countDown();
257
258    // wait until we are going to open the region on a new rs
259    ARRIVE_REPORT.await();
260
261    // resume the isServerOnline check, to let the rs procedure
262    RESUME_IS_SERVER_ONLINE.countDown();
263
264    // before HBASE-20811 the state could become OPEN, and this is why later the region will be
265    // assigned to two regionservers.
266    for (int i = 0; i < 15; i++) {
267      if (rsn.getState() == RegionState.State.OPEN) {
268        break;
269      }
270      Thread.sleep(1000);
271    }
272
273    // resume the old report
274    RESUME_REPORT.countDown();
275
276    // wait a bit to let the region to be online, it is not easy to write a condition for this so
277    // just sleep a while.
278    Thread.sleep(10000);
279
280    // confirm that the region is only on one rs
281    int count = 0;
282    for (RegionServerThread t : UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
283      if (!t.getRegionServer().getRegions(NAME).isEmpty()) {
284        LOG.info("{} is on {}", region, t.getRegionServer().getServerName());
285        count++;
286      }
287    }
288    assertEquals(1, count);
289  }
290}