001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.cleaner; 019 020import static org.junit.jupiter.api.Assertions.assertArrayEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.mockito.ArgumentMatchers.any; 023import static org.mockito.ArgumentMatchers.anyList; 024import static org.mockito.ArgumentMatchers.anyString; 025import static org.mockito.Mockito.mock; 026import static org.mockito.Mockito.never; 027import static org.mockito.Mockito.times; 028import static org.mockito.Mockito.verify; 029import static org.mockito.Mockito.when; 030 031import java.io.IOException; 032import java.util.Arrays; 033import java.util.Collections; 034import java.util.List; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.Stoppable; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.Delete; 040import org.apache.hadoop.hbase.client.Get; 041import org.apache.hadoop.hbase.client.Put; 042import org.apache.hadoop.hbase.client.RegionInfo; 043import org.apache.hadoop.hbase.client.RegionInfoBuilder; 044import org.apache.hadoop.hbase.client.Result; 045import org.apache.hadoop.hbase.client.ResultScanner; 046import org.apache.hadoop.hbase.client.Scan; 047import org.apache.hadoop.hbase.client.Table; 048import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; 049import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; 050import org.apache.hadoop.hbase.replication.ReplicationBarrierFamilyFormat; 051import org.apache.hadoop.hbase.replication.ReplicationException; 052import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 053import org.apache.hadoop.hbase.testclassification.MasterTests; 054import org.apache.hadoop.hbase.testclassification.MediumTests; 055import org.apache.hadoop.hbase.util.Bytes; 056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 057import org.junit.jupiter.api.AfterAll; 058import org.junit.jupiter.api.AfterEach; 059import org.junit.jupiter.api.BeforeAll; 060import org.junit.jupiter.api.Tag; 061import org.junit.jupiter.api.Test; 062import org.junit.jupiter.api.TestInfo; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 067 068@Tag(MasterTests.TAG) 069@Tag(MediumTests.TAG) 070public class TestReplicationBarrierCleaner { 071 072 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBarrierCleaner.class); 073 074 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 075 076 @BeforeAll 077 public static void setUpBeforeClass() throws Exception { 078 UTIL.startMiniCluster(1); 079 } 080 081 @AfterAll 082 public static void tearDownAfterClass() throws Exception { 083 UTIL.shutdownMiniCluster(); 084 } 085 086 @AfterEach 087 public void tearDown() throws IOException { 088 try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME); 089 ResultScanner scanner = table.getScanner(new Scan().addFamily(HConstants.CATALOG_FAMILY) 090 .addFamily(HConstants.REPLICATION_BARRIER_FAMILY).setFilter(new FirstKeyOnlyFilter()))) { 091 for (;;) { 092 Result result = scanner.next(); 093 if (result == null) { 094 break; 095 } 096 TableName tableName = RegionInfo.getTable(result.getRow()); 097 if (!tableName.isSystemTable()) { 098 table.delete(new Delete(result.getRow())); 099 } 100 } 101 } 102 } 103 104 private ReplicationPeerManager create(ReplicationQueueStorage queueStorage, 105 List<String> firstPeerIds, @SuppressWarnings("unchecked") List<String>... peerIds) { 106 ReplicationPeerManager peerManager = mock(ReplicationPeerManager.class); 107 if (queueStorage != null) { 108 when(peerManager.getQueueStorage()).thenReturn(queueStorage); 109 } 110 if (peerIds.length == 0) { 111 when(peerManager.getSerialPeerIdsBelongsTo(any(TableName.class))).thenReturn(firstPeerIds); 112 } else { 113 when(peerManager.getSerialPeerIdsBelongsTo(any(TableName.class))).thenReturn(firstPeerIds, 114 peerIds); 115 } 116 return peerManager; 117 } 118 119 private ReplicationQueueStorage create(Long lastPushedSeqId, Long... lastPushedSeqIds) 120 throws ReplicationException { 121 ReplicationQueueStorage queueStorage = mock(ReplicationQueueStorage.class); 122 if (lastPushedSeqIds.length == 0) { 123 when(queueStorage.getLastSequenceId(anyString(), anyString())).thenReturn(lastPushedSeqId); 124 } else { 125 when(queueStorage.getLastSequenceId(anyString(), anyString())).thenReturn(lastPushedSeqId, 126 lastPushedSeqIds); 127 } 128 return queueStorage; 129 } 130 131 private ReplicationBarrierCleaner create(ReplicationPeerManager peerManager) throws IOException { 132 return new ReplicationBarrierCleaner(UTIL.getConfiguration(), new WarnOnlyStoppable(), 133 UTIL.getConnection(), peerManager); 134 } 135 136 private void addBarrier(RegionInfo region, long... barriers) throws IOException { 137 Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime()); 138 for (int i = 0; i < barriers.length; i++) { 139 put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER, 140 put.getTimestamp() - barriers.length + i, Bytes.toBytes(barriers[i])); 141 } 142 try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) { 143 table.put(put); 144 } 145 } 146 147 private void fillCatalogFamily(RegionInfo region) throws IOException { 148 try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) { 149 table.put(new Put(region.getRegionName()).addColumn(HConstants.CATALOG_FAMILY, 150 Bytes.toBytes("whatever"), Bytes.toBytes("whatever"))); 151 } 152 } 153 154 private void clearCatalogFamily(RegionInfo region) throws IOException { 155 try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) { 156 table.delete(new Delete(region.getRegionName()).addFamily(HConstants.CATALOG_FAMILY)); 157 } 158 } 159 160 @Test 161 public void testNothing() throws IOException { 162 ReplicationPeerManager peerManager = mock(ReplicationPeerManager.class); 163 ReplicationBarrierCleaner cleaner = create(peerManager); 164 cleaner.chore(); 165 verify(peerManager, never()).getSerialPeerIdsBelongsTo(any(TableName.class)); 166 verify(peerManager, never()).getQueueStorage(); 167 } 168 169 @Test 170 public void testCleanNoPeers(TestInfo testInfo) throws IOException { 171 TableName tableName1 = TableName.valueOf(testInfo.getTestMethod().get().getName() + "_1"); 172 RegionInfo region11 = 173 RegionInfoBuilder.newBuilder(tableName1).setEndKey(Bytes.toBytes(1)).build(); 174 addBarrier(region11, 10, 20, 30, 40, 50, 60); 175 fillCatalogFamily(region11); 176 RegionInfo region12 = 177 RegionInfoBuilder.newBuilder(tableName1).setStartKey(Bytes.toBytes(1)).build(); 178 addBarrier(region12, 20, 30, 40, 50, 60, 70); 179 fillCatalogFamily(region12); 180 181 TableName tableName2 = TableName.valueOf(testInfo.getTestMethod().get().getName() + "_2"); 182 RegionInfo region21 = 183 RegionInfoBuilder.newBuilder(tableName2).setEndKey(Bytes.toBytes(1)).build(); 184 addBarrier(region21, 100, 200, 300, 400); 185 fillCatalogFamily(region21); 186 RegionInfo region22 = 187 RegionInfoBuilder.newBuilder(tableName2).setStartKey(Bytes.toBytes(1)).build(); 188 addBarrier(region22, 200, 300, 400, 500, 600); 189 fillCatalogFamily(region22); 190 191 @SuppressWarnings("unchecked") 192 ReplicationPeerManager peerManager = 193 create(null, Collections.emptyList(), Collections.emptyList()); 194 ReplicationBarrierCleaner cleaner = create(peerManager); 195 cleaner.chore(); 196 197 // should never call this method 198 verify(peerManager, never()).getQueueStorage(); 199 // should only be called twice although we have 4 regions to clean 200 verify(peerManager, times(2)).getSerialPeerIdsBelongsTo(any(TableName.class)); 201 202 assertArrayEquals(new long[] { 60 }, ReplicationBarrierFamilyFormat 203 .getReplicationBarriers(UTIL.getConnection(), region11.getRegionName())); 204 assertArrayEquals(new long[] { 70 }, ReplicationBarrierFamilyFormat 205 .getReplicationBarriers(UTIL.getConnection(), region12.getRegionName())); 206 207 assertArrayEquals(new long[] { 400 }, ReplicationBarrierFamilyFormat 208 .getReplicationBarriers(UTIL.getConnection(), region21.getRegionName())); 209 assertArrayEquals(new long[] { 600 }, ReplicationBarrierFamilyFormat 210 .getReplicationBarriers(UTIL.getConnection(), region22.getRegionName())); 211 } 212 213 @Test 214 public void testDeleteBarriers(TestInfo testInfo) throws IOException, ReplicationException { 215 TableName tableName = TableName.valueOf(testInfo.getTestMethod().get().getName()); 216 RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build(); 217 addBarrier(region, 10, 20, 30, 40, 50, 60); 218 // two peers 219 ReplicationQueueStorage queueStorage = create(-1L, 2L, 15L, 25L, 20L, 25L, 65L, 55L, 70L, 70L); 220 List<String> peerIds = Lists.newArrayList("1", "2"); 221 222 @SuppressWarnings("unchecked") 223 ReplicationPeerManager peerManager = 224 create(queueStorage, peerIds, peerIds, peerIds, peerIds, peerIds); 225 ReplicationBarrierCleaner cleaner = create(peerManager); 226 227 // beyond the first barrier, no deletion 228 cleaner.chore(); 229 assertArrayEquals(new long[] { 10, 20, 30, 40, 50, 60 }, ReplicationBarrierFamilyFormat 230 .getReplicationBarriers(UTIL.getConnection(), region.getRegionName())); 231 232 // in the first range, still no deletion 233 cleaner.chore(); 234 assertArrayEquals(new long[] { 10, 20, 30, 40, 50, 60 }, ReplicationBarrierFamilyFormat 235 .getReplicationBarriers(UTIL.getConnection(), region.getRegionName())); 236 237 // in the second range, 10 is deleted 238 cleaner.chore(); 239 assertArrayEquals(new long[] { 20, 30, 40, 50, 60 }, ReplicationBarrierFamilyFormat 240 .getReplicationBarriers(UTIL.getConnection(), region.getRegionName())); 241 242 // between 50 and 60, so the barriers before 50 will be deleted 243 cleaner.chore(); 244 assertArrayEquals(new long[] { 50, 60 }, ReplicationBarrierFamilyFormat 245 .getReplicationBarriers(UTIL.getConnection(), region.getRegionName())); 246 247 // in the last open range, 50 is deleted 248 cleaner.chore(); 249 assertArrayEquals(new long[] { 60 }, ReplicationBarrierFamilyFormat 250 .getReplicationBarriers(UTIL.getConnection(), region.getRegionName())); 251 } 252 253 @Test 254 public void testDeleteRowForDeletedRegion(TestInfo testInfo) 255 throws IOException, ReplicationException { 256 TableName tableName = TableName.valueOf(testInfo.getTestMethod().get().getName()); 257 RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build(); 258 addBarrier(region, 40, 50, 60); 259 fillCatalogFamily(region); 260 261 String peerId = "1"; 262 ReplicationQueueStorage queueStorage = create(59L); 263 @SuppressWarnings("unchecked") 264 ReplicationPeerManager peerManager = create(queueStorage, Lists.newArrayList(peerId)); 265 ReplicationBarrierCleaner cleaner = create(peerManager); 266 267 // we have something in catalog family, so only delete 40 268 cleaner.chore(); 269 assertArrayEquals(new long[] { 50, 60 }, ReplicationBarrierFamilyFormat 270 .getReplicationBarriers(UTIL.getConnection(), region.getRegionName())); 271 verify(queueStorage, never()).removeLastSequenceIds(anyString(), anyList()); 272 273 // No catalog family, then we should remove the whole row 274 clearCatalogFamily(region); 275 cleaner.chore(); 276 try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) { 277 assertFalse(table 278 .exists(new Get(region.getRegionName()).addFamily(HConstants.REPLICATION_BARRIER_FAMILY))); 279 } 280 verify(queueStorage, times(1)).removeLastSequenceIds(peerId, 281 Arrays.asList(region.getEncodedName())); 282 } 283 284 @Test 285 public void testDeleteRowForDeletedRegionNoPeers(TestInfo testInfo) throws IOException { 286 TableName tableName = TableName.valueOf(testInfo.getTestMethod().get().getName()); 287 RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build(); 288 addBarrier(region, 40, 50, 60); 289 290 ReplicationPeerManager peerManager = mock(ReplicationPeerManager.class); 291 ReplicationBarrierCleaner cleaner = create(peerManager); 292 cleaner.chore(); 293 294 verify(peerManager, times(1)).getSerialPeerIdsBelongsTo(tableName); 295 // There are no peers, and no catalog family for this region either, so we should remove the 296 // barriers. And since there is no catalog family, after we delete the barrier family, the whole 297 // row is deleted. 298 try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) { 299 assertFalse(table.exists(new Get(region.getRegionName()))); 300 } 301 } 302 303 private static class WarnOnlyStoppable implements Stoppable { 304 @Override 305 public void stop(String why) { 306 LOG.warn("TestReplicationBarrierCleaner received stop, ignoring. Reason: " + why); 307 } 308 309 @Override 310 public boolean isStopped() { 311 return false; 312 } 313 } 314}