001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_RETRIES;
021import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_ROWS_IN_BATCH;
022import static org.apache.hadoop.hbase.replication.TestReplicationBase.SLEEP_TIME;
023import static org.junit.Assert.assertEquals;
024
025import java.util.ArrayList;
026import java.util.List;
027import org.apache.hadoop.hbase.HBaseClassTestRule;
028import org.apache.hadoop.hbase.client.Delete;
029import org.apache.hadoop.hbase.client.Put;
030import org.apache.hadoop.hbase.testclassification.LargeTests;
031import org.apache.hadoop.hbase.testclassification.ReplicationTests;
032import org.apache.hadoop.hbase.util.Bytes;
033import org.junit.ClassRule;
034import org.junit.Test;
035import org.junit.experimental.categories.Category;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039@Category({ ReplicationTests.class, LargeTests.class })
040public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase {
041
042  @ClassRule
043  public static final HBaseClassTestRule CLASS_RULE =
044    HBaseClassTestRule.forClass(TestReplicationSyncUpTool.class);
045
046  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSyncUpTool.class);
047
048  /**
049   * Add a row to a table in each cluster, check it's replicated, delete it, check's gone Also check
050   * the puts and deletes are not replicated back to the originating cluster.
051   */
052  @Test
053  public void testSyncUpTool() throws Exception {
054
055    /**
056     * Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily:
057     * 'cf1' : replicated 'norep': not replicated
058     */
059    setupReplication();
060
061    /**
062     * at Master: t1_syncup: put 100 rows into cf1, and 1 rows into norep t2_syncup: put 200 rows
063     * into cf1, and 1 rows into norep verify correctly replicated to slave
064     */
065    putAndReplicateRows();
066
067    /**
068     * Verify delete works step 1: stop hbase on Slave step 2: at Master: t1_syncup: delete 50 rows
069     * from cf1 t2_syncup: delete 100 rows from cf1 no change on 'norep' step 3: stop hbase on
070     * master, restart hbase on Slave step 4: verify Slave still have the rows before delete
071     * t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 step 5: run syncup tool on Master
072     * step 6: verify that delete show up on Slave t1_syncup: 50 rows from cf1 t2_syncup: 100 rows
073     * from cf1 verify correctly replicated to Slave
074     */
075    mimicSyncUpAfterDelete();
076
077    /**
078     * Verify put works step 1: stop hbase on Slave step 2: at Master: t1_syncup: put 100 rows from
079     * cf1 t2_syncup: put 200 rows from cf1 and put another row on 'norep' ATTN: put to 'cf1' will
080     * overwrite existing rows, so end count will be 100 and 200 respectively put to 'norep' will
081     * add a new row. step 3: stop hbase on master, restart hbase on Slave step 4: verify Slave
082     * still has the rows before put t1_syncup: 50 rows from cf1 t2_syncup: 100 rows from cf1 step
083     * 5: run syncup tool on Master step 6: verify that put show up on Slave and 'norep' does not
084     * t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 verify correctly replicated to
085     * Slave
086     */
087    mimicSyncUpAfterPut();
088  }
089
090  private void putAndReplicateRows() throws Exception {
091    LOG.debug("putAndReplicateRows");
092    // add rows to Master cluster,
093    Put p;
094
095    // 100 + 1 row to t1_syncup
096    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
097      p = new Put(Bytes.toBytes("row" + i));
098      p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i));
099      ht1Source.put(p);
100    }
101    p = new Put(Bytes.toBytes("row" + 9999));
102    p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9999));
103    ht1Source.put(p);
104
105    // 200 + 1 row to t2_syncup
106    for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) {
107      p = new Put(Bytes.toBytes("row" + i));
108      p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i));
109      ht2Source.put(p);
110    }
111    p = new Put(Bytes.toBytes("row" + 9999));
112    p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9999));
113    ht2Source.put(p);
114
115    // ensure replication completed
116    Thread.sleep(SLEEP_TIME);
117    int rowCountHt1Source = UTIL1.countRows(ht1Source);
118    for (int i = 0; i < NB_RETRIES; i++) {
119      int rowCountHt1TargetAtPeer1 = UTIL2.countRows(ht1TargetAtPeer1);
120      if (i == NB_RETRIES - 1) {
121        assertEquals("t1_syncup has 101 rows on source, and 100 on slave1", rowCountHt1Source - 1,
122          rowCountHt1TargetAtPeer1);
123      }
124      if (rowCountHt1Source - 1 == rowCountHt1TargetAtPeer1) {
125        break;
126      }
127      Thread.sleep(SLEEP_TIME);
128    }
129
130    int rowCountHt2Source = UTIL1.countRows(ht2Source);
131    for (int i = 0; i < NB_RETRIES; i++) {
132      int rowCountHt2TargetAtPeer1 = UTIL2.countRows(ht2TargetAtPeer1);
133      if (i == NB_RETRIES - 1) {
134        assertEquals("t2_syncup has 201 rows on source, and 200 on slave1", rowCountHt2Source - 1,
135          rowCountHt2TargetAtPeer1);
136      }
137      if (rowCountHt2Source - 1 == rowCountHt2TargetAtPeer1) {
138        break;
139      }
140      Thread.sleep(SLEEP_TIME);
141    }
142  }
143
144  private void mimicSyncUpAfterDelete() throws Exception {
145    LOG.debug("mimicSyncUpAfterDelete");
146    shutDownTargetHBaseCluster();
147
148    List<Delete> list = new ArrayList<>();
149    // delete half of the rows
150    for (int i = 0; i < NB_ROWS_IN_BATCH / 2; i++) {
151      String rowKey = "row" + i;
152      Delete del = new Delete(Bytes.toBytes(rowKey));
153      list.add(del);
154    }
155    ht1Source.delete(list);
156
157    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
158      String rowKey = "row" + i;
159      Delete del = new Delete(Bytes.toBytes(rowKey));
160      list.add(del);
161    }
162    ht2Source.delete(list);
163
164    int rowCount_ht1Source = UTIL1.countRows(ht1Source);
165    assertEquals("t1_syncup has 51 rows on source, after remove 50 of the replicated colfam", 51,
166      rowCount_ht1Source);
167
168    int rowCount_ht2Source = UTIL1.countRows(ht2Source);
169    assertEquals("t2_syncup has 101 rows on source, after remove 100 of the replicated colfam", 101,
170      rowCount_ht2Source);
171
172    shutDownSourceHBaseCluster();
173    restartTargetHBaseCluster(1);
174
175    Thread.sleep(SLEEP_TIME);
176
177    // before sync up
178    int rowCountHt1TargetAtPeer1 = UTIL2.countRows(ht1TargetAtPeer1);
179    int rowCountHt2TargetAtPeer1 = UTIL2.countRows(ht2TargetAtPeer1);
180    assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCountHt1TargetAtPeer1);
181    assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCountHt2TargetAtPeer1);
182
183    // After sync up
184    for (int i = 0; i < NB_RETRIES; i++) {
185      syncUp(UTIL1);
186      rowCountHt1TargetAtPeer1 = UTIL2.countRows(ht1TargetAtPeer1);
187      rowCountHt2TargetAtPeer1 = UTIL2.countRows(ht2TargetAtPeer1);
188      if (i == NB_RETRIES - 1) {
189        if (rowCountHt1TargetAtPeer1 != 50 || rowCountHt2TargetAtPeer1 != 100) {
190          // syncUP still failed. Let's look at the source in case anything wrong there
191          restartSourceHBaseCluster(1);
192          rowCount_ht1Source = UTIL1.countRows(ht1Source);
193          LOG.debug("t1_syncup should have 51 rows at source, and it is " + rowCount_ht1Source);
194          rowCount_ht2Source = UTIL1.countRows(ht2Source);
195          LOG.debug("t2_syncup should have 101 rows at source, and it is " + rowCount_ht2Source);
196        }
197        assertEquals("@Peer1 t1_syncup should be sync up and have 50 rows", 50,
198          rowCountHt1TargetAtPeer1);
199        assertEquals("@Peer1 t2_syncup should be sync up and have 100 rows", 100,
200          rowCountHt2TargetAtPeer1);
201      }
202      if (rowCountHt1TargetAtPeer1 == 50 && rowCountHt2TargetAtPeer1 == 100) {
203        LOG.info("SyncUpAfterDelete succeeded at retry = " + i);
204        break;
205      } else {
206        LOG.debug("SyncUpAfterDelete failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
207          + rowCountHt1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
208          + rowCountHt2TargetAtPeer1);
209      }
210      Thread.sleep(SLEEP_TIME);
211    }
212  }
213
214  private void mimicSyncUpAfterPut() throws Exception {
215    LOG.debug("mimicSyncUpAfterPut");
216    restartSourceHBaseCluster(1);
217    shutDownTargetHBaseCluster();
218
219    Put p;
220    // another 100 + 1 row to t1_syncup
221    // we should see 100 + 2 rows now
222    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
223      p = new Put(Bytes.toBytes("row" + i));
224      p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i));
225      ht1Source.put(p);
226    }
227    p = new Put(Bytes.toBytes("row" + 9998));
228    p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9998));
229    ht1Source.put(p);
230
231    // another 200 + 1 row to t1_syncup
232    // we should see 200 + 2 rows now
233    for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) {
234      p = new Put(Bytes.toBytes("row" + i));
235      p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i));
236      ht2Source.put(p);
237    }
238    p = new Put(Bytes.toBytes("row" + 9998));
239    p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9998));
240    ht2Source.put(p);
241
242    int rowCount_ht1Source = UTIL1.countRows(ht1Source);
243    assertEquals("t1_syncup has 102 rows on source", 102, rowCount_ht1Source);
244    int rowCount_ht2Source = UTIL1.countRows(ht2Source);
245    assertEquals("t2_syncup has 202 rows on source", 202, rowCount_ht2Source);
246
247    shutDownSourceHBaseCluster();
248    restartTargetHBaseCluster(1);
249
250    Thread.sleep(SLEEP_TIME);
251
252    // before sync up
253    int rowCountHt1TargetAtPeer1 = UTIL2.countRows(ht1TargetAtPeer1);
254    int rowCountHt2TargetAtPeer1 = UTIL2.countRows(ht2TargetAtPeer1);
255    assertEquals("@Peer1 t1_syncup should be NOT sync up and have 50 rows", 50,
256      rowCountHt1TargetAtPeer1);
257    assertEquals("@Peer1 t2_syncup should be NOT sync up and have 100 rows", 100,
258      rowCountHt2TargetAtPeer1);
259
260    // after syun up
261    for (int i = 0; i < NB_RETRIES; i++) {
262      syncUp(UTIL1);
263      rowCountHt1TargetAtPeer1 = UTIL2.countRows(ht1TargetAtPeer1);
264      rowCountHt2TargetAtPeer1 = UTIL2.countRows(ht2TargetAtPeer1);
265      if (i == NB_RETRIES - 1) {
266        if (rowCountHt1TargetAtPeer1 != 100 || rowCountHt2TargetAtPeer1 != 200) {
267          // syncUP still failed. Let's look at the source in case anything wrong there
268          restartSourceHBaseCluster(1);
269          rowCount_ht1Source = UTIL1.countRows(ht1Source);
270          LOG.debug("t1_syncup should have 102 rows at source, and it is " + rowCount_ht1Source);
271          rowCount_ht2Source = UTIL1.countRows(ht2Source);
272          LOG.debug("t2_syncup should have 202 rows at source, and it is " + rowCount_ht2Source);
273        }
274        assertEquals("@Peer1 t1_syncup should be sync up and have 100 rows", 100,
275          rowCountHt1TargetAtPeer1);
276        assertEquals("@Peer1 t2_syncup should be sync up and have 200 rows", 200,
277          rowCountHt2TargetAtPeer1);
278      }
279      if (rowCountHt1TargetAtPeer1 == 100 && rowCountHt2TargetAtPeer1 == 200) {
280        LOG.info("SyncUpAfterPut succeeded at retry = " + i);
281        break;
282      } else {
283        LOG.debug("SyncUpAfterPut failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
284          + rowCountHt1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
285          + rowCountHt2TargetAtPeer1);
286      }
287      Thread.sleep(SLEEP_TIME);
288    }
289  }
290}