001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.hamcrest.CoreMatchers.endsWith;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.junit.jupiter.api.Assertions.assertEquals;
023
024import org.apache.hadoop.fs.FileStatus;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.client.RegionInfo;
027import org.apache.hadoop.hbase.master.MasterFileSystem;
028import org.apache.hadoop.hbase.regionserver.HRegionServer;
029import org.apache.hadoop.hbase.regionserver.LogRoller;
030import org.apache.hadoop.hbase.testclassification.LargeTests;
031import org.apache.hadoop.hbase.testclassification.ReplicationTests;
032import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
033import org.junit.jupiter.api.Tag;
034import org.junit.jupiter.api.Test;
035
036import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
037
038/**
039 * Testcase to confirm that serial replication will not be stuck when using along with synchronous
040 * replication. See HBASE-21486 for more details.
041 */
042@Tag(ReplicationTests.TAG)
043@Tag(LargeTests.TAG)
044public class TestSerialSyncReplication extends SyncReplicationTestBase {
045
046  @Test
047  public void test() throws Exception {
048    // change to serial
049    UTIL1.getAdmin().updateReplicationPeerConfig(PEER_ID, ReplicationPeerConfig
050      .newBuilder(UTIL1.getAdmin().getReplicationPeerConfig(PEER_ID)).setSerial(true).build());
051    UTIL2.getAdmin().updateReplicationPeerConfig(PEER_ID, ReplicationPeerConfig
052      .newBuilder(UTIL2.getAdmin().getReplicationPeerConfig(PEER_ID)).setSerial(true).build());
053
054    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
055      SyncReplicationState.STANDBY);
056    UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
057      SyncReplicationState.ACTIVE);
058
059    UTIL2.getAdmin().disableReplicationPeer(PEER_ID);
060
061    writeAndVerifyReplication(UTIL1, UTIL2, 0, 100);
062
063    MasterFileSystem mfs = UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem();
064    Path remoteWALDir = ReplicationUtils.getPeerRemoteWALDir(
065      new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME), PEER_ID);
066    FileStatus[] remoteWALStatus = mfs.getWALFileSystem().listStatus(remoteWALDir);
067    assertEquals(1, remoteWALStatus.length);
068    Path remoteWAL = remoteWALStatus[0].getPath();
069    assertThat(remoteWAL.getName(), endsWith(ReplicationUtils.SYNC_WAL_SUFFIX));
070    // roll the wal writer, so that we will delete the remore wal. This is used to make sure that we
071    // will not replay this wal when transiting to DA.
072    for (RegionServerThread t : UTIL1.getMiniHBaseCluster().getRegionServerThreads()) {
073      LogRoller roller = t.getRegionServer().getWalRoller();
074      roller.requestRollAll();
075      roller.waitUntilWalRollFinished();
076    }
077    waitUntilDeleted(UTIL2, remoteWAL);
078
079    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
080      SyncReplicationState.DOWNGRADE_ACTIVE);
081    UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
082      SyncReplicationState.STANDBY);
083    // let's reopen the region
084    RegionInfo region = Iterables.getOnlyElement(UTIL2.getAdmin().getRegions(TABLE_NAME));
085    HRegionServer target = UTIL2.getOtherRegionServer(UTIL2.getRSForFirstRegionInTable(TABLE_NAME));
086    UTIL2.getAdmin().move(region.getEncodedNameAsBytes(), target.getServerName());
087    // here we will remove all the pending wals. This is not a normal operation sequence but anyway,
088    // user could do this.
089    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
090      SyncReplicationState.STANDBY);
091    // transit back to DA
092    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
093      SyncReplicationState.DOWNGRADE_ACTIVE);
094
095    UTIL2.getAdmin().enableReplicationPeer(PEER_ID);
096    // make sure that the async replication still works
097    writeAndVerifyReplication(UTIL2, UTIL1, 100, 200);
098  }
099}