001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertTrue;
021
022import java.io.IOException;
023import java.util.Collections;
024import org.apache.hadoop.fs.Path;
025import org.apache.hadoop.hbase.HBaseClassTestRule;
026import org.apache.hadoop.hbase.TableName;
027import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
028import org.apache.hadoop.hbase.client.Put;
029import org.apache.hadoop.hbase.client.RegionInfo;
030import org.apache.hadoop.hbase.client.Table;
031import org.apache.hadoop.hbase.client.TableState;
032import org.apache.hadoop.hbase.master.TableStateManager;
033import org.apache.hadoop.hbase.regionserver.HRegionServer;
034import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
035import org.apache.hadoop.hbase.replication.regionserver.Replication;
036import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
037import org.apache.hadoop.hbase.testclassification.MediumTests;
038import org.apache.hadoop.hbase.testclassification.ReplicationTests;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
041import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
042import org.junit.Before;
043import org.junit.ClassRule;
044import org.junit.Test;
045import org.junit.experimental.categories.Category;
046
047import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
048
049/**
050 * Testcase for HBASE-20147.
051 */
052@Category({ ReplicationTests.class, MediumTests.class })
053public class TestAddToSerialReplicationPeer extends SerialReplicationTestBase {
054
055  @ClassRule
056  public static final HBaseClassTestRule CLASS_RULE =
057    HBaseClassTestRule.forClass(TestAddToSerialReplicationPeer.class);
058
059  @Before
060  public void setUp() throws IOException, StreamLacksCapabilityException {
061    setupWALWriter();
062  }
063
064  // make sure that we will start replication for the sequence id after move, that's what we want to
065  // test here.
066  private void moveRegionAndArchiveOldWals(RegionInfo region, HRegionServer rs) throws Exception {
067    moveRegion(region, rs);
068    rollAllWALs();
069  }
070
071  private void waitUntilReplicatedToTheCurrentWALFile(HRegionServer rs) throws Exception {
072    Path path = ((AbstractFSWAL<?>) rs.getWAL(null)).getCurrentFileName();
073    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(path.getName());
074    UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
075
076      @Override
077      public boolean evaluate() throws Exception {
078        ReplicationSourceManager manager =
079          ((Replication) rs.getReplicationSourceService()).getReplicationManager();
080        return manager.getWALs().get(PEER_ID).get(logPrefix).size() == 1;
081      }
082
083      @Override
084      public String explainFailure() throws Exception {
085        return "Still not replicated to the current WAL file yet";
086      }
087    });
088  }
089
090  @Test
091  public void testAddPeer() throws Exception {
092    TableName tableName = createTable();
093    try (Table table = UTIL.getConnection().getTable(tableName)) {
094      for (int i = 0; i < 100; i++) {
095        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
096      }
097    }
098    RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
099    HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName));
100    moveRegionAndArchiveOldWals(region, rs);
101    addPeer(true);
102    try (Table table = UTIL.getConnection().getTable(tableName)) {
103      for (int i = 0; i < 100; i++) {
104        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
105      }
106    }
107    waitUntilReplicationDone(100);
108    checkOrder(100);
109  }
110
111  @Test
112  public void testChangeToSerial() throws Exception {
113    ReplicationPeerConfig peerConfig =
114      ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase")
115        .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build();
116    UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true);
117
118    TableName tableName = createTable();
119    try (Table table = UTIL.getConnection().getTable(tableName)) {
120      for (int i = 0; i < 100; i++) {
121        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
122      }
123    }
124
125    RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
126    HRegionServer srcRs = UTIL.getRSForFirstRegionInTable(tableName);
127    HRegionServer rs = UTIL.getOtherRegionServer(srcRs);
128    moveRegionAndArchiveOldWals(region, rs);
129    waitUntilReplicationDone(100);
130    waitUntilReplicatedToTheCurrentWALFile(srcRs);
131
132    UTIL.getAdmin().disableReplicationPeer(PEER_ID);
133    UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID,
134      ReplicationPeerConfig.newBuilder(peerConfig).setSerial(true).build());
135    UTIL.getAdmin().enableReplicationPeer(PEER_ID);
136
137    try (Table table = UTIL.getConnection().getTable(tableName)) {
138      for (int i = 0; i < 100; i++) {
139        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
140      }
141    }
142    waitUntilReplicationDone(200);
143    checkOrder(200);
144  }
145
146  @Test
147  public void testAddToSerialPeer() throws Exception {
148    ReplicationPeerConfig peerConfig =
149      ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase")
150        .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName())
151        .setReplicateAllUserTables(false).setSerial(true).build();
152    UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true);
153
154    TableName tableName = createTable();
155    try (Table table = UTIL.getConnection().getTable(tableName)) {
156      for (int i = 0; i < 100; i++) {
157        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
158      }
159    }
160    RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
161    HRegionServer srcRs = UTIL.getRSForFirstRegionInTable(tableName);
162    HRegionServer rs = UTIL.getOtherRegionServer(srcRs);
163    moveRegionAndArchiveOldWals(region, rs);
164    waitUntilReplicatedToTheCurrentWALFile(rs);
165    UTIL.getAdmin().disableReplicationPeer(PEER_ID);
166    UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID,
167      ReplicationPeerConfig.newBuilder(peerConfig)
168        .setTableCFsMap(ImmutableMap.of(tableName, Collections.emptyList())).build());
169    UTIL.getAdmin().enableReplicationPeer(PEER_ID);
170    try (Table table = UTIL.getConnection().getTable(tableName)) {
171      for (int i = 0; i < 100; i++) {
172        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
173      }
174    }
175    waitUntilReplicationDone(100);
176    checkOrder(100);
177  }
178
179  @Test
180  public void testDisabledTable() throws Exception {
181    TableName tableName = createTable();
182    try (Table table = UTIL.getConnection().getTable(tableName)) {
183      for (int i = 0; i < 100; i++) {
184        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
185      }
186    }
187    UTIL.getAdmin().disableTable(tableName);
188    rollAllWALs();
189    addPeer(true);
190    UTIL.getAdmin().enableTable(tableName);
191    try (Table table = UTIL.getConnection().getTable(tableName)) {
192      for (int i = 0; i < 100; i++) {
193        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
194      }
195    }
196    waitUntilReplicationDone(100);
197    checkOrder(100);
198  }
199
200  @Test
201  public void testDisablingTable() throws Exception {
202    TableName tableName = createTable();
203    try (Table table = UTIL.getConnection().getTable(tableName)) {
204      for (int i = 0; i < 100; i++) {
205        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
206      }
207    }
208    UTIL.getAdmin().disableTable(tableName);
209    rollAllWALs();
210    TableStateManager tsm = UTIL.getMiniHBaseCluster().getMaster().getTableStateManager();
211    tsm.setTableState(tableName, TableState.State.DISABLING);
212    Thread t = new Thread(() -> {
213      try {
214        addPeer(true);
215      } catch (IOException e) {
216        throw new RuntimeException(e);
217      }
218    });
219    t.start();
220    Thread.sleep(5000);
221    // we will wait on the disabling table so the thread should still be alive.
222    assertTrue(t.isAlive());
223    tsm.setTableState(tableName, TableState.State.DISABLED);
224    t.join();
225    UTIL.getAdmin().enableTable(tableName);
226    try (Table table = UTIL.getConnection().getTable(tableName)) {
227      for (int i = 0; i < 100; i++) {
228        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
229      }
230    }
231    waitUntilReplicationDone(100);
232    checkOrder(100);
233  }
234
235  @Test
236  public void testEnablingTable() throws Exception {
237    TableName tableName = createTable();
238    try (Table table = UTIL.getConnection().getTable(tableName)) {
239      for (int i = 0; i < 100; i++) {
240        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
241      }
242    }
243    RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
244    HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName));
245    moveRegionAndArchiveOldWals(region, rs);
246    TableStateManager tsm = UTIL.getMiniHBaseCluster().getMaster().getTableStateManager();
247    tsm.setTableState(tableName, TableState.State.ENABLING);
248    Thread t = new Thread(() -> {
249      try {
250        addPeer(true);
251      } catch (IOException e) {
252        throw new RuntimeException(e);
253      }
254    });
255    t.start();
256    Thread.sleep(5000);
257    // we will wait on the disabling table so the thread should still be alive.
258    assertTrue(t.isAlive());
259    tsm.setTableState(tableName, TableState.State.ENABLED);
260    t.join();
261    try (Table table = UTIL.getConnection().getTable(tableName)) {
262      for (int i = 0; i < 100; i++) {
263        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
264      }
265    }
266    waitUntilReplicationDone(100);
267    checkOrder(100);
268  }
269}