001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.Assert.assertFalse; 021import static org.junit.Assert.assertTrue; 022import static org.mockito.ArgumentMatchers.any; 023import static org.mockito.Mockito.doAnswer; 024import static org.mockito.Mockito.mock; 025import static org.mockito.Mockito.times; 026import static org.mockito.Mockito.verify; 027import static org.mockito.Mockito.when; 028 029import java.io.IOException; 030import java.util.Arrays; 031import java.util.List; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.Cell.Type; 034import org.apache.hadoop.hbase.CellBuilderFactory; 035import org.apache.hadoop.hbase.CellBuilderType; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseTestingUtility; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.MetaTableAccessor; 040import org.apache.hadoop.hbase.Server; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.client.Connection; 043import org.apache.hadoop.hbase.client.Put; 044import org.apache.hadoop.hbase.client.RegionInfo; 045import org.apache.hadoop.hbase.client.RegionInfoBuilder; 046import org.apache.hadoop.hbase.client.Table; 047import org.apache.hadoop.hbase.master.RegionState; 048import org.apache.hadoop.hbase.replication.ReplicationException; 049import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 050import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 051import org.apache.hadoop.hbase.testclassification.MediumTests; 052import org.apache.hadoop.hbase.testclassification.ReplicationTests; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 055import org.apache.hadoop.hbase.wal.WAL.Entry; 056import org.apache.hadoop.hbase.wal.WALKeyImpl; 057import org.junit.AfterClass; 058import org.junit.Before; 059import org.junit.BeforeClass; 060import org.junit.ClassRule; 061import org.junit.Rule; 062import org.junit.Test; 063import org.junit.experimental.categories.Category; 064import org.junit.rules.TestName; 065import org.mockito.invocation.InvocationOnMock; 066import org.mockito.stubbing.Answer; 067 068import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 069 070@Category({ ReplicationTests.class, MediumTests.class }) 071public class TestSerialReplicationChecker { 072 073 @ClassRule 074 public static final HBaseClassTestRule CLASS_RULE = 075 HBaseClassTestRule.forClass(TestSerialReplicationChecker.class); 076 077 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 078 079 private static String PEER_ID = "1"; 080 081 private static ReplicationQueueStorage QUEUE_STORAGE; 082 083 private static String WAL_FILE_NAME = "test.wal"; 084 085 private Connection conn; 086 087 private SerialReplicationChecker checker; 088 089 @Rule 090 public final TestName name = new TestName(); 091 092 private TableName tableName; 093 094 @BeforeClass 095 public static void setUpBeforeClass() throws Exception { 096 UTIL.startMiniCluster(1); 097 QUEUE_STORAGE = ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getZooKeeperWatcher(), 098 UTIL.getConfiguration()); 099 QUEUE_STORAGE.addWAL(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(), PEER_ID, 100 WAL_FILE_NAME); 101 } 102 103 @AfterClass 104 public static void tearDownAfterClass() throws Exception { 105 UTIL.shutdownMiniCluster(); 106 } 107 108 @Before 109 public void setUp() throws IOException { 110 ReplicationSource source = mock(ReplicationSource.class); 111 when(source.getPeerId()).thenReturn(PEER_ID); 112 when(source.getReplicationQueueStorage()).thenReturn(QUEUE_STORAGE); 113 conn = mock(Connection.class); 114 when(conn.isClosed()).thenReturn(false); 115 doAnswer(new Answer<Table>() { 116 117 @Override 118 public Table answer(InvocationOnMock invocation) throws Throwable { 119 return UTIL.getConnection().getTable((TableName) invocation.getArgument(0)); 120 } 121 122 }).when(conn).getTable(any(TableName.class)); 123 Server server = mock(Server.class); 124 when(server.getConnection()).thenReturn(conn); 125 when(source.getServer()).thenReturn(server); 126 checker = new SerialReplicationChecker(UTIL.getConfiguration(), source); 127 tableName = TableName.valueOf(name.getMethodName()); 128 } 129 130 private Entry createEntry(RegionInfo region, long seqId) { 131 WALKeyImpl key = mock(WALKeyImpl.class); 132 when(key.getTableName()).thenReturn(tableName); 133 when(key.getEncodedRegionName()).thenReturn(region.getEncodedNameAsBytes()); 134 when(key.getSequenceId()).thenReturn(seqId); 135 Entry entry = mock(Entry.class); 136 when(entry.getKey()).thenReturn(key); 137 return entry; 138 } 139 140 private Cell createCell(RegionInfo region) { 141 return CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(region.getStartKey()) 142 .setType(Type.Put).build(); 143 } 144 145 @Test 146 public void testNoBarrierCanPush() throws IOException { 147 RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build(); 148 assertTrue(checker.canPush(createEntry(region, 100), createCell(region))); 149 } 150 151 private void addStateAndBarrier(RegionInfo region, RegionState.State state, long... barriers) 152 throws IOException { 153 Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime()); 154 if (state != null) { 155 put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER, 156 Bytes.toBytes(state.name())); 157 } 158 for (int i = 0; i < barriers.length; i++) { 159 put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER, 160 put.getTimeStamp() - barriers.length + i, Bytes.toBytes(barriers[i])); 161 } 162 try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) { 163 table.put(put); 164 } 165 } 166 167 private void setState(RegionInfo region, RegionState.State state) throws IOException { 168 Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime()); 169 put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER, 170 Bytes.toBytes(state.name())); 171 try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) { 172 table.put(put); 173 } 174 } 175 176 private void updatePushedSeqId(RegionInfo region, long seqId) throws ReplicationException { 177 QUEUE_STORAGE.setWALPosition(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(), 178 PEER_ID, WAL_FILE_NAME, 10, ImmutableMap.of(region.getEncodedName(), seqId)); 179 } 180 181 private void addParents(RegionInfo region, List<RegionInfo> parents) throws IOException { 182 Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime()); 183 put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, 184 MetaTableAccessor.REPLICATION_PARENT_QUALIFIER, MetaTableAccessor.getParentsBytes(parents)); 185 try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) { 186 table.put(put); 187 } 188 } 189 190 @Test 191 public void testLastRegionAndOpeningCanNotPush() throws IOException, ReplicationException { 192 RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build(); 193 addStateAndBarrier(region, RegionState.State.OPEN, 10); 194 Cell cell = createCell(region); 195 // can push since we are in the first range 196 assertTrue(checker.canPush(createEntry(region, 100), cell)); 197 setState(region, RegionState.State.OPENING); 198 // can not push since we are in the last range and the state is OPENING 199 assertFalse(checker.canPush(createEntry(region, 102), cell)); 200 addStateAndBarrier(region, RegionState.State.OPEN, 50); 201 // can not push since the previous range has not been finished yet 202 assertFalse(checker.canPush(createEntry(region, 102), cell)); 203 updatePushedSeqId(region, 49); 204 // can push since the previous range has been finished 205 assertTrue(checker.canPush(createEntry(region, 102), cell)); 206 setState(region, RegionState.State.OPENING); 207 assertFalse(checker.canPush(createEntry(region, 104), cell)); 208 } 209 210 @Test 211 public void testCanPushUnder() throws IOException, ReplicationException { 212 RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build(); 213 addStateAndBarrier(region, RegionState.State.OPEN, 10, 100); 214 updatePushedSeqId(region, 9); 215 Cell cell = createCell(region); 216 assertTrue(checker.canPush(createEntry(region, 20), cell)); 217 verify(conn, times(1)).getTable(any(TableName.class)); 218 // not continuous 219 for (int i = 22; i < 100; i += 2) { 220 assertTrue(checker.canPush(createEntry(region, i), cell)); 221 } 222 // verify that we do not go to meta table 223 verify(conn, times(1)).getTable(any(TableName.class)); 224 } 225 226 @Test 227 public void testCanPushIfContinuous() throws IOException, ReplicationException { 228 RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build(); 229 addStateAndBarrier(region, RegionState.State.OPEN, 10); 230 updatePushedSeqId(region, 9); 231 Cell cell = createCell(region); 232 assertTrue(checker.canPush(createEntry(region, 20), cell)); 233 verify(conn, times(1)).getTable(any(TableName.class)); 234 // continuous 235 for (int i = 21; i < 100; i++) { 236 assertTrue(checker.canPush(createEntry(region, i), cell)); 237 } 238 // verify that we do not go to meta table 239 verify(conn, times(1)).getTable(any(TableName.class)); 240 } 241 242 @Test 243 public void testCanPushAfterMerge() throws IOException, ReplicationException { 244 // 0xFF is the escape byte when storing region name so let's make sure it can work. 245 byte[] endKey = new byte[] { (byte) 0xFF, 0x00, (byte) 0xFF, (byte) 0xFF, 0x01 }; 246 RegionInfo regionA = 247 RegionInfoBuilder.newBuilder(tableName).setEndKey(endKey).setRegionId(1).build(); 248 RegionInfo regionB = 249 RegionInfoBuilder.newBuilder(tableName).setStartKey(endKey).setRegionId(2).build(); 250 RegionInfo region = RegionInfoBuilder.newBuilder(tableName).setRegionId(3).build(); 251 addStateAndBarrier(regionA, null, 10, 100); 252 addStateAndBarrier(regionB, null, 20, 200); 253 addStateAndBarrier(region, RegionState.State.OPEN, 200); 254 addParents(region, Arrays.asList(regionA, regionB)); 255 Cell cell = createCell(region); 256 // can not push since both parents have not been finished yet 257 assertFalse(checker.canPush(createEntry(region, 300), cell)); 258 updatePushedSeqId(regionB, 199); 259 // can not push since regionA has not been finished yet 260 assertFalse(checker.canPush(createEntry(region, 300), cell)); 261 updatePushedSeqId(regionA, 99); 262 // can push since all parents have been finished 263 assertTrue(checker.canPush(createEntry(region, 300), cell)); 264 } 265 266 @Test 267 public void testCanPushAfterSplit() throws IOException, ReplicationException { 268 // 0xFF is the escape byte when storing region name so let's make sure it can work. 269 byte[] endKey = new byte[] { (byte) 0xFF, 0x00, (byte) 0xFF, (byte) 0xFF, 0x01 }; 270 RegionInfo region = RegionInfoBuilder.newBuilder(tableName).setRegionId(1).build(); 271 RegionInfo regionA = 272 RegionInfoBuilder.newBuilder(tableName).setEndKey(endKey).setRegionId(2).build(); 273 RegionInfo regionB = 274 RegionInfoBuilder.newBuilder(tableName).setStartKey(endKey).setRegionId(3).build(); 275 addStateAndBarrier(region, null, 10, 100); 276 addStateAndBarrier(regionA, RegionState.State.OPEN, 100, 200); 277 addStateAndBarrier(regionB, RegionState.State.OPEN, 100, 300); 278 addParents(regionA, Arrays.asList(region)); 279 addParents(regionB, Arrays.asList(region)); 280 Cell cellA = createCell(regionA); 281 Cell cellB = createCell(regionB); 282 // can not push since parent has not been finished yet 283 assertFalse(checker.canPush(createEntry(regionA, 150), cellA)); 284 assertFalse(checker.canPush(createEntry(regionB, 200), cellB)); 285 updatePushedSeqId(region, 99); 286 // can push since parent has been finished 287 assertTrue(checker.canPush(createEntry(regionA, 150), cellA)); 288 assertTrue(checker.canPush(createEntry(regionB, 200), cellB)); 289 } 290 291 @Test 292 public void testCanPushEqualsToBarrier() throws IOException, ReplicationException { 293 // For binary search, equals to an element will result to a positive value, let's test whether 294 // it works. 295 RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build(); 296 addStateAndBarrier(region, RegionState.State.OPEN, 10, 100); 297 Cell cell = createCell(region); 298 assertTrue(checker.canPush(createEntry(region, 10), cell)); 299 assertFalse(checker.canPush(createEntry(region, 100), cell)); 300 updatePushedSeqId(region, 99); 301 assertTrue(checker.canPush(createEntry(region, 100), cell)); 302 } 303}