001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotEquals;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.io.UncheckedIOException;
026import java.util.List;
027import java.util.Map;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.ServerName;
032import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
033import org.apache.hadoop.hbase.StartTestingClusterOption;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.RegionInfo;
036import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
037import org.apache.hadoop.hbase.testclassification.MasterTests;
038import org.apache.hadoop.hbase.testclassification.MediumTests;
039import org.apache.hadoop.hbase.util.JVMClusterUtil;
040import org.junit.ClassRule;
041import org.junit.Test;
042import org.junit.experimental.categories.Category;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046@Category({ MasterTests.class, MediumTests.class })
047public class TestRetainAssignmentOnRestart extends AbstractTestRestartCluster {
048
049  @ClassRule
050  public static final HBaseClassTestRule CLASS_RULE =
051      HBaseClassTestRule.forClass(TestRetainAssignmentOnRestart.class);
052
053  private static final Logger LOG = LoggerFactory.getLogger(TestRetainAssignmentOnRestart.class);
054
055  private static int NUM_OF_RS = 3;
056
057  public static final class HMasterForTest extends HMaster {
058
059    public HMasterForTest(Configuration conf) throws IOException {
060      super(conf);
061    }
062
063    @Override
064    protected void startProcedureExecutor() throws IOException {
065      // only start procedure executor when we have all the regionservers ready to take regions
066      new Thread(() -> {
067        for (;;) {
068          if (getServerManager().createDestinationServersList().size() == NUM_OF_RS) {
069            try {
070              HMasterForTest.super.startProcedureExecutor();
071            } catch (IOException e) {
072              throw new UncheckedIOException(e);
073            }
074          }
075          try {
076            Thread.sleep(1000);
077          } catch (InterruptedException e) {
078          }
079        }
080      }).start();
081    }
082  }
083
084  @Override
085  protected boolean splitWALCoordinatedByZk() {
086    return true;
087  }
088
089  /**
090   * This tests retaining assignments on a cluster restart
091   */
092  @Test
093  public void testRetainAssignmentOnClusterRestart() throws Exception {
094    setupCluster();
095    HMaster master = UTIL.getMiniHBaseCluster().getMaster();
096    SingleProcessHBaseCluster cluster = UTIL.getHBaseCluster();
097    List<JVMClusterUtil.RegionServerThread> threads = cluster.getLiveRegionServerThreads();
098    assertEquals(NUM_OF_RS, threads.size());
099    int[] rsPorts = new int[NUM_OF_RS];
100    for (int i = 0; i < NUM_OF_RS; i++) {
101      rsPorts[i] = threads.get(i).getRegionServer().getServerName().getPort();
102    }
103
104    // We don't have to use SnapshotOfRegionAssignmentFromMeta. We use it here because AM used to
105    // use it to load all user region placements
106    SnapshotOfRegionAssignmentFromMeta snapshot =
107        new SnapshotOfRegionAssignmentFromMeta(master.getConnection());
108    snapshot.initialize();
109    Map<RegionInfo, ServerName> regionToRegionServerMap = snapshot.getRegionToRegionServerMap();
110    for (ServerName serverName : regionToRegionServerMap.values()) {
111      boolean found = false; // Test only, no need to optimize
112      for (int k = 0; k < NUM_OF_RS && !found; k++) {
113        found = serverName.getPort() == rsPorts[k];
114      }
115      assertTrue(found);
116    }
117
118    LOG.info("\n\nShutting down HBase cluster");
119    cluster.stopMaster(0);
120    cluster.shutdown();
121    cluster.waitUntilShutDown();
122
123    LOG.info("\n\nSleeping a bit");
124    Thread.sleep(2000);
125
126    LOG.info("\n\nStarting cluster the second time with the same ports");
127    cluster.getConf().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 3);
128    master = cluster.startMaster().getMaster();
129    for (int i = 0; i < NUM_OF_RS; i++) {
130      cluster.getConf().setInt(HConstants.REGIONSERVER_PORT, rsPorts[i]);
131      cluster.startRegionServer();
132    }
133
134    ensureServersWithSamePort(master, rsPorts);
135
136    // Wait till master is initialized and all regions are assigned
137    for (TableName TABLE : TABLES) {
138      UTIL.waitTableAvailable(TABLE);
139    }
140    UTIL.waitUntilNoRegionsInTransition(60000);
141
142    snapshot = new SnapshotOfRegionAssignmentFromMeta(master.getConnection());
143    snapshot.initialize();
144    Map<RegionInfo, ServerName> newRegionToRegionServerMap = snapshot.getRegionToRegionServerMap();
145    assertEquals(regionToRegionServerMap.size(), newRegionToRegionServerMap.size());
146    for (Map.Entry<RegionInfo, ServerName> entry : newRegionToRegionServerMap.entrySet()) {
147      ServerName oldServer = regionToRegionServerMap.get(entry.getKey());
148      ServerName currentServer = entry.getValue();
149      LOG.info(
150        "Key=" + entry.getKey() + " oldServer=" + oldServer + ", currentServer=" + currentServer);
151      assertEquals(entry.getKey().toString(), oldServer.getAddress(), currentServer.getAddress());
152      assertNotEquals(oldServer.getStartcode(), currentServer.getStartcode());
153    }
154  }
155
156  /**
157   * This tests retaining assignments on a single node restart
158   */
159  @Test
160  public void testRetainAssignmentOnSingleRSRestart() throws Exception {
161    setupCluster();
162    HMaster master = UTIL.getMiniHBaseCluster().getMaster();
163    SingleProcessHBaseCluster cluster = UTIL.getHBaseCluster();
164    List<JVMClusterUtil.RegionServerThread> threads = cluster.getLiveRegionServerThreads();
165    assertEquals(NUM_OF_RS, threads.size());
166    int[] rsPorts = new int[NUM_OF_RS];
167    for (int i = 0; i < NUM_OF_RS; i++) {
168      rsPorts[i] = threads.get(i).getRegionServer().getServerName().getPort();
169    }
170
171    // We don't have to use SnapshotOfRegionAssignmentFromMeta. We use it here because AM used to
172    // use it to load all user region placements
173    SnapshotOfRegionAssignmentFromMeta snapshot =
174        new SnapshotOfRegionAssignmentFromMeta(master.getConnection());
175    snapshot.initialize();
176    Map<RegionInfo, ServerName> regionToRegionServerMap = snapshot.getRegionToRegionServerMap();
177    for (ServerName serverName : regionToRegionServerMap.values()) {
178      boolean found = false; // Test only, no need to optimize
179      for (int k = 0; k < NUM_OF_RS && !found; k++) {
180        found = serverName.getPort() == rsPorts[k];
181      }
182      assertTrue(found);
183    }
184
185    // Server to be restarted
186    ServerName deadRS = threads.get(0).getRegionServer().getServerName();
187    LOG.info("\n\nStopping HMaster and {} server", deadRS);
188    // Stopping master first so that region server SCP will not be initiated
189    cluster.stopMaster(0);
190    cluster.waitForMasterToStop(master.getServerName(), 5000);
191    cluster.stopRegionServer(deadRS);
192
193    LOG.info("\n\nSleeping a bit");
194    Thread.sleep(2000);
195
196    LOG.info("\n\nStarting HMaster and region server {} second time with the same port", deadRS);
197    cluster.getConf().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 3);
198    master = cluster.startMaster().getMaster();
199    cluster.getConf().setInt(HConstants.REGIONSERVER_PORT, deadRS.getPort());
200    cluster.startRegionServer();
201
202    ensureServersWithSamePort(master, rsPorts);
203
204    // Wait till master is initialized and all regions are assigned
205    for (TableName TABLE : TABLES) {
206      UTIL.waitTableAvailable(TABLE);
207    }
208    UTIL.waitUntilNoRegionsInTransition(60000);
209
210    snapshot = new SnapshotOfRegionAssignmentFromMeta(master.getConnection());
211    snapshot.initialize();
212    Map<RegionInfo, ServerName> newRegionToRegionServerMap = snapshot.getRegionToRegionServerMap();
213    assertEquals(regionToRegionServerMap.size(), newRegionToRegionServerMap.size());
214    for (Map.Entry<RegionInfo, ServerName> entry : newRegionToRegionServerMap.entrySet()) {
215      ServerName oldServer = regionToRegionServerMap.get(entry.getKey());
216      ServerName currentServer = entry.getValue();
217      LOG.info(
218        "Key=" + entry.getKey() + " oldServer=" + oldServer + ", currentServer=" + currentServer);
219      assertEquals(entry.getKey().toString(), oldServer.getAddress(), currentServer.getAddress());
220
221      if (deadRS.getPort() == oldServer.getPort()) {
222        // Restarted RS start code wont be same
223        assertNotEquals(oldServer.getStartcode(), currentServer.getStartcode());
224      } else {
225        assertEquals(oldServer.getStartcode(), currentServer.getStartcode());
226      }
227    }
228  }
229
230  private void setupCluster() throws Exception, IOException, InterruptedException {
231    // Set Zookeeper based connection registry since we will stop master and start a new master
232    // without populating the underlying config for the connection.
233    UTIL.getConfiguration().set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
234      HConstants.ZK_CONNECTION_REGISTRY_CLASS);
235    // Enable retain assignment during ServerCrashProcedure
236    UTIL.getConfiguration().setBoolean(ServerCrashProcedure.MASTER_SCP_RETAIN_ASSIGNMENT, true);
237    UTIL.startMiniCluster(StartTestingClusterOption.builder().masterClass(HMasterForTest.class)
238      .numRegionServers(NUM_OF_RS).build());
239
240    // Turn off balancer
241    UTIL.getMiniHBaseCluster().getMaster().getMasterRpcServices().synchronousBalanceSwitch(false);
242
243    LOG.info("\n\nCreating tables");
244    for (TableName TABLE : TABLES) {
245      UTIL.createTable(TABLE, FAMILY);
246    }
247    for (TableName TABLE : TABLES) {
248      UTIL.waitTableEnabled(TABLE);
249    }
250
251    UTIL.getMiniHBaseCluster().getMaster();
252    UTIL.waitUntilNoRegionsInTransition(60000);
253  }
254
255  private void ensureServersWithSamePort(HMaster master, int[] rsPorts) {
256    // Make sure live regionservers are on the same host/port
257    List<ServerName> localServers = master.getServerManager().getOnlineServersList();
258    assertEquals(NUM_OF_RS, localServers.size());
259    for (int i = 0; i < NUM_OF_RS; i++) {
260      boolean found = false;
261      for (ServerName serverName : localServers) {
262        if (serverName.getPort() == rsPorts[i]) {
263          found = true;
264          break;
265        }
266      }
267      assertTrue(found);
268    }
269  }
270}