001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertEquals;
021
022import java.util.ArrayList;
023import java.util.List;
024import org.apache.hadoop.hbase.HBaseClassTestRule;
025import org.apache.hadoop.hbase.HBaseTestingUtility;
026import org.apache.hadoop.hbase.HColumnDescriptor;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.HTableDescriptor;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.Admin;
031import org.apache.hadoop.hbase.client.Connection;
032import org.apache.hadoop.hbase.client.ConnectionFactory;
033import org.apache.hadoop.hbase.client.Delete;
034import org.apache.hadoop.hbase.client.Put;
035import org.apache.hadoop.hbase.client.Table;
036import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
037import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
038import org.apache.hadoop.hbase.testclassification.LargeTests;
039import org.apache.hadoop.hbase.testclassification.ReplicationTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.util.ToolRunner;
042import org.junit.After;
043import org.junit.Before;
044import org.junit.ClassRule;
045import org.junit.Test;
046import org.junit.experimental.categories.Category;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050@Category({ ReplicationTests.class, LargeTests.class })
051public class TestReplicationSyncUpTool extends TestReplicationBase {
052
053  @ClassRule
054  public static final HBaseClassTestRule CLASS_RULE =
055      HBaseClassTestRule.forClass(TestReplicationSyncUpTool.class);
056
057  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSyncUpTool.class);
058
059  private static final TableName t1_su = TableName.valueOf("t1_syncup");
060  private static final TableName t2_su = TableName.valueOf("t2_syncup");
061
062  protected static final byte[] famName = Bytes.toBytes("cf1");
063  private static final byte[] qualName = Bytes.toBytes("q1");
064
065  protected static final byte[] noRepfamName = Bytes.toBytes("norep");
066
067  private HTableDescriptor t1_syncupSource, t1_syncupTarget;
068  private HTableDescriptor t2_syncupSource, t2_syncupTarget;
069
070  protected Table ht1Source, ht2Source, ht1TargetAtPeer1, ht2TargetAtPeer1;
071
072  @Before
073  public void setUp() throws Exception {
074    HColumnDescriptor fam;
075
076    t1_syncupSource = new HTableDescriptor(t1_su);
077    fam = new HColumnDescriptor(famName);
078    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
079    t1_syncupSource.addFamily(fam);
080    fam = new HColumnDescriptor(noRepfamName);
081    t1_syncupSource.addFamily(fam);
082
083    t1_syncupTarget = new HTableDescriptor(t1_su);
084    fam = new HColumnDescriptor(famName);
085    t1_syncupTarget.addFamily(fam);
086    fam = new HColumnDescriptor(noRepfamName);
087    t1_syncupTarget.addFamily(fam);
088
089    t2_syncupSource = new HTableDescriptor(t2_su);
090    fam = new HColumnDescriptor(famName);
091    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
092    t2_syncupSource.addFamily(fam);
093    fam = new HColumnDescriptor(noRepfamName);
094    t2_syncupSource.addFamily(fam);
095
096    t2_syncupTarget = new HTableDescriptor(t2_su);
097    fam = new HColumnDescriptor(famName);
098    t2_syncupTarget.addFamily(fam);
099    fam = new HColumnDescriptor(noRepfamName);
100    t2_syncupTarget.addFamily(fam);
101  }
102
103  @After
104  public void tearDownBase() throws Exception {
105    // Do nothing, just replace the super tearDown. because the super tearDown will use the
106    // out-of-data HBase admin to remove replication peer, which will be result in failure.
107  }
108
109  /**
110   * Add a row to a table in each cluster, check it's replicated, delete it,
111   * check's gone Also check the puts and deletes are not replicated back to
112   * the originating cluster.
113   */
114  @Test
115  public void testSyncUpTool() throws Exception {
116
117    /**
118     * Set up Replication: on Master and one Slave
119     * Table: t1_syncup and t2_syncup
120     * columnfamily:
121     *    'cf1'  : replicated
122     *    'norep': not replicated
123     */
124    setupReplication();
125
126    /**
127     * at Master:
128     * t1_syncup: put 100 rows into cf1, and 1 rows into norep
129     * t2_syncup: put 200 rows into cf1, and 1 rows into norep
130     *
131     * verify correctly replicated to slave
132     */
133    putAndReplicateRows();
134
135    /**
136     * Verify delete works
137     *
138     * step 1: stop hbase on Slave
139     *
140     * step 2: at Master:
141     *  t1_syncup: delete 50 rows  from cf1
142     *  t2_syncup: delete 100 rows from cf1
143     *  no change on 'norep'
144     *
145     * step 3: stop hbase on master, restart hbase on Slave
146     *
147     * step 4: verify Slave still have the rows before delete
148     *      t1_syncup: 100 rows from cf1
149     *      t2_syncup: 200 rows from cf1
150     *
151     * step 5: run syncup tool on Master
152     *
153     * step 6: verify that delete show up on Slave
154     *      t1_syncup: 50 rows from cf1
155     *      t2_syncup: 100 rows from cf1
156     *
157     * verify correctly replicated to Slave
158     */
159    mimicSyncUpAfterDelete();
160
161    /**
162     * Verify put works
163     *
164     * step 1: stop hbase on Slave
165     *
166     * step 2: at Master:
167     *  t1_syncup: put 100 rows  from cf1
168     *  t2_syncup: put 200 rows  from cf1
169     *  and put another row on 'norep'
170     *  ATTN: put to 'cf1' will overwrite existing rows, so end count will
171     *        be 100 and 200 respectively
172     *      put to 'norep' will add a new row.
173     *
174     * step 3: stop hbase on master, restart hbase on Slave
175     *
176     * step 4: verify Slave still has the rows before put
177     *      t1_syncup: 50 rows from cf1
178     *      t2_syncup: 100 rows from cf1
179     *
180     * step 5: run syncup tool on Master
181     *
182     * step 6: verify that put show up on Slave
183     *         and 'norep' does not
184     *      t1_syncup: 100 rows from cf1
185     *      t2_syncup: 200 rows from cf1
186     *
187     * verify correctly replicated to Slave
188     */
189    mimicSyncUpAfterPut();
190  }
191
192  protected void setupReplication() throws Exception {
193    ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
194    ReplicationAdmin admin2 = new ReplicationAdmin(conf2);
195
196    Admin ha = utility1.getAdmin();
197    ha.createTable(t1_syncupSource);
198    ha.createTable(t2_syncupSource);
199    ha.close();
200
201    ha = utility2.getAdmin();
202    ha.createTable(t1_syncupTarget);
203    ha.createTable(t2_syncupTarget);
204    ha.close();
205
206    Connection connection1 = ConnectionFactory.createConnection(utility1.getConfiguration());
207    Connection connection2 = ConnectionFactory.createConnection(utility2.getConfiguration());
208
209    // Get HTable from Master
210    ht1Source = connection1.getTable(t1_su);
211    ht2Source = connection1.getTable(t2_su);
212
213    // Get HTable from Peer1
214    ht1TargetAtPeer1 = connection2.getTable(t1_su);
215    ht2TargetAtPeer1 = connection2.getTable(t2_su);
216
217    /**
218     * set M-S : Master: utility1 Slave1: utility2
219     */
220    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
221    rpc.setClusterKey(utility2.getClusterKey());
222    admin1.addPeer("1", rpc, null);
223
224    admin1.close();
225    admin2.close();
226  }
227
228  private void putAndReplicateRows() throws Exception {
229    LOG.debug("putAndReplicateRows");
230    // add rows to Master cluster,
231    Put p;
232
233    // 100 + 1 row to t1_syncup
234    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
235      p = new Put(Bytes.toBytes("row" + i));
236      p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
237      ht1Source.put(p);
238    }
239    p = new Put(Bytes.toBytes("row" + 9999));
240    p.addColumn(noRepfamName, qualName, Bytes.toBytes("val" + 9999));
241    ht1Source.put(p);
242
243    // 200 + 1 row to t2_syncup
244    for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) {
245      p = new Put(Bytes.toBytes("row" + i));
246      p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
247      ht2Source.put(p);
248    }
249    p = new Put(Bytes.toBytes("row" + 9999));
250    p.addColumn(noRepfamName, qualName, Bytes.toBytes("val" + 9999));
251    ht2Source.put(p);
252
253    // ensure replication completed
254    Thread.sleep(SLEEP_TIME);
255    int rowCount_ht1Source = utility1.countRows(ht1Source);
256    for (int i = 0; i < NB_RETRIES; i++) {
257      int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
258      if (i==NB_RETRIES-1) {
259        assertEquals("t1_syncup has 101 rows on source, and 100 on slave1", rowCount_ht1Source - 1,
260            rowCount_ht1TargetAtPeer1);
261      }
262      if (rowCount_ht1Source - 1 == rowCount_ht1TargetAtPeer1) {
263        break;
264      }
265      Thread.sleep(SLEEP_TIME);
266    }
267
268    int rowCount_ht2Source = utility1.countRows(ht2Source);
269    for (int i = 0; i < NB_RETRIES; i++) {
270      int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
271      if (i==NB_RETRIES-1) {
272        assertEquals("t2_syncup has 201 rows on source, and 200 on slave1", rowCount_ht2Source - 1,
273            rowCount_ht2TargetAtPeer1);
274      }
275      if (rowCount_ht2Source - 1 == rowCount_ht2TargetAtPeer1) {
276        break;
277      }
278      Thread.sleep(SLEEP_TIME);
279    }
280  }
281
282  private void mimicSyncUpAfterDelete() throws Exception {
283    LOG.debug("mimicSyncUpAfterDelete");
284    utility2.shutdownMiniHBaseCluster();
285
286    List<Delete> list = new ArrayList<>();
287    // delete half of the rows
288    for (int i = 0; i < NB_ROWS_IN_BATCH / 2; i++) {
289      String rowKey = "row" + i;
290      Delete del = new Delete(Bytes.toBytes(rowKey));
291      list.add(del);
292    }
293    ht1Source.delete(list);
294
295    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
296      String rowKey = "row" + i;
297      Delete del = new Delete(Bytes.toBytes(rowKey));
298      list.add(del);
299    }
300    ht2Source.delete(list);
301
302    int rowCount_ht1Source = utility1.countRows(ht1Source);
303    assertEquals("t1_syncup has 51 rows on source, after remove 50 of the replicated colfam", 51,
304      rowCount_ht1Source);
305
306    int rowCount_ht2Source = utility1.countRows(ht2Source);
307    assertEquals("t2_syncup has 101 rows on source, after remove 100 of the replicated colfam",
308      101, rowCount_ht2Source);
309
310    utility1.shutdownMiniHBaseCluster();
311    utility2.restartHBaseCluster(1);
312
313    Thread.sleep(SLEEP_TIME);
314
315    // before sync up
316    int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
317    int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
318    assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCount_ht1TargetAtPeer1);
319    assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCount_ht2TargetAtPeer1);
320
321    // After sync up
322    for (int i = 0; i < NB_RETRIES; i++) {
323      syncUp(utility1);
324      rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
325      rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
326      if (i == NB_RETRIES - 1) {
327        if (rowCount_ht1TargetAtPeer1 != 50 || rowCount_ht2TargetAtPeer1 != 100) {
328          // syncUP still failed. Let's look at the source in case anything wrong there
329          utility1.restartHBaseCluster(1);
330          rowCount_ht1Source = utility1.countRows(ht1Source);
331          LOG.debug("t1_syncup should have 51 rows at source, and it is " + rowCount_ht1Source);
332          rowCount_ht2Source = utility1.countRows(ht2Source);
333          LOG.debug("t2_syncup should have 101 rows at source, and it is " + rowCount_ht2Source);
334        }
335        assertEquals("@Peer1 t1_syncup should be sync up and have 50 rows", 50,
336          rowCount_ht1TargetAtPeer1);
337        assertEquals("@Peer1 t2_syncup should be sync up and have 100 rows", 100,
338          rowCount_ht2TargetAtPeer1);
339      }
340      if (rowCount_ht1TargetAtPeer1 == 50 && rowCount_ht2TargetAtPeer1 == 100) {
341        LOG.info("SyncUpAfterDelete succeeded at retry = " + i);
342        break;
343      } else {
344        LOG.debug("SyncUpAfterDelete failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
345            + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
346            + rowCount_ht2TargetAtPeer1);
347      }
348      Thread.sleep(SLEEP_TIME);
349    }
350  }
351
352  private void mimicSyncUpAfterPut() throws Exception {
353    LOG.debug("mimicSyncUpAfterPut");
354    utility1.restartHBaseCluster(1);
355    utility2.shutdownMiniHBaseCluster();
356
357    Put p;
358    // another 100 + 1 row to t1_syncup
359    // we should see 100 + 2 rows now
360    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
361      p = new Put(Bytes.toBytes("row" + i));
362      p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
363      ht1Source.put(p);
364    }
365    p = new Put(Bytes.toBytes("row" + 9998));
366    p.addColumn(noRepfamName, qualName, Bytes.toBytes("val" + 9998));
367    ht1Source.put(p);
368
369    // another 200 + 1 row to t1_syncup
370    // we should see 200 + 2 rows now
371    for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) {
372      p = new Put(Bytes.toBytes("row" + i));
373      p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
374      ht2Source.put(p);
375    }
376    p = new Put(Bytes.toBytes("row" + 9998));
377    p.addColumn(noRepfamName, qualName, Bytes.toBytes("val" + 9998));
378    ht2Source.put(p);
379
380    int rowCount_ht1Source = utility1.countRows(ht1Source);
381    assertEquals("t1_syncup has 102 rows on source", 102, rowCount_ht1Source);
382    int rowCount_ht2Source = utility1.countRows(ht2Source);
383    assertEquals("t2_syncup has 202 rows on source", 202, rowCount_ht2Source);
384
385    utility1.shutdownMiniHBaseCluster();
386    utility2.restartHBaseCluster(1);
387
388    Thread.sleep(SLEEP_TIME);
389
390    // before sync up
391    int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
392    int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
393    assertEquals("@Peer1 t1_syncup should be NOT sync up and have 50 rows", 50,
394      rowCount_ht1TargetAtPeer1);
395    assertEquals("@Peer1 t2_syncup should be NOT sync up and have 100 rows", 100,
396      rowCount_ht2TargetAtPeer1);
397
398    // after syun up
399    for (int i = 0; i < NB_RETRIES; i++) {
400      syncUp(utility1);
401      rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
402      rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
403      if (i == NB_RETRIES - 1) {
404        if (rowCount_ht1TargetAtPeer1 != 100 || rowCount_ht2TargetAtPeer1 != 200) {
405          // syncUP still failed. Let's look at the source in case anything wrong there
406          utility1.restartHBaseCluster(1);
407          rowCount_ht1Source = utility1.countRows(ht1Source);
408          LOG.debug("t1_syncup should have 102 rows at source, and it is " + rowCount_ht1Source);
409          rowCount_ht2Source = utility1.countRows(ht2Source);
410          LOG.debug("t2_syncup should have 202 rows at source, and it is " + rowCount_ht2Source);
411        }
412        assertEquals("@Peer1 t1_syncup should be sync up and have 100 rows", 100,
413          rowCount_ht1TargetAtPeer1);
414        assertEquals("@Peer1 t2_syncup should be sync up and have 200 rows", 200,
415          rowCount_ht2TargetAtPeer1);
416      }
417      if (rowCount_ht1TargetAtPeer1 == 100 && rowCount_ht2TargetAtPeer1 == 200) {
418        LOG.info("SyncUpAfterPut succeeded at retry = " + i);
419        break;
420      } else {
421        LOG.debug("SyncUpAfterPut failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
422            + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
423            + rowCount_ht2TargetAtPeer1);
424      }
425      Thread.sleep(SLEEP_TIME);
426    }
427  }
428
429  protected void syncUp(HBaseTestingUtility ut) throws Exception {
430    ToolRunner.run(ut.getConfiguration(), new ReplicationSyncUp(), new String[0]);
431  }
432}