001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtil.countRows;
021import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_RETRIES;
022import static org.apache.hadoop.hbase.replication.TestReplicationBase.SLEEP_TIME;
023import static org.apache.hadoop.hbase.replication.TestReplicationBase.row;
024import static org.junit.Assert.assertEquals;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Collections;
029import java.util.HashSet;
030import java.util.Iterator;
031import java.util.List;
032import java.util.Set;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.Table;
040import org.apache.hadoop.hbase.quotas.QuotaUtil;
041import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
042import org.apache.hadoop.hbase.testclassification.LargeTests;
043import org.apache.hadoop.hbase.testclassification.ReplicationTests;
044import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.apache.hadoop.hbase.util.HFileTestUtil;
047import org.junit.ClassRule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053@Category({ ReplicationTests.class, LargeTests.class })
054public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplicationSyncUpToolBase {
055
056  @ClassRule
057  public static final HBaseClassTestRule CLASS_RULE =
058    HBaseClassTestRule.forClass(TestReplicationSyncUpToolWithBulkLoadedData.class);
059
060  private static final Logger LOG =
061    LoggerFactory.getLogger(TestReplicationSyncUpToolWithBulkLoadedData.class);
062
063  @Override
064  protected void customizeClusterConf(Configuration conf) {
065    conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
066    conf.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
067    conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
068    conf.set("hbase.replication.source.fs.conf.provider",
069      TestSourceFSConfigurationProvider.class.getCanonicalName());
070  }
071
072  @Test
073  public void testSyncUpTool() throws Exception {
074    /**
075     * Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily:
076     * 'cf1' : replicated 'norep': not replicated
077     */
078    setupReplication();
079
080    /**
081     * Prepare 24 random hfile ranges required for creating hfiles
082     */
083    Iterator<String> randomHFileRangeListIterator = null;
084    Set<String> randomHFileRanges = new HashSet<>(24);
085    for (int i = 0; i < 24; i++) {
086      randomHFileRanges.add(UTIL1.getRandomUUID().toString());
087    }
088    List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges);
089    Collections.sort(randomHFileRangeList);
090    randomHFileRangeListIterator = randomHFileRangeList.iterator();
091
092    /**
093     * at Master: t1_syncup: Load 50 rows into cf1, and 50 rows from other hdfs into cf1, and 3 rows
094     * into norep t2_syncup: Load 100 rows into cf1, and 100 rows from other hdfs into cf1, and 3
095     * rows into norep verify correctly replicated to slave
096     */
097    loadAndReplicateHFiles(true, randomHFileRangeListIterator);
098
099    /**
100     * Verify hfile load works step 1: stop hbase on Slave step 2: at Master: t1_syncup: Load
101     * another 100 rows into cf1 and 3 rows into norep t2_syncup: Load another 200 rows into cf1 and
102     * 3 rows into norep step 3: stop hbase on master, restart hbase on Slave step 4: verify Slave
103     * still has the rows before load t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 step
104     * 5: run syncup tool on Master step 6: verify that hfiles show up on Slave and 'norep' does not
105     * t1_syncup: 200 rows from cf1 t2_syncup: 400 rows from cf1 verify correctly replicated to
106     * Slave
107     */
108    mimicSyncUpAfterBulkLoad(randomHFileRangeListIterator);
109
110  }
111
112  private void mimicSyncUpAfterBulkLoad(Iterator<String> randomHFileRangeListIterator)
113    throws Exception {
114    LOG.debug("mimicSyncUpAfterBulkLoad");
115    shutDownTargetHBaseCluster();
116
117    loadAndReplicateHFiles(false, randomHFileRangeListIterator);
118
119    int rowCount_ht1Source = countRows(ht1Source);
120    assertEquals("t1_syncup has 206 rows on source, after bulk load of another 103 hfiles", 206,
121      rowCount_ht1Source);
122
123    int rowCount_ht2Source = countRows(ht2Source);
124    assertEquals("t2_syncup has 406 rows on source, after bulk load of another 203 hfiles", 406,
125      rowCount_ht2Source);
126
127    shutDownSourceHBaseCluster();
128    restartTargetHBaseCluster(1);
129
130    Thread.sleep(SLEEP_TIME);
131
132    // Before sync up
133    int rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
134    int rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
135    assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCountHt1TargetAtPeer1);
136    assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCountHt2TargetAtPeer1);
137
138    // Run sync up tool
139    syncUp(UTIL1);
140
141    // After syun up
142    for (int i = 0; i < NB_RETRIES; i++) {
143      syncUp(UTIL1);
144      rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
145      rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
146      if (i == NB_RETRIES - 1) {
147        if (rowCountHt1TargetAtPeer1 != 200 || rowCountHt2TargetAtPeer1 != 400) {
148          // syncUP still failed. Let's look at the source in case anything wrong there
149          restartSourceHBaseCluster(1);
150          rowCount_ht1Source = countRows(ht1Source);
151          LOG.debug("t1_syncup should have 206 rows at source, and it is " + rowCount_ht1Source);
152          rowCount_ht2Source = countRows(ht2Source);
153          LOG.debug("t2_syncup should have 406 rows at source, and it is " + rowCount_ht2Source);
154        }
155        assertEquals("@Peer1 t1_syncup should be sync up and have 200 rows", 200,
156          rowCountHt1TargetAtPeer1);
157        assertEquals("@Peer1 t2_syncup should be sync up and have 400 rows", 400,
158          rowCountHt2TargetAtPeer1);
159      }
160      if (rowCountHt1TargetAtPeer1 == 200 && rowCountHt2TargetAtPeer1 == 400) {
161        LOG.info("SyncUpAfterBulkLoad succeeded at retry = " + i);
162        break;
163      } else {
164        LOG.debug("SyncUpAfterBulkLoad failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
165          + rowCountHt1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
166          + rowCountHt2TargetAtPeer1);
167      }
168      Thread.sleep(SLEEP_TIME);
169    }
170  }
171
172  private void loadAndReplicateHFiles(boolean verifyReplicationOnSlave,
173    Iterator<String> randomHFileRangeListIterator) throws Exception {
174    LOG.debug("loadAndReplicateHFiles");
175
176    // Load 50 + 50 + 3 hfiles to t1_syncup.
177    byte[][][] hfileRanges =
178      new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
179        Bytes.toBytes(randomHFileRangeListIterator.next()) } };
180    loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht1Source, hfileRanges, 50);
181
182    hfileRanges =
183      new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
184        Bytes.toBytes(randomHFileRangeListIterator.next()) } };
185    loadFromOtherHDFSAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht1Source,
186      hfileRanges, 50);
187
188    hfileRanges =
189      new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
190        Bytes.toBytes(randomHFileRangeListIterator.next()) } };
191    loadAndValidateHFileReplication("HFileReplication_1", row, NO_REP_FAMILY, ht1Source,
192      hfileRanges, 3);
193
194    // Load 100 + 100 + 3 hfiles to t2_syncup.
195    hfileRanges =
196      new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
197        Bytes.toBytes(randomHFileRangeListIterator.next()) } };
198    loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht2Source, hfileRanges, 100);
199
200    hfileRanges =
201      new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
202        Bytes.toBytes(randomHFileRangeListIterator.next()) } };
203    loadFromOtherHDFSAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht2Source,
204      hfileRanges, 100);
205
206    hfileRanges =
207      new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
208        Bytes.toBytes(randomHFileRangeListIterator.next()) } };
209    loadAndValidateHFileReplication("HFileReplication_1", row, NO_REP_FAMILY, ht2Source,
210      hfileRanges, 3);
211
212    if (verifyReplicationOnSlave) {
213      // ensure replication completed
214      wait(ht1TargetAtPeer1, countRows(ht1Source) - 3,
215        "t1_syncup has 103 rows on source, and 100 on slave1");
216
217      wait(ht2TargetAtPeer1, countRows(ht2Source) - 3,
218        "t2_syncup has 203 rows on source, and 200 on slave1");
219    }
220  }
221
222  private void loadAndValidateHFileReplication(String testName, byte[] row, byte[] fam,
223    Table source, byte[][][] hfileRanges, int numOfRows) throws Exception {
224    Path dir = UTIL1.getDataTestDirOnTestFS(testName);
225    FileSystem fs = UTIL1.getTestFileSystem();
226    dir = dir.makeQualified(fs);
227    Path familyDir = new Path(dir, Bytes.toString(fam));
228
229    int hfileIdx = 0;
230    for (byte[][] range : hfileRanges) {
231      byte[] from = range[0];
232      byte[] to = range[1];
233      HFileTestUtil.createHFile(UTIL1.getConfiguration(), fs,
234        new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows);
235    }
236
237    final TableName tableName = source.getName();
238    BulkLoadHFiles loader = BulkLoadHFiles.create(UTIL1.getConfiguration());
239    loader.bulkLoad(tableName, dir);
240  }
241
242  private void loadFromOtherHDFSAndValidateHFileReplication(String testName, byte[] row, byte[] fam,
243    Table source, byte[][][] hfileRanges, int numOfRows) throws Exception {
244    Path dir = UTIL2.getDataTestDirOnTestFS(testName);
245    FileSystem fs = UTIL2.getTestFileSystem();
246    dir = dir.makeQualified(fs);
247    Path familyDir = new Path(dir, Bytes.toString(fam));
248
249    int hfileIdx = 0;
250    for (byte[][] range : hfileRanges) {
251      byte[] from = range[0];
252      byte[] to = range[1];
253      HFileTestUtil.createHFile(UTIL2.getConfiguration(), fs,
254        new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows);
255    }
256
257    final TableName tableName = source.getName();
258    BulkLoadHFiles loader = BulkLoadHFiles.create(UTIL1.getConfiguration());
259    loader.bulkLoad(tableName, dir);
260  }
261
262  private void wait(Table target, int expectedCount, String msg)
263    throws IOException, InterruptedException {
264    for (int i = 0; i < NB_RETRIES; i++) {
265      int rowCountHt2TargetAtPeer1 = countRows(target);
266      if (i == NB_RETRIES - 1) {
267        assertEquals(msg, expectedCount, rowCountHt2TargetAtPeer1);
268      }
269      if (expectedCount == rowCountHt2TargetAtPeer1) {
270        break;
271      }
272      Thread.sleep(SLEEP_TIME);
273    }
274  }
275}