001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtil.countRows;
021import static org.apache.hadoop.hbase.replication.TestReplicationBaseNoBeforeAll.NB_RETRIES;
022import static org.apache.hadoop.hbase.replication.TestReplicationBaseNoBeforeAll.NB_ROWS_IN_BATCH;
023import static org.apache.hadoop.hbase.replication.TestReplicationBaseNoBeforeAll.SLEEP_TIME;
024import static org.junit.jupiter.api.Assertions.assertEquals;
025import static org.junit.jupiter.api.Assertions.assertTrue;
026
027import java.util.ArrayList;
028import java.util.List;
029import java.util.stream.Collectors;
030import org.apache.hadoop.fs.FileAlreadyExistsException;
031import org.apache.hadoop.fs.FileStatus;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.client.Delete;
036import org.apache.hadoop.hbase.client.Put;
037import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
038import org.apache.hadoop.hbase.testclassification.LargeTests;
039import org.apache.hadoop.hbase.testclassification.ReplicationTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.hbase.util.CommonFSUtils;
042import org.junit.jupiter.api.Tag;
043import org.junit.jupiter.api.Test;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047@Tag(ReplicationTests.TAG)
048@Tag(LargeTests.TAG)
049public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase {
050
051  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSyncUpTool.class);
052
053  /**
054   * Add a row to a table in each cluster, check it's replicated, delete it, check's gone Also check
055   * the puts and deletes are not replicated back to the originating cluster.
056   */
057  @Test
058  public void testSyncUpTool() throws Exception {
059    // Set up Replication: on Master and one Slave
060    // Table: t1_syncup and t2_syncup
061    // columnfamily:
062    // 'cf1' : replicated
063    // 'norep': not replicated
064    setupReplication();
065
066    //
067    // at Master:
068    // t1_syncup: put 100 rows into cf1, and 1 rows into norep
069    // t2_syncup: put 200 rows into cf1, and 1 rows into norep
070    //
071    // verify correctly replicated to slave
072    putAndReplicateRows();
073
074    // Verify delete works
075    //
076    // step 1: stop hbase on Slave
077    //
078    // step 2: at Master:
079    // t1_syncup: delete 50 rows from cf1
080    // t2_syncup: delete 100 rows from cf1
081    // no change on 'norep'
082    //
083    // step 3: stop hbase on master, restart hbase on Slave
084    //
085    // step 4: verify Slave still have the rows before delete
086    // t1_syncup: 100 rows from cf1
087    // t2_syncup: 200 rows from cf1
088    //
089    // step 5: run syncup tool on Master
090    //
091    // step 6: verify that delete show up on Slave
092    // t1_syncup: 50 rows from cf1
093    // t2_syncup: 100 rows from cf1
094    //
095    // verify correctly replicated to Slave
096    mimicSyncUpAfterDelete();
097
098    // Verify put works
099    //
100    // step 1: stop hbase on Slave
101    //
102    // step 2: at Master:
103    // t1_syncup: put 100 rows from cf1
104    // t2_syncup: put 200 rows from cf1
105    // and put another row on 'norep'
106    // ATTN:
107    // put to 'cf1' will overwrite existing rows, so end count will be 100 and 200 respectively
108    // put to 'norep' will add a new row.
109    //
110    // step 3: stop hbase on master, restart hbase on Slave
111    //
112    // step 4: verify Slave still has the rows before put
113    // t1_syncup: 50 rows from cf1
114    // t2_syncup: 100 rows from cf1
115    //
116    // step 5: run syncup tool on Master
117    //
118    // step 6: verify that put show up on Slave and 'norep' does not
119    // t1_syncup: 100 rows from cf1
120    // t2_syncup: 200 rows from cf1
121    //
122    // verify correctly replicated to Slave
123    mimicSyncUpAfterPut();
124  }
125
126  private void putAndReplicateRows() throws Exception {
127    LOG.debug("putAndReplicateRows");
128    // add rows to Master cluster,
129    Put p;
130
131    // 100 + 1 row to t1_syncup
132    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
133      p = new Put(Bytes.toBytes("row" + i));
134      p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i));
135      ht1Source.put(p);
136    }
137    p = new Put(Bytes.toBytes("row" + 9999));
138    p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9999));
139    ht1Source.put(p);
140
141    // 200 + 1 row to t2_syncup
142    for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) {
143      p = new Put(Bytes.toBytes("row" + i));
144      p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i));
145      ht2Source.put(p);
146    }
147    p = new Put(Bytes.toBytes("row" + 9999));
148    p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9999));
149    ht2Source.put(p);
150
151    // ensure replication completed
152    Thread.sleep(SLEEP_TIME);
153    int rowCountHt1Source = countRows(ht1Source);
154    for (int i = 0; i < NB_RETRIES; i++) {
155      int rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
156      if (i == NB_RETRIES - 1) {
157        assertEquals(rowCountHt1Source - 1, rowCountHt1TargetAtPeer1,
158          "t1_syncup has 101 rows on source, and 100 on slave1");
159      }
160      if (rowCountHt1Source - 1 == rowCountHt1TargetAtPeer1) {
161        break;
162      }
163      Thread.sleep(SLEEP_TIME);
164    }
165
166    int rowCountHt2Source = countRows(ht2Source);
167    for (int i = 0; i < NB_RETRIES; i++) {
168      int rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
169      if (i == NB_RETRIES - 1) {
170        assertEquals(rowCountHt2Source - 1, rowCountHt2TargetAtPeer1,
171          "t2_syncup has 201 rows on source, and 200 on slave1");
172      }
173      if (rowCountHt2Source - 1 == rowCountHt2TargetAtPeer1) {
174        break;
175      }
176      Thread.sleep(SLEEP_TIME);
177    }
178  }
179
180  private void mimicSyncUpAfterDelete() throws Exception {
181    LOG.debug("mimicSyncUpAfterDelete");
182    shutDownTargetHBaseCluster();
183
184    List<Delete> list = new ArrayList<>();
185    // delete half of the rows
186    for (int i = 0; i < NB_ROWS_IN_BATCH / 2; i++) {
187      String rowKey = "row" + i;
188      Delete del = new Delete(Bytes.toBytes(rowKey));
189      list.add(del);
190    }
191    ht1Source.delete(list);
192
193    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
194      String rowKey = "row" + i;
195      Delete del = new Delete(Bytes.toBytes(rowKey));
196      list.add(del);
197    }
198    ht2Source.delete(list);
199
200    int rowCount_ht1Source = countRows(ht1Source);
201    assertEquals(51, rowCount_ht1Source,
202      "t1_syncup has 51 rows on source, after remove 50 of the replicated colfam");
203
204    int rowCount_ht2Source = countRows(ht2Source);
205    assertEquals(101, rowCount_ht2Source,
206      "t2_syncup has 101 rows on source, after remove 100 of the replicated colfam");
207    List<ServerName> sourceRses = UTIL1.getHBaseCluster().getRegionServerThreads().stream()
208      .map(rst -> rst.getRegionServer().getServerName()).collect(Collectors.toList());
209    shutDownSourceHBaseCluster();
210    restartTargetHBaseCluster(1);
211
212    Thread.sleep(SLEEP_TIME);
213
214    // before sync up
215    int rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
216    int rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
217    assertEquals(100, rowCountHt1TargetAtPeer1, "@Peer1 t1_syncup should still have 100 rows");
218    assertEquals(200, rowCountHt2TargetAtPeer1, "@Peer1 t2_syncup should still have 200 rows");
219
220    syncUp(UTIL1);
221
222    // After sync up
223    rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
224    rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
225    assertEquals(50, rowCountHt1TargetAtPeer1,
226      "@Peer1 t1_syncup should be sync up and have 50 rows");
227    assertEquals(100, rowCountHt2TargetAtPeer1,
228      "@Peer1 t2_syncup should be sync up and have 100 rows");
229
230    // check we have recorded the dead region servers and also have an info file
231    Path rootDir = CommonFSUtils.getRootDir(UTIL1.getConfiguration());
232    Path syncUpInfoDir = new Path(rootDir, ReplicationSyncUp.INFO_DIR);
233    FileSystem fs = UTIL1.getTestFileSystem();
234    for (ServerName sn : sourceRses) {
235      assertTrue(fs.exists(new Path(syncUpInfoDir, sn.getServerName())));
236    }
237    assertTrue(fs.exists(new Path(syncUpInfoDir, ReplicationSyncUp.INFO_FILE)));
238    assertEquals(sourceRses.size() + 1, fs.listStatus(syncUpInfoDir).length);
239
240    restartSourceHBaseCluster(1);
241    // should finally removed all the records after restart
242    UTIL1.waitFor(60000, () -> fs.listStatus(syncUpInfoDir).length == 0);
243  }
244
245  private void mimicSyncUpAfterPut() throws Exception {
246    LOG.debug("mimicSyncUpAfterPut");
247    shutDownTargetHBaseCluster();
248
249    Put p;
250    // another 100 + 1 row to t1_syncup
251    // we should see 100 + 2 rows now
252    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
253      p = new Put(Bytes.toBytes("row" + i));
254      p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i));
255      ht1Source.put(p);
256    }
257    p = new Put(Bytes.toBytes("row" + 9998));
258    p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9998));
259    ht1Source.put(p);
260
261    // another 200 + 1 row to t1_syncup
262    // we should see 200 + 2 rows now
263    for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) {
264      p = new Put(Bytes.toBytes("row" + i));
265      p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i));
266      ht2Source.put(p);
267    }
268    p = new Put(Bytes.toBytes("row" + 9998));
269    p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9998));
270    ht2Source.put(p);
271
272    int rowCount_ht1Source = countRows(ht1Source);
273    assertEquals(102, rowCount_ht1Source, "t1_syncup has 102 rows on source");
274    int rowCount_ht2Source = countRows(ht2Source);
275    assertEquals(202, rowCount_ht2Source, "t2_syncup has 202 rows on source");
276
277    shutDownSourceHBaseCluster();
278    restartTargetHBaseCluster(1);
279
280    Thread.sleep(SLEEP_TIME);
281
282    // before sync up
283    int rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
284    int rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
285    assertEquals(50, rowCountHt1TargetAtPeer1,
286      "@Peer1 t1_syncup should be NOT sync up and have 50 rows");
287    assertEquals(100, rowCountHt2TargetAtPeer1,
288      "@Peer1 t2_syncup should be NOT sync up and have 100 rows");
289
290    syncUp(UTIL1);
291
292    // after sync up
293    rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
294    rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
295    assertEquals(100, rowCountHt1TargetAtPeer1,
296      "@Peer1 t1_syncup should be sync up and have 100 rows");
297    assertEquals(200, rowCountHt2TargetAtPeer1,
298      "@Peer1 t2_syncup should be sync up and have 200 rows");
299  }
300
301  /**
302   * test "start a new ReplicationSyncUp after the previous failed". See HBASE-27623 for details.
303   */
304  @Test
305  public void testStartANewSyncUpToolAfterFailed() throws Exception {
306    // Start syncUpTool for the first time with non-force mode,
307    // let's assume that this will fail in sync data,
308    // this does not affect our test results
309    syncUp(UTIL1);
310    Path rootDir = CommonFSUtils.getRootDir(UTIL1.getConfiguration());
311    Path syncUpInfoDir = new Path(rootDir, ReplicationSyncUp.INFO_DIR);
312    Path replicationInfoPath = new Path(syncUpInfoDir, ReplicationSyncUp.INFO_FILE);
313    FileSystem fs = UTIL1.getTestFileSystem();
314    assertTrue(fs.exists(replicationInfoPath));
315    FileStatus fileStatus1 = fs.getFileStatus(replicationInfoPath);
316
317    // Start syncUpTool for the second time with non-force mode,
318    // startup will fail because replication info file already exists
319    try {
320      syncUp(UTIL1);
321    } catch (Exception e) {
322      assertTrue(e instanceof FileAlreadyExistsException,
323        "e should be a FileAlreadyExistsException");
324    }
325    FileStatus fileStatus2 = fs.getFileStatus(replicationInfoPath);
326    assertEquals(fileStatus1.getModificationTime(), fileStatus2.getModificationTime());
327
328    // Start syncUpTool for the third time with force mode,
329    // startup will success and create a new replication info file
330    syncUp(UTIL1, new String[] { "-f" });
331    FileStatus fileStatus3 = fs.getFileStatus(replicationInfoPath);
332    assertTrue(fileStatus3.getModificationTime() > fileStatus2.getModificationTime());
333  }
334}