001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.hamcrest.CoreMatchers.endsWith; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.junit.jupiter.api.Assertions.assertEquals; 023 024import org.apache.hadoop.fs.FileStatus; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.client.RegionInfo; 027import org.apache.hadoop.hbase.master.MasterFileSystem; 028import org.apache.hadoop.hbase.regionserver.HRegionServer; 029import org.apache.hadoop.hbase.regionserver.LogRoller; 030import org.apache.hadoop.hbase.testclassification.LargeTests; 031import org.apache.hadoop.hbase.testclassification.ReplicationTests; 032import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 033import org.junit.jupiter.api.Tag; 034import org.junit.jupiter.api.Test; 035 036import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 037 038/** 039 * Testcase to confirm that serial replication will not be stuck when using along with synchronous 040 * replication. See HBASE-21486 for more details. 041 */ 042@Tag(ReplicationTests.TAG) 043@Tag(LargeTests.TAG) 044public class TestSerialSyncReplication extends SyncReplicationTestBase { 045 046 @Test 047 public void test() throws Exception { 048 // change to serial 049 UTIL1.getAdmin().updateReplicationPeerConfig(PEER_ID, ReplicationPeerConfig 050 .newBuilder(UTIL1.getAdmin().getReplicationPeerConfig(PEER_ID)).setSerial(true).build()); 051 UTIL2.getAdmin().updateReplicationPeerConfig(PEER_ID, ReplicationPeerConfig 052 .newBuilder(UTIL2.getAdmin().getReplicationPeerConfig(PEER_ID)).setSerial(true).build()); 053 054 UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, 055 SyncReplicationState.STANDBY); 056 UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, 057 SyncReplicationState.ACTIVE); 058 059 UTIL2.getAdmin().disableReplicationPeer(PEER_ID); 060 061 writeAndVerifyReplication(UTIL1, UTIL2, 0, 100); 062 063 MasterFileSystem mfs = UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem(); 064 Path remoteWALDir = ReplicationUtils.getPeerRemoteWALDir( 065 new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME), PEER_ID); 066 FileStatus[] remoteWALStatus = mfs.getWALFileSystem().listStatus(remoteWALDir); 067 assertEquals(1, remoteWALStatus.length); 068 Path remoteWAL = remoteWALStatus[0].getPath(); 069 assertThat(remoteWAL.getName(), endsWith(ReplicationUtils.SYNC_WAL_SUFFIX)); 070 // roll the wal writer, so that we will delete the remore wal. This is used to make sure that we 071 // will not replay this wal when transiting to DA. 072 for (RegionServerThread t : UTIL1.getMiniHBaseCluster().getRegionServerThreads()) { 073 LogRoller roller = t.getRegionServer().getWalRoller(); 074 roller.requestRollAll(); 075 roller.waitUntilWalRollFinished(); 076 } 077 waitUntilDeleted(UTIL2, remoteWAL); 078 079 UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, 080 SyncReplicationState.DOWNGRADE_ACTIVE); 081 UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, 082 SyncReplicationState.STANDBY); 083 // let's reopen the region 084 RegionInfo region = Iterables.getOnlyElement(UTIL2.getAdmin().getRegions(TABLE_NAME)); 085 HRegionServer target = UTIL2.getOtherRegionServer(UTIL2.getRSForFirstRegionInTable(TABLE_NAME)); 086 UTIL2.getAdmin().move(region.getEncodedNameAsBytes(), target.getServerName()); 087 // here we will remove all the pending wals. This is not a normal operation sequence but anyway, 088 // user could do this. 089 UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, 090 SyncReplicationState.STANDBY); 091 // transit back to DA 092 UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, 093 SyncReplicationState.DOWNGRADE_ACTIVE); 094 095 UTIL2.getAdmin().enableReplicationPeer(PEER_ID); 096 // make sure that the async replication still works 097 writeAndVerifyReplication(UTIL2, UTIL1, 100, 200); 098 } 099}