001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.jupiter.api.Assertions.assertFalse; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022import static org.mockito.ArgumentMatchers.any; 023import static org.mockito.Mockito.doAnswer; 024import static org.mockito.Mockito.mock; 025import static org.mockito.Mockito.times; 026import static org.mockito.Mockito.verify; 027import static org.mockito.Mockito.when; 028 029import java.io.IOException; 030import java.util.Arrays; 031import java.util.List; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.Cell.Type; 034import org.apache.hadoop.hbase.CellBuilderFactory; 035import org.apache.hadoop.hbase.CellBuilderType; 036import org.apache.hadoop.hbase.HBaseTestingUtil; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.Server; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.Connection; 041import org.apache.hadoop.hbase.client.Put; 042import org.apache.hadoop.hbase.client.RegionInfo; 043import org.apache.hadoop.hbase.client.RegionInfoBuilder; 044import org.apache.hadoop.hbase.client.Table; 045import org.apache.hadoop.hbase.master.RegionState; 046import org.apache.hadoop.hbase.replication.ReplicationBarrierFamilyFormat; 047import org.apache.hadoop.hbase.replication.ReplicationException; 048import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; 049import org.apache.hadoop.hbase.replication.ReplicationQueueId; 050import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 051import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 052import org.apache.hadoop.hbase.testclassification.MediumTests; 053import org.apache.hadoop.hbase.testclassification.ReplicationTests; 054import org.apache.hadoop.hbase.util.Bytes; 055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 056import org.apache.hadoop.hbase.wal.WAL.Entry; 057import org.apache.hadoop.hbase.wal.WALKeyImpl; 058import org.junit.jupiter.api.AfterAll; 059import org.junit.jupiter.api.BeforeAll; 060import org.junit.jupiter.api.BeforeEach; 061import org.junit.jupiter.api.Tag; 062import org.junit.jupiter.api.Test; 063import org.junit.jupiter.api.TestInfo; 064import org.mockito.invocation.InvocationOnMock; 065import org.mockito.stubbing.Answer; 066 067import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 068 069@Tag(ReplicationTests.TAG) 070@Tag(MediumTests.TAG) 071public class TestSerialReplicationChecker { 072 073 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 074 075 private static String PEER_ID = "1"; 076 077 private static ReplicationQueueStorage QUEUE_STORAGE; 078 079 private static String WAL_FILE_NAME = "test.wal"; 080 081 private Connection conn; 082 083 private SerialReplicationChecker checker; 084 085 private String testName; 086 087 private TableName tableName; 088 089 @BeforeAll 090 public static void setUpBeforeClass() throws Exception { 091 UTIL.startMiniCluster(1); 092 TableName repTable = TableName.valueOf("test_serial_rep"); 093 UTIL.getAdmin() 094 .createTable(ReplicationStorageFactory.createReplicationQueueTableDescriptor(repTable)); 095 QUEUE_STORAGE = ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getConnection(), 096 UTIL.getConfiguration(), repTable); 097 } 098 099 @AfterAll 100 public static void tearDownAfterClass() throws Exception { 101 UTIL.shutdownMiniCluster(); 102 } 103 104 @BeforeEach 105 public void setUp(TestInfo testInfo) throws IOException { 106 testName = testInfo.getTestMethod().get().getName(); 107 ReplicationSource source = mock(ReplicationSource.class); 108 when(source.getPeerId()).thenReturn(PEER_ID); 109 when(source.getReplicationQueueStorage()).thenReturn(QUEUE_STORAGE); 110 conn = mock(Connection.class); 111 when(conn.isClosed()).thenReturn(false); 112 doAnswer(new Answer<Table>() { 113 114 @Override 115 public Table answer(InvocationOnMock invocation) throws Throwable { 116 return UTIL.getConnection().getTable((TableName) invocation.getArgument(0)); 117 } 118 119 }).when(conn).getTable(any(TableName.class)); 120 Server server = mock(Server.class); 121 when(server.getConnection()).thenReturn(conn); 122 when(source.getServer()).thenReturn(server); 123 checker = new SerialReplicationChecker(UTIL.getConfiguration(), source); 124 tableName = TableName.valueOf(testName); 125 } 126 127 private Entry createEntry(RegionInfo region, long seqId) { 128 WALKeyImpl key = mock(WALKeyImpl.class); 129 when(key.getTableName()).thenReturn(tableName); 130 when(key.getEncodedRegionName()).thenReturn(region.getEncodedNameAsBytes()); 131 when(key.getSequenceId()).thenReturn(seqId); 132 Entry entry = mock(Entry.class); 133 when(entry.getKey()).thenReturn(key); 134 return entry; 135 } 136 137 private Cell createCell(RegionInfo region) { 138 return CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(region.getStartKey()) 139 .setType(Type.Put).build(); 140 } 141 142 @Test 143 public void testNoBarrierCanPush() throws IOException { 144 RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build(); 145 assertTrue(checker.canPush(createEntry(region, 100), createCell(region))); 146 } 147 148 private void addStateAndBarrier(RegionInfo region, RegionState.State state, long... barriers) 149 throws IOException { 150 Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime()); 151 if (state != null) { 152 put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER, 153 Bytes.toBytes(state.name())); 154 } 155 for (int i = 0; i < barriers.length; i++) { 156 put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER, 157 put.getTimestamp() - barriers.length + i, Bytes.toBytes(barriers[i])); 158 } 159 try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) { 160 table.put(put); 161 } 162 } 163 164 private void setState(RegionInfo region, RegionState.State state) throws IOException { 165 Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime()); 166 put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER, 167 Bytes.toBytes(state.name())); 168 try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) { 169 table.put(put); 170 } 171 } 172 173 private void updatePushedSeqId(RegionInfo region, long seqId) throws ReplicationException { 174 ReplicationQueueId queueId = new ReplicationQueueId( 175 UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(), PEER_ID); 176 QUEUE_STORAGE.setOffset(queueId, "", new ReplicationGroupOffset(WAL_FILE_NAME, 10), 177 ImmutableMap.of(region.getEncodedName(), seqId)); 178 } 179 180 private void addParents(RegionInfo region, List<RegionInfo> parents) throws IOException { 181 Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime()); 182 put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, 183 ReplicationBarrierFamilyFormat.REPLICATION_PARENT_QUALIFIER, 184 ReplicationBarrierFamilyFormat.getParentsBytes(parents)); 185 try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) { 186 table.put(put); 187 } 188 } 189 190 @Test 191 public void testLastRegionAndOpeningCanNotPush() throws IOException, ReplicationException { 192 RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build(); 193 addStateAndBarrier(region, RegionState.State.OPEN, 10); 194 Cell cell = createCell(region); 195 // can push since we are in the first range 196 assertTrue(checker.canPush(createEntry(region, 100), cell)); 197 setState(region, RegionState.State.OPENING); 198 // can not push since we are in the last range and the state is OPENING 199 assertFalse(checker.canPush(createEntry(region, 102), cell)); 200 addStateAndBarrier(region, RegionState.State.OPEN, 50); 201 // can not push since the previous range has not been finished yet 202 assertFalse(checker.canPush(createEntry(region, 102), cell)); 203 updatePushedSeqId(region, 49); 204 // can push since the previous range has been finished 205 assertTrue(checker.canPush(createEntry(region, 102), cell)); 206 setState(region, RegionState.State.OPENING); 207 assertFalse(checker.canPush(createEntry(region, 104), cell)); 208 } 209 210 @Test 211 public void testCanPushUnder() throws IOException, ReplicationException { 212 RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build(); 213 addStateAndBarrier(region, RegionState.State.OPEN, 10, 100); 214 updatePushedSeqId(region, 9); 215 Cell cell = createCell(region); 216 assertTrue(checker.canPush(createEntry(region, 20), cell)); 217 verify(conn, times(1)).getTable(any(TableName.class)); 218 // not continuous 219 for (int i = 22; i < 100; i += 2) { 220 assertTrue(checker.canPush(createEntry(region, i), cell)); 221 } 222 // verify that we do not go to meta table 223 verify(conn, times(1)).getTable(any(TableName.class)); 224 } 225 226 @Test 227 public void testCanPushIfContinuous() throws IOException, ReplicationException { 228 RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build(); 229 addStateAndBarrier(region, RegionState.State.OPEN, 10); 230 updatePushedSeqId(region, 9); 231 Cell cell = createCell(region); 232 assertTrue(checker.canPush(createEntry(region, 20), cell)); 233 verify(conn, times(1)).getTable(any(TableName.class)); 234 // continuous 235 for (int i = 21; i < 100; i++) { 236 assertTrue(checker.canPush(createEntry(region, i), cell)); 237 } 238 // verify that we do not go to meta table 239 verify(conn, times(1)).getTable(any(TableName.class)); 240 } 241 242 @Test 243 public void testCanPushAfterMerge() throws IOException, ReplicationException { 244 // 0xFF is the escape byte when storing region name so let's make sure it can work. 245 byte[] endKey = new byte[] { (byte) 0xFF, 0x00, (byte) 0xFF, (byte) 0xFF, 0x01 }; 246 RegionInfo regionA = 247 RegionInfoBuilder.newBuilder(tableName).setEndKey(endKey).setRegionId(1).build(); 248 RegionInfo regionB = 249 RegionInfoBuilder.newBuilder(tableName).setStartKey(endKey).setRegionId(2).build(); 250 RegionInfo region = RegionInfoBuilder.newBuilder(tableName).setRegionId(3).build(); 251 addStateAndBarrier(regionA, null, 10, 100); 252 addStateAndBarrier(regionB, null, 20, 200); 253 addStateAndBarrier(region, RegionState.State.OPEN, 200); 254 addParents(region, Arrays.asList(regionA, regionB)); 255 Cell cell = createCell(region); 256 // can not push since both parents have not been finished yet 257 assertFalse(checker.canPush(createEntry(region, 300), cell)); 258 updatePushedSeqId(regionB, 199); 259 // can not push since regionA has not been finished yet 260 assertFalse(checker.canPush(createEntry(region, 300), cell)); 261 updatePushedSeqId(regionA, 99); 262 // can push since all parents have been finished 263 assertTrue(checker.canPush(createEntry(region, 300), cell)); 264 } 265 266 @Test 267 public void testCanPushAfterSplit() throws IOException, ReplicationException { 268 // 0xFF is the escape byte when storing region name so let's make sure it can work. 269 byte[] endKey = new byte[] { (byte) 0xFF, 0x00, (byte) 0xFF, (byte) 0xFF, 0x01 }; 270 RegionInfo region = RegionInfoBuilder.newBuilder(tableName).setRegionId(1).build(); 271 RegionInfo regionA = 272 RegionInfoBuilder.newBuilder(tableName).setEndKey(endKey).setRegionId(2).build(); 273 RegionInfo regionB = 274 RegionInfoBuilder.newBuilder(tableName).setStartKey(endKey).setRegionId(3).build(); 275 addStateAndBarrier(region, null, 10, 100); 276 addStateAndBarrier(regionA, RegionState.State.OPEN, 100, 200); 277 addStateAndBarrier(regionB, RegionState.State.OPEN, 100, 300); 278 addParents(regionA, Arrays.asList(region)); 279 addParents(regionB, Arrays.asList(region)); 280 Cell cellA = createCell(regionA); 281 Cell cellB = createCell(regionB); 282 // can not push since parent has not been finished yet 283 assertFalse(checker.canPush(createEntry(regionA, 150), cellA)); 284 assertFalse(checker.canPush(createEntry(regionB, 200), cellB)); 285 updatePushedSeqId(region, 99); 286 // can push since parent has been finished 287 assertTrue(checker.canPush(createEntry(regionA, 150), cellA)); 288 assertTrue(checker.canPush(createEntry(regionB, 200), cellB)); 289 } 290 291 @Test 292 public void testCanPushEqualsToBarrier() throws IOException, ReplicationException { 293 // For binary search, equals to an element will result to a positive value, let's test whether 294 // it works. 295 RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build(); 296 addStateAndBarrier(region, RegionState.State.OPEN, 10, 100); 297 Cell cell = createCell(region); 298 assertTrue(checker.canPush(createEntry(region, 10), cell)); 299 assertFalse(checker.canPush(createEntry(region, 100), cell)); 300 updatePushedSeqId(region, 99); 301 assertTrue(checker.canPush(createEntry(region, 100), cell)); 302 } 303}