001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertNotEquals; 022import static org.junit.jupiter.api.Assertions.assertNotNull; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024 025import java.io.IOException; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.concurrent.TimeUnit; 030import java.util.stream.Collectors; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 034import org.apache.hadoop.hbase.client.Put; 035import org.apache.hadoop.hbase.client.RegionInfo; 036import org.apache.hadoop.hbase.client.Table; 037import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 038import org.apache.hadoop.hbase.regionserver.HRegionServer; 039import org.apache.hadoop.hbase.testclassification.MediumTests; 040import org.apache.hadoop.hbase.testclassification.ReplicationTests; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; 043import org.apache.hadoop.hbase.wal.NoEOFWALStreamReader; 044import org.apache.hadoop.hbase.wal.WAL.Entry; 045import org.apache.hadoop.hbase.wal.WALStreamReader; 046import org.junit.jupiter.api.BeforeEach; 047import org.junit.jupiter.api.Tag; 048import org.junit.jupiter.api.Test; 049 050@Tag(ReplicationTests.TAG) 051@Tag(MediumTests.TAG) 052public class TestSerialReplication extends SerialReplicationTestBase { 053 054 @BeforeEach 055 public void setUp() throws IOException, StreamLacksCapabilityException { 056 setupWALWriter(); 057 // add in disable state, so later when enabling it all sources will start push together. 058 addPeer(false); 059 } 060 061 @Test 062 public void testRegionMove() throws Exception { 063 TableName tableName = createTable(); 064 try (Table table = UTIL.getConnection().getTable(tableName)) { 065 for (int i = 0; i < 100; i++) { 066 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 067 } 068 } 069 RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0); 070 HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName)); 071 moveRegion(region, rs); 072 try (Table table = UTIL.getConnection().getTable(tableName)) { 073 for (int i = 100; i < 200; i++) { 074 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 075 } 076 } 077 enablePeerAndWaitUntilReplicationDone(200); 078 checkOrder(200); 079 } 080 081 @Test 082 public void testRegionSplit() throws Exception { 083 TableName tableName = createTable(); 084 try (Table table = UTIL.getConnection().getTable(tableName)) { 085 for (int i = 0; i < 100; i++) { 086 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 087 } 088 } 089 UTIL.flush(tableName); 090 RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0); 091 UTIL.getAdmin().splitRegionAsync(region.getEncodedNameAsBytes(), Bytes.toBytes(50)).get(30, 092 TimeUnit.SECONDS); 093 UTIL.waitUntilNoRegionsInTransition(30000); 094 List<RegionInfo> regions = UTIL.getAdmin().getRegions(tableName); 095 assertEquals(2, regions.size()); 096 try (Table table = UTIL.getConnection().getTable(tableName)) { 097 for (int i = 0; i < 100; i++) { 098 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 099 } 100 } 101 enablePeerAndWaitUntilReplicationDone(200); 102 Map<String, Long> regionsToSeqId = new HashMap<>(); 103 regionsToSeqId.put(region.getEncodedName(), -1L); 104 regions.stream().map(RegionInfo::getEncodedName).forEach(n -> regionsToSeqId.put(n, -1L)); 105 try (WALStreamReader reader = 106 NoEOFWALStreamReader.create(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) { 107 int count = 0; 108 for (Entry entry;;) { 109 entry = reader.next(); 110 if (entry == null) { 111 break; 112 } 113 String encodedName = Bytes.toString(entry.getKey().getEncodedRegionName()); 114 Long seqId = regionsToSeqId.get(encodedName); 115 assertNotNull(seqId, 116 "Unexcepted entry " + entry + ", expected regions " + region + ", or " + regions); 117 assertTrue(entry.getKey().getSequenceId() >= seqId.longValue(), 118 "Sequence id go backwards from " + seqId + " to " + entry.getKey().getSequenceId() 119 + " for " + encodedName); 120 if (count < 100) { 121 assertEquals(region.getEncodedName(), encodedName, 122 encodedName + " is pushed before parent " + region.getEncodedName()); 123 } else { 124 assertNotEquals(region.getEncodedName(), encodedName); 125 } 126 count++; 127 } 128 assertEquals(200, count); 129 } 130 } 131 132 @Test 133 public void testRegionMerge() throws Exception { 134 byte[] splitKey = Bytes.toBytes(50); 135 TableName tableName = tableNameExt.getTableName(); 136 UTIL.getAdmin().createTable( 137 TableDescriptorBuilder.newBuilder(tableName) 138 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF) 139 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 140 .build(), 141 new byte[][] { splitKey }); 142 UTIL.waitTableAvailable(tableName); 143 try (Table table = UTIL.getConnection().getTable(tableName)) { 144 for (int i = 0; i < 100; i++) { 145 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 146 } 147 } 148 List<RegionInfo> regions = UTIL.getAdmin().getRegions(tableName); 149 UTIL.getAdmin() 150 .mergeRegionsAsync( 151 regions.stream().map(RegionInfo::getEncodedNameAsBytes).toArray(byte[][]::new), false) 152 .get(30, TimeUnit.SECONDS); 153 UTIL.waitUntilNoRegionsInTransition(30000); 154 List<RegionInfo> regionsAfterMerge = UTIL.getAdmin().getRegions(tableName); 155 assertEquals(1, regionsAfterMerge.size()); 156 try (Table table = UTIL.getConnection().getTable(tableName)) { 157 for (int i = 0; i < 100; i++) { 158 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 159 } 160 } 161 enablePeerAndWaitUntilReplicationDone(200); 162 Map<String, Long> regionsToSeqId = new HashMap<>(); 163 RegionInfo region = regionsAfterMerge.get(0); 164 regionsToSeqId.put(region.getEncodedName(), -1L); 165 regions.stream().map(RegionInfo::getEncodedName).forEach(n -> regionsToSeqId.put(n, -1L)); 166 try (WALStreamReader reader = 167 NoEOFWALStreamReader.create(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) { 168 int count = 0; 169 for (Entry entry;;) { 170 entry = reader.next(); 171 if (entry == null) { 172 break; 173 } 174 String encodedName = Bytes.toString(entry.getKey().getEncodedRegionName()); 175 Long seqId = regionsToSeqId.get(encodedName); 176 assertNotNull(seqId, 177 "Unexcepted entry " + entry + ", expected regions " + region + ", or " + regions); 178 assertTrue(entry.getKey().getSequenceId() >= seqId.longValue(), 179 "Sequence id go backwards from " + seqId + " to " + entry.getKey().getSequenceId() 180 + " for " + encodedName); 181 if (count < 100) { 182 assertNotEquals( 183 encodedName + " is pushed before parents " + regions.stream() 184 .map(RegionInfo::getEncodedName).collect(Collectors.joining(" and ")), 185 region.getEncodedName(), encodedName); 186 } else { 187 assertEquals(region.getEncodedName(), encodedName); 188 } 189 count++; 190 } 191 assertEquals(200, count); 192 } 193 } 194 195 @Test 196 public void testRemovePeerNothingReplicated() throws Exception { 197 TableName tableName = createTable(); 198 String encodedRegionName = 199 UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo().getEncodedName(); 200 ReplicationQueueStorage queueStorage = 201 UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage(); 202 assertEquals(HConstants.NO_SEQNUM, queueStorage.getLastSequenceId(encodedRegionName, PEER_ID)); 203 UTIL.getAdmin().removeReplicationPeer(PEER_ID); 204 assertEquals(HConstants.NO_SEQNUM, queueStorage.getLastSequenceId(encodedRegionName, PEER_ID)); 205 } 206 207 @Test 208 public void testRemovePeer() throws Exception { 209 TableName tableName = createTable(); 210 try (Table table = UTIL.getConnection().getTable(tableName)) { 211 for (int i = 0; i < 100; i++) { 212 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 213 } 214 } 215 enablePeerAndWaitUntilReplicationDone(100); 216 checkOrder(100); 217 String encodedRegionName = 218 UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo().getEncodedName(); 219 ReplicationQueueStorage queueStorage = 220 UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage(); 221 assertTrue(queueStorage.getLastSequenceId(encodedRegionName, PEER_ID) > 0); 222 UTIL.getAdmin().removeReplicationPeer(PEER_ID); 223 // confirm that we delete the last pushed sequence id 224 assertEquals(HConstants.NO_SEQNUM, queueStorage.getLastSequenceId(encodedRegionName, PEER_ID)); 225 } 226 227 @Test 228 public void testRemoveSerialFlag() throws Exception { 229 TableName tableName = createTable(); 230 try (Table table = UTIL.getConnection().getTable(tableName)) { 231 for (int i = 0; i < 100; i++) { 232 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 233 } 234 } 235 enablePeerAndWaitUntilReplicationDone(100); 236 checkOrder(100); 237 String encodedRegionName = 238 UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo().getEncodedName(); 239 ReplicationQueueStorage queueStorage = 240 UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage(); 241 assertTrue(queueStorage.getLastSequenceId(encodedRegionName, PEER_ID) > 0); 242 ReplicationPeerConfig peerConfig = UTIL.getAdmin().getReplicationPeerConfig(PEER_ID); 243 UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID, 244 ReplicationPeerConfig.newBuilder(peerConfig).setSerial(false).build()); 245 // confirm that we delete the last pushed sequence id 246 assertEquals(HConstants.NO_SEQNUM, queueStorage.getLastSequenceId(encodedRegionName, PEER_ID)); 247 } 248}