001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertFalse;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022
023import org.apache.hadoop.hbase.client.RegionInfoBuilder;
024import org.apache.hadoop.hbase.regionserver.HRegionServer;
025import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
026import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
027import org.apache.hadoop.hbase.replication.SyncReplicationState;
028import org.apache.hadoop.hbase.replication.SyncReplicationTestBase;
029import org.apache.hadoop.hbase.testclassification.LargeTests;
030import org.apache.hadoop.hbase.testclassification.ReplicationTests;
031import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
032import org.junit.jupiter.api.Tag;
033import org.junit.jupiter.api.Test;
034
035/**
036 * Testcase for HBASE-20456.
037 */
038@Tag(ReplicationTests.TAG)
039@Tag(LargeTests.TAG)
040public class TestSyncReplicationShipperQuit extends SyncReplicationTestBase {
041
042  @Test
043  public void testShipperQuitWhenDA() throws Exception {
044    // set to serial replication
045    UTIL1.getAdmin().updateReplicationPeerConfig(PEER_ID, ReplicationPeerConfig
046      .newBuilder(UTIL1.getAdmin().getReplicationPeerConfig(PEER_ID)).setSerial(true).build());
047    UTIL2.getAdmin().updateReplicationPeerConfig(PEER_ID, ReplicationPeerConfig
048      .newBuilder(UTIL2.getAdmin().getReplicationPeerConfig(PEER_ID)).setSerial(true).build());
049    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
050      SyncReplicationState.STANDBY);
051    UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
052      SyncReplicationState.ACTIVE);
053
054    writeAndVerifyReplication(UTIL1, UTIL2, 0, 100);
055    HRegionServer rs = UTIL1.getRSForFirstRegionInTable(TABLE_NAME);
056    AbstractFSWAL<?> wal =
057      (AbstractFSWAL<?>) rs.getWAL(RegionInfoBuilder.newBuilder(TABLE_NAME).build());
058    String walGroupId =
059      AbstractFSWALProvider.getWALPrefixFromWALName(wal.getCurrentFileName().getName());
060    ReplicationSourceShipper shipper =
061      ((ReplicationSource) ((Replication) rs.getReplicationSourceService()).getReplicationManager()
062        .getSource(PEER_ID)).workerThreads.get(walGroupId);
063    assertFalse(shipper.isFinished());
064
065    UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
066      SyncReplicationState.DOWNGRADE_ACTIVE);
067    writeAndVerifyReplication(UTIL1, UTIL2, 100, 200);
068
069    ReplicationSource source = (ReplicationSource) ((Replication) rs.getReplicationSourceService())
070      .getReplicationManager().getSource(PEER_ID);
071    // the peer is serial so here we can make sure that the previous wals have already been
072    // replicated, and finally the shipper should be removed from the worker pool
073    UTIL1.waitFor(10000, () -> !source.workerThreads.containsKey(walGroupId));
074    assertTrue(shipper.isFinished());
075  }
076}