001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.hamcrest.CoreMatchers.containsString; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertFalse; 024import static org.junit.Assert.assertTrue; 025import static org.junit.Assert.fail; 026 027import java.util.concurrent.ExecutionException; 028import java.util.concurrent.TimeUnit; 029import org.apache.hadoop.hbase.DoNotRetryIOException; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.client.AsyncConnection; 032import org.apache.hadoop.hbase.client.AsyncTable; 033import org.apache.hadoop.hbase.client.ConnectionFactory; 034import org.apache.hadoop.hbase.client.Get; 035import org.apache.hadoop.hbase.client.Put; 036import org.apache.hadoop.hbase.client.RegionInfoBuilder; 037import org.apache.hadoop.hbase.client.RetriesExhaustedException; 038import org.apache.hadoop.hbase.client.Table; 039import org.apache.hadoop.hbase.regionserver.HRegion; 040import org.apache.hadoop.hbase.regionserver.HRegionServer; 041import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL; 042import org.apache.hadoop.hbase.testclassification.LargeTests; 043import org.apache.hadoop.hbase.testclassification.ReplicationTests; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider; 046import org.junit.BeforeClass; 047import org.junit.ClassRule; 048import org.junit.Test; 049import org.junit.experimental.categories.Category; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053@Category({ ReplicationTests.class, LargeTests.class }) 054public class TestSyncReplicationMoreLogsInLocalGiveUpSplitting extends SyncReplicationTestBase { 055 056 @ClassRule 057 public static final HBaseClassTestRule CLASS_RULE = 058 HBaseClassTestRule.forClass(TestSyncReplicationMoreLogsInLocalGiveUpSplitting.class); 059 060 private static final Logger LOG = 061 LoggerFactory.getLogger(TestSyncReplicationMoreLogsInLocalGiveUpSplitting.class); 062 063 @BeforeClass 064 public static void setUp() throws Exception { 065 UTIL1.getConfiguration().setClass(SyncReplicationWALProvider.DUAL_WAL_IMPL, 066 DualAsyncFSWALForTest.class, DualAsyncFSWAL.class); 067 UTIL2.getConfiguration().setClass(SyncReplicationWALProvider.DUAL_WAL_IMPL, 068 DualAsyncFSWALForTest.class, DualAsyncFSWAL.class); 069 SyncReplicationTestBase.setUp(); 070 } 071 072 @Test 073 public void testSplitLog() throws Exception { 074 UTIL1.getAdmin().disableReplicationPeer(PEER_ID); 075 UTIL2.getAdmin().disableReplicationPeer(PEER_ID); 076 UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, 077 SyncReplicationState.STANDBY); 078 UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, 079 SyncReplicationState.ACTIVE); 080 try (Table table = UTIL1.getConnection().getTable(TABLE_NAME)) { 081 table.put(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, Bytes.toBytes(0))); 082 } 083 HRegionServer rs = UTIL1.getRSForFirstRegionInTable(TABLE_NAME); 084 DualAsyncFSWALForTest wal = 085 (DualAsyncFSWALForTest) rs.getWAL(RegionInfoBuilder.newBuilder(TABLE_NAME).build()); 086 wal.setRemoteBroken(); 087 wal.suspendLogRoll(); 088 try (AsyncConnection conn = 089 ConnectionFactory.createAsyncConnection(UTIL1.getConfiguration()).get()) { 090 AsyncTable<?> table = conn.getTableBuilder(TABLE_NAME).setMaxAttempts(1) 091 .setWriteRpcTimeout(5, TimeUnit.SECONDS).build(); 092 try { 093 table.put(new Put(Bytes.toBytes(1)).addColumn(CF, CQ, Bytes.toBytes(1))).get(); 094 fail("Should fail since the rs will hang and we will get a rpc timeout"); 095 } catch (ExecutionException e) { 096 // expected 097 LOG.info("Expected error:", e); 098 } 099 } 100 wal.waitUntilArrive(); 101 UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, 102 SyncReplicationState.DOWNGRADE_ACTIVE); 103 wal.resumeLogRoll(); 104 try (Table table = UTIL2.getConnection().getTable(TABLE_NAME)) { 105 assertEquals(0, Bytes.toInt(table.get(new Get(Bytes.toBytes(0))).getValue(CF, CQ))); 106 // we failed to write this entry to remote so it should not exist 107 assertFalse(table.exists(new Get(Bytes.toBytes(1)))); 108 } 109 UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, 110 SyncReplicationState.STANDBY); 111 // make sure that the region is online. We can not use waitTableAvailable since the table in 112 // stand by state can not be read from client. 113 try (Table table = UTIL1.getConnection().getTable(TABLE_NAME)) { 114 try { 115 table.exists(new Get(Bytes.toBytes(0))); 116 } catch (DoNotRetryIOException | RetriesExhaustedException e) { 117 // expected 118 assertThat(e.getMessage(), containsString("STANDBY")); 119 } 120 } 121 HRegion region = UTIL1.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0); 122 // we give up splitting the whole wal file so this record will also be gone. 123 assertTrue(region.get(new Get(Bytes.toBytes(0))).isEmpty()); 124 UTIL2.getAdmin().enableReplicationPeer(PEER_ID); 125 // finally it should be replicated back 126 waitUntilReplicationDone(UTIL1, 1); 127 } 128}