001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.jupiter.api.Assertions.assertTrue;
021
022import java.io.IOException;
023import java.util.Collections;
024import org.apache.hadoop.fs.Path;
025import org.apache.hadoop.hbase.TableName;
026import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
027import org.apache.hadoop.hbase.client.Put;
028import org.apache.hadoop.hbase.client.RegionInfo;
029import org.apache.hadoop.hbase.client.Table;
030import org.apache.hadoop.hbase.client.TableState;
031import org.apache.hadoop.hbase.master.TableStateManager;
032import org.apache.hadoop.hbase.regionserver.HRegionServer;
033import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
034import org.apache.hadoop.hbase.replication.regionserver.Replication;
035import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
036import org.apache.hadoop.hbase.testclassification.LargeTests;
037import org.apache.hadoop.hbase.testclassification.ReplicationTests;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
040import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
041import org.junit.jupiter.api.BeforeEach;
042import org.junit.jupiter.api.Tag;
043import org.junit.jupiter.api.Test;
044
045import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
046
047/**
048 * Testcase for HBASE-20147.
049 */
050@Tag(ReplicationTests.TAG)
051@Tag(LargeTests.TAG)
052public class TestAddToSerialReplicationPeer extends SerialReplicationTestBase {
053
054  @BeforeEach
055  public void setUp() throws IOException, StreamLacksCapabilityException {
056    setupWALWriter();
057  }
058
059  // make sure that we will start replication for the sequence id after move, that's what we want to
060  // test here.
061  private void moveRegionAndArchiveOldWals(RegionInfo region, HRegionServer rs) throws Exception {
062    moveRegion(region, rs);
063    rollAllWALs();
064  }
065
066  private void waitUntilReplicatedToTheCurrentWALFile(HRegionServer rs, final String oldWalName)
067    throws Exception {
068    Path path = ((AbstractFSWAL<?>) rs.getWAL(null)).getCurrentFileName();
069    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(path.getName());
070    UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
071
072      @Override
073      public boolean evaluate() throws Exception {
074        ReplicationSourceManager manager =
075          ((Replication) rs.getReplicationSourceService()).getReplicationManager();
076        // Make sure replication moves to the new file.
077        ReplicationQueueId queueId = new ReplicationQueueId(rs.getServerName(), PEER_ID);
078        return (manager.getWALs().get(queueId).get(logPrefix).size() == 1)
079          && !oldWalName.equals(manager.getWALs().get(queueId).get(logPrefix).first());
080      }
081
082      @Override
083      public String explainFailure() throws Exception {
084        return "Still not replicated to the current WAL file yet";
085      }
086    });
087  }
088
089  @Test
090  public void testAddPeer() throws Exception {
091    TableName tableName = createTable();
092    try (Table table = UTIL.getConnection().getTable(tableName)) {
093      for (int i = 0; i < 100; i++) {
094        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
095      }
096    }
097    RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
098    HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName));
099    moveRegionAndArchiveOldWals(region, rs);
100    addPeer(true);
101    try (Table table = UTIL.getConnection().getTable(tableName)) {
102      for (int i = 0; i < 100; i++) {
103        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
104      }
105    }
106    waitUntilReplicationDone(100);
107    checkOrder(100);
108  }
109
110  @Test
111  public void testChangeToSerial() throws Exception {
112    ReplicationPeerConfig peerConfig =
113      ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase")
114        .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build();
115    UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true);
116
117    TableName tableName = createTable();
118    try (Table table = UTIL.getConnection().getTable(tableName)) {
119      for (int i = 0; i < 100; i++) {
120        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
121      }
122    }
123
124    RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
125    HRegionServer srcRs = UTIL.getRSForFirstRegionInTable(tableName);
126    // Get the current wal file name
127    String walFileNameBeforeRollover =
128      ((AbstractFSWAL<?>) srcRs.getWAL(null)).getCurrentFileName().getName();
129
130    HRegionServer rs = UTIL.getOtherRegionServer(srcRs);
131    moveRegionAndArchiveOldWals(region, rs);
132    waitUntilReplicationDone(100);
133    waitUntilReplicatedToTheCurrentWALFile(srcRs, walFileNameBeforeRollover);
134
135    UTIL.getAdmin().disableReplicationPeer(PEER_ID);
136    UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID,
137      ReplicationPeerConfig.newBuilder(peerConfig).setSerial(true).build());
138    UTIL.getAdmin().enableReplicationPeer(PEER_ID);
139
140    try (Table table = UTIL.getConnection().getTable(tableName)) {
141      for (int i = 0; i < 100; i++) {
142        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
143      }
144    }
145    waitUntilReplicationDone(200);
146    checkOrder(200);
147  }
148
149  @Test
150  public void testAddToSerialPeer() throws Exception {
151    ReplicationPeerConfig peerConfig =
152      ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase")
153        .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName())
154        .setReplicateAllUserTables(false).setSerial(true).build();
155    UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true);
156
157    TableName tableName = createTable();
158    try (Table table = UTIL.getConnection().getTable(tableName)) {
159      for (int i = 0; i < 100; i++) {
160        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
161      }
162    }
163    RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
164    HRegionServer srcRs = UTIL.getRSForFirstRegionInTable(tableName);
165    HRegionServer rs = UTIL.getOtherRegionServer(srcRs);
166
167    // Get the current wal file name
168    String walFileNameBeforeRollover =
169      ((AbstractFSWAL<?>) srcRs.getWAL(null)).getCurrentFileName().getName();
170
171    moveRegionAndArchiveOldWals(region, rs);
172
173    // Make sure that the replication done for the oldWal at source rs.
174    waitUntilReplicatedToTheCurrentWALFile(srcRs, walFileNameBeforeRollover);
175
176    UTIL.getAdmin().disableReplicationPeer(PEER_ID);
177    UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID,
178      ReplicationPeerConfig.newBuilder(peerConfig)
179        .setTableCFsMap(ImmutableMap.of(tableName, Collections.emptyList())).build());
180    UTIL.getAdmin().enableReplicationPeer(PEER_ID);
181    try (Table table = UTIL.getConnection().getTable(tableName)) {
182      for (int i = 0; i < 100; i++) {
183        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
184      }
185    }
186    waitUntilReplicationDone(100);
187    checkOrder(100);
188  }
189
190  @Test
191  public void testDisabledTable() throws Exception {
192    TableName tableName = createTable();
193    try (Table table = UTIL.getConnection().getTable(tableName)) {
194      for (int i = 0; i < 100; i++) {
195        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
196      }
197    }
198    UTIL.getAdmin().disableTable(tableName);
199    rollAllWALs();
200    addPeer(true);
201    UTIL.getAdmin().enableTable(tableName);
202    try (Table table = UTIL.getConnection().getTable(tableName)) {
203      for (int i = 0; i < 100; i++) {
204        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
205      }
206    }
207    waitUntilReplicationDone(100);
208    checkOrder(100);
209  }
210
211  @Test
212  public void testDisablingTable() throws Exception {
213    TableName tableName = createTable();
214    try (Table table = UTIL.getConnection().getTable(tableName)) {
215      for (int i = 0; i < 100; i++) {
216        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
217      }
218    }
219    UTIL.getAdmin().disableTable(tableName);
220    rollAllWALs();
221    TableStateManager tsm = UTIL.getMiniHBaseCluster().getMaster().getTableStateManager();
222    tsm.setTableState(tableName, TableState.State.DISABLING);
223    Thread t = new Thread(() -> {
224      try {
225        addPeer(true);
226      } catch (IOException e) {
227        throw new RuntimeException(e);
228      }
229    });
230    t.start();
231    Thread.sleep(5000);
232    // we will wait on the disabling table so the thread should still be alive.
233    assertTrue(t.isAlive());
234    tsm.setTableState(tableName, TableState.State.DISABLED);
235    t.join();
236    UTIL.getAdmin().enableTable(tableName);
237    try (Table table = UTIL.getConnection().getTable(tableName)) {
238      for (int i = 0; i < 100; i++) {
239        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
240      }
241    }
242    waitUntilReplicationDone(100);
243    checkOrder(100);
244  }
245
246  @Test
247  public void testEnablingTable() throws Exception {
248    TableName tableName = createTable();
249    try (Table table = UTIL.getConnection().getTable(tableName)) {
250      for (int i = 0; i < 100; i++) {
251        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
252      }
253    }
254    RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
255    HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName));
256    moveRegionAndArchiveOldWals(region, rs);
257    TableStateManager tsm = UTIL.getMiniHBaseCluster().getMaster().getTableStateManager();
258    tsm.setTableState(tableName, TableState.State.ENABLING);
259    Thread t = new Thread(() -> {
260      try {
261        addPeer(true);
262      } catch (IOException e) {
263        throw new RuntimeException(e);
264      }
265    });
266    t.start();
267    Thread.sleep(5000);
268    // we will wait on the disabling table so the thread should still be alive.
269    assertTrue(t.isAlive());
270    tsm.setTableState(tableName, TableState.State.ENABLED);
271    t.join();
272    try (Table table = UTIL.getConnection().getTable(tableName)) {
273      for (int i = 0; i < 100; i++) {
274        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
275      }
276    }
277    waitUntilReplicationDone(100);
278    checkOrder(100);
279  }
280}