001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.HBaseClassTestRule;
026import org.apache.hadoop.hbase.HBaseTestingUtility;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.LocalHBaseCluster;
029import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer;
030import org.apache.hadoop.hbase.ServerName;
031import org.apache.hadoop.hbase.master.LoadBalancer;
032import org.apache.hadoop.hbase.master.ServerManager;
033import org.apache.hadoop.hbase.testclassification.MediumTests;
034import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
035import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
036import org.apache.zookeeper.KeeperException;
037import org.junit.After;
038import org.junit.Before;
039import org.junit.ClassRule;
040import org.junit.Test;
041import org.junit.experimental.categories.Category;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045@Category(MediumTests.class)
046public class TestRegionServerReportForDuty {
047
048  @ClassRule
049  public static final HBaseClassTestRule CLASS_RULE =
050      HBaseClassTestRule.forClass(TestRegionServerReportForDuty.class);
051
052  private static final Logger LOG = LoggerFactory.getLogger(TestRegionServerReportForDuty.class);
053
054  private static final long SLEEP_INTERVAL = 500;
055
056  private HBaseTestingUtility testUtil;
057  private LocalHBaseCluster cluster;
058  private RegionServerThread rs;
059  private RegionServerThread rs2;
060  private MasterThread master;
061  private MasterThread backupMaster;
062
063  @Before
064  public void setUp() throws Exception {
065    testUtil = new HBaseTestingUtility();
066    testUtil.startMiniDFSCluster(1);
067    testUtil.startMiniZKCluster(1);
068    testUtil.createRootDir();
069    cluster = new LocalHBaseCluster(testUtil.getConfiguration(), 0, 0);
070  }
071
072  @After
073  public void tearDown() throws Exception {
074    cluster.shutdown();
075    cluster.join();
076    testUtil.shutdownMiniZKCluster();
077    testUtil.shutdownMiniDFSCluster();
078  }
079
080  /**
081   * Tests region sever reportForDuty with backup master becomes primary master after
082   * the first master goes away.
083   */
084  @Test
085  public void testReportForDutyWithMasterChange() throws Exception {
086
087    // Start a master and wait for it to become the active/primary master.
088    // Use a random unique port
089    cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort());
090    // master has a rs. defaultMinToStart = 2
091    boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(testUtil.getConfiguration());
092    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, tablesOnMaster? 2: 1);
093    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, tablesOnMaster? 2: 1);
094    master = cluster.addMaster();
095    rs = cluster.addRegionServer();
096    LOG.debug("Starting master: " + master.getMaster().getServerName());
097    master.start();
098    rs.start();
099
100    waitForClusterOnline(master);
101
102    // Add a 2nd region server
103    cluster.getConfiguration().set(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName());
104    rs2 = cluster.addRegionServer();
105    // Start the region server. This region server will refresh RPC connection
106    // from the current active master to the next active master before completing
107    // reportForDuty
108    LOG.debug("Starting 2nd region server: " + rs2.getRegionServer().getServerName());
109    rs2.start();
110
111    waitForSecondRsStarted();
112
113    // Stop the current master.
114    master.getMaster().stop("Stopping master");
115
116    // Start a new master and use another random unique port
117    // Also let it wait for exactly 2 region severs to report in.
118    cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort());
119    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
120      tablesOnMaster? 3: 2);
121    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART,
122      tablesOnMaster? 3: 2);
123    backupMaster = cluster.addMaster();
124    LOG.debug("Starting new master: " + backupMaster.getMaster().getServerName());
125    backupMaster.start();
126
127    waitForClusterOnline(backupMaster);
128
129    // Do some checking/asserts here.
130    assertTrue(backupMaster.getMaster().isActiveMaster());
131    assertTrue(backupMaster.getMaster().isInitialized());
132    assertEquals(backupMaster.getMaster().getServerManager().getOnlineServersList().size(),
133      tablesOnMaster? 3: 2);
134
135  }
136
137  private void waitForClusterOnline(MasterThread master) throws InterruptedException {
138    while (true) {
139      if (master.getMaster().isInitialized()) {
140        break;
141      }
142      Thread.sleep(SLEEP_INTERVAL);
143      LOG.debug("Waiting for master to come online ...");
144    }
145    rs.waitForServerOnline();
146  }
147
148  private void waitForSecondRsStarted() throws InterruptedException {
149    while (true) {
150      if (((MyRegionServer) rs2.getRegionServer()).getRpcStubCreatedFlag() == true) {
151        break;
152      }
153      Thread.sleep(SLEEP_INTERVAL);
154      LOG.debug("Waiting 2nd RS to be started ...");
155    }
156  }
157
158  // Create a Region Server that provide a hook so that we can wait for the master switch over
159  // before continuing reportForDuty to the mater.
160  // The idea is that we get a RPC connection to the first active master, then we wait.
161  // The first master goes down, the second master becomes the active master. The region
162  // server continues reportForDuty. It should succeed with the new master.
163  public static class MyRegionServer extends MiniHBaseClusterRegionServer {
164
165    private ServerName sn;
166    // This flag is to make sure this rs has obtained the rpcStub to the first master.
167    // The first master will go down after this.
168    private boolean rpcStubCreatedFlag = false;
169    private boolean masterChanged = false;
170
171    public MyRegionServer(Configuration conf) throws IOException, KeeperException,
172        InterruptedException {
173      super(conf);
174    }
175
176    @Override
177    protected synchronized ServerName createRegionServerStatusStub(boolean refresh) {
178      sn = super.createRegionServerStatusStub(refresh);
179      rpcStubCreatedFlag = true;
180
181      // Wait for master switch over. Only do this for the second region server.
182      while (!masterChanged) {
183        ServerName newSn = super.getMasterAddressTracker().getMasterAddress(true);
184        if (newSn != null && !newSn.equals(sn)) {
185          masterChanged = true;
186          break;
187        }
188        try {
189          Thread.sleep(SLEEP_INTERVAL);
190        } catch (InterruptedException e) {
191          return null;
192        }
193        LOG.debug("Waiting for master switch over ... ");
194      }
195      return sn;
196    }
197
198    public boolean getRpcStubCreatedFlag() {
199      return rpcStubCreatedFlag;
200    }
201  }
202}