001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotEquals;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.concurrent.TimeUnit;
030import java.util.stream.Collectors;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
035import org.apache.hadoop.hbase.client.Put;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.client.Table;
038import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
039import org.apache.hadoop.hbase.regionserver.HRegionServer;
040import org.apache.hadoop.hbase.testclassification.MediumTests;
041import org.apache.hadoop.hbase.testclassification.ReplicationTests;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
044import org.apache.hadoop.hbase.wal.WAL;
045import org.apache.hadoop.hbase.wal.WAL.Entry;
046import org.apache.hadoop.hbase.wal.WALFactory;
047import org.junit.Before;
048import org.junit.ClassRule;
049import org.junit.Test;
050import org.junit.experimental.categories.Category;
051
052@Category({ ReplicationTests.class, MediumTests.class })
053public class TestSerialReplication extends SerialReplicationTestBase {
054
055  @ClassRule
056  public static final HBaseClassTestRule CLASS_RULE =
057    HBaseClassTestRule.forClass(TestSerialReplication.class);
058
059  @Before
060  public void setUp() throws IOException, StreamLacksCapabilityException {
061    setupWALWriter();
062    // add in disable state, so later when enabling it all sources will start push together.
063    addPeer(false);
064  }
065
066  @Test
067  public void testRegionMove() throws Exception {
068    TableName tableName = createTable();
069    try (Table table = UTIL.getConnection().getTable(tableName)) {
070      for (int i = 0; i < 100; i++) {
071        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
072      }
073    }
074    RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
075    HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName));
076    moveRegion(region, rs);
077    try (Table table = UTIL.getConnection().getTable(tableName)) {
078      for (int i = 100; i < 200; i++) {
079        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
080      }
081    }
082    enablePeerAndWaitUntilReplicationDone(200);
083    checkOrder(200);
084  }
085
086  @Test
087  public void testRegionSplit() throws Exception {
088    TableName tableName = createTable();
089    try (Table table = UTIL.getConnection().getTable(tableName)) {
090      for (int i = 0; i < 100; i++) {
091        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
092      }
093    }
094    UTIL.flush(tableName);
095    RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
096    UTIL.getAdmin().splitRegionAsync(region.getEncodedNameAsBytes(), Bytes.toBytes(50)).get(30,
097      TimeUnit.SECONDS);
098    UTIL.waitUntilNoRegionsInTransition(30000);
099    List<RegionInfo> regions = UTIL.getAdmin().getRegions(tableName);
100    assertEquals(2, regions.size());
101    try (Table table = UTIL.getConnection().getTable(tableName)) {
102      for (int i = 0; i < 100; i++) {
103        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
104      }
105    }
106    enablePeerAndWaitUntilReplicationDone(200);
107    Map<String, Long> regionsToSeqId = new HashMap<>();
108    regionsToSeqId.put(region.getEncodedName(), -1L);
109    regions.stream().map(RegionInfo::getEncodedName).forEach(n -> regionsToSeqId.put(n, -1L));
110    try (WAL.Reader reader =
111      WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
112      int count = 0;
113      for (Entry entry;;) {
114        entry = reader.next();
115        if (entry == null) {
116          break;
117        }
118        String encodedName = Bytes.toString(entry.getKey().getEncodedRegionName());
119        Long seqId = regionsToSeqId.get(encodedName);
120        assertNotNull(
121          "Unexcepted entry " + entry + ", expected regions " + region + ", or " + regions, seqId);
122        assertTrue("Sequence id go backwards from " + seqId + " to "
123          + entry.getKey().getSequenceId() + " for " + encodedName,
124          entry.getKey().getSequenceId() >= seqId.longValue());
125        if (count < 100) {
126          assertEquals(encodedName + " is pushed before parent " + region.getEncodedName(),
127            region.getEncodedName(), encodedName);
128        } else {
129          assertNotEquals(region.getEncodedName(), encodedName);
130        }
131        count++;
132      }
133      assertEquals(200, count);
134    }
135  }
136
137  @Test
138  public void testRegionMerge() throws Exception {
139    byte[] splitKey = Bytes.toBytes(50);
140    TableName tableName = TableName.valueOf(name.getMethodName());
141    UTIL.getAdmin().createTable(
142      TableDescriptorBuilder.newBuilder(tableName)
143        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF)
144          .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
145        .build(),
146      new byte[][] { splitKey });
147    UTIL.waitTableAvailable(tableName);
148    try (Table table = UTIL.getConnection().getTable(tableName)) {
149      for (int i = 0; i < 100; i++) {
150        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
151      }
152    }
153    List<RegionInfo> regions = UTIL.getAdmin().getRegions(tableName);
154    UTIL.getAdmin()
155      .mergeRegionsAsync(
156        regions.stream().map(RegionInfo::getEncodedNameAsBytes).toArray(byte[][]::new), false)
157      .get(30, TimeUnit.SECONDS);
158    UTIL.waitUntilNoRegionsInTransition(30000);
159    List<RegionInfo> regionsAfterMerge = UTIL.getAdmin().getRegions(tableName);
160    assertEquals(1, regionsAfterMerge.size());
161    try (Table table = UTIL.getConnection().getTable(tableName)) {
162      for (int i = 0; i < 100; i++) {
163        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
164      }
165    }
166    enablePeerAndWaitUntilReplicationDone(200);
167    Map<String, Long> regionsToSeqId = new HashMap<>();
168    RegionInfo region = regionsAfterMerge.get(0);
169    regionsToSeqId.put(region.getEncodedName(), -1L);
170    regions.stream().map(RegionInfo::getEncodedName).forEach(n -> regionsToSeqId.put(n, -1L));
171    try (WAL.Reader reader =
172      WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
173      int count = 0;
174      for (Entry entry;;) {
175        entry = reader.next();
176        if (entry == null) {
177          break;
178        }
179        String encodedName = Bytes.toString(entry.getKey().getEncodedRegionName());
180        Long seqId = regionsToSeqId.get(encodedName);
181        assertNotNull(
182          "Unexcepted entry " + entry + ", expected regions " + region + ", or " + regions, seqId);
183        assertTrue("Sequence id go backwards from " + seqId + " to "
184          + entry.getKey().getSequenceId() + " for " + encodedName,
185          entry.getKey().getSequenceId() >= seqId.longValue());
186        if (count < 100) {
187          assertNotEquals(
188            encodedName + " is pushed before parents " + regions.stream()
189              .map(RegionInfo::getEncodedName).collect(Collectors.joining(" and ")),
190            region.getEncodedName(), encodedName);
191        } else {
192          assertEquals(region.getEncodedName(), encodedName);
193        }
194        count++;
195      }
196      assertEquals(200, count);
197    }
198  }
199
200  @Test
201  public void testRemovePeerNothingReplicated() throws Exception {
202    TableName tableName = createTable();
203    String encodedRegionName =
204      UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo().getEncodedName();
205    ReplicationQueueStorage queueStorage =
206      UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
207    assertEquals(HConstants.NO_SEQNUM, queueStorage.getLastSequenceId(encodedRegionName, PEER_ID));
208    UTIL.getAdmin().removeReplicationPeer(PEER_ID);
209    assertEquals(HConstants.NO_SEQNUM, queueStorage.getLastSequenceId(encodedRegionName, PEER_ID));
210  }
211
212  @Test
213  public void testRemovePeer() throws Exception {
214    TableName tableName = createTable();
215    try (Table table = UTIL.getConnection().getTable(tableName)) {
216      for (int i = 0; i < 100; i++) {
217        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
218      }
219    }
220    enablePeerAndWaitUntilReplicationDone(100);
221    checkOrder(100);
222    String encodedRegionName =
223      UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo().getEncodedName();
224    ReplicationQueueStorage queueStorage =
225      UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
226    assertTrue(queueStorage.getLastSequenceId(encodedRegionName, PEER_ID) > 0);
227    UTIL.getAdmin().removeReplicationPeer(PEER_ID);
228    // confirm that we delete the last pushed sequence id
229    assertEquals(HConstants.NO_SEQNUM, queueStorage.getLastSequenceId(encodedRegionName, PEER_ID));
230  }
231
232  @Test
233  public void testRemoveSerialFlag() throws Exception {
234    TableName tableName = createTable();
235    try (Table table = UTIL.getConnection().getTable(tableName)) {
236      for (int i = 0; i < 100; i++) {
237        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
238      }
239    }
240    enablePeerAndWaitUntilReplicationDone(100);
241    checkOrder(100);
242    String encodedRegionName =
243      UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo().getEncodedName();
244    ReplicationQueueStorage queueStorage =
245      UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
246    assertTrue(queueStorage.getLastSequenceId(encodedRegionName, PEER_ID) > 0);
247    ReplicationPeerConfig peerConfig = UTIL.getAdmin().getReplicationPeerConfig(PEER_ID);
248    UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID,
249      ReplicationPeerConfig.newBuilder(peerConfig).setSerial(false).build());
250    // confirm that we delete the last pushed sequence id
251    assertEquals(HConstants.NO_SEQNUM, queueStorage.getLastSequenceId(encodedRegionName, PEER_ID));
252  }
253}