001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertTrue;
021
022import java.io.IOException;
023import java.util.Collections;
024import org.apache.hadoop.fs.Path;
025import org.apache.hadoop.hbase.HBaseClassTestRule;
026import org.apache.hadoop.hbase.TableName;
027import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
028import org.apache.hadoop.hbase.client.Put;
029import org.apache.hadoop.hbase.client.RegionInfo;
030import org.apache.hadoop.hbase.client.Table;
031import org.apache.hadoop.hbase.client.TableState;
032import org.apache.hadoop.hbase.master.TableStateManager;
033import org.apache.hadoop.hbase.regionserver.HRegionServer;
034import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
035import org.apache.hadoop.hbase.replication.regionserver.Replication;
036import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
037import org.apache.hadoop.hbase.testclassification.LargeTests;
038import org.apache.hadoop.hbase.testclassification.ReplicationTests;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
041import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
042import org.junit.Before;
043import org.junit.ClassRule;
044import org.junit.Test;
045import org.junit.experimental.categories.Category;
046
047import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
048
049/**
050 * Testcase for HBASE-20147.
051 */
052@Category({ ReplicationTests.class, LargeTests.class })
053public class TestAddToSerialReplicationPeer extends SerialReplicationTestBase {
054
055  @ClassRule
056  public static final HBaseClassTestRule CLASS_RULE =
057    HBaseClassTestRule.forClass(TestAddToSerialReplicationPeer.class);
058
059  @Before
060  public void setUp() throws IOException, StreamLacksCapabilityException {
061    setupWALWriter();
062  }
063
064  // make sure that we will start replication for the sequence id after move, that's what we want to
065  // test here.
066  private void moveRegionAndArchiveOldWals(RegionInfo region, HRegionServer rs) throws Exception {
067    moveRegion(region, rs);
068    rollAllWALs();
069  }
070
071  private void waitUntilReplicatedToTheCurrentWALFile(HRegionServer rs, final String oldWalName)
072    throws Exception {
073    Path path = ((AbstractFSWAL<?>) rs.getWAL(null)).getCurrentFileName();
074    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(path.getName());
075    UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
076
077      @Override
078      public boolean evaluate() throws Exception {
079        ReplicationSourceManager manager =
080          ((Replication) rs.getReplicationSourceService()).getReplicationManager();
081        // Make sure replication moves to the new file.
082        ReplicationQueueId queueId = new ReplicationQueueId(rs.getServerName(), PEER_ID);
083        return (manager.getWALs().get(queueId).get(logPrefix).size() == 1)
084          && !oldWalName.equals(manager.getWALs().get(queueId).get(logPrefix).first());
085      }
086
087      @Override
088      public String explainFailure() throws Exception {
089        return "Still not replicated to the current WAL file yet";
090      }
091    });
092  }
093
094  @Test
095  public void testAddPeer() throws Exception {
096    TableName tableName = createTable();
097    try (Table table = UTIL.getConnection().getTable(tableName)) {
098      for (int i = 0; i < 100; i++) {
099        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
100      }
101    }
102    RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
103    HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName));
104    moveRegionAndArchiveOldWals(region, rs);
105    addPeer(true);
106    try (Table table = UTIL.getConnection().getTable(tableName)) {
107      for (int i = 0; i < 100; i++) {
108        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
109      }
110    }
111    waitUntilReplicationDone(100);
112    checkOrder(100);
113  }
114
115  @Test
116  public void testChangeToSerial() throws Exception {
117    ReplicationPeerConfig peerConfig =
118      ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase")
119        .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build();
120    UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true);
121
122    TableName tableName = createTable();
123    try (Table table = UTIL.getConnection().getTable(tableName)) {
124      for (int i = 0; i < 100; i++) {
125        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
126      }
127    }
128
129    RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
130    HRegionServer srcRs = UTIL.getRSForFirstRegionInTable(tableName);
131    // Get the current wal file name
132    String walFileNameBeforeRollover =
133      ((AbstractFSWAL<?>) srcRs.getWAL(null)).getCurrentFileName().getName();
134
135    HRegionServer rs = UTIL.getOtherRegionServer(srcRs);
136    moveRegionAndArchiveOldWals(region, rs);
137    waitUntilReplicationDone(100);
138    waitUntilReplicatedToTheCurrentWALFile(srcRs, walFileNameBeforeRollover);
139
140    UTIL.getAdmin().disableReplicationPeer(PEER_ID);
141    UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID,
142      ReplicationPeerConfig.newBuilder(peerConfig).setSerial(true).build());
143    UTIL.getAdmin().enableReplicationPeer(PEER_ID);
144
145    try (Table table = UTIL.getConnection().getTable(tableName)) {
146      for (int i = 0; i < 100; i++) {
147        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
148      }
149    }
150    waitUntilReplicationDone(200);
151    checkOrder(200);
152  }
153
154  @Test
155  public void testAddToSerialPeer() throws Exception {
156    ReplicationPeerConfig peerConfig =
157      ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase")
158        .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName())
159        .setReplicateAllUserTables(false).setSerial(true).build();
160    UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true);
161
162    TableName tableName = createTable();
163    try (Table table = UTIL.getConnection().getTable(tableName)) {
164      for (int i = 0; i < 100; i++) {
165        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
166      }
167    }
168    RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
169    HRegionServer srcRs = UTIL.getRSForFirstRegionInTable(tableName);
170    HRegionServer rs = UTIL.getOtherRegionServer(srcRs);
171
172    // Get the current wal file name
173    String walFileNameBeforeRollover =
174      ((AbstractFSWAL<?>) srcRs.getWAL(null)).getCurrentFileName().getName();
175
176    moveRegionAndArchiveOldWals(region, rs);
177
178    // Make sure that the replication done for the oldWal at source rs.
179    waitUntilReplicatedToTheCurrentWALFile(srcRs, walFileNameBeforeRollover);
180
181    UTIL.getAdmin().disableReplicationPeer(PEER_ID);
182    UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID,
183      ReplicationPeerConfig.newBuilder(peerConfig)
184        .setTableCFsMap(ImmutableMap.of(tableName, Collections.emptyList())).build());
185    UTIL.getAdmin().enableReplicationPeer(PEER_ID);
186    try (Table table = UTIL.getConnection().getTable(tableName)) {
187      for (int i = 0; i < 100; i++) {
188        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
189      }
190    }
191    waitUntilReplicationDone(100);
192    checkOrder(100);
193  }
194
195  @Test
196  public void testDisabledTable() throws Exception {
197    TableName tableName = createTable();
198    try (Table table = UTIL.getConnection().getTable(tableName)) {
199      for (int i = 0; i < 100; i++) {
200        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
201      }
202    }
203    UTIL.getAdmin().disableTable(tableName);
204    rollAllWALs();
205    addPeer(true);
206    UTIL.getAdmin().enableTable(tableName);
207    try (Table table = UTIL.getConnection().getTable(tableName)) {
208      for (int i = 0; i < 100; i++) {
209        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
210      }
211    }
212    waitUntilReplicationDone(100);
213    checkOrder(100);
214  }
215
216  @Test
217  public void testDisablingTable() throws Exception {
218    TableName tableName = createTable();
219    try (Table table = UTIL.getConnection().getTable(tableName)) {
220      for (int i = 0; i < 100; i++) {
221        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
222      }
223    }
224    UTIL.getAdmin().disableTable(tableName);
225    rollAllWALs();
226    TableStateManager tsm = UTIL.getMiniHBaseCluster().getMaster().getTableStateManager();
227    tsm.setTableState(tableName, TableState.State.DISABLING);
228    Thread t = new Thread(() -> {
229      try {
230        addPeer(true);
231      } catch (IOException e) {
232        throw new RuntimeException(e);
233      }
234    });
235    t.start();
236    Thread.sleep(5000);
237    // we will wait on the disabling table so the thread should still be alive.
238    assertTrue(t.isAlive());
239    tsm.setTableState(tableName, TableState.State.DISABLED);
240    t.join();
241    UTIL.getAdmin().enableTable(tableName);
242    try (Table table = UTIL.getConnection().getTable(tableName)) {
243      for (int i = 0; i < 100; i++) {
244        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
245      }
246    }
247    waitUntilReplicationDone(100);
248    checkOrder(100);
249  }
250
251  @Test
252  public void testEnablingTable() throws Exception {
253    TableName tableName = createTable();
254    try (Table table = UTIL.getConnection().getTable(tableName)) {
255      for (int i = 0; i < 100; i++) {
256        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
257      }
258    }
259    RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
260    HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName));
261    moveRegionAndArchiveOldWals(region, rs);
262    TableStateManager tsm = UTIL.getMiniHBaseCluster().getMaster().getTableStateManager();
263    tsm.setTableState(tableName, TableState.State.ENABLING);
264    Thread t = new Thread(() -> {
265      try {
266        addPeer(true);
267      } catch (IOException e) {
268        throw new RuntimeException(e);
269      }
270    });
271    t.start();
272    Thread.sleep(5000);
273    // we will wait on the disabling table so the thread should still be alive.
274    assertTrue(t.isAlive());
275    tsm.setTableState(tableName, TableState.State.ENABLED);
276    t.join();
277    try (Table table = UTIL.getConnection().getTable(tableName)) {
278      for (int i = 0; i < 100; i++) {
279        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
280      }
281    }
282    waitUntilReplicationDone(100);
283    checkOrder(100);
284  }
285}