001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.cleaner;
019
020import static org.junit.jupiter.api.Assertions.assertArrayEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.mockito.ArgumentMatchers.any;
023import static org.mockito.ArgumentMatchers.anyList;
024import static org.mockito.ArgumentMatchers.anyString;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.never;
027import static org.mockito.Mockito.times;
028import static org.mockito.Mockito.verify;
029import static org.mockito.Mockito.when;
030
031import java.io.IOException;
032import java.util.Arrays;
033import java.util.Collections;
034import java.util.List;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.Stoppable;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.Delete;
040import org.apache.hadoop.hbase.client.Get;
041import org.apache.hadoop.hbase.client.Put;
042import org.apache.hadoop.hbase.client.RegionInfo;
043import org.apache.hadoop.hbase.client.RegionInfoBuilder;
044import org.apache.hadoop.hbase.client.Result;
045import org.apache.hadoop.hbase.client.ResultScanner;
046import org.apache.hadoop.hbase.client.Scan;
047import org.apache.hadoop.hbase.client.Table;
048import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
049import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
050import org.apache.hadoop.hbase.replication.ReplicationBarrierFamilyFormat;
051import org.apache.hadoop.hbase.replication.ReplicationException;
052import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
053import org.apache.hadoop.hbase.testclassification.MasterTests;
054import org.apache.hadoop.hbase.testclassification.MediumTests;
055import org.apache.hadoop.hbase.util.Bytes;
056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
057import org.junit.jupiter.api.AfterAll;
058import org.junit.jupiter.api.AfterEach;
059import org.junit.jupiter.api.BeforeAll;
060import org.junit.jupiter.api.Tag;
061import org.junit.jupiter.api.Test;
062import org.junit.jupiter.api.TestInfo;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
067
068@Tag(MasterTests.TAG)
069@Tag(MediumTests.TAG)
070public class TestReplicationBarrierCleaner {
071
072  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBarrierCleaner.class);
073
074  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
075
076  @BeforeAll
077  public static void setUpBeforeClass() throws Exception {
078    UTIL.startMiniCluster(1);
079  }
080
081  @AfterAll
082  public static void tearDownAfterClass() throws Exception {
083    UTIL.shutdownMiniCluster();
084  }
085
086  @AfterEach
087  public void tearDown() throws IOException {
088    try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME);
089      ResultScanner scanner = table.getScanner(new Scan().addFamily(HConstants.CATALOG_FAMILY)
090        .addFamily(HConstants.REPLICATION_BARRIER_FAMILY).setFilter(new FirstKeyOnlyFilter()))) {
091      for (;;) {
092        Result result = scanner.next();
093        if (result == null) {
094          break;
095        }
096        TableName tableName = RegionInfo.getTable(result.getRow());
097        if (!tableName.isSystemTable()) {
098          table.delete(new Delete(result.getRow()));
099        }
100      }
101    }
102  }
103
104  private ReplicationPeerManager create(ReplicationQueueStorage queueStorage,
105    List<String> firstPeerIds, @SuppressWarnings("unchecked") List<String>... peerIds) {
106    ReplicationPeerManager peerManager = mock(ReplicationPeerManager.class);
107    if (queueStorage != null) {
108      when(peerManager.getQueueStorage()).thenReturn(queueStorage);
109    }
110    if (peerIds.length == 0) {
111      when(peerManager.getSerialPeerIdsBelongsTo(any(TableName.class))).thenReturn(firstPeerIds);
112    } else {
113      when(peerManager.getSerialPeerIdsBelongsTo(any(TableName.class))).thenReturn(firstPeerIds,
114        peerIds);
115    }
116    return peerManager;
117  }
118
119  private ReplicationQueueStorage create(Long lastPushedSeqId, Long... lastPushedSeqIds)
120    throws ReplicationException {
121    ReplicationQueueStorage queueStorage = mock(ReplicationQueueStorage.class);
122    if (lastPushedSeqIds.length == 0) {
123      when(queueStorage.getLastSequenceId(anyString(), anyString())).thenReturn(lastPushedSeqId);
124    } else {
125      when(queueStorage.getLastSequenceId(anyString(), anyString())).thenReturn(lastPushedSeqId,
126        lastPushedSeqIds);
127    }
128    return queueStorage;
129  }
130
131  private ReplicationBarrierCleaner create(ReplicationPeerManager peerManager) throws IOException {
132    return new ReplicationBarrierCleaner(UTIL.getConfiguration(), new WarnOnlyStoppable(),
133      UTIL.getConnection(), peerManager);
134  }
135
136  private void addBarrier(RegionInfo region, long... barriers) throws IOException {
137    Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
138    for (int i = 0; i < barriers.length; i++) {
139      put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER,
140        put.getTimestamp() - barriers.length + i, Bytes.toBytes(barriers[i]));
141    }
142    try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
143      table.put(put);
144    }
145  }
146
147  private void fillCatalogFamily(RegionInfo region) throws IOException {
148    try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
149      table.put(new Put(region.getRegionName()).addColumn(HConstants.CATALOG_FAMILY,
150        Bytes.toBytes("whatever"), Bytes.toBytes("whatever")));
151    }
152  }
153
154  private void clearCatalogFamily(RegionInfo region) throws IOException {
155    try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
156      table.delete(new Delete(region.getRegionName()).addFamily(HConstants.CATALOG_FAMILY));
157    }
158  }
159
160  @Test
161  public void testNothing() throws IOException {
162    ReplicationPeerManager peerManager = mock(ReplicationPeerManager.class);
163    ReplicationBarrierCleaner cleaner = create(peerManager);
164    cleaner.chore();
165    verify(peerManager, never()).getSerialPeerIdsBelongsTo(any(TableName.class));
166    verify(peerManager, never()).getQueueStorage();
167  }
168
169  @Test
170  public void testCleanNoPeers(TestInfo testInfo) throws IOException {
171    TableName tableName1 = TableName.valueOf(testInfo.getTestMethod().get().getName() + "_1");
172    RegionInfo region11 =
173      RegionInfoBuilder.newBuilder(tableName1).setEndKey(Bytes.toBytes(1)).build();
174    addBarrier(region11, 10, 20, 30, 40, 50, 60);
175    fillCatalogFamily(region11);
176    RegionInfo region12 =
177      RegionInfoBuilder.newBuilder(tableName1).setStartKey(Bytes.toBytes(1)).build();
178    addBarrier(region12, 20, 30, 40, 50, 60, 70);
179    fillCatalogFamily(region12);
180
181    TableName tableName2 = TableName.valueOf(testInfo.getTestMethod().get().getName() + "_2");
182    RegionInfo region21 =
183      RegionInfoBuilder.newBuilder(tableName2).setEndKey(Bytes.toBytes(1)).build();
184    addBarrier(region21, 100, 200, 300, 400);
185    fillCatalogFamily(region21);
186    RegionInfo region22 =
187      RegionInfoBuilder.newBuilder(tableName2).setStartKey(Bytes.toBytes(1)).build();
188    addBarrier(region22, 200, 300, 400, 500, 600);
189    fillCatalogFamily(region22);
190
191    @SuppressWarnings("unchecked")
192    ReplicationPeerManager peerManager =
193      create(null, Collections.emptyList(), Collections.emptyList());
194    ReplicationBarrierCleaner cleaner = create(peerManager);
195    cleaner.chore();
196
197    // should never call this method
198    verify(peerManager, never()).getQueueStorage();
199    // should only be called twice although we have 4 regions to clean
200    verify(peerManager, times(2)).getSerialPeerIdsBelongsTo(any(TableName.class));
201
202    assertArrayEquals(new long[] { 60 }, ReplicationBarrierFamilyFormat
203      .getReplicationBarriers(UTIL.getConnection(), region11.getRegionName()));
204    assertArrayEquals(new long[] { 70 }, ReplicationBarrierFamilyFormat
205      .getReplicationBarriers(UTIL.getConnection(), region12.getRegionName()));
206
207    assertArrayEquals(new long[] { 400 }, ReplicationBarrierFamilyFormat
208      .getReplicationBarriers(UTIL.getConnection(), region21.getRegionName()));
209    assertArrayEquals(new long[] { 600 }, ReplicationBarrierFamilyFormat
210      .getReplicationBarriers(UTIL.getConnection(), region22.getRegionName()));
211  }
212
213  @Test
214  public void testDeleteBarriers(TestInfo testInfo) throws IOException, ReplicationException {
215    TableName tableName = TableName.valueOf(testInfo.getTestMethod().get().getName());
216    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
217    addBarrier(region, 10, 20, 30, 40, 50, 60);
218    // two peers
219    ReplicationQueueStorage queueStorage = create(-1L, 2L, 15L, 25L, 20L, 25L, 65L, 55L, 70L, 70L);
220    List<String> peerIds = Lists.newArrayList("1", "2");
221
222    @SuppressWarnings("unchecked")
223    ReplicationPeerManager peerManager =
224      create(queueStorage, peerIds, peerIds, peerIds, peerIds, peerIds);
225    ReplicationBarrierCleaner cleaner = create(peerManager);
226
227    // beyond the first barrier, no deletion
228    cleaner.chore();
229    assertArrayEquals(new long[] { 10, 20, 30, 40, 50, 60 }, ReplicationBarrierFamilyFormat
230      .getReplicationBarriers(UTIL.getConnection(), region.getRegionName()));
231
232    // in the first range, still no deletion
233    cleaner.chore();
234    assertArrayEquals(new long[] { 10, 20, 30, 40, 50, 60 }, ReplicationBarrierFamilyFormat
235      .getReplicationBarriers(UTIL.getConnection(), region.getRegionName()));
236
237    // in the second range, 10 is deleted
238    cleaner.chore();
239    assertArrayEquals(new long[] { 20, 30, 40, 50, 60 }, ReplicationBarrierFamilyFormat
240      .getReplicationBarriers(UTIL.getConnection(), region.getRegionName()));
241
242    // between 50 and 60, so the barriers before 50 will be deleted
243    cleaner.chore();
244    assertArrayEquals(new long[] { 50, 60 }, ReplicationBarrierFamilyFormat
245      .getReplicationBarriers(UTIL.getConnection(), region.getRegionName()));
246
247    // in the last open range, 50 is deleted
248    cleaner.chore();
249    assertArrayEquals(new long[] { 60 }, ReplicationBarrierFamilyFormat
250      .getReplicationBarriers(UTIL.getConnection(), region.getRegionName()));
251  }
252
253  @Test
254  public void testDeleteRowForDeletedRegion(TestInfo testInfo)
255    throws IOException, ReplicationException {
256    TableName tableName = TableName.valueOf(testInfo.getTestMethod().get().getName());
257    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
258    addBarrier(region, 40, 50, 60);
259    fillCatalogFamily(region);
260
261    String peerId = "1";
262    ReplicationQueueStorage queueStorage = create(59L);
263    @SuppressWarnings("unchecked")
264    ReplicationPeerManager peerManager = create(queueStorage, Lists.newArrayList(peerId));
265    ReplicationBarrierCleaner cleaner = create(peerManager);
266
267    // we have something in catalog family, so only delete 40
268    cleaner.chore();
269    assertArrayEquals(new long[] { 50, 60 }, ReplicationBarrierFamilyFormat
270      .getReplicationBarriers(UTIL.getConnection(), region.getRegionName()));
271    verify(queueStorage, never()).removeLastSequenceIds(anyString(), anyList());
272
273    // No catalog family, then we should remove the whole row
274    clearCatalogFamily(region);
275    cleaner.chore();
276    try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
277      assertFalse(table
278        .exists(new Get(region.getRegionName()).addFamily(HConstants.REPLICATION_BARRIER_FAMILY)));
279    }
280    verify(queueStorage, times(1)).removeLastSequenceIds(peerId,
281      Arrays.asList(region.getEncodedName()));
282  }
283
284  @Test
285  public void testDeleteRowForDeletedRegionNoPeers(TestInfo testInfo) throws IOException {
286    TableName tableName = TableName.valueOf(testInfo.getTestMethod().get().getName());
287    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
288    addBarrier(region, 40, 50, 60);
289
290    ReplicationPeerManager peerManager = mock(ReplicationPeerManager.class);
291    ReplicationBarrierCleaner cleaner = create(peerManager);
292    cleaner.chore();
293
294    verify(peerManager, times(1)).getSerialPeerIdsBelongsTo(tableName);
295    // There are no peers, and no catalog family for this region either, so we should remove the
296    // barriers. And since there is no catalog family, after we delete the barrier family, the whole
297    // row is deleted.
298    try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
299      assertFalse(table.exists(new Get(region.getRegionName())));
300    }
301  }
302
303  private static class WarnOnlyStoppable implements Stoppable {
304    @Override
305    public void stop(String why) {
306      LOG.warn("TestReplicationBarrierCleaner received stop, ignoring. Reason: " + why);
307    }
308
309    @Override
310    public boolean isStopped() {
311      return false;
312    }
313  }
314}