001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertFalse;
021import static org.junit.Assert.assertTrue;
022import static org.mockito.ArgumentMatchers.any;
023import static org.mockito.Mockito.doAnswer;
024import static org.mockito.Mockito.mock;
025import static org.mockito.Mockito.times;
026import static org.mockito.Mockito.verify;
027import static org.mockito.Mockito.when;
028
029import java.io.IOException;
030import java.util.Arrays;
031import java.util.List;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.Cell.Type;
034import org.apache.hadoop.hbase.CellBuilderFactory;
035import org.apache.hadoop.hbase.CellBuilderType;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseTestingUtil;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.Server;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.client.Connection;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.RegionInfo;
044import org.apache.hadoop.hbase.client.RegionInfoBuilder;
045import org.apache.hadoop.hbase.client.Table;
046import org.apache.hadoop.hbase.master.RegionState;
047import org.apache.hadoop.hbase.replication.ReplicationBarrierFamilyFormat;
048import org.apache.hadoop.hbase.replication.ReplicationException;
049import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
050import org.apache.hadoop.hbase.replication.ReplicationQueueId;
051import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
052import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
053import org.apache.hadoop.hbase.testclassification.MediumTests;
054import org.apache.hadoop.hbase.testclassification.ReplicationTests;
055import org.apache.hadoop.hbase.util.Bytes;
056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
057import org.apache.hadoop.hbase.wal.WAL.Entry;
058import org.apache.hadoop.hbase.wal.WALKeyImpl;
059import org.junit.AfterClass;
060import org.junit.Before;
061import org.junit.BeforeClass;
062import org.junit.ClassRule;
063import org.junit.Rule;
064import org.junit.Test;
065import org.junit.experimental.categories.Category;
066import org.junit.rules.TestName;
067import org.mockito.invocation.InvocationOnMock;
068import org.mockito.stubbing.Answer;
069
070import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
071
072@Category({ ReplicationTests.class, MediumTests.class })
073public class TestSerialReplicationChecker {
074
075  @ClassRule
076  public static final HBaseClassTestRule CLASS_RULE =
077    HBaseClassTestRule.forClass(TestSerialReplicationChecker.class);
078
079  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
080
081  private static String PEER_ID = "1";
082
083  private static ReplicationQueueStorage QUEUE_STORAGE;
084
085  private static String WAL_FILE_NAME = "test.wal";
086
087  private Connection conn;
088
089  private SerialReplicationChecker checker;
090
091  @Rule
092  public final TestName name = new TestName();
093
094  private TableName tableName;
095
096  @BeforeClass
097  public static void setUpBeforeClass() throws Exception {
098    UTIL.startMiniCluster(1);
099    TableName repTable = TableName.valueOf("test_serial_rep");
100    UTIL.getAdmin()
101      .createTable(ReplicationStorageFactory.createReplicationQueueTableDescriptor(repTable));
102    QUEUE_STORAGE = ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getConnection(),
103      UTIL.getConfiguration(), repTable);
104  }
105
106  @AfterClass
107  public static void tearDownAfterClass() throws Exception {
108    UTIL.shutdownMiniCluster();
109  }
110
111  @Before
112  public void setUp() throws IOException {
113    ReplicationSource source = mock(ReplicationSource.class);
114    when(source.getPeerId()).thenReturn(PEER_ID);
115    when(source.getReplicationQueueStorage()).thenReturn(QUEUE_STORAGE);
116    conn = mock(Connection.class);
117    when(conn.isClosed()).thenReturn(false);
118    doAnswer(new Answer<Table>() {
119
120      @Override
121      public Table answer(InvocationOnMock invocation) throws Throwable {
122        return UTIL.getConnection().getTable((TableName) invocation.getArgument(0));
123      }
124
125    }).when(conn).getTable(any(TableName.class));
126    Server server = mock(Server.class);
127    when(server.getConnection()).thenReturn(conn);
128    when(source.getServer()).thenReturn(server);
129    checker = new SerialReplicationChecker(UTIL.getConfiguration(), source);
130    tableName = TableName.valueOf(name.getMethodName());
131  }
132
133  private Entry createEntry(RegionInfo region, long seqId) {
134    WALKeyImpl key = mock(WALKeyImpl.class);
135    when(key.getTableName()).thenReturn(tableName);
136    when(key.getEncodedRegionName()).thenReturn(region.getEncodedNameAsBytes());
137    when(key.getSequenceId()).thenReturn(seqId);
138    Entry entry = mock(Entry.class);
139    when(entry.getKey()).thenReturn(key);
140    return entry;
141  }
142
143  private Cell createCell(RegionInfo region) {
144    return CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(region.getStartKey())
145      .setType(Type.Put).build();
146  }
147
148  @Test
149  public void testNoBarrierCanPush() throws IOException {
150    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
151    assertTrue(checker.canPush(createEntry(region, 100), createCell(region)));
152  }
153
154  private void addStateAndBarrier(RegionInfo region, RegionState.State state, long... barriers)
155    throws IOException {
156    Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
157    if (state != null) {
158      put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER,
159        Bytes.toBytes(state.name()));
160    }
161    for (int i = 0; i < barriers.length; i++) {
162      put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER,
163        put.getTimestamp() - barriers.length + i, Bytes.toBytes(barriers[i]));
164    }
165    try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
166      table.put(put);
167    }
168  }
169
170  private void setState(RegionInfo region, RegionState.State state) throws IOException {
171    Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
172    put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER,
173      Bytes.toBytes(state.name()));
174    try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
175      table.put(put);
176    }
177  }
178
179  private void updatePushedSeqId(RegionInfo region, long seqId) throws ReplicationException {
180    ReplicationQueueId queueId = new ReplicationQueueId(
181      UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(), PEER_ID);
182    QUEUE_STORAGE.setOffset(queueId, "", new ReplicationGroupOffset(WAL_FILE_NAME, 10),
183      ImmutableMap.of(region.getEncodedName(), seqId));
184  }
185
186  private void addParents(RegionInfo region, List<RegionInfo> parents) throws IOException {
187    Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
188    put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY,
189      ReplicationBarrierFamilyFormat.REPLICATION_PARENT_QUALIFIER,
190      ReplicationBarrierFamilyFormat.getParentsBytes(parents));
191    try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
192      table.put(put);
193    }
194  }
195
196  @Test
197  public void testLastRegionAndOpeningCanNotPush() throws IOException, ReplicationException {
198    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
199    addStateAndBarrier(region, RegionState.State.OPEN, 10);
200    Cell cell = createCell(region);
201    // can push since we are in the first range
202    assertTrue(checker.canPush(createEntry(region, 100), cell));
203    setState(region, RegionState.State.OPENING);
204    // can not push since we are in the last range and the state is OPENING
205    assertFalse(checker.canPush(createEntry(region, 102), cell));
206    addStateAndBarrier(region, RegionState.State.OPEN, 50);
207    // can not push since the previous range has not been finished yet
208    assertFalse(checker.canPush(createEntry(region, 102), cell));
209    updatePushedSeqId(region, 49);
210    // can push since the previous range has been finished
211    assertTrue(checker.canPush(createEntry(region, 102), cell));
212    setState(region, RegionState.State.OPENING);
213    assertFalse(checker.canPush(createEntry(region, 104), cell));
214  }
215
216  @Test
217  public void testCanPushUnder() throws IOException, ReplicationException {
218    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
219    addStateAndBarrier(region, RegionState.State.OPEN, 10, 100);
220    updatePushedSeqId(region, 9);
221    Cell cell = createCell(region);
222    assertTrue(checker.canPush(createEntry(region, 20), cell));
223    verify(conn, times(1)).getTable(any(TableName.class));
224    // not continuous
225    for (int i = 22; i < 100; i += 2) {
226      assertTrue(checker.canPush(createEntry(region, i), cell));
227    }
228    // verify that we do not go to meta table
229    verify(conn, times(1)).getTable(any(TableName.class));
230  }
231
232  @Test
233  public void testCanPushIfContinuous() throws IOException, ReplicationException {
234    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
235    addStateAndBarrier(region, RegionState.State.OPEN, 10);
236    updatePushedSeqId(region, 9);
237    Cell cell = createCell(region);
238    assertTrue(checker.canPush(createEntry(region, 20), cell));
239    verify(conn, times(1)).getTable(any(TableName.class));
240    // continuous
241    for (int i = 21; i < 100; i++) {
242      assertTrue(checker.canPush(createEntry(region, i), cell));
243    }
244    // verify that we do not go to meta table
245    verify(conn, times(1)).getTable(any(TableName.class));
246  }
247
248  @Test
249  public void testCanPushAfterMerge() throws IOException, ReplicationException {
250    // 0xFF is the escape byte when storing region name so let's make sure it can work.
251    byte[] endKey = new byte[] { (byte) 0xFF, 0x00, (byte) 0xFF, (byte) 0xFF, 0x01 };
252    RegionInfo regionA =
253      RegionInfoBuilder.newBuilder(tableName).setEndKey(endKey).setRegionId(1).build();
254    RegionInfo regionB =
255      RegionInfoBuilder.newBuilder(tableName).setStartKey(endKey).setRegionId(2).build();
256    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).setRegionId(3).build();
257    addStateAndBarrier(regionA, null, 10, 100);
258    addStateAndBarrier(regionB, null, 20, 200);
259    addStateAndBarrier(region, RegionState.State.OPEN, 200);
260    addParents(region, Arrays.asList(regionA, regionB));
261    Cell cell = createCell(region);
262    // can not push since both parents have not been finished yet
263    assertFalse(checker.canPush(createEntry(region, 300), cell));
264    updatePushedSeqId(regionB, 199);
265    // can not push since regionA has not been finished yet
266    assertFalse(checker.canPush(createEntry(region, 300), cell));
267    updatePushedSeqId(regionA, 99);
268    // can push since all parents have been finished
269    assertTrue(checker.canPush(createEntry(region, 300), cell));
270  }
271
272  @Test
273  public void testCanPushAfterSplit() throws IOException, ReplicationException {
274    // 0xFF is the escape byte when storing region name so let's make sure it can work.
275    byte[] endKey = new byte[] { (byte) 0xFF, 0x00, (byte) 0xFF, (byte) 0xFF, 0x01 };
276    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).setRegionId(1).build();
277    RegionInfo regionA =
278      RegionInfoBuilder.newBuilder(tableName).setEndKey(endKey).setRegionId(2).build();
279    RegionInfo regionB =
280      RegionInfoBuilder.newBuilder(tableName).setStartKey(endKey).setRegionId(3).build();
281    addStateAndBarrier(region, null, 10, 100);
282    addStateAndBarrier(regionA, RegionState.State.OPEN, 100, 200);
283    addStateAndBarrier(regionB, RegionState.State.OPEN, 100, 300);
284    addParents(regionA, Arrays.asList(region));
285    addParents(regionB, Arrays.asList(region));
286    Cell cellA = createCell(regionA);
287    Cell cellB = createCell(regionB);
288    // can not push since parent has not been finished yet
289    assertFalse(checker.canPush(createEntry(regionA, 150), cellA));
290    assertFalse(checker.canPush(createEntry(regionB, 200), cellB));
291    updatePushedSeqId(region, 99);
292    // can push since parent has been finished
293    assertTrue(checker.canPush(createEntry(regionA, 150), cellA));
294    assertTrue(checker.canPush(createEntry(regionB, 200), cellB));
295  }
296
297  @Test
298  public void testCanPushEqualsToBarrier() throws IOException, ReplicationException {
299    // For binary search, equals to an element will result to a positive value, let's test whether
300    // it works.
301    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
302    addStateAndBarrier(region, RegionState.State.OPEN, 10, 100);
303    Cell cell = createCell(region);
304    assertTrue(checker.canPush(createEntry(region, 10), cell));
305    assertFalse(checker.canPush(createEntry(region, 100), cell));
306    updatePushedSeqId(region, 99);
307    assertTrue(checker.canPush(createEntry(region, 100), cell));
308  }
309}