001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtil.countRows;
021import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_RETRIES;
022import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_ROWS_IN_BATCH;
023import static org.apache.hadoop.hbase.replication.TestReplicationBase.SLEEP_TIME;
024import static org.junit.Assert.assertEquals;
025
026import java.util.ArrayList;
027import java.util.List;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.client.Delete;
030import org.apache.hadoop.hbase.client.Put;
031import org.apache.hadoop.hbase.testclassification.LargeTests;
032import org.apache.hadoop.hbase.testclassification.ReplicationTests;
033import org.apache.hadoop.hbase.util.Bytes;
034import org.junit.ClassRule;
035import org.junit.Test;
036import org.junit.experimental.categories.Category;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040@Category({ ReplicationTests.class, LargeTests.class })
041public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase {
042
043  @ClassRule
044  public static final HBaseClassTestRule CLASS_RULE =
045    HBaseClassTestRule.forClass(TestReplicationSyncUpTool.class);
046
047  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSyncUpTool.class);
048
049  /**
050   * Add a row to a table in each cluster, check it's replicated, delete it, check's gone Also check
051   * the puts and deletes are not replicated back to the originating cluster.
052   */
053  @Test
054  public void testSyncUpTool() throws Exception {
055
056    /**
057     * Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily:
058     * 'cf1' : replicated 'norep': not replicated
059     */
060    setupReplication();
061
062    /**
063     * at Master: t1_syncup: put 100 rows into cf1, and 1 rows into norep t2_syncup: put 200 rows
064     * into cf1, and 1 rows into norep verify correctly replicated to slave
065     */
066    putAndReplicateRows();
067
068    /**
069     * Verify delete works step 1: stop hbase on Slave step 2: at Master: t1_syncup: delete 50 rows
070     * from cf1 t2_syncup: delete 100 rows from cf1 no change on 'norep' step 3: stop hbase on
071     * master, restart hbase on Slave step 4: verify Slave still have the rows before delete
072     * t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 step 5: run syncup tool on Master
073     * step 6: verify that delete show up on Slave t1_syncup: 50 rows from cf1 t2_syncup: 100 rows
074     * from cf1 verify correctly replicated to Slave
075     */
076    mimicSyncUpAfterDelete();
077
078    /**
079     * Verify put works step 1: stop hbase on Slave step 2: at Master: t1_syncup: put 100 rows from
080     * cf1 t2_syncup: put 200 rows from cf1 and put another row on 'norep' ATTN: put to 'cf1' will
081     * overwrite existing rows, so end count will be 100 and 200 respectively put to 'norep' will
082     * add a new row. step 3: stop hbase on master, restart hbase on Slave step 4: verify Slave
083     * still has the rows before put t1_syncup: 50 rows from cf1 t2_syncup: 100 rows from cf1 step
084     * 5: run syncup tool on Master step 6: verify that put show up on Slave and 'norep' does not
085     * t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 verify correctly replicated to
086     * Slave
087     */
088    mimicSyncUpAfterPut();
089  }
090
091  private void putAndReplicateRows() throws Exception {
092    LOG.debug("putAndReplicateRows");
093    // add rows to Master cluster,
094    Put p;
095
096    // 100 + 1 row to t1_syncup
097    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
098      p = new Put(Bytes.toBytes("row" + i));
099      p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i));
100      ht1Source.put(p);
101    }
102    p = new Put(Bytes.toBytes("row" + 9999));
103    p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9999));
104    ht1Source.put(p);
105
106    // 200 + 1 row to t2_syncup
107    for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) {
108      p = new Put(Bytes.toBytes("row" + i));
109      p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i));
110      ht2Source.put(p);
111    }
112    p = new Put(Bytes.toBytes("row" + 9999));
113    p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9999));
114    ht2Source.put(p);
115
116    // ensure replication completed
117    Thread.sleep(SLEEP_TIME);
118    int rowCountHt1Source = countRows(ht1Source);
119    for (int i = 0; i < NB_RETRIES; i++) {
120      int rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
121      if (i == NB_RETRIES - 1) {
122        assertEquals("t1_syncup has 101 rows on source, and 100 on slave1", rowCountHt1Source - 1,
123          rowCountHt1TargetAtPeer1);
124      }
125      if (rowCountHt1Source - 1 == rowCountHt1TargetAtPeer1) {
126        break;
127      }
128      Thread.sleep(SLEEP_TIME);
129    }
130
131    int rowCountHt2Source = countRows(ht2Source);
132    for (int i = 0; i < NB_RETRIES; i++) {
133      int rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
134      if (i == NB_RETRIES - 1) {
135        assertEquals("t2_syncup has 201 rows on source, and 200 on slave1", rowCountHt2Source - 1,
136          rowCountHt2TargetAtPeer1);
137      }
138      if (rowCountHt2Source - 1 == rowCountHt2TargetAtPeer1) {
139        break;
140      }
141      Thread.sleep(SLEEP_TIME);
142    }
143  }
144
145  private void mimicSyncUpAfterDelete() throws Exception {
146    LOG.debug("mimicSyncUpAfterDelete");
147    shutDownTargetHBaseCluster();
148
149    List<Delete> list = new ArrayList<>();
150    // delete half of the rows
151    for (int i = 0; i < NB_ROWS_IN_BATCH / 2; i++) {
152      String rowKey = "row" + i;
153      Delete del = new Delete(Bytes.toBytes(rowKey));
154      list.add(del);
155    }
156    ht1Source.delete(list);
157
158    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
159      String rowKey = "row" + i;
160      Delete del = new Delete(Bytes.toBytes(rowKey));
161      list.add(del);
162    }
163    ht2Source.delete(list);
164
165    int rowCount_ht1Source = countRows(ht1Source);
166    assertEquals("t1_syncup has 51 rows on source, after remove 50 of the replicated colfam", 51,
167      rowCount_ht1Source);
168
169    int rowCount_ht2Source = countRows(ht2Source);
170    assertEquals("t2_syncup has 101 rows on source, after remove 100 of the replicated colfam", 101,
171      rowCount_ht2Source);
172
173    shutDownSourceHBaseCluster();
174    restartTargetHBaseCluster(1);
175
176    Thread.sleep(SLEEP_TIME);
177
178    // before sync up
179    int rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
180    int rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
181    assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCountHt1TargetAtPeer1);
182    assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCountHt2TargetAtPeer1);
183
184    // After sync up
185    for (int i = 0; i < NB_RETRIES; i++) {
186      syncUp(UTIL1);
187      rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
188      rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
189      if (i == NB_RETRIES - 1) {
190        if (rowCountHt1TargetAtPeer1 != 50 || rowCountHt2TargetAtPeer1 != 100) {
191          // syncUP still failed. Let's look at the source in case anything wrong there
192          restartSourceHBaseCluster(1);
193          rowCount_ht1Source = countRows(ht1Source);
194          LOG.debug("t1_syncup should have 51 rows at source, and it is " + rowCount_ht1Source);
195          rowCount_ht2Source = countRows(ht2Source);
196          LOG.debug("t2_syncup should have 101 rows at source, and it is " + rowCount_ht2Source);
197        }
198        assertEquals("@Peer1 t1_syncup should be sync up and have 50 rows", 50,
199          rowCountHt1TargetAtPeer1);
200        assertEquals("@Peer1 t2_syncup should be sync up and have 100 rows", 100,
201          rowCountHt2TargetAtPeer1);
202      }
203      if (rowCountHt1TargetAtPeer1 == 50 && rowCountHt2TargetAtPeer1 == 100) {
204        LOG.info("SyncUpAfterDelete succeeded at retry = " + i);
205        break;
206      } else {
207        LOG.debug("SyncUpAfterDelete failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
208          + rowCountHt1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
209          + rowCountHt2TargetAtPeer1);
210      }
211      Thread.sleep(SLEEP_TIME);
212    }
213  }
214
215  private void mimicSyncUpAfterPut() throws Exception {
216    LOG.debug("mimicSyncUpAfterPut");
217    restartSourceHBaseCluster(1);
218    shutDownTargetHBaseCluster();
219
220    Put p;
221    // another 100 + 1 row to t1_syncup
222    // we should see 100 + 2 rows now
223    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
224      p = new Put(Bytes.toBytes("row" + i));
225      p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i));
226      ht1Source.put(p);
227    }
228    p = new Put(Bytes.toBytes("row" + 9998));
229    p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9998));
230    ht1Source.put(p);
231
232    // another 200 + 1 row to t1_syncup
233    // we should see 200 + 2 rows now
234    for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) {
235      p = new Put(Bytes.toBytes("row" + i));
236      p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i));
237      ht2Source.put(p);
238    }
239    p = new Put(Bytes.toBytes("row" + 9998));
240    p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9998));
241    ht2Source.put(p);
242
243    int rowCount_ht1Source = countRows(ht1Source);
244    assertEquals("t1_syncup has 102 rows on source", 102, rowCount_ht1Source);
245    int rowCount_ht2Source = countRows(ht2Source);
246    assertEquals("t2_syncup has 202 rows on source", 202, rowCount_ht2Source);
247
248    shutDownSourceHBaseCluster();
249    restartTargetHBaseCluster(1);
250
251    Thread.sleep(SLEEP_TIME);
252
253    // before sync up
254    int rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
255    int rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
256    assertEquals("@Peer1 t1_syncup should be NOT sync up and have 50 rows", 50,
257      rowCountHt1TargetAtPeer1);
258    assertEquals("@Peer1 t2_syncup should be NOT sync up and have 100 rows", 100,
259      rowCountHt2TargetAtPeer1);
260
261    // after syun up
262    for (int i = 0; i < NB_RETRIES; i++) {
263      syncUp(UTIL1);
264      rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
265      rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
266      if (i == NB_RETRIES - 1) {
267        if (rowCountHt1TargetAtPeer1 != 100 || rowCountHt2TargetAtPeer1 != 200) {
268          // syncUP still failed. Let's look at the source in case anything wrong there
269          restartSourceHBaseCluster(1);
270          rowCount_ht1Source = countRows(ht1Source);
271          LOG.debug("t1_syncup should have 102 rows at source, and it is " + rowCount_ht1Source);
272          rowCount_ht2Source = countRows(ht2Source);
273          LOG.debug("t2_syncup should have 202 rows at source, and it is " + rowCount_ht2Source);
274        }
275        assertEquals("@Peer1 t1_syncup should be sync up and have 100 rows", 100,
276          rowCountHt1TargetAtPeer1);
277        assertEquals("@Peer1 t2_syncup should be sync up and have 200 rows", 200,
278          rowCountHt2TargetAtPeer1);
279      }
280      if (rowCountHt1TargetAtPeer1 == 100 && rowCountHt2TargetAtPeer1 == 200) {
281        LOG.info("SyncUpAfterPut succeeded at retry = " + i);
282        break;
283      } else {
284        LOG.debug("SyncUpAfterPut failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
285          + rowCountHt1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
286          + rowCountHt2TargetAtPeer1);
287      }
288      Thread.sleep(SLEEP_TIME);
289    }
290  }
291}