001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertEquals;
021
022import java.util.ArrayList;
023import java.util.List;
024import org.apache.hadoop.hbase.HBaseClassTestRule;
025import org.apache.hadoop.hbase.HBaseTestingUtility;
026import org.apache.hadoop.hbase.HColumnDescriptor;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.HTableDescriptor;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.Admin;
031import org.apache.hadoop.hbase.client.Connection;
032import org.apache.hadoop.hbase.client.ConnectionFactory;
033import org.apache.hadoop.hbase.client.Delete;
034import org.apache.hadoop.hbase.client.Put;
035import org.apache.hadoop.hbase.client.Table;
036import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
037import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
038import org.apache.hadoop.hbase.testclassification.LargeTests;
039import org.apache.hadoop.hbase.testclassification.ReplicationTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.util.ToolRunner;
042import org.junit.Before;
043import org.junit.ClassRule;
044import org.junit.Test;
045import org.junit.experimental.categories.Category;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049@Category({ReplicationTests.class, LargeTests.class})
050public class TestReplicationSyncUpTool extends TestReplicationBase {
051
052  @ClassRule
053  public static final HBaseClassTestRule CLASS_RULE =
054      HBaseClassTestRule.forClass(TestReplicationSyncUpTool.class);
055
056  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSyncUpTool.class);
057
058  private static final TableName t1_su = TableName.valueOf("t1_syncup");
059  private static final TableName t2_su = TableName.valueOf("t2_syncup");
060
061  protected static final byte[] famName = Bytes.toBytes("cf1");
062  private static final byte[] qualName = Bytes.toBytes("q1");
063
064  protected static final byte[] noRepfamName = Bytes.toBytes("norep");
065
066  private HTableDescriptor t1_syncupSource, t1_syncupTarget;
067  private HTableDescriptor t2_syncupSource, t2_syncupTarget;
068
069  protected Table ht1Source, ht2Source, ht1TargetAtPeer1, ht2TargetAtPeer1;
070
071  @Before
072  public void setUp() throws Exception {
073
074    HColumnDescriptor fam;
075
076    t1_syncupSource = new HTableDescriptor(t1_su);
077    fam = new HColumnDescriptor(famName);
078    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
079    t1_syncupSource.addFamily(fam);
080    fam = new HColumnDescriptor(noRepfamName);
081    t1_syncupSource.addFamily(fam);
082
083    t1_syncupTarget = new HTableDescriptor(t1_su);
084    fam = new HColumnDescriptor(famName);
085    t1_syncupTarget.addFamily(fam);
086    fam = new HColumnDescriptor(noRepfamName);
087    t1_syncupTarget.addFamily(fam);
088
089    t2_syncupSource = new HTableDescriptor(t2_su);
090    fam = new HColumnDescriptor(famName);
091    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
092    t2_syncupSource.addFamily(fam);
093    fam = new HColumnDescriptor(noRepfamName);
094    t2_syncupSource.addFamily(fam);
095
096    t2_syncupTarget = new HTableDescriptor(t2_su);
097    fam = new HColumnDescriptor(famName);
098    t2_syncupTarget.addFamily(fam);
099    fam = new HColumnDescriptor(noRepfamName);
100    t2_syncupTarget.addFamily(fam);
101
102  }
103
104  /**
105   * Add a row to a table in each cluster, check it's replicated, delete it,
106   * check's gone Also check the puts and deletes are not replicated back to
107   * the originating cluster.
108   */
109  @Test
110  public void testSyncUpTool() throws Exception {
111
112    /**
113     * Set up Replication: on Master and one Slave
114     * Table: t1_syncup and t2_syncup
115     * columnfamily:
116     *    'cf1'  : replicated
117     *    'norep': not replicated
118     */
119    setupReplication();
120
121    /**
122     * at Master:
123     * t1_syncup: put 100 rows into cf1, and 1 rows into norep
124     * t2_syncup: put 200 rows into cf1, and 1 rows into norep
125     *
126     * verify correctly replicated to slave
127     */
128    putAndReplicateRows();
129
130    /**
131     * Verify delete works
132     *
133     * step 1: stop hbase on Slave
134     *
135     * step 2: at Master:
136     *  t1_syncup: delete 50 rows  from cf1
137     *  t2_syncup: delete 100 rows from cf1
138     *  no change on 'norep'
139     *
140     * step 3: stop hbase on master, restart hbase on Slave
141     *
142     * step 4: verify Slave still have the rows before delete
143     *      t1_syncup: 100 rows from cf1
144     *      t2_syncup: 200 rows from cf1
145     *
146     * step 5: run syncup tool on Master
147     *
148     * step 6: verify that delete show up on Slave
149     *      t1_syncup: 50 rows from cf1
150     *      t2_syncup: 100 rows from cf1
151     *
152     * verify correctly replicated to Slave
153     */
154    mimicSyncUpAfterDelete();
155
156    /**
157     * Verify put works
158     *
159     * step 1: stop hbase on Slave
160     *
161     * step 2: at Master:
162     *  t1_syncup: put 100 rows  from cf1
163     *  t2_syncup: put 200 rows  from cf1
164     *  and put another row on 'norep'
165     *  ATTN: put to 'cf1' will overwrite existing rows, so end count will
166     *        be 100 and 200 respectively
167     *      put to 'norep' will add a new row.
168     *
169     * step 3: stop hbase on master, restart hbase on Slave
170     *
171     * step 4: verify Slave still has the rows before put
172     *      t1_syncup: 50 rows from cf1
173     *      t2_syncup: 100 rows from cf1
174     *
175     * step 5: run syncup tool on Master
176     *
177     * step 6: verify that put show up on Slave
178     *         and 'norep' does not
179     *      t1_syncup: 100 rows from cf1
180     *      t2_syncup: 200 rows from cf1
181     *
182     * verify correctly replicated to Slave
183     */
184    mimicSyncUpAfterPut();
185
186  }
187
188  protected void setupReplication() throws Exception {
189    ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
190    ReplicationAdmin admin2 = new ReplicationAdmin(conf2);
191
192    Admin ha = utility1.getAdmin();
193    ha.createTable(t1_syncupSource);
194    ha.createTable(t2_syncupSource);
195    ha.close();
196
197    ha = utility2.getAdmin();
198    ha.createTable(t1_syncupTarget);
199    ha.createTable(t2_syncupTarget);
200    ha.close();
201
202    Connection connection1 = ConnectionFactory.createConnection(utility1.getConfiguration());
203    Connection connection2 = ConnectionFactory.createConnection(utility2.getConfiguration());
204
205    // Get HTable from Master
206    ht1Source = connection1.getTable(t1_su);
207    ht2Source = connection1.getTable(t2_su);
208
209    // Get HTable from Peer1
210    ht1TargetAtPeer1 = connection2.getTable(t1_su);
211    ht2TargetAtPeer1 = connection2.getTable(t2_su);
212
213    /**
214     * set M-S : Master: utility1 Slave1: utility2
215     */
216    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
217    rpc.setClusterKey(utility2.getClusterKey());
218    admin1.addPeer("1", rpc, null);
219
220    admin1.close();
221    admin2.close();
222  }
223
224  private void putAndReplicateRows() throws Exception {
225    LOG.debug("putAndReplicateRows");
226    // add rows to Master cluster,
227    Put p;
228
229    // 100 + 1 row to t1_syncup
230    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
231      p = new Put(Bytes.toBytes("row" + i));
232      p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
233      ht1Source.put(p);
234    }
235    p = new Put(Bytes.toBytes("row" + 9999));
236    p.addColumn(noRepfamName, qualName, Bytes.toBytes("val" + 9999));
237    ht1Source.put(p);
238
239    // 200 + 1 row to t2_syncup
240    for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) {
241      p = new Put(Bytes.toBytes("row" + i));
242      p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
243      ht2Source.put(p);
244    }
245    p = new Put(Bytes.toBytes("row" + 9999));
246    p.addColumn(noRepfamName, qualName, Bytes.toBytes("val" + 9999));
247    ht2Source.put(p);
248
249    // ensure replication completed
250    Thread.sleep(SLEEP_TIME);
251    int rowCount_ht1Source = utility1.countRows(ht1Source);
252    for (int i = 0; i < NB_RETRIES; i++) {
253      int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
254      if (i==NB_RETRIES-1) {
255        assertEquals("t1_syncup has 101 rows on source, and 100 on slave1", rowCount_ht1Source - 1,
256            rowCount_ht1TargetAtPeer1);
257      }
258      if (rowCount_ht1Source - 1 == rowCount_ht1TargetAtPeer1) {
259        break;
260      }
261      Thread.sleep(SLEEP_TIME);
262    }
263
264    int rowCount_ht2Source = utility1.countRows(ht2Source);
265    for (int i = 0; i < NB_RETRIES; i++) {
266      int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
267      if (i==NB_RETRIES-1) {
268        assertEquals("t2_syncup has 201 rows on source, and 200 on slave1", rowCount_ht2Source - 1,
269            rowCount_ht2TargetAtPeer1);
270      }
271      if (rowCount_ht2Source - 1 == rowCount_ht2TargetAtPeer1) {
272        break;
273      }
274      Thread.sleep(SLEEP_TIME);
275    }
276  }
277
278  private void mimicSyncUpAfterDelete() throws Exception {
279    LOG.debug("mimicSyncUpAfterDelete");
280    utility2.shutdownMiniHBaseCluster();
281
282    List<Delete> list = new ArrayList<>();
283    // delete half of the rows
284    for (int i = 0; i < NB_ROWS_IN_BATCH / 2; i++) {
285      String rowKey = "row" + i;
286      Delete del = new Delete(Bytes.toBytes(rowKey));
287      list.add(del);
288    }
289    ht1Source.delete(list);
290
291    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
292      String rowKey = "row" + i;
293      Delete del = new Delete(Bytes.toBytes(rowKey));
294      list.add(del);
295    }
296    ht2Source.delete(list);
297
298    int rowCount_ht1Source = utility1.countRows(ht1Source);
299    assertEquals("t1_syncup has 51 rows on source, after remove 50 of the replicated colfam", 51,
300      rowCount_ht1Source);
301
302    int rowCount_ht2Source = utility1.countRows(ht2Source);
303    assertEquals("t2_syncup has 101 rows on source, after remove 100 of the replicated colfam",
304      101, rowCount_ht2Source);
305
306    utility1.shutdownMiniHBaseCluster();
307    utility2.restartHBaseCluster(1);
308
309    Thread.sleep(SLEEP_TIME);
310
311    // before sync up
312    int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
313    int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
314    assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCount_ht1TargetAtPeer1);
315    assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCount_ht2TargetAtPeer1);
316
317    // After sync up
318    for (int i = 0; i < NB_RETRIES; i++) {
319      syncUp(utility1);
320      rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
321      rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
322      if (i == NB_RETRIES - 1) {
323        if (rowCount_ht1TargetAtPeer1 != 50 || rowCount_ht2TargetAtPeer1 != 100) {
324          // syncUP still failed. Let's look at the source in case anything wrong there
325          utility1.restartHBaseCluster(1);
326          rowCount_ht1Source = utility1.countRows(ht1Source);
327          LOG.debug("t1_syncup should have 51 rows at source, and it is " + rowCount_ht1Source);
328          rowCount_ht2Source = utility1.countRows(ht2Source);
329          LOG.debug("t2_syncup should have 101 rows at source, and it is " + rowCount_ht2Source);
330        }
331        assertEquals("@Peer1 t1_syncup should be sync up and have 50 rows", 50,
332          rowCount_ht1TargetAtPeer1);
333        assertEquals("@Peer1 t2_syncup should be sync up and have 100 rows", 100,
334          rowCount_ht2TargetAtPeer1);
335      }
336      if (rowCount_ht1TargetAtPeer1 == 50 && rowCount_ht2TargetAtPeer1 == 100) {
337        LOG.info("SyncUpAfterDelete succeeded at retry = " + i);
338        break;
339      } else {
340        LOG.debug("SyncUpAfterDelete failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
341            + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
342            + rowCount_ht2TargetAtPeer1);
343      }
344      Thread.sleep(SLEEP_TIME);
345    }
346  }
347
348  private void mimicSyncUpAfterPut() throws Exception {
349    LOG.debug("mimicSyncUpAfterPut");
350    utility1.restartHBaseCluster(1);
351    utility2.shutdownMiniHBaseCluster();
352
353    Put p;
354    // another 100 + 1 row to t1_syncup
355    // we should see 100 + 2 rows now
356    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
357      p = new Put(Bytes.toBytes("row" + i));
358      p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
359      ht1Source.put(p);
360    }
361    p = new Put(Bytes.toBytes("row" + 9998));
362    p.addColumn(noRepfamName, qualName, Bytes.toBytes("val" + 9998));
363    ht1Source.put(p);
364
365    // another 200 + 1 row to t1_syncup
366    // we should see 200 + 2 rows now
367    for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) {
368      p = new Put(Bytes.toBytes("row" + i));
369      p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
370      ht2Source.put(p);
371    }
372    p = new Put(Bytes.toBytes("row" + 9998));
373    p.addColumn(noRepfamName, qualName, Bytes.toBytes("val" + 9998));
374    ht2Source.put(p);
375
376    int rowCount_ht1Source = utility1.countRows(ht1Source);
377    assertEquals("t1_syncup has 102 rows on source", 102, rowCount_ht1Source);
378    int rowCount_ht2Source = utility1.countRows(ht2Source);
379    assertEquals("t2_syncup has 202 rows on source", 202, rowCount_ht2Source);
380
381    utility1.shutdownMiniHBaseCluster();
382    utility2.restartHBaseCluster(1);
383
384    Thread.sleep(SLEEP_TIME);
385
386    // before sync up
387    int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
388    int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
389    assertEquals("@Peer1 t1_syncup should be NOT sync up and have 50 rows", 50,
390      rowCount_ht1TargetAtPeer1);
391    assertEquals("@Peer1 t2_syncup should be NOT sync up and have 100 rows", 100,
392      rowCount_ht2TargetAtPeer1);
393
394    // after syun up
395    for (int i = 0; i < NB_RETRIES; i++) {
396      syncUp(utility1);
397      rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
398      rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
399      if (i == NB_RETRIES - 1) {
400        if (rowCount_ht1TargetAtPeer1 != 100 || rowCount_ht2TargetAtPeer1 != 200) {
401          // syncUP still failed. Let's look at the source in case anything wrong there
402          utility1.restartHBaseCluster(1);
403          rowCount_ht1Source = utility1.countRows(ht1Source);
404          LOG.debug("t1_syncup should have 102 rows at source, and it is " + rowCount_ht1Source);
405          rowCount_ht2Source = utility1.countRows(ht2Source);
406          LOG.debug("t2_syncup should have 202 rows at source, and it is " + rowCount_ht2Source);
407        }
408        assertEquals("@Peer1 t1_syncup should be sync up and have 100 rows", 100,
409          rowCount_ht1TargetAtPeer1);
410        assertEquals("@Peer1 t2_syncup should be sync up and have 200 rows", 200,
411          rowCount_ht2TargetAtPeer1);
412      }
413      if (rowCount_ht1TargetAtPeer1 == 100 && rowCount_ht2TargetAtPeer1 == 200) {
414        LOG.info("SyncUpAfterPut succeeded at retry = " + i);
415        break;
416      } else {
417        LOG.debug("SyncUpAfterPut failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
418            + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
419            + rowCount_ht2TargetAtPeer1);
420      }
421      Thread.sleep(SLEEP_TIME);
422    }
423  }
424
425  protected void syncUp(HBaseTestingUtility ut) throws Exception {
426    ToolRunner.run(ut.getConfiguration(), new ReplicationSyncUp(), new String[0]);
427  }
428}