001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtil.countRows;
021import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_RETRIES;
022import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_ROWS_IN_BATCH;
023import static org.apache.hadoop.hbase.replication.TestReplicationBase.SLEEP_TIME;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertTrue;
026
027import java.util.ArrayList;
028import java.util.List;
029import java.util.stream.Collectors;
030import org.apache.hadoop.fs.FileAlreadyExistsException;
031import org.apache.hadoop.fs.FileStatus;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.hadoop.hbase.client.Delete;
037import org.apache.hadoop.hbase.client.Put;
038import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
039import org.apache.hadoop.hbase.testclassification.LargeTests;
040import org.apache.hadoop.hbase.testclassification.ReplicationTests;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.hbase.util.CommonFSUtils;
043import org.junit.ClassRule;
044import org.junit.Test;
045import org.junit.experimental.categories.Category;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049@Category({ ReplicationTests.class, LargeTests.class })
050public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase {
051
052  @ClassRule
053  public static final HBaseClassTestRule CLASS_RULE =
054    HBaseClassTestRule.forClass(TestReplicationSyncUpTool.class);
055
056  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSyncUpTool.class);
057
058  /**
059   * Add a row to a table in each cluster, check it's replicated, delete it, check's gone Also check
060   * the puts and deletes are not replicated back to the originating cluster.
061   */
062  @Test
063  public void testSyncUpTool() throws Exception {
064    // Set up Replication: on Master and one Slave
065    // Table: t1_syncup and t2_syncup
066    // columnfamily:
067    // 'cf1' : replicated
068    // 'norep': not replicated
069    setupReplication();
070
071    //
072    // at Master:
073    // t1_syncup: put 100 rows into cf1, and 1 rows into norep
074    // t2_syncup: put 200 rows into cf1, and 1 rows into norep
075    //
076    // verify correctly replicated to slave
077    putAndReplicateRows();
078
079    // Verify delete works
080    //
081    // step 1: stop hbase on Slave
082    //
083    // step 2: at Master:
084    // t1_syncup: delete 50 rows from cf1
085    // t2_syncup: delete 100 rows from cf1
086    // no change on 'norep'
087    //
088    // step 3: stop hbase on master, restart hbase on Slave
089    //
090    // step 4: verify Slave still have the rows before delete
091    // t1_syncup: 100 rows from cf1
092    // t2_syncup: 200 rows from cf1
093    //
094    // step 5: run syncup tool on Master
095    //
096    // step 6: verify that delete show up on Slave
097    // t1_syncup: 50 rows from cf1
098    // t2_syncup: 100 rows from cf1
099    //
100    // verify correctly replicated to Slave
101    mimicSyncUpAfterDelete();
102
103    // Verify put works
104    //
105    // step 1: stop hbase on Slave
106    //
107    // step 2: at Master:
108    // t1_syncup: put 100 rows from cf1
109    // t2_syncup: put 200 rows from cf1
110    // and put another row on 'norep'
111    // ATTN:
112    // put to 'cf1' will overwrite existing rows, so end count will be 100 and 200 respectively
113    // put to 'norep' will add a new row.
114    //
115    // step 3: stop hbase on master, restart hbase on Slave
116    //
117    // step 4: verify Slave still has the rows before put
118    // t1_syncup: 50 rows from cf1
119    // t2_syncup: 100 rows from cf1
120    //
121    // step 5: run syncup tool on Master
122    //
123    // step 6: verify that put show up on Slave and 'norep' does not
124    // t1_syncup: 100 rows from cf1
125    // t2_syncup: 200 rows from cf1
126    //
127    // verify correctly replicated to Slave
128    mimicSyncUpAfterPut();
129  }
130
131  private void putAndReplicateRows() throws Exception {
132    LOG.debug("putAndReplicateRows");
133    // add rows to Master cluster,
134    Put p;
135
136    // 100 + 1 row to t1_syncup
137    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
138      p = new Put(Bytes.toBytes("row" + i));
139      p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i));
140      ht1Source.put(p);
141    }
142    p = new Put(Bytes.toBytes("row" + 9999));
143    p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9999));
144    ht1Source.put(p);
145
146    // 200 + 1 row to t2_syncup
147    for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) {
148      p = new Put(Bytes.toBytes("row" + i));
149      p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i));
150      ht2Source.put(p);
151    }
152    p = new Put(Bytes.toBytes("row" + 9999));
153    p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9999));
154    ht2Source.put(p);
155
156    // ensure replication completed
157    Thread.sleep(SLEEP_TIME);
158    int rowCountHt1Source = countRows(ht1Source);
159    for (int i = 0; i < NB_RETRIES; i++) {
160      int rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
161      if (i == NB_RETRIES - 1) {
162        assertEquals("t1_syncup has 101 rows on source, and 100 on slave1", rowCountHt1Source - 1,
163          rowCountHt1TargetAtPeer1);
164      }
165      if (rowCountHt1Source - 1 == rowCountHt1TargetAtPeer1) {
166        break;
167      }
168      Thread.sleep(SLEEP_TIME);
169    }
170
171    int rowCountHt2Source = countRows(ht2Source);
172    for (int i = 0; i < NB_RETRIES; i++) {
173      int rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
174      if (i == NB_RETRIES - 1) {
175        assertEquals("t2_syncup has 201 rows on source, and 200 on slave1", rowCountHt2Source - 1,
176          rowCountHt2TargetAtPeer1);
177      }
178      if (rowCountHt2Source - 1 == rowCountHt2TargetAtPeer1) {
179        break;
180      }
181      Thread.sleep(SLEEP_TIME);
182    }
183  }
184
185  private void mimicSyncUpAfterDelete() throws Exception {
186    LOG.debug("mimicSyncUpAfterDelete");
187    shutDownTargetHBaseCluster();
188
189    List<Delete> list = new ArrayList<>();
190    // delete half of the rows
191    for (int i = 0; i < NB_ROWS_IN_BATCH / 2; i++) {
192      String rowKey = "row" + i;
193      Delete del = new Delete(Bytes.toBytes(rowKey));
194      list.add(del);
195    }
196    ht1Source.delete(list);
197
198    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
199      String rowKey = "row" + i;
200      Delete del = new Delete(Bytes.toBytes(rowKey));
201      list.add(del);
202    }
203    ht2Source.delete(list);
204
205    int rowCount_ht1Source = countRows(ht1Source);
206    assertEquals("t1_syncup has 51 rows on source, after remove 50 of the replicated colfam", 51,
207      rowCount_ht1Source);
208
209    int rowCount_ht2Source = countRows(ht2Source);
210    assertEquals("t2_syncup has 101 rows on source, after remove 100 of the replicated colfam", 101,
211      rowCount_ht2Source);
212    List<ServerName> sourceRses = UTIL1.getHBaseCluster().getRegionServerThreads().stream()
213      .map(rst -> rst.getRegionServer().getServerName()).collect(Collectors.toList());
214    shutDownSourceHBaseCluster();
215    restartTargetHBaseCluster(1);
216
217    Thread.sleep(SLEEP_TIME);
218
219    // before sync up
220    int rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
221    int rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
222    assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCountHt1TargetAtPeer1);
223    assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCountHt2TargetAtPeer1);
224
225    syncUp(UTIL1);
226
227    // After sync up
228    rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
229    rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
230    assertEquals("@Peer1 t1_syncup should be sync up and have 50 rows", 50,
231      rowCountHt1TargetAtPeer1);
232    assertEquals("@Peer1 t2_syncup should be sync up and have 100 rows", 100,
233      rowCountHt2TargetAtPeer1);
234
235    // check we have recorded the dead region servers and also have an info file
236    Path rootDir = CommonFSUtils.getRootDir(UTIL1.getConfiguration());
237    Path syncUpInfoDir = new Path(rootDir, ReplicationSyncUp.INFO_DIR);
238    FileSystem fs = UTIL1.getTestFileSystem();
239    for (ServerName sn : sourceRses) {
240      assertTrue(fs.exists(new Path(syncUpInfoDir, sn.getServerName())));
241    }
242    assertTrue(fs.exists(new Path(syncUpInfoDir, ReplicationSyncUp.INFO_FILE)));
243    assertEquals(sourceRses.size() + 1, fs.listStatus(syncUpInfoDir).length);
244
245    restartSourceHBaseCluster(1);
246    // should finally removed all the records after restart
247    UTIL1.waitFor(60000, () -> fs.listStatus(syncUpInfoDir).length == 0);
248  }
249
250  private void mimicSyncUpAfterPut() throws Exception {
251    LOG.debug("mimicSyncUpAfterPut");
252    shutDownTargetHBaseCluster();
253
254    Put p;
255    // another 100 + 1 row to t1_syncup
256    // we should see 100 + 2 rows now
257    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
258      p = new Put(Bytes.toBytes("row" + i));
259      p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i));
260      ht1Source.put(p);
261    }
262    p = new Put(Bytes.toBytes("row" + 9998));
263    p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9998));
264    ht1Source.put(p);
265
266    // another 200 + 1 row to t1_syncup
267    // we should see 200 + 2 rows now
268    for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) {
269      p = new Put(Bytes.toBytes("row" + i));
270      p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i));
271      ht2Source.put(p);
272    }
273    p = new Put(Bytes.toBytes("row" + 9998));
274    p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9998));
275    ht2Source.put(p);
276
277    int rowCount_ht1Source = countRows(ht1Source);
278    assertEquals("t1_syncup has 102 rows on source", 102, rowCount_ht1Source);
279    int rowCount_ht2Source = countRows(ht2Source);
280    assertEquals("t2_syncup has 202 rows on source", 202, rowCount_ht2Source);
281
282    shutDownSourceHBaseCluster();
283    restartTargetHBaseCluster(1);
284
285    Thread.sleep(SLEEP_TIME);
286
287    // before sync up
288    int rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
289    int rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
290    assertEquals("@Peer1 t1_syncup should be NOT sync up and have 50 rows", 50,
291      rowCountHt1TargetAtPeer1);
292    assertEquals("@Peer1 t2_syncup should be NOT sync up and have 100 rows", 100,
293      rowCountHt2TargetAtPeer1);
294
295    syncUp(UTIL1);
296
297    // after sync up
298    rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
299    rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
300    assertEquals("@Peer1 t1_syncup should be sync up and have 100 rows", 100,
301      rowCountHt1TargetAtPeer1);
302    assertEquals("@Peer1 t2_syncup should be sync up and have 200 rows", 200,
303      rowCountHt2TargetAtPeer1);
304  }
305
306  /**
307   * test "start a new ReplicationSyncUp after the previous failed". See HBASE-27623 for details.
308   */
309  @Test
310  public void testStartANewSyncUpToolAfterFailed() throws Exception {
311    // Start syncUpTool for the first time with non-force mode,
312    // let's assume that this will fail in sync data,
313    // this does not affect our test results
314    syncUp(UTIL1);
315    Path rootDir = CommonFSUtils.getRootDir(UTIL1.getConfiguration());
316    Path syncUpInfoDir = new Path(rootDir, ReplicationSyncUp.INFO_DIR);
317    Path replicationInfoPath = new Path(syncUpInfoDir, ReplicationSyncUp.INFO_FILE);
318    FileSystem fs = UTIL1.getTestFileSystem();
319    assertTrue(fs.exists(replicationInfoPath));
320    FileStatus fileStatus1 = fs.getFileStatus(replicationInfoPath);
321
322    // Start syncUpTool for the second time with non-force mode,
323    // startup will fail because replication info file already exists
324    try {
325      syncUp(UTIL1);
326    } catch (Exception e) {
327      assertTrue("e should be a FileAlreadyExistsException",
328        (e instanceof FileAlreadyExistsException));
329    }
330    FileStatus fileStatus2 = fs.getFileStatus(replicationInfoPath);
331    assertEquals(fileStatus1.getModificationTime(), fileStatus2.getModificationTime());
332
333    // Start syncUpTool for the third time with force mode,
334    // startup will success and create a new replication info file
335    syncUp(UTIL1, new String[] { "-f" });
336    FileStatus fileStatus3 = fs.getFileStatus(replicationInfoPath);
337    assertTrue(fileStatus3.getModificationTime() > fileStatus2.getModificationTime());
338  }
339}