001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertFalse;
021import static org.junit.Assert.assertTrue;
022import static org.mockito.ArgumentMatchers.any;
023import static org.mockito.Mockito.doAnswer;
024import static org.mockito.Mockito.mock;
025import static org.mockito.Mockito.times;
026import static org.mockito.Mockito.verify;
027import static org.mockito.Mockito.when;
028
029import java.io.IOException;
030import java.util.Arrays;
031import java.util.List;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.Cell.Type;
034import org.apache.hadoop.hbase.CellBuilderFactory;
035import org.apache.hadoop.hbase.CellBuilderType;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseTestingUtil;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.Server;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.client.Connection;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.RegionInfo;
044import org.apache.hadoop.hbase.client.RegionInfoBuilder;
045import org.apache.hadoop.hbase.client.Table;
046import org.apache.hadoop.hbase.master.RegionState;
047import org.apache.hadoop.hbase.replication.ReplicationBarrierFamilyFormat;
048import org.apache.hadoop.hbase.replication.ReplicationException;
049import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
050import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
051import org.apache.hadoop.hbase.testclassification.MediumTests;
052import org.apache.hadoop.hbase.testclassification.ReplicationTests;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
055import org.apache.hadoop.hbase.wal.WAL.Entry;
056import org.apache.hadoop.hbase.wal.WALKeyImpl;
057import org.junit.AfterClass;
058import org.junit.Before;
059import org.junit.BeforeClass;
060import org.junit.ClassRule;
061import org.junit.Rule;
062import org.junit.Test;
063import org.junit.experimental.categories.Category;
064import org.junit.rules.TestName;
065import org.mockito.invocation.InvocationOnMock;
066import org.mockito.stubbing.Answer;
067
068import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
069
070@Category({ ReplicationTests.class, MediumTests.class })
071public class TestSerialReplicationChecker {
072
073  @ClassRule
074  public static final HBaseClassTestRule CLASS_RULE =
075    HBaseClassTestRule.forClass(TestSerialReplicationChecker.class);
076
077  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
078
079  private static String PEER_ID = "1";
080
081  private static ReplicationQueueStorage QUEUE_STORAGE;
082
083  private static String WAL_FILE_NAME = "test.wal";
084
085  private Connection conn;
086
087  private SerialReplicationChecker checker;
088
089  @Rule
090  public final TestName name = new TestName();
091
092  private TableName tableName;
093
094  @BeforeClass
095  public static void setUpBeforeClass() throws Exception {
096    UTIL.startMiniCluster(1);
097    QUEUE_STORAGE = ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getZooKeeperWatcher(),
098      UTIL.getConfiguration());
099    QUEUE_STORAGE.addWAL(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(), PEER_ID,
100      WAL_FILE_NAME);
101  }
102
103  @AfterClass
104  public static void tearDownAfterClass() throws Exception {
105    UTIL.shutdownMiniCluster();
106  }
107
108  @Before
109  public void setUp() throws IOException {
110    ReplicationSource source = mock(ReplicationSource.class);
111    when(source.getPeerId()).thenReturn(PEER_ID);
112    when(source.getReplicationQueueStorage()).thenReturn(QUEUE_STORAGE);
113    conn = mock(Connection.class);
114    when(conn.isClosed()).thenReturn(false);
115    doAnswer(new Answer<Table>() {
116
117      @Override
118      public Table answer(InvocationOnMock invocation) throws Throwable {
119        return UTIL.getConnection().getTable((TableName) invocation.getArgument(0));
120      }
121
122    }).when(conn).getTable(any(TableName.class));
123    Server server = mock(Server.class);
124    when(server.getConnection()).thenReturn(conn);
125    when(source.getServer()).thenReturn(server);
126    checker = new SerialReplicationChecker(UTIL.getConfiguration(), source);
127    tableName = TableName.valueOf(name.getMethodName());
128  }
129
130  private Entry createEntry(RegionInfo region, long seqId) {
131    WALKeyImpl key = mock(WALKeyImpl.class);
132    when(key.getTableName()).thenReturn(tableName);
133    when(key.getEncodedRegionName()).thenReturn(region.getEncodedNameAsBytes());
134    when(key.getSequenceId()).thenReturn(seqId);
135    Entry entry = mock(Entry.class);
136    when(entry.getKey()).thenReturn(key);
137    return entry;
138  }
139
140  private Cell createCell(RegionInfo region) {
141    return CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(region.getStartKey())
142      .setType(Type.Put).build();
143  }
144
145  @Test
146  public void testNoBarrierCanPush() throws IOException {
147    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
148    assertTrue(checker.canPush(createEntry(region, 100), createCell(region)));
149  }
150
151  private void addStateAndBarrier(RegionInfo region, RegionState.State state, long... barriers)
152      throws IOException {
153    Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
154    if (state != null) {
155      put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER,
156        Bytes.toBytes(state.name()));
157    }
158    for (int i = 0; i < barriers.length; i++) {
159      put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER,
160        put.getTimestamp() - barriers.length + i, Bytes.toBytes(barriers[i]));
161    }
162    try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
163      table.put(put);
164    }
165  }
166
167  private void setState(RegionInfo region, RegionState.State state) throws IOException {
168    Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
169    put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER,
170      Bytes.toBytes(state.name()));
171    try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
172      table.put(put);
173    }
174  }
175
176  private void updatePushedSeqId(RegionInfo region, long seqId) throws ReplicationException {
177    QUEUE_STORAGE.setWALPosition(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(),
178      PEER_ID, WAL_FILE_NAME, 10, ImmutableMap.of(region.getEncodedName(), seqId));
179  }
180
181  private void addParents(RegionInfo region, List<RegionInfo> parents) throws IOException {
182    Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
183    put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY,
184      ReplicationBarrierFamilyFormat.REPLICATION_PARENT_QUALIFIER,
185      ReplicationBarrierFamilyFormat.getParentsBytes(parents));
186    try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
187      table.put(put);
188    }
189  }
190
191  @Test
192  public void testLastRegionAndOpeningCanNotPush() throws IOException, ReplicationException {
193    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
194    addStateAndBarrier(region, RegionState.State.OPEN, 10);
195    Cell cell = createCell(region);
196    // can push since we are in the first range
197    assertTrue(checker.canPush(createEntry(region, 100), cell));
198    setState(region, RegionState.State.OPENING);
199    // can not push since we are in the last range and the state is OPENING
200    assertFalse(checker.canPush(createEntry(region, 102), cell));
201    addStateAndBarrier(region, RegionState.State.OPEN, 50);
202    // can not push since the previous range has not been finished yet
203    assertFalse(checker.canPush(createEntry(region, 102), cell));
204    updatePushedSeqId(region, 49);
205    // can push since the previous range has been finished
206    assertTrue(checker.canPush(createEntry(region, 102), cell));
207    setState(region, RegionState.State.OPENING);
208    assertFalse(checker.canPush(createEntry(region, 104), cell));
209  }
210
211  @Test
212  public void testCanPushUnder() throws IOException, ReplicationException {
213    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
214    addStateAndBarrier(region, RegionState.State.OPEN, 10, 100);
215    updatePushedSeqId(region, 9);
216    Cell cell = createCell(region);
217    assertTrue(checker.canPush(createEntry(region, 20), cell));
218    verify(conn, times(1)).getTable(any(TableName.class));
219    // not continuous
220    for (int i = 22; i < 100; i += 2) {
221      assertTrue(checker.canPush(createEntry(region, i), cell));
222    }
223    // verify that we do not go to meta table
224    verify(conn, times(1)).getTable(any(TableName.class));
225  }
226
227  @Test
228  public void testCanPushIfContinuous() throws IOException, ReplicationException {
229    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
230    addStateAndBarrier(region, RegionState.State.OPEN, 10);
231    updatePushedSeqId(region, 9);
232    Cell cell = createCell(region);
233    assertTrue(checker.canPush(createEntry(region, 20), cell));
234    verify(conn, times(1)).getTable(any(TableName.class));
235    // continuous
236    for (int i = 21; i < 100; i++) {
237      assertTrue(checker.canPush(createEntry(region, i), cell));
238    }
239    // verify that we do not go to meta table
240    verify(conn, times(1)).getTable(any(TableName.class));
241  }
242
243  @Test
244  public void testCanPushAfterMerge() throws IOException, ReplicationException {
245    // 0xFF is the escape byte when storing region name so let's make sure it can work.
246    byte[] endKey = new byte[] { (byte) 0xFF, 0x00, (byte) 0xFF, (byte) 0xFF, 0x01 };
247    RegionInfo regionA =
248      RegionInfoBuilder.newBuilder(tableName).setEndKey(endKey).setRegionId(1).build();
249    RegionInfo regionB =
250      RegionInfoBuilder.newBuilder(tableName).setStartKey(endKey).setRegionId(2).build();
251    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).setRegionId(3).build();
252    addStateAndBarrier(regionA, null, 10, 100);
253    addStateAndBarrier(regionB, null, 20, 200);
254    addStateAndBarrier(region, RegionState.State.OPEN, 200);
255    addParents(region, Arrays.asList(regionA, regionB));
256    Cell cell = createCell(region);
257    // can not push since both parents have not been finished yet
258    assertFalse(checker.canPush(createEntry(region, 300), cell));
259    updatePushedSeqId(regionB, 199);
260    // can not push since regionA has not been finished yet
261    assertFalse(checker.canPush(createEntry(region, 300), cell));
262    updatePushedSeqId(regionA, 99);
263    // can push since all parents have been finished
264    assertTrue(checker.canPush(createEntry(region, 300), cell));
265  }
266
267  @Test
268  public void testCanPushAfterSplit() throws IOException, ReplicationException {
269    // 0xFF is the escape byte when storing region name so let's make sure it can work.
270    byte[] endKey = new byte[] { (byte) 0xFF, 0x00, (byte) 0xFF, (byte) 0xFF, 0x01 };
271    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).setRegionId(1).build();
272    RegionInfo regionA =
273      RegionInfoBuilder.newBuilder(tableName).setEndKey(endKey).setRegionId(2).build();
274    RegionInfo regionB =
275      RegionInfoBuilder.newBuilder(tableName).setStartKey(endKey).setRegionId(3).build();
276    addStateAndBarrier(region, null, 10, 100);
277    addStateAndBarrier(regionA, RegionState.State.OPEN, 100, 200);
278    addStateAndBarrier(regionB, RegionState.State.OPEN, 100, 300);
279    addParents(regionA, Arrays.asList(region));
280    addParents(regionB, Arrays.asList(region));
281    Cell cellA = createCell(regionA);
282    Cell cellB = createCell(regionB);
283    // can not push since parent has not been finished yet
284    assertFalse(checker.canPush(createEntry(regionA, 150), cellA));
285    assertFalse(checker.canPush(createEntry(regionB, 200), cellB));
286    updatePushedSeqId(region, 99);
287    // can push since parent has been finished
288    assertTrue(checker.canPush(createEntry(regionA, 150), cellA));
289    assertTrue(checker.canPush(createEntry(regionB, 200), cellB));
290  }
291
292  @Test
293  public void testCanPushEqualsToBarrier() throws IOException, ReplicationException {
294    // For binary search, equals to an element will result to a positive value, let's test whether
295    // it works.
296    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
297    addStateAndBarrier(region, RegionState.State.OPEN, 10, 100);
298    Cell cell = createCell(region);
299    assertTrue(checker.canPush(createEntry(region, 10), cell));
300    assertFalse(checker.canPush(createEntry(region, 100), cell));
301    updatePushedSeqId(region, 99);
302    assertTrue(checker.canPush(createEntry(region, 100), cell));
303  }
304}