001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.hamcrest.CoreMatchers.containsString; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.junit.jupiter.api.Assertions.assertEquals; 023import static org.junit.jupiter.api.Assertions.assertFalse; 024import static org.junit.jupiter.api.Assertions.assertTrue; 025import static org.junit.jupiter.api.Assertions.fail; 026 027import java.util.concurrent.CompletableFuture; 028import java.util.concurrent.ExecutionException; 029import org.apache.hadoop.fs.FileStatus; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.HBaseTestingUtil; 033import org.apache.hadoop.hbase.client.AsyncConnection; 034import org.apache.hadoop.hbase.client.AsyncTable; 035import org.apache.hadoop.hbase.client.ConnectionFactory; 036import org.apache.hadoop.hbase.client.Get; 037import org.apache.hadoop.hbase.client.Put; 038import org.apache.hadoop.hbase.regionserver.HRegion; 039import org.apache.hadoop.hbase.testclassification.LargeTests; 040import org.apache.hadoop.hbase.testclassification.ReplicationTests; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.hadoop.hbase.wal.NoEOFWALStreamReader; 043import org.apache.hadoop.hbase.wal.WAL.Entry; 044import org.apache.hadoop.hbase.wal.WALStreamReader; 045import org.junit.jupiter.api.Tag; 046import org.junit.jupiter.api.Test; 047 048@Tag(ReplicationTests.TAG) 049@Tag(LargeTests.TAG) 050public class SyncReplicationActiveTestBase extends SyncReplicationTestBaseNoBeforeAll { 051 052 @Test 053 public void testActive() throws Exception { 054 UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, 055 SyncReplicationState.STANDBY); 056 UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, 057 SyncReplicationState.ACTIVE); 058 059 // confirm that peer with state A will reject replication request. 060 verifyReplicationRequestRejection(UTIL1, true); 061 verifyReplicationRequestRejection(UTIL2, false); 062 063 UTIL1.getAdmin().disableReplicationPeer(PEER_ID); 064 write(UTIL1, 0, 100); 065 Thread.sleep(2000); 066 // peer is disabled so no data have been replicated 067 verifyNotReplicatedThroughRegion(UTIL2, 0, 100); 068 069 // Ensure that there's no cluster id in remote log entries. 070 verifyNoClusterIdInRemoteLog(UTIL2, REMOTE_WAL_DIR2, PEER_ID); 071 072 UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, 073 SyncReplicationState.DOWNGRADE_ACTIVE); 074 // confirm that peer with state DA will reject replication request. 075 verifyReplicationRequestRejection(UTIL2, true); 076 // confirm that the data is there after we convert the peer to DA 077 verify(UTIL2, 0, 100); 078 079 try (AsyncConnection conn = 080 ConnectionFactory.createAsyncConnection(UTIL1.getConfiguration()).get()) { 081 AsyncTable<?> table = conn.getTableBuilder(TABLE_NAME).setMaxAttempts(1).build(); 082 CompletableFuture<Void> future = 083 table.put(new Put(Bytes.toBytes(1000)).addColumn(CF, CQ, Bytes.toBytes(1000))); 084 Thread.sleep(2000); 085 // should hang on rolling 086 assertFalse(future.isDone()); 087 UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, 088 SyncReplicationState.STANDBY); 089 try { 090 future.get(); 091 fail("should fail because of the wal is closing"); 092 } catch (ExecutionException e) { 093 // expected 094 assertThat(e.getCause().getMessage(), containsString("only marker edit is allowed")); 095 } 096 } 097 // confirm that the data has not been persisted 098 HRegion region = UTIL1.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0); 099 assertTrue(region.get(new Get(Bytes.toBytes(1000))).isEmpty()); 100 UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, 101 SyncReplicationState.ACTIVE); 102 103 writeAndVerifyReplication(UTIL2, UTIL1, 100, 200); 104 105 // shutdown the cluster completely 106 UTIL1.shutdownMiniCluster(); 107 // confirm that we can convert to DA even if the remote slave cluster is down 108 UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, 109 SyncReplicationState.DOWNGRADE_ACTIVE); 110 // confirm that peer with state DA will reject replication request. 111 verifyReplicationRequestRejection(UTIL2, true); 112 write(UTIL2, 200, 300); 113 } 114 115 private void verifyNoClusterIdInRemoteLog(HBaseTestingUtil utility, Path remoteDir, String peerId) 116 throws Exception { 117 FileSystem fs2 = utility.getTestFileSystem(); 118 FileStatus[] files = fs2.listStatus(new Path(remoteDir, peerId)); 119 assertTrue(files.length > 0); 120 for (FileStatus file : files) { 121 try (WALStreamReader reader = 122 NoEOFWALStreamReader.create(fs2, file.getPath(), utility.getConfiguration())) { 123 Entry entry = reader.next(); 124 assertTrue(entry != null); 125 while (entry != null) { 126 assertEquals(entry.getKey().getClusterIds().size(), 0); 127 entry = reader.next(); 128 } 129 } 130 } 131 } 132}