001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertNotEquals;
022import static org.junit.jupiter.api.Assertions.assertNotNull;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024
025import java.io.IOException;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.concurrent.TimeUnit;
030import java.util.stream.Collectors;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
034import org.apache.hadoop.hbase.client.Put;
035import org.apache.hadoop.hbase.client.RegionInfo;
036import org.apache.hadoop.hbase.client.Table;
037import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
038import org.apache.hadoop.hbase.regionserver.HRegionServer;
039import org.apache.hadoop.hbase.testclassification.MediumTests;
040import org.apache.hadoop.hbase.testclassification.ReplicationTests;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
043import org.apache.hadoop.hbase.wal.NoEOFWALStreamReader;
044import org.apache.hadoop.hbase.wal.WAL.Entry;
045import org.apache.hadoop.hbase.wal.WALStreamReader;
046import org.junit.jupiter.api.BeforeEach;
047import org.junit.jupiter.api.Tag;
048import org.junit.jupiter.api.Test;
049
050@Tag(ReplicationTests.TAG)
051@Tag(MediumTests.TAG)
052public class TestSerialReplication extends SerialReplicationTestBase {
053
054  @BeforeEach
055  public void setUp() throws IOException, StreamLacksCapabilityException {
056    setupWALWriter();
057    // add in disable state, so later when enabling it all sources will start push together.
058    addPeer(false);
059  }
060
061  @Test
062  public void testRegionMove() throws Exception {
063    TableName tableName = createTable();
064    try (Table table = UTIL.getConnection().getTable(tableName)) {
065      for (int i = 0; i < 100; i++) {
066        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
067      }
068    }
069    RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
070    HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName));
071    moveRegion(region, rs);
072    try (Table table = UTIL.getConnection().getTable(tableName)) {
073      for (int i = 100; i < 200; i++) {
074        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
075      }
076    }
077    enablePeerAndWaitUntilReplicationDone(200);
078    checkOrder(200);
079  }
080
081  @Test
082  public void testRegionSplit() throws Exception {
083    TableName tableName = createTable();
084    try (Table table = UTIL.getConnection().getTable(tableName)) {
085      for (int i = 0; i < 100; i++) {
086        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
087      }
088    }
089    UTIL.flush(tableName);
090    RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
091    UTIL.getAdmin().splitRegionAsync(region.getEncodedNameAsBytes(), Bytes.toBytes(50)).get(30,
092      TimeUnit.SECONDS);
093    UTIL.waitUntilNoRegionsInTransition(30000);
094    List<RegionInfo> regions = UTIL.getAdmin().getRegions(tableName);
095    assertEquals(2, regions.size());
096    try (Table table = UTIL.getConnection().getTable(tableName)) {
097      for (int i = 0; i < 100; i++) {
098        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
099      }
100    }
101    enablePeerAndWaitUntilReplicationDone(200);
102    Map<String, Long> regionsToSeqId = new HashMap<>();
103    regionsToSeqId.put(region.getEncodedName(), -1L);
104    regions.stream().map(RegionInfo::getEncodedName).forEach(n -> regionsToSeqId.put(n, -1L));
105    try (WALStreamReader reader =
106      NoEOFWALStreamReader.create(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
107      int count = 0;
108      for (Entry entry;;) {
109        entry = reader.next();
110        if (entry == null) {
111          break;
112        }
113        String encodedName = Bytes.toString(entry.getKey().getEncodedRegionName());
114        Long seqId = regionsToSeqId.get(encodedName);
115        assertNotNull(seqId,
116          "Unexcepted entry " + entry + ", expected regions " + region + ", or " + regions);
117        assertTrue(entry.getKey().getSequenceId() >= seqId.longValue(),
118          "Sequence id go backwards from " + seqId + " to " + entry.getKey().getSequenceId()
119            + " for " + encodedName);
120        if (count < 100) {
121          assertEquals(region.getEncodedName(), encodedName,
122            encodedName + " is pushed before parent " + region.getEncodedName());
123        } else {
124          assertNotEquals(region.getEncodedName(), encodedName);
125        }
126        count++;
127      }
128      assertEquals(200, count);
129    }
130  }
131
132  @Test
133  public void testRegionMerge() throws Exception {
134    byte[] splitKey = Bytes.toBytes(50);
135    TableName tableName = tableNameExt.getTableName();
136    UTIL.getAdmin().createTable(
137      TableDescriptorBuilder.newBuilder(tableName)
138        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF)
139          .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
140        .build(),
141      new byte[][] { splitKey });
142    UTIL.waitTableAvailable(tableName);
143    try (Table table = UTIL.getConnection().getTable(tableName)) {
144      for (int i = 0; i < 100; i++) {
145        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
146      }
147    }
148    List<RegionInfo> regions = UTIL.getAdmin().getRegions(tableName);
149    UTIL.getAdmin()
150      .mergeRegionsAsync(
151        regions.stream().map(RegionInfo::getEncodedNameAsBytes).toArray(byte[][]::new), false)
152      .get(30, TimeUnit.SECONDS);
153    UTIL.waitUntilNoRegionsInTransition(30000);
154    List<RegionInfo> regionsAfterMerge = UTIL.getAdmin().getRegions(tableName);
155    assertEquals(1, regionsAfterMerge.size());
156    try (Table table = UTIL.getConnection().getTable(tableName)) {
157      for (int i = 0; i < 100; i++) {
158        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
159      }
160    }
161    enablePeerAndWaitUntilReplicationDone(200);
162    Map<String, Long> regionsToSeqId = new HashMap<>();
163    RegionInfo region = regionsAfterMerge.get(0);
164    regionsToSeqId.put(region.getEncodedName(), -1L);
165    regions.stream().map(RegionInfo::getEncodedName).forEach(n -> regionsToSeqId.put(n, -1L));
166    try (WALStreamReader reader =
167      NoEOFWALStreamReader.create(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
168      int count = 0;
169      for (Entry entry;;) {
170        entry = reader.next();
171        if (entry == null) {
172          break;
173        }
174        String encodedName = Bytes.toString(entry.getKey().getEncodedRegionName());
175        Long seqId = regionsToSeqId.get(encodedName);
176        assertNotNull(seqId,
177          "Unexcepted entry " + entry + ", expected regions " + region + ", or " + regions);
178        assertTrue(entry.getKey().getSequenceId() >= seqId.longValue(),
179          "Sequence id go backwards from " + seqId + " to " + entry.getKey().getSequenceId()
180            + " for " + encodedName);
181        if (count < 100) {
182          assertNotEquals(
183            encodedName + " is pushed before parents " + regions.stream()
184              .map(RegionInfo::getEncodedName).collect(Collectors.joining(" and ")),
185            region.getEncodedName(), encodedName);
186        } else {
187          assertEquals(region.getEncodedName(), encodedName);
188        }
189        count++;
190      }
191      assertEquals(200, count);
192    }
193  }
194
195  @Test
196  public void testRemovePeerNothingReplicated() throws Exception {
197    TableName tableName = createTable();
198    String encodedRegionName =
199      UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo().getEncodedName();
200    ReplicationQueueStorage queueStorage =
201      UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
202    assertEquals(HConstants.NO_SEQNUM, queueStorage.getLastSequenceId(encodedRegionName, PEER_ID));
203    UTIL.getAdmin().removeReplicationPeer(PEER_ID);
204    assertEquals(HConstants.NO_SEQNUM, queueStorage.getLastSequenceId(encodedRegionName, PEER_ID));
205  }
206
207  @Test
208  public void testRemovePeer() throws Exception {
209    TableName tableName = createTable();
210    try (Table table = UTIL.getConnection().getTable(tableName)) {
211      for (int i = 0; i < 100; i++) {
212        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
213      }
214    }
215    enablePeerAndWaitUntilReplicationDone(100);
216    checkOrder(100);
217    String encodedRegionName =
218      UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo().getEncodedName();
219    ReplicationQueueStorage queueStorage =
220      UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
221    assertTrue(queueStorage.getLastSequenceId(encodedRegionName, PEER_ID) > 0);
222    UTIL.getAdmin().removeReplicationPeer(PEER_ID);
223    // confirm that we delete the last pushed sequence id
224    assertEquals(HConstants.NO_SEQNUM, queueStorage.getLastSequenceId(encodedRegionName, PEER_ID));
225  }
226
227  @Test
228  public void testRemoveSerialFlag() throws Exception {
229    TableName tableName = createTable();
230    try (Table table = UTIL.getConnection().getTable(tableName)) {
231      for (int i = 0; i < 100; i++) {
232        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
233      }
234    }
235    enablePeerAndWaitUntilReplicationDone(100);
236    checkOrder(100);
237    String encodedRegionName =
238      UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo().getEncodedName();
239    ReplicationQueueStorage queueStorage =
240      UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
241    assertTrue(queueStorage.getLastSequenceId(encodedRegionName, PEER_ID) > 0);
242    ReplicationPeerConfig peerConfig = UTIL.getAdmin().getReplicationPeerConfig(PEER_ID);
243    UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID,
244      ReplicationPeerConfig.newBuilder(peerConfig).setSerial(false).build());
245    // confirm that we delete the last pushed sequence id
246    assertEquals(HConstants.NO_SEQNUM, queueStorage.getLastSequenceId(encodedRegionName, PEER_ID));
247  }
248}