001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.io.StringWriter;
025import java.util.concurrent.ScheduledThreadPoolExecutor;
026import java.util.concurrent.TimeUnit;
027import org.apache.commons.lang3.StringUtils;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtility;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.LocalHBaseCluster;
033import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
036import org.apache.hadoop.hbase.master.HMaster;
037import org.apache.hadoop.hbase.master.LoadBalancer;
038import org.apache.hadoop.hbase.master.ServerManager;
039import org.apache.hadoop.hbase.testclassification.LargeTests;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
042import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
043import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
044import org.apache.hadoop.hbase.util.Threads;
045import org.apache.log4j.Appender;
046import org.apache.log4j.Layout;
047import org.apache.log4j.PatternLayout;
048import org.apache.log4j.WriterAppender;
049import org.apache.zookeeper.KeeperException;
050import org.junit.After;
051import org.junit.Before;
052import org.junit.ClassRule;
053import org.junit.Test;
054import org.junit.experimental.categories.Category;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058@Category(LargeTests.class)
059public class TestRegionServerReportForDuty {
060
061  @ClassRule
062  public static final HBaseClassTestRule CLASS_RULE =
063      HBaseClassTestRule.forClass(TestRegionServerReportForDuty.class);
064
065  private static final Logger LOG = LoggerFactory.getLogger(TestRegionServerReportForDuty.class);
066
067  private static final long SLEEP_INTERVAL = 500;
068
069  private HBaseTestingUtility testUtil;
070  private LocalHBaseCluster cluster;
071  private RegionServerThread rs;
072  private RegionServerThread rs2;
073  private MasterThread master;
074  private MasterThread backupMaster;
075
076  @Before
077  public void setUp() throws Exception {
078    testUtil = new HBaseTestingUtility();
079    testUtil.startMiniDFSCluster(1);
080    testUtil.startMiniZKCluster(1);
081    testUtil.createRootDir();
082    cluster = new LocalHBaseCluster(testUtil.getConfiguration(), 0, 0);
083  }
084
085  @After
086  public void tearDown() throws Exception {
087    cluster.shutdown();
088    cluster.join();
089    testUtil.shutdownMiniZKCluster();
090    testUtil.shutdownMiniDFSCluster();
091  }
092
093  /**
094   * LogCapturer is similar to {@link org.apache.hadoop.test.GenericTestUtils.LogCapturer}
095   * except that this implementation has a default appender to the root logger.
096   * Hadoop 2.8+ supports the default appender in the LogCapture it ships and this can be replaced.
097   * TODO: This class can be removed after we upgrade Hadoop dependency.
098   */
099  static class LogCapturer {
100    private StringWriter sw = new StringWriter();
101    private WriterAppender appender;
102    private org.apache.log4j.Logger logger;
103
104    LogCapturer(org.apache.log4j.Logger logger) {
105      this.logger = logger;
106      Appender defaultAppender = org.apache.log4j.Logger.getRootLogger().getAppender("stdout");
107      if (defaultAppender == null) {
108        defaultAppender = org.apache.log4j.Logger.getRootLogger().getAppender("console");
109      }
110      final Layout layout = (defaultAppender == null) ? new PatternLayout() :
111          defaultAppender.getLayout();
112      this.appender = new WriterAppender(layout, sw);
113      this.logger.addAppender(this.appender);
114    }
115
116    String getOutput() {
117      return sw.toString();
118    }
119
120    public void stopCapturing() {
121      this.logger.removeAppender(this.appender);
122    }
123  }
124
125  /**
126   * This test HMaster class will always throw ServerNotRunningYetException if checked.
127   */
128  public static class NeverInitializedMaster extends HMaster {
129    public NeverInitializedMaster(Configuration conf) throws IOException {
130      super(conf);
131    }
132
133    @Override
134    protected void checkServiceStarted() throws ServerNotRunningYetException {
135      throw new ServerNotRunningYetException("Server is not running yet");
136    }
137  }
138
139  /**
140   * Tests region server should backoff to report for duty if master is not ready.
141   */
142  @Test
143  public void testReportForDutyBackoff() throws IOException, InterruptedException {
144    cluster.getConfiguration().set(HConstants.MASTER_IMPL, NeverInitializedMaster.class.getName());
145    master = cluster.addMaster();
146    master.start();
147
148    LogCapturer capturer = new LogCapturer(org.apache.log4j.Logger.getLogger(HRegionServer.class));
149    // Set sleep interval relatively low so that exponential backoff is more demanding.
150    int msginterval = 100;
151    cluster.getConfiguration().setInt("hbase.regionserver.msginterval", msginterval);
152    rs = cluster.addRegionServer();
153    rs.start();
154
155    int interval = 10_000;
156    Thread.sleep(interval);
157    capturer.stopCapturing();
158    String output = capturer.getOutput();
159    LOG.info("{}", output);
160    String failMsg = "reportForDuty failed;";
161    int count = StringUtils.countMatches(output, failMsg);
162
163    // Following asserts the actual retry number is in range (expectedRetry/2, expectedRetry*2).
164    // Ideally we can assert the exact retry count. We relax here to tolerate contention error.
165    int expectedRetry = (int)Math.ceil(Math.log(interval - msginterval));
166    assertTrue(String.format("reportForDuty retries %d times, less than expected min %d",
167        count, expectedRetry / 2), count > expectedRetry / 2);
168    assertTrue(String.format("reportForDuty retries %d times, more than expected max %d",
169        count, expectedRetry * 2), count < expectedRetry * 2);
170  }
171
172  /**
173   * Tests region sever reportForDuty with backup master becomes primary master after
174   * the first master goes away.
175   */
176  @Test
177  public void testReportForDutyWithMasterChange() throws Exception {
178
179    // Start a master and wait for it to become the active/primary master.
180    // Use a random unique port
181    cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort());
182    // master has a rs. defaultMinToStart = 2
183    boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(testUtil.getConfiguration());
184    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, tablesOnMaster? 2: 1);
185    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, tablesOnMaster? 2: 1);
186    master = cluster.addMaster();
187    rs = cluster.addRegionServer();
188    LOG.debug("Starting master: " + master.getMaster().getServerName());
189    master.start();
190    rs.start();
191
192    waitForClusterOnline(master);
193
194    // Add a 2nd region server
195    cluster.getConfiguration().set(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName());
196    rs2 = cluster.addRegionServer();
197    // Start the region server. This region server will refresh RPC connection
198    // from the current active master to the next active master before completing
199    // reportForDuty
200    LOG.debug("Starting 2nd region server: " + rs2.getRegionServer().getServerName());
201    rs2.start();
202
203    waitForSecondRsStarted();
204
205    // Stop the current master.
206    master.getMaster().stop("Stopping master");
207
208    // Start a new master and use another random unique port
209    // Also let it wait for exactly 2 region severs to report in.
210    // TODO: Add handling bindexception. Random port is not enough!!! Flakie test!
211    cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort());
212    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
213      tablesOnMaster? 3: 2);
214    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART,
215      tablesOnMaster? 3: 2);
216    backupMaster = cluster.addMaster();
217    LOG.debug("Starting new master: " + backupMaster.getMaster().getServerName());
218    backupMaster.start();
219
220    waitForClusterOnline(backupMaster);
221
222    // Do some checking/asserts here.
223    assertTrue(backupMaster.getMaster().isActiveMaster());
224    assertTrue(backupMaster.getMaster().isInitialized());
225    assertEquals(backupMaster.getMaster().getServerManager().getOnlineServersList().size(),
226      tablesOnMaster? 3: 2);
227
228  }
229  
230  /**
231   * Tests region sever reportForDuty with RS RPC retry
232   */
233  @Test
234  public void testReportForDutyWithRSRpcRetry() throws Exception {
235    ScheduledThreadPoolExecutor scheduledThreadPoolExecutor =
236        new ScheduledThreadPoolExecutor(1, Threads.newDaemonThreadFactory("RSDelayedStart"));
237
238    // Start a master and wait for it to become the active/primary master.
239    // Use a random unique port
240    cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort());
241    // Override the default RS RPC retry interval of 100ms to 300ms
242    cluster.getConfiguration().setLong("hbase.regionserver.rpc.retry.interval", 300);
243    // master has a rs. defaultMinToStart = 2
244    boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(testUtil.getConfiguration());
245    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
246      tablesOnMaster ? 2 : 1);
247    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART,
248      tablesOnMaster ? 2 : 1);
249    master = cluster.addMaster();
250    rs = cluster.addRegionServer();
251    LOG.debug("Starting master: " + master.getMaster().getServerName());
252    master.start();
253    // Delay the RS start so that the meta assignment fails in first attempt and goes to retry block
254    scheduledThreadPoolExecutor.schedule(new Runnable() {
255      @Override
256      public void run() {
257        rs.start();
258      }
259    }, 1000, TimeUnit.MILLISECONDS);
260
261    waitForClusterOnline(master);
262  }
263
264  /**
265   * Tests region sever reportForDuty with manual environment edge
266   */
267  @Test
268  public void testReportForDutyWithEnvironmentEdge() throws Exception {
269    // Start a master and wait for it to become the active/primary master.
270    // Use a random unique port
271    cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort());
272    // Set the dispatch and retry delay to 0 since we want the rpc request to be sent immediately
273    cluster.getConfiguration().setInt("hbase.procedure.remote.dispatcher.delay.msec", 0);
274    cluster.getConfiguration().setLong("hbase.regionserver.rpc.retry.interval", 0);
275
276    // master has a rs. defaultMinToStart = 2
277    boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(testUtil.getConfiguration());
278    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
279      tablesOnMaster ? 2 : 1);
280    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART,
281      tablesOnMaster ? 2 : 1);
282
283    // Inject manual environment edge for clock skew computation between RS and master
284    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
285    EnvironmentEdgeManager.injectEdge(edge);
286    master = cluster.addMaster();
287    rs = cluster.addRegionServer();
288    LOG.debug("Starting master: " + master.getMaster().getServerName());
289    master.start();
290    rs.start();
291
292    waitForClusterOnline(master);
293  }
294
295  private void waitForClusterOnline(MasterThread master) throws InterruptedException {
296    while (true) {
297      if (master.getMaster().isInitialized()) {
298        break;
299      }
300      Thread.sleep(SLEEP_INTERVAL);
301      LOG.debug("Waiting for master to come online ...");
302    }
303    rs.waitForServerOnline();
304  }
305
306  private void waitForSecondRsStarted() throws InterruptedException {
307    while (true) {
308      if (((MyRegionServer) rs2.getRegionServer()).getRpcStubCreatedFlag() == true) {
309        break;
310      }
311      Thread.sleep(SLEEP_INTERVAL);
312      LOG.debug("Waiting 2nd RS to be started ...");
313    }
314  }
315
316  // Create a Region Server that provide a hook so that we can wait for the master switch over
317  // before continuing reportForDuty to the mater.
318  // The idea is that we get a RPC connection to the first active master, then we wait.
319  // The first master goes down, the second master becomes the active master. The region
320  // server continues reportForDuty. It should succeed with the new master.
321  public static class MyRegionServer extends MiniHBaseClusterRegionServer {
322
323    private ServerName sn;
324    // This flag is to make sure this rs has obtained the rpcStub to the first master.
325    // The first master will go down after this.
326    private boolean rpcStubCreatedFlag = false;
327    private boolean masterChanged = false;
328
329    public MyRegionServer(Configuration conf) throws IOException, KeeperException,
330        InterruptedException {
331      super(conf);
332    }
333
334    @Override
335    protected synchronized ServerName createRegionServerStatusStub(boolean refresh) {
336      sn = super.createRegionServerStatusStub(refresh);
337      rpcStubCreatedFlag = true;
338
339      // Wait for master switch over. Only do this for the second region server.
340      while (!masterChanged) {
341        ServerName newSn = super.getMasterAddressTracker().getMasterAddress(true);
342        if (newSn != null && !newSn.equals(sn)) {
343          masterChanged = true;
344          break;
345        }
346        try {
347          Thread.sleep(SLEEP_INTERVAL);
348        } catch (InterruptedException e) {
349          return null;
350        }
351        LOG.debug("Waiting for master switch over ... ");
352      }
353      return sn;
354    }
355
356    public boolean getRpcStubCreatedFlag() {
357      return rpcStubCreatedFlag;
358    }
359  }
360}