001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.hamcrest.CoreMatchers.endsWith;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.junit.Assert.assertEquals;
023
024import org.apache.hadoop.fs.FileStatus;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.HBaseClassTestRule;
027import org.apache.hadoop.hbase.client.RegionInfo;
028import org.apache.hadoop.hbase.master.MasterFileSystem;
029import org.apache.hadoop.hbase.regionserver.HRegionServer;
030import org.apache.hadoop.hbase.regionserver.LogRoller;
031import org.apache.hadoop.hbase.testclassification.LargeTests;
032import org.apache.hadoop.hbase.testclassification.ReplicationTests;
033import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
034import org.junit.ClassRule;
035import org.junit.Test;
036import org.junit.experimental.categories.Category;
037
038import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
039
040/**
041 * Testcase to confirm that serial replication will not be stuck when using along with synchronous
042 * replication. See HBASE-21486 for more details.
043 */
044@Category({ ReplicationTests.class, LargeTests.class })
045public class TestSerialSyncReplication extends SyncReplicationTestBase {
046
047  @ClassRule
048  public static final HBaseClassTestRule CLASS_RULE =
049    HBaseClassTestRule.forClass(TestSerialSyncReplication.class);
050
051  @Test
052  public void test() throws Exception {
053    // change to serial
054    UTIL1.getAdmin().updateReplicationPeerConfig(PEER_ID, ReplicationPeerConfig
055      .newBuilder(UTIL1.getAdmin().getReplicationPeerConfig(PEER_ID)).setSerial(true).build());
056    UTIL2.getAdmin().updateReplicationPeerConfig(PEER_ID, ReplicationPeerConfig
057      .newBuilder(UTIL2.getAdmin().getReplicationPeerConfig(PEER_ID)).setSerial(true).build());
058
059    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
060      SyncReplicationState.STANDBY);
061    UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
062      SyncReplicationState.ACTIVE);
063
064    UTIL2.getAdmin().disableReplicationPeer(PEER_ID);
065
066    writeAndVerifyReplication(UTIL1, UTIL2, 0, 100);
067
068    MasterFileSystem mfs = UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem();
069    Path remoteWALDir = ReplicationUtils.getPeerRemoteWALDir(
070      new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME), PEER_ID);
071    FileStatus[] remoteWALStatus = mfs.getWALFileSystem().listStatus(remoteWALDir);
072    assertEquals(1, remoteWALStatus.length);
073    Path remoteWAL = remoteWALStatus[0].getPath();
074    assertThat(remoteWAL.getName(), endsWith(ReplicationUtils.SYNC_WAL_SUFFIX));
075    // roll the wal writer, so that we will delete the remore wal. This is used to make sure that we
076    // will not replay this wal when transiting to DA.
077    for (RegionServerThread t : UTIL1.getMiniHBaseCluster().getRegionServerThreads()) {
078      LogRoller roller = t.getRegionServer().getWalRoller();
079      roller.requestRollAll();
080      roller.waitUntilWalRollFinished();
081    }
082    waitUntilDeleted(UTIL2, remoteWAL);
083
084    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
085      SyncReplicationState.DOWNGRADE_ACTIVE);
086    UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
087      SyncReplicationState.STANDBY);
088    // let's reopen the region
089    RegionInfo region = Iterables.getOnlyElement(UTIL2.getAdmin().getRegions(TABLE_NAME));
090    HRegionServer target = UTIL2.getOtherRegionServer(UTIL2.getRSForFirstRegionInTable(TABLE_NAME));
091    UTIL2.getAdmin().move(region.getEncodedNameAsBytes(), target.getServerName());
092    // here we will remove all the pending wals. This is not a normal operation sequence but anyway,
093    // user could do this.
094    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
095      SyncReplicationState.STANDBY);
096    // transit back to DA
097    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
098      SyncReplicationState.DOWNGRADE_ACTIVE);
099
100    UTIL2.getAdmin().enableReplicationPeer(PEER_ID);
101    // make sure that the async replication still works
102    writeAndVerifyReplication(UTIL2, UTIL1, 100, 200);
103  }
104}