001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.hamcrest.CoreMatchers.containsString;
021import static org.hamcrest.Matchers.allOf;
022import static org.hamcrest.Matchers.hasItem;
023import static org.hamcrest.Matchers.is;
024import static org.junit.Assert.*;
025
026import java.io.IOException;
027import java.io.StringWriter;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.concurrent.ScheduledThreadPoolExecutor;
031import java.util.concurrent.TimeUnit;
032import org.apache.commons.lang3.StringUtils;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.LocalHBaseCluster;
038import org.apache.hadoop.hbase.MatcherPredicate;
039import org.apache.hadoop.hbase.ServerName;
040import org.apache.hadoop.hbase.SingleProcessHBaseCluster.MiniHBaseClusterRegionServer;
041import org.apache.hadoop.hbase.ipc.DecommissionedHostRejectedException;
042import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
043import org.apache.hadoop.hbase.master.HMaster;
044import org.apache.hadoop.hbase.master.ServerManager;
045import org.apache.hadoop.hbase.testclassification.LargeTests;
046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
047import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
048import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
049import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
050import org.apache.hadoop.hbase.util.Threads;
051import org.apache.zookeeper.KeeperException;
052import org.junit.After;
053import org.junit.Before;
054import org.junit.ClassRule;
055import org.junit.Test;
056import org.junit.experimental.categories.Category;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
061
062@Category(LargeTests.class)
063public class TestRegionServerReportForDuty {
064
065  @ClassRule
066  public static final HBaseClassTestRule CLASS_RULE =
067    HBaseClassTestRule.forClass(TestRegionServerReportForDuty.class);
068
069  private static final Logger LOG = LoggerFactory.getLogger(TestRegionServerReportForDuty.class);
070
071  private static final long SLEEP_INTERVAL = 500;
072
073  private HBaseTestingUtil testUtil;
074  private LocalHBaseCluster cluster;
075  private RegionServerThread rs;
076  private RegionServerThread rs2;
077  private MasterThread master;
078  private MasterThread backupMaster;
079
080  @Before
081  public void setUp() throws Exception {
082    testUtil = new HBaseTestingUtil();
083    testUtil.startMiniDFSCluster(1);
084    testUtil.startMiniZKCluster(1);
085    testUtil.createRootDir();
086    cluster = new LocalHBaseCluster(testUtil.getConfiguration(), 0, 0);
087  }
088
089  @After
090  public void tearDown() throws Exception {
091    cluster.shutdown();
092    cluster.join();
093    testUtil.shutdownMiniZKCluster();
094    testUtil.shutdownMiniDFSCluster();
095  }
096
097  private static class LogCapturer {
098    private StringWriter sw = new StringWriter();
099    private org.apache.logging.log4j.core.appender.WriterAppender appender;
100    private org.apache.logging.log4j.core.Logger logger;
101
102    LogCapturer(org.apache.logging.log4j.core.Logger logger) {
103      this.logger = logger;
104      this.appender = org.apache.logging.log4j.core.appender.WriterAppender.newBuilder()
105        .setName("test").setTarget(sw).build();
106      this.logger.addAppender(this.appender);
107    }
108
109    String getOutput() {
110      return sw.toString();
111    }
112
113    public void stopCapturing() {
114      this.logger.removeAppender(this.appender);
115    }
116  }
117
118  /**
119   * This test HMaster class will always throw ServerNotRunningYetException if checked.
120   */
121  public static class NeverInitializedMaster extends HMaster {
122    public NeverInitializedMaster(Configuration conf) throws IOException {
123      super(conf);
124    }
125
126    @Override
127    protected void checkServiceStarted() throws ServerNotRunningYetException {
128      throw new ServerNotRunningYetException("Server is not running yet");
129    }
130  }
131
132  /**
133   * Tests region server should backoff to report for duty if master is not ready.
134   */
135  @Test
136  public void testReportForDutyBackoff() throws IOException, InterruptedException {
137    cluster.getConfiguration().set(HConstants.MASTER_IMPL, NeverInitializedMaster.class.getName());
138    master = cluster.addMaster();
139    master.start();
140
141    LogCapturer capturer =
142      new LogCapturer((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager
143        .getLogger(HRegionServer.class));
144    // Set sleep interval relatively low so that exponential backoff is more demanding.
145    int msginterval = 100;
146    cluster.getConfiguration().setInt("hbase.regionserver.msginterval", msginterval);
147    rs = cluster.addRegionServer();
148    rs.start();
149
150    int interval = 10_000;
151    Thread.sleep(interval);
152    capturer.stopCapturing();
153    String output = capturer.getOutput();
154    LOG.info("{}", output);
155    String failMsg = "reportForDuty failed;";
156    int count = StringUtils.countMatches(output, failMsg);
157
158    // Following asserts the actual retry number is in range (expectedRetry/2, expectedRetry*2).
159    // Ideally we can assert the exact retry count. We relax here to tolerate contention error.
160    int expectedRetry = (int) Math.ceil(Math.log(interval - msginterval));
161    assertTrue(String.format("reportForDuty retries %d times, less than expected min %d", count,
162      expectedRetry / 2), count > expectedRetry / 2);
163    assertTrue(String.format("reportForDuty retries %d times, more than expected max %d", count,
164      expectedRetry * 2), count < expectedRetry * 2);
165  }
166
167  /**
168   * Tests region sever reportForDuty with backup master becomes primary master after the first
169   * master goes away.
170   */
171  @Test
172  public void testReportForDutyWithMasterChange() throws Exception {
173
174    // Start a master and wait for it to become the active/primary master.
175    // Use a random unique port
176    cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort());
177    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
178    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1);
179    master = cluster.addMaster();
180    rs = cluster.addRegionServer();
181    LOG.debug("Starting master: " + master.getMaster().getServerName());
182    master.start();
183    rs.start();
184
185    waitForClusterOnline(master);
186
187    // Add a 2nd region server
188    cluster.getConfiguration().set(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName());
189    rs2 = cluster.addRegionServer();
190    // Start the region server. This region server will refresh RPC connection
191    // from the current active master to the next active master before completing
192    // reportForDuty
193    LOG.debug("Starting 2nd region server: " + rs2.getRegionServer().getServerName());
194    rs2.start();
195
196    waitForSecondRsStarted();
197
198    // Stop the current master.
199    master.getMaster().stop("Stopping master");
200
201    // Start a new master and use another random unique port
202    // Also let it wait for exactly 2 region severs to report in.
203    // TODO: Add handling bindexception. Random port is not enough!!! Flakie test!
204    cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort());
205    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 2);
206    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 2);
207    backupMaster = cluster.addMaster();
208    LOG.debug("Starting new master: " + backupMaster.getMaster().getServerName());
209    backupMaster.start();
210
211    waitForClusterOnline(backupMaster);
212
213    // Do some checking/asserts here.
214    assertTrue(backupMaster.getMaster().isActiveMaster());
215    assertTrue(backupMaster.getMaster().isInitialized());
216    assertEquals(backupMaster.getMaster().getServerManager().getOnlineServersList().size(), 2);
217
218  }
219
220  /**
221   * Tests region sever reportForDuty with RS RPC retry
222   */
223  @Test
224  public void testReportForDutyWithRSRpcRetry() throws Exception {
225    ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1,
226      new ThreadFactoryBuilder().setNameFormat("RSDelayedStart-pool-%d").setDaemon(true)
227        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
228
229    // Start a master and wait for it to become the active/primary master.
230    // Use a random unique port
231    cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort());
232    // Override the default RS RPC retry interval of 100ms to 300ms
233    cluster.getConfiguration().setLong("hbase.regionserver.rpc.retry.interval", 300);
234    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
235    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1);
236    master = cluster.addMaster();
237    rs = cluster.addRegionServer();
238    LOG.debug("Starting master: " + master.getMaster().getServerName());
239    master.start();
240    // Delay the RS start so that the meta assignment fails in first attempt and goes to retry block
241    scheduledThreadPoolExecutor.schedule(new Runnable() {
242      @Override
243      public void run() {
244        rs.start();
245      }
246    }, 1000, TimeUnit.MILLISECONDS);
247
248    waitForClusterOnline(master);
249  }
250
251  /**
252   * Tests that the RegionServer's reportForDuty gets rejected by the master when the master is
253   * configured to reject decommissioned hosts and when there is a match for the joining
254   * RegionServer in the list of decommissioned servers. Test case for HBASE-28342.
255   */
256  @Test
257  public void testReportForDutyGetsRejectedByMasterWhenConfiguredToRejectDecommissionedHosts()
258    throws Exception {
259    // Start a master and wait for it to become the active/primary master.
260    // Use a random unique port
261    cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort());
262    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
263    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1);
264
265    // Set the cluster to reject decommissioned hosts
266    cluster.getConfiguration().setBoolean(HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY, true);
267
268    master = cluster.addMaster();
269    rs = cluster.addRegionServer();
270    master.start();
271    rs.start();
272    waitForClusterOnline(master);
273
274    // Add a second decommissioned region server to the cluster, wait for it to fail reportForDuty
275    LogCapturer capturer =
276      new LogCapturer((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager
277        .getLogger(HRegionServer.class));
278
279    rs2 = cluster.addRegionServer();
280    master.getMaster().decommissionRegionServers(
281      Collections.singletonList(rs2.getRegionServer().getServerName()), false);
282    rs2.start();
283
284    // Assert that the second regionserver has aborted
285    testUtil.waitFor(TimeUnit.SECONDS.toMillis(90),
286      new MatcherPredicate<>(() -> rs2.getRegionServer().isAborted(), is(true)));
287
288    // Assert that the log messages for DecommissionedHostRejectedException exist in the logs
289    capturer.stopCapturing();
290
291    assertThat(capturer.getOutput(),
292      containsString("Master rejected startup because the host is considered decommissioned"));
293
294    /**
295     * Assert that the following log message occurred (one line):
296     * "org.apache.hadoop.hbase.ipc.DecommissionedHostRejectedException:
297     * org.apache.hadoop.hbase.ipc.DecommissionedHostRejectedException: Host localhost exists in the
298     * list of decommissioned servers and Master is configured to reject decommissioned hosts"
299     */
300    assertThat(Arrays.asList(capturer.getOutput().split("\n")),
301      hasItem(allOf(containsString(DecommissionedHostRejectedException.class.getSimpleName()),
302        containsString(DecommissionedHostRejectedException.class.getSimpleName()),
303        containsString("Host " + rs2.getRegionServer().getServerName().getHostname()
304          + " exists in the list of decommissioned servers and Master is configured to reject"
305          + " decommissioned hosts"))));
306
307    assertThat(Arrays.asList(capturer.getOutput().split("\n")),
308      hasItem(
309        allOf(containsString("ABORTING region server " + rs2.getRegionServer().getServerName()),
310          containsString("Unhandled"),
311          containsString(DecommissionedHostRejectedException.class.getSimpleName()),
312          containsString("Host " + rs2.getRegionServer().getServerName().getHostname()
313            + " exists in the list of decommissioned servers and Master is configured to reject"
314            + " decommissioned hosts"))));
315  }
316
317  /**
318   * Tests region sever reportForDuty with a non-default environment edge
319   */
320  @Test
321  public void testReportForDutyWithEnvironmentEdge() throws Exception {
322    // Start a master and wait for it to become the active/primary master.
323    // Use a random unique port
324    cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort());
325    // Set the dispatch and retry delay to 0 since we want the rpc request to be sent immediately
326    cluster.getConfiguration().setInt("hbase.procedure.remote.dispatcher.delay.msec", 0);
327    cluster.getConfiguration().setLong("hbase.regionserver.rpc.retry.interval", 0);
328
329    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
330    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1);
331
332    // Inject non-default environment edge
333    IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
334    EnvironmentEdgeManager.injectEdge(edge);
335    master = cluster.addMaster();
336    rs = cluster.addRegionServer();
337    LOG.debug("Starting master: " + master.getMaster().getServerName());
338    master.start();
339    rs.start();
340    waitForClusterOnline(master);
341  }
342
343  private void waitForClusterOnline(MasterThread master) throws InterruptedException {
344    while (true) {
345      if (master.getMaster().isInitialized()) {
346        break;
347      }
348      Thread.sleep(SLEEP_INTERVAL);
349      LOG.debug("Waiting for master to come online ...");
350    }
351    rs.waitForServerOnline();
352  }
353
354  private void waitForSecondRsStarted() throws InterruptedException {
355    while (true) {
356      if (((MyRegionServer) rs2.getRegionServer()).getRpcStubCreatedFlag() == true) {
357        break;
358      }
359      Thread.sleep(SLEEP_INTERVAL);
360      LOG.debug("Waiting 2nd RS to be started ...");
361    }
362  }
363
364  // Create a Region Server that provide a hook so that we can wait for the master switch over
365  // before continuing reportForDuty to the mater.
366  // The idea is that we get a RPC connection to the first active master, then we wait.
367  // The first master goes down, the second master becomes the active master. The region
368  // server continues reportForDuty. It should succeed with the new master.
369  public static class MyRegionServer extends MiniHBaseClusterRegionServer {
370
371    private ServerName sn;
372    // This flag is to make sure this rs has obtained the rpcStub to the first master.
373    // The first master will go down after this.
374    private boolean rpcStubCreatedFlag = false;
375    private boolean masterChanged = false;
376
377    public MyRegionServer(Configuration conf)
378      throws IOException, KeeperException, InterruptedException {
379      super(conf);
380    }
381
382    @Override
383    protected synchronized ServerName createRegionServerStatusStub(boolean refresh) {
384      sn = super.createRegionServerStatusStub(refresh);
385      rpcStubCreatedFlag = true;
386
387      // Wait for master switch over. Only do this for the second region server.
388      while (!masterChanged) {
389        ServerName newSn = super.getMasterAddressTracker().getMasterAddress(true);
390        if (newSn != null && !newSn.equals(sn)) {
391          masterChanged = true;
392          break;
393        }
394        try {
395          Thread.sleep(SLEEP_INTERVAL);
396        } catch (InterruptedException e) {
397          return null;
398        }
399        LOG.debug("Waiting for master switch over ... ");
400      }
401      return sn;
402    }
403
404    public boolean getRpcStubCreatedFlag() {
405      return rpcStubCreatedFlag;
406    }
407  }
408}