001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.hamcrest.CoreMatchers.containsString;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.hamcrest.Matchers.allOf;
023import static org.hamcrest.Matchers.hasItem;
024import static org.hamcrest.Matchers.is;
025import static org.junit.jupiter.api.Assertions.assertEquals;
026import static org.junit.jupiter.api.Assertions.assertTrue;
027
028import java.io.IOException;
029import java.io.StringWriter;
030import java.util.Arrays;
031import java.util.Collections;
032import java.util.concurrent.ScheduledThreadPoolExecutor;
033import java.util.concurrent.TimeUnit;
034import org.apache.commons.lang3.StringUtils;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.LocalHBaseCluster;
039import org.apache.hadoop.hbase.MatcherPredicate;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.SingleProcessHBaseCluster.MiniHBaseClusterRegionServer;
042import org.apache.hadoop.hbase.ipc.DecommissionedHostRejectedException;
043import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
044import org.apache.hadoop.hbase.master.HMaster;
045import org.apache.hadoop.hbase.master.ServerManager;
046import org.apache.hadoop.hbase.testclassification.LargeTests;
047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
048import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
049import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
050import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
051import org.apache.hadoop.hbase.util.Threads;
052import org.apache.zookeeper.KeeperException;
053import org.junit.jupiter.api.AfterEach;
054import org.junit.jupiter.api.BeforeEach;
055import org.junit.jupiter.api.Tag;
056import org.junit.jupiter.api.Test;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
061
062@Tag(LargeTests.TAG)
063public class TestRegionServerReportForDuty {
064
065  private static final Logger LOG = LoggerFactory.getLogger(TestRegionServerReportForDuty.class);
066
067  private static final long SLEEP_INTERVAL = 500;
068
069  private HBaseTestingUtil testUtil;
070  private LocalHBaseCluster cluster;
071  private RegionServerThread rs;
072  private RegionServerThread rs2;
073  private MasterThread master;
074  private MasterThread backupMaster;
075
076  @BeforeEach
077  public void setUp() throws Exception {
078    testUtil = new HBaseTestingUtil();
079    testUtil.startMiniDFSCluster(1);
080    testUtil.startMiniZKCluster(1);
081    testUtil.createRootDir();
082    cluster = new LocalHBaseCluster(testUtil.getConfiguration(), 0, 0);
083  }
084
085  @AfterEach
086  public void tearDown() throws Exception {
087    cluster.shutdown();
088    cluster.join();
089    testUtil.shutdownMiniZKCluster();
090    testUtil.shutdownMiniDFSCluster();
091  }
092
093  private static class LogCapturer {
094    private StringWriter sw = new StringWriter();
095    private org.apache.logging.log4j.core.appender.WriterAppender appender;
096    private org.apache.logging.log4j.core.Logger logger;
097
098    LogCapturer(org.apache.logging.log4j.core.Logger logger) {
099      this.logger = logger;
100      this.appender = org.apache.logging.log4j.core.appender.WriterAppender.newBuilder()
101        .setName("test").setTarget(sw).build();
102      this.logger.addAppender(this.appender);
103    }
104
105    String getOutput() {
106      return sw.toString();
107    }
108
109    public void stopCapturing() {
110      this.logger.removeAppender(this.appender);
111    }
112  }
113
114  /**
115   * This test HMaster class will always throw ServerNotRunningYetException if checked.
116   */
117  public static class NeverInitializedMaster extends HMaster {
118    public NeverInitializedMaster(Configuration conf) throws IOException {
119      super(conf);
120    }
121
122    @Override
123    protected void checkServiceStarted() throws ServerNotRunningYetException {
124      throw new ServerNotRunningYetException("Server is not running yet");
125    }
126  }
127
128  /**
129   * Tests region server should backoff to report for duty if master is not ready.
130   */
131  @Test
132  public void testReportForDutyBackoff() throws IOException, InterruptedException {
133    cluster.getConfiguration().set(HConstants.MASTER_IMPL, NeverInitializedMaster.class.getName());
134    master = cluster.addMaster();
135    master.start();
136
137    LogCapturer capturer =
138      new LogCapturer((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager
139        .getLogger(HRegionServer.class));
140    // Set sleep interval relatively low so that exponential backoff is more demanding.
141    int msginterval = 100;
142    cluster.getConfiguration().setInt("hbase.regionserver.msginterval", msginterval);
143    rs = cluster.addRegionServer();
144    rs.start();
145
146    int interval = 10_000;
147    Thread.sleep(interval);
148    capturer.stopCapturing();
149    String output = capturer.getOutput();
150    LOG.info("{}", output);
151    String failMsg = "reportForDuty failed;";
152    int count = StringUtils.countMatches(output, failMsg);
153
154    // Following asserts the actual retry number is in range (expectedRetry/2, expectedRetry*2).
155    // Ideally we can assert the exact retry count. We relax here to tolerate contention error.
156    int expectedRetry = (int) Math.ceil(Math.log(interval - msginterval));
157    assertTrue(count > expectedRetry / 2, String.format(
158      "reportForDuty retries %d times, less than expected min %d", count, expectedRetry / 2));
159    assertTrue(count < expectedRetry * 2, String.format(
160      "reportForDuty retries %d times, more than expected max %d", count, expectedRetry * 2));
161  }
162
163  /**
164   * Tests region sever reportForDuty with backup master becomes primary master after the first
165   * master goes away.
166   */
167  @Test
168  public void testReportForDutyWithMasterChange() throws Exception {
169
170    // Start a master and wait for it to become the active/primary master.
171    // Use a random unique port
172    cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort());
173    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
174    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1);
175    master = cluster.addMaster();
176    rs = cluster.addRegionServer();
177    LOG.debug("Starting master: " + master.getMaster().getServerName());
178    master.start();
179    rs.start();
180
181    waitForClusterOnline(master);
182
183    // Add a 2nd region server
184    cluster.getConfiguration().set(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName());
185    rs2 = cluster.addRegionServer();
186    // Start the region server. This region server will refresh RPC connection
187    // from the current active master to the next active master before completing
188    // reportForDuty
189    LOG.debug("Starting 2nd region server: " + rs2.getRegionServer().getServerName());
190    rs2.start();
191
192    waitForSecondRsStarted();
193
194    // Stop the current master.
195    master.getMaster().stop("Stopping master");
196
197    // Start a new master and use another random unique port
198    // Also let it wait for exactly 2 region severs to report in.
199    // TODO: Add handling bindexception. Random port is not enough!!! Flakie test!
200    cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort());
201    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 2);
202    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 2);
203    backupMaster = cluster.addMaster();
204    LOG.debug("Starting new master: " + backupMaster.getMaster().getServerName());
205    backupMaster.start();
206
207    waitForClusterOnline(backupMaster);
208
209    // Do some checking/asserts here.
210    assertTrue(backupMaster.getMaster().isActiveMaster());
211    assertTrue(backupMaster.getMaster().isInitialized());
212    assertEquals(backupMaster.getMaster().getServerManager().getOnlineServersList().size(), 2);
213
214  }
215
216  /**
217   * Tests region sever reportForDuty with RS RPC retry
218   */
219  @Test
220  public void testReportForDutyWithRSRpcRetry() throws Exception {
221    ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1,
222      new ThreadFactoryBuilder().setNameFormat("RSDelayedStart-pool-%d").setDaemon(true)
223        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
224
225    // Start a master and wait for it to become the active/primary master.
226    // Use a random unique port
227    cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort());
228    // Override the default RS RPC retry interval of 100ms to 300ms
229    cluster.getConfiguration().setLong("hbase.regionserver.rpc.retry.interval", 300);
230    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
231    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1);
232    master = cluster.addMaster();
233    rs = cluster.addRegionServer();
234    LOG.debug("Starting master: " + master.getMaster().getServerName());
235    master.start();
236    // Delay the RS start so that the meta assignment fails in first attempt and goes to retry block
237    scheduledThreadPoolExecutor.schedule(new Runnable() {
238      @Override
239      public void run() {
240        rs.start();
241      }
242    }, 1000, TimeUnit.MILLISECONDS);
243
244    waitForClusterOnline(master);
245  }
246
247  /**
248   * Tests that the RegionServer's reportForDuty gets rejected by the master when the master is
249   * configured to reject decommissioned hosts and when there is a match for the joining
250   * RegionServer in the list of decommissioned servers. Test case for HBASE-28342.
251   */
252  @Test
253  public void testReportForDutyGetsRejectedByMasterWhenConfiguredToRejectDecommissionedHosts()
254    throws Exception {
255    // Start a master and wait for it to become the active/primary master.
256    // Use a random unique port
257    cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort());
258    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
259    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1);
260
261    // Set the cluster to reject decommissioned hosts
262    cluster.getConfiguration().setBoolean(HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY, true);
263
264    master = cluster.addMaster();
265    rs = cluster.addRegionServer();
266    master.start();
267    rs.start();
268    waitForClusterOnline(master);
269
270    // Add a second decommissioned region server to the cluster, wait for it to fail reportForDuty
271    LogCapturer capturer =
272      new LogCapturer((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager
273        .getLogger(HRegionServer.class));
274
275    rs2 = cluster.addRegionServer();
276    master.getMaster().decommissionRegionServers(
277      Collections.singletonList(rs2.getRegionServer().getServerName()), false);
278    rs2.start();
279
280    // Assert that the second regionserver has aborted
281    testUtil.waitFor(TimeUnit.SECONDS.toMillis(90),
282      new MatcherPredicate<>(() -> rs2.getRegionServer().isAborted(), is(true)));
283
284    // Assert that the log messages for DecommissionedHostRejectedException exist in the logs
285    capturer.stopCapturing();
286
287    assertThat(capturer.getOutput(),
288      containsString("Master rejected startup because the host is considered decommissioned"));
289
290    /**
291     * Assert that the following log message occurred (one line):
292     * "org.apache.hadoop.hbase.ipc.DecommissionedHostRejectedException:
293     * org.apache.hadoop.hbase.ipc.DecommissionedHostRejectedException: Host localhost exists in the
294     * list of decommissioned servers and Master is configured to reject decommissioned hosts"
295     */
296    assertThat(Arrays.asList(capturer.getOutput().split("\n")),
297      hasItem(allOf(containsString(DecommissionedHostRejectedException.class.getSimpleName()),
298        containsString(DecommissionedHostRejectedException.class.getSimpleName()),
299        containsString("Host " + rs2.getRegionServer().getServerName().getHostname()
300          + " exists in the list of decommissioned servers and Master is configured to reject"
301          + " decommissioned hosts"))));
302
303    assertThat(Arrays.asList(capturer.getOutput().split("\n")),
304      hasItem(
305        allOf(containsString("ABORTING region server " + rs2.getRegionServer().getServerName()),
306          containsString("Unhandled"),
307          containsString(DecommissionedHostRejectedException.class.getSimpleName()),
308          containsString("Host " + rs2.getRegionServer().getServerName().getHostname()
309            + " exists in the list of decommissioned servers and Master is configured to reject"
310            + " decommissioned hosts"))));
311  }
312
313  /**
314   * Tests region sever reportForDuty with a non-default environment edge
315   */
316  @Test
317  public void testReportForDutyWithEnvironmentEdge() throws Exception {
318    // Start a master and wait for it to become the active/primary master.
319    // Use a random unique port
320    cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort());
321    // Set the dispatch and retry delay to 0 since we want the rpc request to be sent immediately
322    cluster.getConfiguration().setInt("hbase.procedure.remote.dispatcher.delay.msec", 0);
323    cluster.getConfiguration().setLong("hbase.regionserver.rpc.retry.interval", 0);
324
325    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
326    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1);
327
328    // Inject non-default environment edge
329    IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
330    EnvironmentEdgeManager.injectEdge(edge);
331    master = cluster.addMaster();
332    rs = cluster.addRegionServer();
333    LOG.debug("Starting master: " + master.getMaster().getServerName());
334    master.start();
335    rs.start();
336    waitForClusterOnline(master);
337  }
338
339  private void waitForClusterOnline(MasterThread master) throws InterruptedException {
340    while (true) {
341      if (master.getMaster().isInitialized()) {
342        break;
343      }
344      Thread.sleep(SLEEP_INTERVAL);
345      LOG.debug("Waiting for master to come online ...");
346    }
347    rs.waitForServerOnline();
348  }
349
350  private void waitForSecondRsStarted() throws InterruptedException {
351    while (true) {
352      if (((MyRegionServer) rs2.getRegionServer()).getRpcStubCreatedFlag() == true) {
353        break;
354      }
355      Thread.sleep(SLEEP_INTERVAL);
356      LOG.debug("Waiting 2nd RS to be started ...");
357    }
358  }
359
360  // Create a Region Server that provide a hook so that we can wait for the master switch over
361  // before continuing reportForDuty to the mater.
362  // The idea is that we get a RPC connection to the first active master, then we wait.
363  // The first master goes down, the second master becomes the active master. The region
364  // server continues reportForDuty. It should succeed with the new master.
365  public static class MyRegionServer extends MiniHBaseClusterRegionServer {
366
367    private ServerName sn;
368    // This flag is to make sure this rs has obtained the rpcStub to the first master.
369    // The first master will go down after this.
370    private boolean rpcStubCreatedFlag = false;
371    private boolean masterChanged = false;
372
373    public MyRegionServer(Configuration conf)
374      throws IOException, KeeperException, InterruptedException {
375      super(conf);
376    }
377
378    @Override
379    protected synchronized ServerName createRegionServerStatusStub(boolean refresh) {
380      sn = super.createRegionServerStatusStub(refresh);
381      rpcStubCreatedFlag = true;
382
383      // Wait for master switch over. Only do this for the second region server.
384      while (!masterChanged) {
385        ServerName newSn = super.getMasterAddressTracker().getMasterAddress(true);
386        if (newSn != null && !newSn.equals(sn)) {
387          masterChanged = true;
388          break;
389        }
390        try {
391          Thread.sleep(SLEEP_INTERVAL);
392        } catch (InterruptedException e) {
393          return null;
394        }
395        LOG.debug("Waiting for master switch over ... ");
396      }
397      return sn;
398    }
399
400    public boolean getRpcStubCreatedFlag() {
401      return rpcStubCreatedFlag;
402    }
403  }
404}