001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021import static org.hamcrest.Matchers.containsString; 022import static org.hamcrest.Matchers.either; 023import static org.hamcrest.Matchers.instanceOf; 024import static org.junit.jupiter.api.Assertions.assertEquals; 025import static org.junit.jupiter.api.Assertions.assertFalse; 026import static org.junit.jupiter.api.Assertions.assertThrows; 027import static org.junit.jupiter.api.Assertions.assertTrue; 028 029import java.util.concurrent.ExecutionException; 030import java.util.concurrent.TimeUnit; 031import org.apache.hadoop.hbase.DoNotRetryIOException; 032import org.apache.hadoop.hbase.client.AsyncConnection; 033import org.apache.hadoop.hbase.client.AsyncTable; 034import org.apache.hadoop.hbase.client.ConnectionFactory; 035import org.apache.hadoop.hbase.client.Get; 036import org.apache.hadoop.hbase.client.Put; 037import org.apache.hadoop.hbase.client.RegionInfoBuilder; 038import org.apache.hadoop.hbase.client.RetriesExhaustedException; 039import org.apache.hadoop.hbase.client.Table; 040import org.apache.hadoop.hbase.regionserver.HRegion; 041import org.apache.hadoop.hbase.regionserver.HRegionServer; 042import org.apache.hadoop.hbase.testclassification.LargeTests; 043import org.apache.hadoop.hbase.testclassification.ReplicationTests; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.apache.hadoop.hbase.wal.WALFactory; 046import org.apache.hadoop.hbase.wal.WALProvider; 047import org.junit.jupiter.api.BeforeAll; 048import org.junit.jupiter.api.Tag; 049import org.junit.jupiter.api.Test; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053@Tag(ReplicationTests.TAG) 054@Tag(LargeTests.TAG) 055public class TestSyncReplicationMoreLogsInLocalGiveUpSplitting 056 extends SyncReplicationTestBaseNoBeforeAll { 057 058 private static final Logger LOG = 059 LoggerFactory.getLogger(TestSyncReplicationMoreLogsInLocalGiveUpSplitting.class); 060 061 @BeforeAll 062 public static void setUp() throws Exception { 063 UTIL1.getConfiguration().setClass(WALFactory.WAL_PROVIDER, BrokenRemoteAsyncFSWALProvider.class, 064 WALProvider.class); 065 UTIL2.getConfiguration().setClass(WALFactory.WAL_PROVIDER, BrokenRemoteAsyncFSWALProvider.class, 066 WALProvider.class); 067 startClusters(); 068 } 069 070 @Test 071 public void testSplitLog() throws Exception { 072 UTIL1.getAdmin().disableReplicationPeer(PEER_ID); 073 UTIL2.getAdmin().disableReplicationPeer(PEER_ID); 074 UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, 075 SyncReplicationState.STANDBY); 076 UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, 077 SyncReplicationState.ACTIVE); 078 try (Table table = UTIL1.getConnection().getTable(TABLE_NAME)) { 079 table.put(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, Bytes.toBytes(0))); 080 } 081 HRegionServer rs = UTIL1.getRSForFirstRegionInTable(TABLE_NAME); 082 BrokenRemoteAsyncFSWALProvider.BrokenRemoteAsyncFSWAL wal = 083 (BrokenRemoteAsyncFSWALProvider.BrokenRemoteAsyncFSWAL) rs.getWalFactory() 084 .getWAL(RegionInfoBuilder.newBuilder(TABLE_NAME).build()); 085 wal.setRemoteBroken(); 086 wal.suspendLogRoll(); 087 try (AsyncConnection conn = 088 ConnectionFactory.createAsyncConnection(UTIL1.getConfiguration()).get()) { 089 AsyncTable<?> table = conn.getTableBuilder(TABLE_NAME).setMaxAttempts(1) 090 .setWriteRpcTimeout(5, TimeUnit.SECONDS).build(); 091 ExecutionException error = assertThrows(ExecutionException.class, 092 () -> table.put(new Put(Bytes.toBytes(1)).addColumn(CF, CQ, Bytes.toBytes(1))).get()); 093 LOG.info("Expected error:", error); 094 } 095 wal.waitUntilArrive(); 096 UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, 097 SyncReplicationState.DOWNGRADE_ACTIVE); 098 wal.resumeLogRoll(); 099 try (Table table = UTIL2.getConnection().getTable(TABLE_NAME)) { 100 assertEquals(0, Bytes.toInt(table.get(new Get(Bytes.toBytes(0))).getValue(CF, CQ))); 101 // we failed to write this entry to remote so it should not exist 102 assertFalse(table.exists(new Get(Bytes.toBytes(1)))); 103 } 104 UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, 105 SyncReplicationState.STANDBY); 106 // make sure that the region is online. We can not use waitTableAvailable since the table in 107 // stand by state can not be read from client. 108 try (Table table = UTIL1.getConnection().getTable(TABLE_NAME)) { 109 Exception error = 110 assertThrows(Exception.class, () -> table.exists(new Get(Bytes.toBytes(0)))); 111 assertThat(error, either(instanceOf(DoNotRetryIOException.class)) 112 .or(instanceOf(RetriesExhaustedException.class))); 113 assertThat(error.getMessage(), containsString("STANDBY")); 114 } 115 HRegion region = UTIL1.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0); 116 // we give up splitting the whole wal file so this record will also be gone. 117 assertTrue(region.get(new Get(Bytes.toBytes(0))).isEmpty()); 118 UTIL2.getAdmin().enableReplicationPeer(PEER_ID); 119 // finally it should be replicated back 120 waitUntilReplicationDone(UTIL1, 1); 121 } 122}