001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertFalse;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022import static org.mockito.ArgumentMatchers.any;
023import static org.mockito.Mockito.doAnswer;
024import static org.mockito.Mockito.mock;
025import static org.mockito.Mockito.times;
026import static org.mockito.Mockito.verify;
027import static org.mockito.Mockito.when;
028
029import java.io.IOException;
030import java.util.Arrays;
031import java.util.List;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.Cell.Type;
034import org.apache.hadoop.hbase.CellBuilderFactory;
035import org.apache.hadoop.hbase.CellBuilderType;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.Server;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.Connection;
041import org.apache.hadoop.hbase.client.Put;
042import org.apache.hadoop.hbase.client.RegionInfo;
043import org.apache.hadoop.hbase.client.RegionInfoBuilder;
044import org.apache.hadoop.hbase.client.Table;
045import org.apache.hadoop.hbase.master.RegionState;
046import org.apache.hadoop.hbase.replication.ReplicationBarrierFamilyFormat;
047import org.apache.hadoop.hbase.replication.ReplicationException;
048import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
049import org.apache.hadoop.hbase.replication.ReplicationQueueId;
050import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
051import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
052import org.apache.hadoop.hbase.testclassification.MediumTests;
053import org.apache.hadoop.hbase.testclassification.ReplicationTests;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
056import org.apache.hadoop.hbase.wal.WAL.Entry;
057import org.apache.hadoop.hbase.wal.WALKeyImpl;
058import org.junit.jupiter.api.AfterAll;
059import org.junit.jupiter.api.BeforeAll;
060import org.junit.jupiter.api.BeforeEach;
061import org.junit.jupiter.api.Tag;
062import org.junit.jupiter.api.Test;
063import org.junit.jupiter.api.TestInfo;
064import org.mockito.invocation.InvocationOnMock;
065import org.mockito.stubbing.Answer;
066
067import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
068
069@Tag(ReplicationTests.TAG)
070@Tag(MediumTests.TAG)
071public class TestSerialReplicationChecker {
072
073  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
074
075  private static String PEER_ID = "1";
076
077  private static ReplicationQueueStorage QUEUE_STORAGE;
078
079  private static String WAL_FILE_NAME = "test.wal";
080
081  private Connection conn;
082
083  private SerialReplicationChecker checker;
084
085  private String testName;
086
087  private TableName tableName;
088
089  @BeforeAll
090  public static void setUpBeforeClass() throws Exception {
091    UTIL.startMiniCluster(1);
092    TableName repTable = TableName.valueOf("test_serial_rep");
093    UTIL.getAdmin()
094      .createTable(ReplicationStorageFactory.createReplicationQueueTableDescriptor(repTable));
095    QUEUE_STORAGE = ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getConnection(),
096      UTIL.getConfiguration(), repTable);
097  }
098
099  @AfterAll
100  public static void tearDownAfterClass() throws Exception {
101    UTIL.shutdownMiniCluster();
102  }
103
104  @BeforeEach
105  public void setUp(TestInfo testInfo) throws IOException {
106    testName = testInfo.getTestMethod().get().getName();
107    ReplicationSource source = mock(ReplicationSource.class);
108    when(source.getPeerId()).thenReturn(PEER_ID);
109    when(source.getReplicationQueueStorage()).thenReturn(QUEUE_STORAGE);
110    conn = mock(Connection.class);
111    when(conn.isClosed()).thenReturn(false);
112    doAnswer(new Answer<Table>() {
113
114      @Override
115      public Table answer(InvocationOnMock invocation) throws Throwable {
116        return UTIL.getConnection().getTable((TableName) invocation.getArgument(0));
117      }
118
119    }).when(conn).getTable(any(TableName.class));
120    Server server = mock(Server.class);
121    when(server.getConnection()).thenReturn(conn);
122    when(source.getServer()).thenReturn(server);
123    checker = new SerialReplicationChecker(UTIL.getConfiguration(), source);
124    tableName = TableName.valueOf(testName);
125  }
126
127  private Entry createEntry(RegionInfo region, long seqId) {
128    WALKeyImpl key = mock(WALKeyImpl.class);
129    when(key.getTableName()).thenReturn(tableName);
130    when(key.getEncodedRegionName()).thenReturn(region.getEncodedNameAsBytes());
131    when(key.getSequenceId()).thenReturn(seqId);
132    Entry entry = mock(Entry.class);
133    when(entry.getKey()).thenReturn(key);
134    return entry;
135  }
136
137  private Cell createCell(RegionInfo region) {
138    return CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(region.getStartKey())
139      .setType(Type.Put).build();
140  }
141
142  @Test
143  public void testNoBarrierCanPush() throws IOException {
144    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
145    assertTrue(checker.canPush(createEntry(region, 100), createCell(region)));
146  }
147
148  private void addStateAndBarrier(RegionInfo region, RegionState.State state, long... barriers)
149    throws IOException {
150    Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
151    if (state != null) {
152      put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER,
153        Bytes.toBytes(state.name()));
154    }
155    for (int i = 0; i < barriers.length; i++) {
156      put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER,
157        put.getTimestamp() - barriers.length + i, Bytes.toBytes(barriers[i]));
158    }
159    try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
160      table.put(put);
161    }
162  }
163
164  private void setState(RegionInfo region, RegionState.State state) throws IOException {
165    Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
166    put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER,
167      Bytes.toBytes(state.name()));
168    try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
169      table.put(put);
170    }
171  }
172
173  private void updatePushedSeqId(RegionInfo region, long seqId) throws ReplicationException {
174    ReplicationQueueId queueId = new ReplicationQueueId(
175      UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(), PEER_ID);
176    QUEUE_STORAGE.setOffset(queueId, "", new ReplicationGroupOffset(WAL_FILE_NAME, 10),
177      ImmutableMap.of(region.getEncodedName(), seqId));
178  }
179
180  private void addParents(RegionInfo region, List<RegionInfo> parents) throws IOException {
181    Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
182    put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY,
183      ReplicationBarrierFamilyFormat.REPLICATION_PARENT_QUALIFIER,
184      ReplicationBarrierFamilyFormat.getParentsBytes(parents));
185    try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
186      table.put(put);
187    }
188  }
189
190  @Test
191  public void testLastRegionAndOpeningCanNotPush() throws IOException, ReplicationException {
192    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
193    addStateAndBarrier(region, RegionState.State.OPEN, 10);
194    Cell cell = createCell(region);
195    // can push since we are in the first range
196    assertTrue(checker.canPush(createEntry(region, 100), cell));
197    setState(region, RegionState.State.OPENING);
198    // can not push since we are in the last range and the state is OPENING
199    assertFalse(checker.canPush(createEntry(region, 102), cell));
200    addStateAndBarrier(region, RegionState.State.OPEN, 50);
201    // can not push since the previous range has not been finished yet
202    assertFalse(checker.canPush(createEntry(region, 102), cell));
203    updatePushedSeqId(region, 49);
204    // can push since the previous range has been finished
205    assertTrue(checker.canPush(createEntry(region, 102), cell));
206    setState(region, RegionState.State.OPENING);
207    assertFalse(checker.canPush(createEntry(region, 104), cell));
208  }
209
210  @Test
211  public void testCanPushUnder() throws IOException, ReplicationException {
212    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
213    addStateAndBarrier(region, RegionState.State.OPEN, 10, 100);
214    updatePushedSeqId(region, 9);
215    Cell cell = createCell(region);
216    assertTrue(checker.canPush(createEntry(region, 20), cell));
217    verify(conn, times(1)).getTable(any(TableName.class));
218    // not continuous
219    for (int i = 22; i < 100; i += 2) {
220      assertTrue(checker.canPush(createEntry(region, i), cell));
221    }
222    // verify that we do not go to meta table
223    verify(conn, times(1)).getTable(any(TableName.class));
224  }
225
226  @Test
227  public void testCanPushIfContinuous() throws IOException, ReplicationException {
228    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
229    addStateAndBarrier(region, RegionState.State.OPEN, 10);
230    updatePushedSeqId(region, 9);
231    Cell cell = createCell(region);
232    assertTrue(checker.canPush(createEntry(region, 20), cell));
233    verify(conn, times(1)).getTable(any(TableName.class));
234    // continuous
235    for (int i = 21; i < 100; i++) {
236      assertTrue(checker.canPush(createEntry(region, i), cell));
237    }
238    // verify that we do not go to meta table
239    verify(conn, times(1)).getTable(any(TableName.class));
240  }
241
242  @Test
243  public void testCanPushAfterMerge() throws IOException, ReplicationException {
244    // 0xFF is the escape byte when storing region name so let's make sure it can work.
245    byte[] endKey = new byte[] { (byte) 0xFF, 0x00, (byte) 0xFF, (byte) 0xFF, 0x01 };
246    RegionInfo regionA =
247      RegionInfoBuilder.newBuilder(tableName).setEndKey(endKey).setRegionId(1).build();
248    RegionInfo regionB =
249      RegionInfoBuilder.newBuilder(tableName).setStartKey(endKey).setRegionId(2).build();
250    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).setRegionId(3).build();
251    addStateAndBarrier(regionA, null, 10, 100);
252    addStateAndBarrier(regionB, null, 20, 200);
253    addStateAndBarrier(region, RegionState.State.OPEN, 200);
254    addParents(region, Arrays.asList(regionA, regionB));
255    Cell cell = createCell(region);
256    // can not push since both parents have not been finished yet
257    assertFalse(checker.canPush(createEntry(region, 300), cell));
258    updatePushedSeqId(regionB, 199);
259    // can not push since regionA has not been finished yet
260    assertFalse(checker.canPush(createEntry(region, 300), cell));
261    updatePushedSeqId(regionA, 99);
262    // can push since all parents have been finished
263    assertTrue(checker.canPush(createEntry(region, 300), cell));
264  }
265
266  @Test
267  public void testCanPushAfterSplit() throws IOException, ReplicationException {
268    // 0xFF is the escape byte when storing region name so let's make sure it can work.
269    byte[] endKey = new byte[] { (byte) 0xFF, 0x00, (byte) 0xFF, (byte) 0xFF, 0x01 };
270    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).setRegionId(1).build();
271    RegionInfo regionA =
272      RegionInfoBuilder.newBuilder(tableName).setEndKey(endKey).setRegionId(2).build();
273    RegionInfo regionB =
274      RegionInfoBuilder.newBuilder(tableName).setStartKey(endKey).setRegionId(3).build();
275    addStateAndBarrier(region, null, 10, 100);
276    addStateAndBarrier(regionA, RegionState.State.OPEN, 100, 200);
277    addStateAndBarrier(regionB, RegionState.State.OPEN, 100, 300);
278    addParents(regionA, Arrays.asList(region));
279    addParents(regionB, Arrays.asList(region));
280    Cell cellA = createCell(regionA);
281    Cell cellB = createCell(regionB);
282    // can not push since parent has not been finished yet
283    assertFalse(checker.canPush(createEntry(regionA, 150), cellA));
284    assertFalse(checker.canPush(createEntry(regionB, 200), cellB));
285    updatePushedSeqId(region, 99);
286    // can push since parent has been finished
287    assertTrue(checker.canPush(createEntry(regionA, 150), cellA));
288    assertTrue(checker.canPush(createEntry(regionB, 200), cellB));
289  }
290
291  @Test
292  public void testCanPushEqualsToBarrier() throws IOException, ReplicationException {
293    // For binary search, equals to an element will result to a positive value, let's test whether
294    // it works.
295    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
296    addStateAndBarrier(region, RegionState.State.OPEN, 10, 100);
297    Cell cell = createCell(region);
298    assertTrue(checker.canPush(createEntry(region, 10), cell));
299    assertFalse(checker.canPush(createEntry(region, 100), cell));
300    updatePushedSeqId(region, 99);
301    assertTrue(checker.canPush(createEntry(region, 100), cell));
302  }
303}