001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_RETRIES;
021import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_ROWS_IN_BATCH;
022import static org.apache.hadoop.hbase.replication.TestReplicationBase.SLEEP_TIME;
023import static org.junit.Assert.assertEquals;
024
025import java.util.ArrayList;
026import java.util.List;
027import org.apache.hadoop.hbase.HBaseClassTestRule;
028import org.apache.hadoop.hbase.client.Delete;
029import org.apache.hadoop.hbase.client.Put;
030import org.apache.hadoop.hbase.testclassification.LargeTests;
031import org.apache.hadoop.hbase.testclassification.ReplicationTests;
032import org.apache.hadoop.hbase.util.Bytes;
033import org.junit.ClassRule;
034import org.junit.Test;
035import org.junit.experimental.categories.Category;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039@Category({ ReplicationTests.class, LargeTests.class })
040public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase {
041
042  @ClassRule
043  public static final HBaseClassTestRule CLASS_RULE =
044    HBaseClassTestRule.forClass(TestReplicationSyncUpTool.class);
045
046  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSyncUpTool.class);
047
048  /**
049   * Add a row to a table in each cluster, check it's replicated, delete it, check's gone Also check
050   * the puts and deletes are not replicated back to the originating cluster.
051   */
052  @Test
053  public void testSyncUpTool() throws Exception {
054
055    /**
056     * Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily:
057     * 'cf1' : replicated 'norep': not replicated
058     */
059    setupReplication();
060
061    /**
062     * at Master: t1_syncup: put 100 rows into cf1, and 1 rows into norep t2_syncup: put 200 rows
063     * into cf1, and 1 rows into norep verify correctly replicated to slave
064     */
065    putAndReplicateRows();
066
067    /**
068     * Verify delete works step 1: stop hbase on Slave step 2: at Master: t1_syncup: delete 50 rows
069     * from cf1 t2_syncup: delete 100 rows from cf1 no change on 'norep' step 3: stop hbase on
070     * master, restart hbase on Slave step 4: verify Slave still have the rows before delete
071     * t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 step 5: run syncup tool on Master
072     * step 6: verify that delete show up on Slave t1_syncup: 50 rows from cf1 t2_syncup: 100 rows
073     * from cf1 verify correctly replicated to Slave
074     */
075    mimicSyncUpAfterDelete();
076
077    /**
078     * Verify put works step 1: stop hbase on Slave step 2: at Master: t1_syncup: put 100 rows from
079     * cf1 t2_syncup: put 200 rows from cf1 and put another row on 'norep' ATTN: put to 'cf1' will
080     * overwrite existing rows, so end count will be 100 and 200 respectively put to 'norep' will
081     * add a new row. step 3: stop hbase on master, restart hbase on Slave step 4: verify Slave
082     * still has the rows before put t1_syncup: 50 rows from cf1 t2_syncup: 100 rows from cf1 step
083     * 5: run syncup tool on Master step 6: verify that put show up on Slave and 'norep' does not
084     * t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 verify correctly replicated to
085     * Slave
086     */
087    mimicSyncUpAfterPut();
088  }
089
090  private void putAndReplicateRows() throws Exception {
091    LOG.debug("putAndReplicateRows");
092    // add rows to Master cluster,
093    Put p;
094
095    // 100 + 1 row to t1_syncup
096    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
097      p = new Put(Bytes.toBytes("row" + i));
098      p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i));
099      ht1Source.put(p);
100    }
101    p = new Put(Bytes.toBytes("row" + 9999));
102    p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9999));
103    ht1Source.put(p);
104
105    // 200 + 1 row to t2_syncup
106    for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) {
107      p = new Put(Bytes.toBytes("row" + i));
108      p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i));
109      ht2Source.put(p);
110    }
111    p = new Put(Bytes.toBytes("row" + 9999));
112    p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9999));
113    ht2Source.put(p);
114
115    // ensure replication completed
116    Thread.sleep(SLEEP_TIME);
117    int rowCountHt1Source = UTIL1.countRows(ht1Source);
118    for (int i = 0; i < NB_RETRIES; i++) {
119      int rowCountHt1TargetAtPeer1 = UTIL2.countRows(ht1TargetAtPeer1);
120      if (i == NB_RETRIES - 1) {
121        assertEquals("t1_syncup has 101 rows on source, and 100 on slave1", rowCountHt1Source - 1,
122          rowCountHt1TargetAtPeer1);
123      }
124      if (rowCountHt1Source - 1 == rowCountHt1TargetAtPeer1) {
125        break;
126      }
127      Thread.sleep(SLEEP_TIME);
128    }
129
130    int rowCountHt2Source = UTIL1.countRows(ht2Source);
131    for (int i = 0; i < NB_RETRIES; i++) {
132      int rowCountHt2TargetAtPeer1 = UTIL2.countRows(ht2TargetAtPeer1);
133      if (i == NB_RETRIES - 1) {
134        assertEquals("t2_syncup has 201 rows on source, and 200 on slave1", rowCountHt2Source - 1,
135          rowCountHt2TargetAtPeer1);
136      }
137      if (rowCountHt2Source - 1 == rowCountHt2TargetAtPeer1) {
138        break;
139      }
140      Thread.sleep(SLEEP_TIME);
141    }
142  }
143
144  private void mimicSyncUpAfterDelete() throws Exception {
145    LOG.debug("mimicSyncUpAfterDelete");
146    shutDownTargetHBaseCluster();
147
148    List<Delete> list = new ArrayList<>();
149    // delete half of the rows
150    for (int i = 0; i < NB_ROWS_IN_BATCH / 2; i++) {
151      String rowKey = "row" + i;
152      Delete del = new Delete(Bytes.toBytes(rowKey));
153      list.add(del);
154    }
155    ht1Source.delete(list);
156
157    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
158      String rowKey = "row" + i;
159      Delete del = new Delete(Bytes.toBytes(rowKey));
160      list.add(del);
161    }
162    ht2Source.delete(list);
163
164    int rowCount_ht1Source = UTIL1.countRows(ht1Source);
165    assertEquals("t1_syncup has 51 rows on source, after remove 50 of the replicated colfam", 51,
166      rowCount_ht1Source);
167
168    int rowCount_ht2Source = UTIL1.countRows(ht2Source);
169    assertEquals("t2_syncup has 101 rows on source, after remove 100 of the replicated colfam", 101,
170      rowCount_ht2Source);
171
172    shutDownSourceHBaseCluster();
173    restartTargetHBaseCluster(1);
174
175    Thread.sleep(SLEEP_TIME);
176
177    // before sync up
178    int rowCountHt1TargetAtPeer1 = UTIL2.countRows(ht1TargetAtPeer1);
179    int rowCountHt2TargetAtPeer1 = UTIL2.countRows(ht2TargetAtPeer1);
180    assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCountHt1TargetAtPeer1);
181    assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCountHt2TargetAtPeer1);
182
183    // After sync up
184    for (int i = 0; i < NB_RETRIES; i++) {
185      syncUp(UTIL1);
186      rowCountHt1TargetAtPeer1 = UTIL2.countRows(ht1TargetAtPeer1);
187      rowCountHt2TargetAtPeer1 = UTIL2.countRows(ht2TargetAtPeer1);
188      if (i == NB_RETRIES - 1) {
189        if (rowCountHt1TargetAtPeer1 != 50 || rowCountHt2TargetAtPeer1 != 100) {
190          // syncUP still failed. Let's look at the source in case anything wrong there
191          restartSourceHBaseCluster(1);
192          rowCount_ht1Source = UTIL1.countRows(ht1Source);
193          LOG.debug("t1_syncup should have 51 rows at source, and it is " + rowCount_ht1Source);
194          rowCount_ht2Source = UTIL1.countRows(ht2Source);
195          LOG.debug("t2_syncup should have 101 rows at source, and it is " + rowCount_ht2Source);
196        }
197        assertEquals("@Peer1 t1_syncup should be sync up and have 50 rows", 50,
198          rowCountHt1TargetAtPeer1);
199        assertEquals("@Peer1 t2_syncup should be sync up and have 100 rows", 100,
200          rowCountHt2TargetAtPeer1);
201      }
202      if (rowCountHt1TargetAtPeer1 == 50 && rowCountHt2TargetAtPeer1 == 100) {
203        LOG.info("SyncUpAfterDelete succeeded at retry = " + i);
204        break;
205      } else {
206        LOG.debug("SyncUpAfterDelete failed at retry = " + i + ", with rowCount_ht1TargetPeer1 =" +
207          rowCountHt1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 =" + rowCountHt2TargetAtPeer1);
208      }
209      Thread.sleep(SLEEP_TIME);
210    }
211  }
212
213  private void mimicSyncUpAfterPut() throws Exception {
214    LOG.debug("mimicSyncUpAfterPut");
215    restartSourceHBaseCluster(1);
216    shutDownTargetHBaseCluster();
217
218    Put p;
219    // another 100 + 1 row to t1_syncup
220    // we should see 100 + 2 rows now
221    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
222      p = new Put(Bytes.toBytes("row" + i));
223      p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i));
224      ht1Source.put(p);
225    }
226    p = new Put(Bytes.toBytes("row" + 9998));
227    p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9998));
228    ht1Source.put(p);
229
230    // another 200 + 1 row to t1_syncup
231    // we should see 200 + 2 rows now
232    for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) {
233      p = new Put(Bytes.toBytes("row" + i));
234      p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i));
235      ht2Source.put(p);
236    }
237    p = new Put(Bytes.toBytes("row" + 9998));
238    p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9998));
239    ht2Source.put(p);
240
241    int rowCount_ht1Source = UTIL1.countRows(ht1Source);
242    assertEquals("t1_syncup has 102 rows on source", 102, rowCount_ht1Source);
243    int rowCount_ht2Source = UTIL1.countRows(ht2Source);
244    assertEquals("t2_syncup has 202 rows on source", 202, rowCount_ht2Source);
245
246    shutDownSourceHBaseCluster();
247    restartTargetHBaseCluster(1);
248
249    Thread.sleep(SLEEP_TIME);
250
251    // before sync up
252    int rowCountHt1TargetAtPeer1 = UTIL2.countRows(ht1TargetAtPeer1);
253    int rowCountHt2TargetAtPeer1 = UTIL2.countRows(ht2TargetAtPeer1);
254    assertEquals("@Peer1 t1_syncup should be NOT sync up and have 50 rows", 50,
255      rowCountHt1TargetAtPeer1);
256    assertEquals("@Peer1 t2_syncup should be NOT sync up and have 100 rows", 100,
257      rowCountHt2TargetAtPeer1);
258
259    // after syun up
260    for (int i = 0; i < NB_RETRIES; i++) {
261      syncUp(UTIL1);
262      rowCountHt1TargetAtPeer1 = UTIL2.countRows(ht1TargetAtPeer1);
263      rowCountHt2TargetAtPeer1 = UTIL2.countRows(ht2TargetAtPeer1);
264      if (i == NB_RETRIES - 1) {
265        if (rowCountHt1TargetAtPeer1 != 100 || rowCountHt2TargetAtPeer1 != 200) {
266          // syncUP still failed. Let's look at the source in case anything wrong there
267          restartSourceHBaseCluster(1);
268          rowCount_ht1Source = UTIL1.countRows(ht1Source);
269          LOG.debug("t1_syncup should have 102 rows at source, and it is " + rowCount_ht1Source);
270          rowCount_ht2Source = UTIL1.countRows(ht2Source);
271          LOG.debug("t2_syncup should have 202 rows at source, and it is " + rowCount_ht2Source);
272        }
273        assertEquals("@Peer1 t1_syncup should be sync up and have 100 rows", 100,
274          rowCountHt1TargetAtPeer1);
275        assertEquals("@Peer1 t2_syncup should be sync up and have 200 rows", 200,
276          rowCountHt2TargetAtPeer1);
277      }
278      if (rowCountHt1TargetAtPeer1 == 100 && rowCountHt2TargetAtPeer1 == 200) {
279        LOG.info("SyncUpAfterPut succeeded at retry = " + i);
280        break;
281      } else {
282        LOG.debug("SyncUpAfterPut failed at retry = " + i + ", with rowCount_ht1TargetPeer1 =" +
283          rowCountHt1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 =" + rowCountHt2TargetAtPeer1);
284      }
285      Thread.sleep(SLEEP_TIME);
286    }
287  }
288}