001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertFalse;
021import static org.junit.Assert.assertTrue;
022import static org.mockito.ArgumentMatchers.any;
023import static org.mockito.Mockito.doAnswer;
024import static org.mockito.Mockito.mock;
025import static org.mockito.Mockito.times;
026import static org.mockito.Mockito.verify;
027import static org.mockito.Mockito.when;
028
029import java.io.IOException;
030import java.util.Arrays;
031import java.util.List;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.Cell.Type;
034import org.apache.hadoop.hbase.CellBuilderFactory;
035import org.apache.hadoop.hbase.CellBuilderType;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseTestingUtility;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.MetaTableAccessor;
040import org.apache.hadoop.hbase.Server;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.client.Connection;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.RegionInfo;
045import org.apache.hadoop.hbase.client.RegionInfoBuilder;
046import org.apache.hadoop.hbase.client.Table;
047import org.apache.hadoop.hbase.master.RegionState;
048import org.apache.hadoop.hbase.replication.ReplicationException;
049import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
050import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
051import org.apache.hadoop.hbase.testclassification.MediumTests;
052import org.apache.hadoop.hbase.testclassification.ReplicationTests;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
055import org.apache.hadoop.hbase.wal.WAL.Entry;
056import org.apache.hadoop.hbase.wal.WALKeyImpl;
057import org.junit.AfterClass;
058import org.junit.Before;
059import org.junit.BeforeClass;
060import org.junit.ClassRule;
061import org.junit.Rule;
062import org.junit.Test;
063import org.junit.experimental.categories.Category;
064import org.junit.rules.TestName;
065import org.mockito.invocation.InvocationOnMock;
066import org.mockito.stubbing.Answer;
067
068import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
069
070@Category({ ReplicationTests.class, MediumTests.class })
071public class TestSerialReplicationChecker {
072
073  @ClassRule
074  public static final HBaseClassTestRule CLASS_RULE =
075    HBaseClassTestRule.forClass(TestSerialReplicationChecker.class);
076
077  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
078
079  private static String PEER_ID = "1";
080
081  private static ReplicationQueueStorage QUEUE_STORAGE;
082
083  private static String WAL_FILE_NAME = "test.wal";
084
085  private Connection conn;
086
087  private SerialReplicationChecker checker;
088
089  @Rule
090  public final TestName name = new TestName();
091
092  private TableName tableName;
093
094  @BeforeClass
095  public static void setUpBeforeClass() throws Exception {
096    UTIL.startMiniCluster(1);
097    QUEUE_STORAGE = ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getZooKeeperWatcher(),
098      UTIL.getConfiguration());
099    QUEUE_STORAGE.addWAL(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(), PEER_ID,
100      WAL_FILE_NAME);
101  }
102
103  @AfterClass
104  public static void tearDownAfterClass() throws Exception {
105    UTIL.shutdownMiniCluster();
106  }
107
108  @Before
109  public void setUp() throws IOException {
110    ReplicationSource source = mock(ReplicationSource.class);
111    when(source.getPeerId()).thenReturn(PEER_ID);
112    when(source.getReplicationQueueStorage()).thenReturn(QUEUE_STORAGE);
113    conn = mock(Connection.class);
114    when(conn.isClosed()).thenReturn(false);
115    doAnswer(new Answer<Table>() {
116
117      @Override
118      public Table answer(InvocationOnMock invocation) throws Throwable {
119        return UTIL.getConnection().getTable((TableName) invocation.getArgument(0));
120      }
121
122    }).when(conn).getTable(any(TableName.class));
123    Server server = mock(Server.class);
124    when(server.getConnection()).thenReturn(conn);
125    when(source.getServer()).thenReturn(server);
126    checker = new SerialReplicationChecker(UTIL.getConfiguration(), source);
127    tableName = TableName.valueOf(name.getMethodName());
128  }
129
130  private Entry createEntry(RegionInfo region, long seqId) {
131    WALKeyImpl key = mock(WALKeyImpl.class);
132    when(key.getTableName()).thenReturn(tableName);
133    when(key.getEncodedRegionName()).thenReturn(region.getEncodedNameAsBytes());
134    when(key.getSequenceId()).thenReturn(seqId);
135    Entry entry = mock(Entry.class);
136    when(entry.getKey()).thenReturn(key);
137    return entry;
138  }
139
140  private Cell createCell(RegionInfo region) {
141    return CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(region.getStartKey())
142      .setType(Type.Put).build();
143  }
144
145  @Test
146  public void testNoBarrierCanPush() throws IOException {
147    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
148    assertTrue(checker.canPush(createEntry(region, 100), createCell(region)));
149  }
150
151  private void addStateAndBarrier(RegionInfo region, RegionState.State state, long... barriers)
152      throws IOException {
153    Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
154    if (state != null) {
155      put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER,
156        Bytes.toBytes(state.name()));
157    }
158    for (int i = 0; i < barriers.length; i++) {
159      put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER,
160        put.getTimeStamp() - barriers.length + i, Bytes.toBytes(barriers[i]));
161    }
162    try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
163      table.put(put);
164    }
165  }
166
167  private void setState(RegionInfo region, RegionState.State state) throws IOException {
168    Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
169    put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER,
170      Bytes.toBytes(state.name()));
171    try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
172      table.put(put);
173    }
174  }
175
176  private void updatePushedSeqId(RegionInfo region, long seqId) throws ReplicationException {
177    QUEUE_STORAGE.setWALPosition(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(),
178      PEER_ID, WAL_FILE_NAME, 10, ImmutableMap.of(region.getEncodedName(), seqId));
179  }
180
181  private void addParents(RegionInfo region, List<RegionInfo> parents) throws IOException {
182    Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
183    put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY,
184      MetaTableAccessor.REPLICATION_PARENT_QUALIFIER, MetaTableAccessor.getParentsBytes(parents));
185    try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
186      table.put(put);
187    }
188  }
189
190  @Test
191  public void testLastRegionAndOpeningCanNotPush() throws IOException, ReplicationException {
192    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
193    addStateAndBarrier(region, RegionState.State.OPEN, 10);
194    Cell cell = createCell(region);
195    // can push since we are in the first range
196    assertTrue(checker.canPush(createEntry(region, 100), cell));
197    setState(region, RegionState.State.OPENING);
198    // can not push since we are in the last range and the state is OPENING
199    assertFalse(checker.canPush(createEntry(region, 102), cell));
200    addStateAndBarrier(region, RegionState.State.OPEN, 50);
201    // can not push since the previous range has not been finished yet
202    assertFalse(checker.canPush(createEntry(region, 102), cell));
203    updatePushedSeqId(region, 49);
204    // can push since the previous range has been finished
205    assertTrue(checker.canPush(createEntry(region, 102), cell));
206    setState(region, RegionState.State.OPENING);
207    assertFalse(checker.canPush(createEntry(region, 104), cell));
208  }
209
210  @Test
211  public void testCanPushUnder() throws IOException, ReplicationException {
212    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
213    addStateAndBarrier(region, RegionState.State.OPEN, 10, 100);
214    updatePushedSeqId(region, 9);
215    Cell cell = createCell(region);
216    assertTrue(checker.canPush(createEntry(region, 20), cell));
217    verify(conn, times(1)).getTable(any(TableName.class));
218    // not continuous
219    for (int i = 22; i < 100; i += 2) {
220      assertTrue(checker.canPush(createEntry(region, i), cell));
221    }
222    // verify that we do not go to meta table
223    verify(conn, times(1)).getTable(any(TableName.class));
224  }
225
226  @Test
227  public void testCanPushIfContinuous() throws IOException, ReplicationException {
228    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
229    addStateAndBarrier(region, RegionState.State.OPEN, 10);
230    updatePushedSeqId(region, 9);
231    Cell cell = createCell(region);
232    assertTrue(checker.canPush(createEntry(region, 20), cell));
233    verify(conn, times(1)).getTable(any(TableName.class));
234    // continuous
235    for (int i = 21; i < 100; i++) {
236      assertTrue(checker.canPush(createEntry(region, i), cell));
237    }
238    // verify that we do not go to meta table
239    verify(conn, times(1)).getTable(any(TableName.class));
240  }
241
242  @Test
243  public void testCanPushAfterMerge() throws IOException, ReplicationException {
244    // 0xFF is the escape byte when storing region name so let's make sure it can work.
245    byte[] endKey = new byte[] { (byte) 0xFF, 0x00, (byte) 0xFF, (byte) 0xFF, 0x01 };
246    RegionInfo regionA =
247      RegionInfoBuilder.newBuilder(tableName).setEndKey(endKey).setRegionId(1).build();
248    RegionInfo regionB =
249      RegionInfoBuilder.newBuilder(tableName).setStartKey(endKey).setRegionId(2).build();
250    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).setRegionId(3).build();
251    addStateAndBarrier(regionA, null, 10, 100);
252    addStateAndBarrier(regionB, null, 20, 200);
253    addStateAndBarrier(region, RegionState.State.OPEN, 200);
254    addParents(region, Arrays.asList(regionA, regionB));
255    Cell cell = createCell(region);
256    // can not push since both parents have not been finished yet
257    assertFalse(checker.canPush(createEntry(region, 300), cell));
258    updatePushedSeqId(regionB, 199);
259    // can not push since regionA has not been finished yet
260    assertFalse(checker.canPush(createEntry(region, 300), cell));
261    updatePushedSeqId(regionA, 99);
262    // can push since all parents have been finished
263    assertTrue(checker.canPush(createEntry(region, 300), cell));
264  }
265
266  @Test
267  public void testCanPushAfterSplit() throws IOException, ReplicationException {
268    // 0xFF is the escape byte when storing region name so let's make sure it can work.
269    byte[] endKey = new byte[] { (byte) 0xFF, 0x00, (byte) 0xFF, (byte) 0xFF, 0x01 };
270    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).setRegionId(1).build();
271    RegionInfo regionA =
272      RegionInfoBuilder.newBuilder(tableName).setEndKey(endKey).setRegionId(2).build();
273    RegionInfo regionB =
274      RegionInfoBuilder.newBuilder(tableName).setStartKey(endKey).setRegionId(3).build();
275    addStateAndBarrier(region, null, 10, 100);
276    addStateAndBarrier(regionA, RegionState.State.OPEN, 100, 200);
277    addStateAndBarrier(regionB, RegionState.State.OPEN, 100, 300);
278    addParents(regionA, Arrays.asList(region));
279    addParents(regionB, Arrays.asList(region));
280    Cell cellA = createCell(regionA);
281    Cell cellB = createCell(regionB);
282    // can not push since parent has not been finished yet
283    assertFalse(checker.canPush(createEntry(regionA, 150), cellA));
284    assertFalse(checker.canPush(createEntry(regionB, 200), cellB));
285    updatePushedSeqId(region, 99);
286    // can push since parent has been finished
287    assertTrue(checker.canPush(createEntry(regionA, 150), cellA));
288    assertTrue(checker.canPush(createEntry(regionB, 200), cellB));
289  }
290
291  @Test
292  public void testCanPushEqualsToBarrier() throws IOException, ReplicationException {
293    // For binary search, equals to an element will result to a positive value, let's test whether
294    // it works.
295    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
296    addStateAndBarrier(region, RegionState.State.OPEN, 10, 100);
297    Cell cell = createCell(region);
298    assertTrue(checker.canPush(createEntry(region, 10), cell));
299    assertFalse(checker.canPush(createEntry(region, 100), cell));
300    updatePushedSeqId(region, 99);
301    assertTrue(checker.canPush(createEntry(region, 100), cell));
302  }
303}