001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.apache.hadoop.hbase.master.assignment.AssignmentManager.FORCE_REGION_RETAINMENT;
021import static org.apache.hadoop.hbase.master.assignment.AssignmentManager.FORCE_REGION_RETAINMENT_WAIT_INTERVAL;
022import static org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure.MASTER_SCP_RETAIN_ASSIGNMENT;
023import static org.junit.jupiter.api.Assertions.assertEquals;
024import static org.junit.jupiter.api.Assertions.assertNotEquals;
025import static org.junit.jupiter.api.Assertions.assertTrue;
026
027import java.io.IOException;
028import java.io.UncheckedIOException;
029import java.util.List;
030import java.util.Map;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.ServerName;
034import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
035import org.apache.hadoop.hbase.StartTestingClusterOption;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.RegionInfo;
038import org.apache.hadoop.hbase.testclassification.MasterTests;
039import org.apache.hadoop.hbase.testclassification.MediumTests;
040import org.apache.hadoop.hbase.util.JVMClusterUtil;
041import org.junit.jupiter.api.Tag;
042import org.junit.jupiter.api.Test;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046@Tag(MasterTests.TAG)
047@Tag(MediumTests.TAG)
048public class TestRetainAssignmentOnRestart extends AbstractTestRestartCluster {
049
050  private static final Logger LOG = LoggerFactory.getLogger(TestRetainAssignmentOnRestart.class);
051
052  private static int NUM_OF_RS = 3;
053
054  public static final class HMasterForTest extends HMaster {
055
056    public HMasterForTest(Configuration conf) throws IOException {
057      super(conf);
058    }
059
060    @Override
061    protected void startProcedureExecutor() throws IOException {
062      // only start procedure executor when we have all the regionservers ready to take regions
063      new Thread(() -> {
064        for (;;) {
065          if (getServerManager().createDestinationServersList().size() == NUM_OF_RS) {
066            try {
067              HMasterForTest.super.startProcedureExecutor();
068            } catch (IOException e) {
069              throw new UncheckedIOException(e);
070            }
071            break;
072          }
073          try {
074            Thread.sleep(1000);
075          } catch (InterruptedException e) {
076          }
077        }
078      }).start();
079    }
080  }
081
082  @Override
083  protected boolean splitWALCoordinatedByZk() {
084    return true;
085  }
086
087  /**
088   * This tests retaining assignments on a cluster restart
089   */
090  @Test
091  public void testRetainAssignmentOnClusterRestart() throws Exception {
092    setupCluster();
093    HMaster master = UTIL.getMiniHBaseCluster().getMaster();
094    SingleProcessHBaseCluster cluster = UTIL.getHBaseCluster();
095    List<JVMClusterUtil.RegionServerThread> threads = cluster.getLiveRegionServerThreads();
096    assertEquals(NUM_OF_RS, threads.size());
097    int[] rsPorts = new int[NUM_OF_RS];
098    for (int i = 0; i < NUM_OF_RS; i++) {
099      rsPorts[i] = threads.get(i).getRegionServer().getServerName().getPort();
100    }
101
102    // We don't have to use SnapshotOfRegionAssignmentFromMeta. We use it here because AM used to
103    // use it to load all user region placements
104    SnapshotOfRegionAssignmentFromMeta snapshot =
105      new SnapshotOfRegionAssignmentFromMeta(master.getConnection());
106    snapshot.initialize();
107    Map<RegionInfo, ServerName> regionToRegionServerMap = snapshot.getRegionToRegionServerMap();
108    for (ServerName serverName : regionToRegionServerMap.values()) {
109      boolean found = false; // Test only, no need to optimize
110      for (int k = 0; k < NUM_OF_RS && !found; k++) {
111        found = serverName.getPort() == rsPorts[k];
112      }
113      assertTrue(found);
114    }
115
116    LOG.info("\n\nShutting down HBase cluster");
117    cluster.stopMaster(0);
118    cluster.shutdown();
119    cluster.waitUntilShutDown();
120
121    LOG.info("\n\nSleeping a bit");
122    Thread.sleep(2000);
123
124    LOG.info("\n\nStarting cluster the second time with the same ports");
125    cluster.getConf().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 3);
126    master = cluster.startMaster().getMaster();
127    for (int i = 0; i < NUM_OF_RS; i++) {
128      cluster.getConf().setInt(HConstants.REGIONSERVER_PORT, rsPorts[i]);
129      cluster.startRegionServer();
130    }
131
132    ensureServersWithSamePort(master, rsPorts);
133
134    // Wait till master is initialized and all regions are assigned
135    for (TableName TABLE : TABLES) {
136      UTIL.waitTableAvailable(TABLE);
137    }
138    UTIL.waitUntilNoRegionsInTransition(60000);
139
140    snapshot = new SnapshotOfRegionAssignmentFromMeta(master.getConnection());
141    snapshot.initialize();
142    Map<RegionInfo, ServerName> newRegionToRegionServerMap = snapshot.getRegionToRegionServerMap();
143    assertEquals(regionToRegionServerMap.size(), newRegionToRegionServerMap.size());
144    for (Map.Entry<RegionInfo, ServerName> entry : newRegionToRegionServerMap.entrySet()) {
145      ServerName oldServer = regionToRegionServerMap.get(entry.getKey());
146      ServerName currentServer = entry.getValue();
147      LOG.info(
148        "Key=" + entry.getKey() + " oldServer=" + oldServer + ", currentServer=" + currentServer);
149      assertEquals(oldServer.getAddress(), currentServer.getAddress(), entry.getKey().toString());
150      assertNotEquals(oldServer.getStartcode(), currentServer.getStartcode());
151    }
152  }
153
154  /**
155   * This tests retaining assignments on a single node restart
156   */
157  @Test
158  public void testRetainAssignmentOnSingleRSRestart() throws Exception {
159    setupCluster();
160    HMaster master = UTIL.getMiniHBaseCluster().getMaster();
161    SingleProcessHBaseCluster cluster = UTIL.getHBaseCluster();
162    List<JVMClusterUtil.RegionServerThread> threads = cluster.getLiveRegionServerThreads();
163    assertEquals(NUM_OF_RS, threads.size());
164    int[] rsPorts = new int[NUM_OF_RS];
165    for (int i = 0; i < NUM_OF_RS; i++) {
166      rsPorts[i] = threads.get(i).getRegionServer().getServerName().getPort();
167    }
168
169    // We don't have to use SnapshotOfRegionAssignmentFromMeta. We use it here because AM used to
170    // use it to load all user region placements
171    SnapshotOfRegionAssignmentFromMeta snapshot =
172      new SnapshotOfRegionAssignmentFromMeta(master.getConnection());
173    snapshot.initialize();
174    Map<RegionInfo, ServerName> regionToRegionServerMap = snapshot.getRegionToRegionServerMap();
175    for (ServerName serverName : regionToRegionServerMap.values()) {
176      boolean found = false; // Test only, no need to optimize
177      for (int k = 0; k < NUM_OF_RS && !found; k++) {
178        found = serverName.getPort() == rsPorts[k];
179      }
180      assertTrue(found);
181    }
182
183    // Server to be restarted
184    ServerName deadRS = threads.get(0).getRegionServer().getServerName();
185    LOG.info("\n\nStopping HMaster and {} server", deadRS);
186    // Stopping master first so that region server SCP will not be initiated
187    cluster.stopMaster(0);
188    cluster.waitForMasterToStop(master.getServerName(), 5000);
189    cluster.stopRegionServer(deadRS);
190    cluster.waitForRegionServerToStop(deadRS, 5000);
191
192    LOG.info("\n\nSleeping a bit");
193    Thread.sleep(2000);
194
195    LOG.info("\n\nStarting HMaster and region server {} second time with the same port", deadRS);
196    cluster.getConf().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 3);
197    master = cluster.startMaster().getMaster();
198    cluster.getConf().setInt(HConstants.REGIONSERVER_PORT, deadRS.getPort());
199    cluster.startRegionServer();
200
201    ensureServersWithSamePort(master, rsPorts);
202
203    // Wait till master is initialized and all regions are assigned
204    for (TableName TABLE : TABLES) {
205      UTIL.waitTableAvailable(TABLE);
206    }
207    UTIL.waitUntilNoRegionsInTransition(60000);
208
209    snapshot = new SnapshotOfRegionAssignmentFromMeta(master.getConnection());
210    snapshot.initialize();
211    Map<RegionInfo, ServerName> newRegionToRegionServerMap = snapshot.getRegionToRegionServerMap();
212    assertEquals(regionToRegionServerMap.size(), newRegionToRegionServerMap.size());
213    for (Map.Entry<RegionInfo, ServerName> entry : newRegionToRegionServerMap.entrySet()) {
214      ServerName oldServer = regionToRegionServerMap.get(entry.getKey());
215      ServerName currentServer = entry.getValue();
216      LOG.info(
217        "Key=" + entry.getKey() + " oldServer=" + oldServer + ", currentServer=" + currentServer);
218      assertEquals(oldServer.getAddress(), currentServer.getAddress(), entry.getKey().toString());
219
220      if (deadRS.getPort() == oldServer.getPort()) {
221        // Restarted RS start code wont be same
222        assertNotEquals(oldServer.getStartcode(), currentServer.getStartcode());
223      } else {
224        assertEquals(oldServer.getStartcode(), currentServer.getStartcode());
225      }
226    }
227  }
228
229  /**
230   * This tests the force retaining assignments upon an RS restart, even when master triggers an SCP
231   */
232  @Test
233  public void testForceRetainAssignment() throws Exception {
234    UTIL.getConfiguration().setBoolean(FORCE_REGION_RETAINMENT, true);
235    UTIL.getConfiguration().setLong(FORCE_REGION_RETAINMENT_WAIT_INTERVAL, 50);
236    setupCluster();
237    HMaster master = UTIL.getMiniHBaseCluster().getMaster();
238    SingleProcessHBaseCluster cluster = UTIL.getHBaseCluster();
239    List<JVMClusterUtil.RegionServerThread> threads = cluster.getLiveRegionServerThreads();
240    assertEquals(NUM_OF_RS, threads.size());
241    int[] rsPorts = new int[NUM_OF_RS];
242    for (int i = 0; i < NUM_OF_RS; i++) {
243      rsPorts[i] = threads.get(i).getRegionServer().getServerName().getPort();
244    }
245
246    // We don't have to use SnapshotOfRegionAssignmentFromMeta. We use it here because AM used to
247    // use it to load all user region placements
248    SnapshotOfRegionAssignmentFromMeta snapshot =
249      new SnapshotOfRegionAssignmentFromMeta(master.getConnection());
250    snapshot.initialize();
251    Map<RegionInfo, ServerName> regionToRegionServerMap = snapshot.getRegionToRegionServerMap();
252    for (ServerName serverName : regionToRegionServerMap.values()) {
253      boolean found = false; // Test only, no need to optimize
254      for (int k = 0; k < NUM_OF_RS && !found; k++) {
255        found = serverName.getPort() == rsPorts[k];
256      }
257      LOG.info("Server {} has regions? {}", serverName, found);
258      assertTrue(found);
259    }
260
261    // Server to be restarted
262    ServerName deadRS = threads.get(0).getRegionServer().getServerName();
263    LOG.info("\n\nStopping {} server", deadRS);
264    cluster.stopRegionServer(deadRS);
265
266    LOG.info("\n\nSleeping a bit");
267    Thread.sleep(2000);
268
269    LOG.info("\n\nStarting region server {} second time with the same port", deadRS);
270    cluster.getConf().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 3);
271    cluster.getConf().setInt(HConstants.REGIONSERVER_PORT, deadRS.getPort());
272    cluster.startRegionServer();
273
274    ensureServersWithSamePort(master, rsPorts);
275
276    // Wait till master is initialized and all regions are assigned
277    for (TableName TABLE : TABLES) {
278      UTIL.waitTableAvailable(TABLE);
279    }
280    UTIL.waitUntilNoRegionsInTransition(60000);
281    snapshot = new SnapshotOfRegionAssignmentFromMeta(master.getConnection());
282    snapshot.initialize();
283    Map<RegionInfo, ServerName> newRegionToRegionServerMap = snapshot.getRegionToRegionServerMap();
284    assertEquals(regionToRegionServerMap.size(), newRegionToRegionServerMap.size());
285    for (Map.Entry<RegionInfo, ServerName> entry : newRegionToRegionServerMap.entrySet()) {
286      ServerName oldServer = regionToRegionServerMap.get(entry.getKey());
287      ServerName currentServer = entry.getValue();
288      LOG.info(
289        "Key=" + entry.getKey() + " oldServer=" + oldServer + ", currentServer=" + currentServer);
290      assertEquals(oldServer.getAddress(), currentServer.getAddress(), entry.getKey().toString());
291
292      if (deadRS.getPort() == oldServer.getPort()) {
293        // Restarted RS start code wont be same
294        assertNotEquals(oldServer.getStartcode(), currentServer.getStartcode());
295      } else {
296        assertEquals(oldServer.getStartcode(), currentServer.getStartcode());
297      }
298    }
299  }
300
301  private void setupCluster() throws Exception, IOException, InterruptedException {
302    // Set Zookeeper based connection registry since we will stop master and start a new master
303    // without populating the underlying config for the connection.
304    UTIL.getConfiguration().set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
305      HConstants.ZK_CONNECTION_REGISTRY_CLASS);
306    // Enable retain assignment during ServerCrashProcedure
307    UTIL.getConfiguration().setBoolean(MASTER_SCP_RETAIN_ASSIGNMENT, true);
308    UTIL.startMiniCluster(StartTestingClusterOption.builder().masterClass(HMasterForTest.class)
309      .numRegionServers(NUM_OF_RS).build());
310
311    // Turn off balancer
312    UTIL.getMiniHBaseCluster().getMaster().getMasterRpcServices().synchronousBalanceSwitch(false);
313
314    LOG.info("\n\nCreating tables");
315    for (TableName TABLE : TABLES) {
316      UTIL.createTable(TABLE, FAMILY);
317    }
318    for (TableName TABLE : TABLES) {
319      UTIL.waitTableEnabled(TABLE);
320    }
321
322    UTIL.getMiniHBaseCluster().getMaster();
323    UTIL.waitUntilNoRegionsInTransition(60000);
324  }
325
326  private void ensureServersWithSamePort(HMaster master, int[] rsPorts) {
327    // Make sure live regionservers are on the same host/port
328    List<ServerName> localServers = master.getServerManager().getOnlineServersList();
329    assertEquals(NUM_OF_RS, localServers.size());
330    for (int i = 0; i < NUM_OF_RS; i++) {
331      boolean found = false;
332      for (ServerName serverName : localServers) {
333        if (serverName.getPort() == rsPorts[i]) {
334          found = true;
335          break;
336        }
337      }
338      assertTrue(found);
339    }
340  }
341}