001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtil.countRows;
021import static org.apache.hadoop.hbase.replication.TestReplicationBaseNoBeforeAll.NB_RETRIES;
022import static org.apache.hadoop.hbase.replication.TestReplicationBaseNoBeforeAll.SLEEP_TIME;
023import static org.apache.hadoop.hbase.replication.TestReplicationBaseNoBeforeAll.row;
024import static org.junit.jupiter.api.Assertions.assertEquals;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Collections;
029import java.util.HashSet;
030import java.util.Iterator;
031import java.util.List;
032import java.util.Set;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.Table;
040import org.apache.hadoop.hbase.quotas.QuotaUtil;
041import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
042import org.apache.hadoop.hbase.testclassification.LargeTests;
043import org.apache.hadoop.hbase.testclassification.ReplicationTests;
044import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.apache.hadoop.hbase.util.HFileTestUtil;
047import org.junit.jupiter.api.Tag;
048import org.junit.jupiter.api.Test;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052@Tag(ReplicationTests.TAG)
053@Tag(LargeTests.TAG)
054public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplicationSyncUpToolBase {
055
056  private static final Logger LOG =
057    LoggerFactory.getLogger(TestReplicationSyncUpToolWithBulkLoadedData.class);
058
059  @Override
060  protected void customizeClusterConf(Configuration conf) {
061    conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
062    conf.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
063    conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
064    conf.set("hbase.replication.source.fs.conf.provider",
065      TestSourceFSConfigurationProvider.class.getCanonicalName());
066  }
067
068  @Test
069  public void testSyncUpTool() throws Exception {
070    // Set up Replication:
071    // on Master and one Slave Table: t1_syncup and t2_syncup
072    // columnfamily:
073    // 'cf1' : replicated
074    // 'norep': not replicated
075    setupReplication();
076
077    // Prepare 24 random hfile ranges required for creating hfiles
078    Iterator<String> randomHFileRangeListIterator = null;
079    Set<String> randomHFileRanges = new HashSet<>(24);
080    for (int i = 0; i < 24; i++) {
081      randomHFileRanges.add(HBaseTestingUtil.getRandomUUID().toString());
082    }
083    List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges);
084    Collections.sort(randomHFileRangeList);
085    randomHFileRangeListIterator = randomHFileRangeList.iterator();
086
087    // at Master:
088    // t1_syncup: Load 50 rows into cf1, and 50 rows from other hdfs into cf1, and 3 rows into norep
089    // t2_syncup: Load 100 rows into cf1, and 100 rows from other hdfs into cf1, and 3 rows into
090    // norep
091    // verify correctly replicated to slave
092    loadAndReplicateHFiles(true, randomHFileRangeListIterator);
093
094    // Verify hfile load works
095    //
096    // step 1: stop hbase on Slave
097    //
098    // step 2: at Master:
099    // t1_syncup: Load another 100 rows into cf1 and 3 rows into norep
100    // t2_syncup: Load another 200 rows into cf1 and 3 rows into norep
101    //
102    // step 3: stop hbase on master, restart hbase on Slave
103    //
104    // step 4: verify Slave still has the rows before load
105    // t1_syncup: 100 rows from cf1
106    // t2_syncup: 200 rows from cf1
107    //
108    // step 5: run syncup tool on Master
109    //
110    // step 6: verify that hfiles show up on Slave and 'norep' does not
111    // t1_syncup: 200 rows from cf1
112    // t2_syncup: 400 rows from cf1
113    // verify correctly replicated to Slave
114    mimicSyncUpAfterBulkLoad(randomHFileRangeListIterator);
115
116  }
117
118  private void mimicSyncUpAfterBulkLoad(Iterator<String> randomHFileRangeListIterator)
119    throws Exception {
120    LOG.debug("mimicSyncUpAfterBulkLoad");
121    shutDownTargetHBaseCluster();
122
123    loadAndReplicateHFiles(false, randomHFileRangeListIterator);
124
125    int rowCount_ht1Source = countRows(ht1Source);
126    assertEquals(206, rowCount_ht1Source,
127      "t1_syncup has 206 rows on source, after bulk load of another 103 hfiles");
128
129    int rowCount_ht2Source = countRows(ht2Source);
130    assertEquals(406, rowCount_ht2Source,
131      "t2_syncup has 406 rows on source, after bulk load of another 203 hfiles");
132
133    shutDownSourceHBaseCluster();
134    restartTargetHBaseCluster(1);
135
136    Thread.sleep(SLEEP_TIME);
137
138    // Before sync up
139    int rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
140    int rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
141    assertEquals(100, rowCountHt1TargetAtPeer1, "@Peer1 t1_syncup should still have 100 rows");
142    assertEquals(200, rowCountHt2TargetAtPeer1, "@Peer1 t2_syncup should still have 200 rows");
143
144    // Run sync up tool
145    syncUp(UTIL1);
146
147    // After syun up
148    rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
149    rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
150    assertEquals(200, rowCountHt1TargetAtPeer1,
151      "@Peer1 t1_syncup should be sync up and have 200 rows");
152    assertEquals(400, rowCountHt2TargetAtPeer1,
153      "@Peer1 t2_syncup should be sync up and have 400 rows");
154  }
155
156  private void loadAndReplicateHFiles(boolean verifyReplicationOnSlave,
157    Iterator<String> randomHFileRangeListIterator) throws Exception {
158    LOG.debug("loadAndReplicateHFiles");
159
160    // Load 50 + 50 + 3 hfiles to t1_syncup.
161    byte[][][] hfileRanges =
162      new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
163        Bytes.toBytes(randomHFileRangeListIterator.next()) } };
164    loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht1Source, hfileRanges, 50);
165
166    hfileRanges =
167      new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
168        Bytes.toBytes(randomHFileRangeListIterator.next()) } };
169    loadFromOtherHDFSAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht1Source,
170      hfileRanges, 50);
171
172    hfileRanges =
173      new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
174        Bytes.toBytes(randomHFileRangeListIterator.next()) } };
175    loadAndValidateHFileReplication("HFileReplication_1", row, NO_REP_FAMILY, ht1Source,
176      hfileRanges, 3);
177
178    // Load 100 + 100 + 3 hfiles to t2_syncup.
179    hfileRanges =
180      new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
181        Bytes.toBytes(randomHFileRangeListIterator.next()) } };
182    loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht2Source, hfileRanges, 100);
183
184    hfileRanges =
185      new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
186        Bytes.toBytes(randomHFileRangeListIterator.next()) } };
187    loadFromOtherHDFSAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht2Source,
188      hfileRanges, 100);
189
190    hfileRanges =
191      new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
192        Bytes.toBytes(randomHFileRangeListIterator.next()) } };
193    loadAndValidateHFileReplication("HFileReplication_1", row, NO_REP_FAMILY, ht2Source,
194      hfileRanges, 3);
195
196    if (verifyReplicationOnSlave) {
197      // ensure replication completed
198      wait(ht1TargetAtPeer1, countRows(ht1Source) - 3,
199        "t1_syncup has 103 rows on source, and 100 on slave1");
200
201      wait(ht2TargetAtPeer1, countRows(ht2Source) - 3,
202        "t2_syncup has 203 rows on source, and 200 on slave1");
203    }
204  }
205
206  private void loadAndValidateHFileReplication(String testName, byte[] row, byte[] fam,
207    Table source, byte[][][] hfileRanges, int numOfRows) throws Exception {
208    Path dir = UTIL1.getDataTestDirOnTestFS(testName);
209    FileSystem fs = UTIL1.getTestFileSystem();
210    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
211    Path familyDir = new Path(dir, Bytes.toString(fam));
212
213    int hfileIdx = 0;
214    for (byte[][] range : hfileRanges) {
215      byte[] from = range[0];
216      byte[] to = range[1];
217      HFileTestUtil.createHFile(UTIL1.getConfiguration(), fs,
218        new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows);
219    }
220
221    final TableName tableName = source.getName();
222    BulkLoadHFiles loader = BulkLoadHFiles.create(UTIL1.getConfiguration());
223    loader.bulkLoad(tableName, dir);
224  }
225
226  private void loadFromOtherHDFSAndValidateHFileReplication(String testName, byte[] row, byte[] fam,
227    Table source, byte[][][] hfileRanges, int numOfRows) throws Exception {
228    Path dir = UTIL2.getDataTestDirOnTestFS(testName);
229    FileSystem fs = UTIL2.getTestFileSystem();
230    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
231    Path familyDir = new Path(dir, Bytes.toString(fam));
232
233    int hfileIdx = 0;
234    for (byte[][] range : hfileRanges) {
235      byte[] from = range[0];
236      byte[] to = range[1];
237      HFileTestUtil.createHFile(UTIL2.getConfiguration(), fs,
238        new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows);
239    }
240
241    final TableName tableName = source.getName();
242    BulkLoadHFiles loader = BulkLoadHFiles.create(UTIL1.getConfiguration());
243    loader.bulkLoad(tableName, dir);
244  }
245
246  private void wait(Table target, int expectedCount, String msg)
247    throws IOException, InterruptedException {
248    for (int i = 0; i < NB_RETRIES; i++) {
249      int rowCountHt2TargetAtPeer1 = countRows(target);
250      if (i == NB_RETRIES - 1) {
251        assertEquals(expectedCount, rowCountHt2TargetAtPeer1, msg);
252      }
253      if (expectedCount == rowCountHt2TargetAtPeer1) {
254        break;
255      }
256      Thread.sleep(SLEEP_TIME);
257    }
258  }
259}