001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Collections;
025import java.util.HashSet;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Set;
029import java.util.UUID;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.Table;
036import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
037import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
038import org.apache.hadoop.hbase.testclassification.LargeTests;
039import org.apache.hadoop.hbase.testclassification.ReplicationTests;
040import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.hbase.util.HFileTestUtil;
043import org.junit.BeforeClass;
044import org.junit.ClassRule;
045import org.junit.experimental.categories.Category;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049@Category({ ReplicationTests.class, LargeTests.class })
050public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplicationSyncUpTool {
051
052  @ClassRule
053  public static final HBaseClassTestRule CLASS_RULE =
054      HBaseClassTestRule.forClass(TestReplicationSyncUpToolWithBulkLoadedData.class);
055
056  private static final Logger LOG = LoggerFactory
057      .getLogger(TestReplicationSyncUpToolWithBulkLoadedData.class);
058
059  @BeforeClass
060  public static void setUpBeforeClass() throws Exception {
061    conf1.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
062    conf1.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
063    conf1.set("hbase.replication.source.fs.conf.provider",
064      TestSourceFSConfigurationProvider.class.getCanonicalName());
065    String classes = conf1.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
066    if (!classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint")) {
067      classes = classes + ",org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint";
068      conf1.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, classes);
069    }
070
071    TestReplicationBase.setUpBeforeClass();
072  }
073
074  @Override
075  public void testSyncUpTool() throws Exception {
076    /**
077     * Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily:
078     * 'cf1' : replicated 'norep': not replicated
079     */
080    setupReplication();
081
082    /**
083     * Prepare 16 random hfile ranges required for creating hfiles
084     */
085    Iterator<String> randomHFileRangeListIterator = null;
086    Set<String> randomHFileRanges = new HashSet<>(16);
087    for (int i = 0; i < 16; i++) {
088      randomHFileRanges.add(UUID.randomUUID().toString());
089    }
090    List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges);
091    Collections.sort(randomHFileRangeList);
092    randomHFileRangeListIterator = randomHFileRangeList.iterator();
093
094    /**
095     * at Master: t1_syncup: Load 100 rows into cf1, and 3 rows into norep t2_syncup: Load 200 rows
096     * into cf1, and 3 rows into norep verify correctly replicated to slave
097     */
098    loadAndReplicateHFiles(true, randomHFileRangeListIterator);
099
100    /**
101     * Verify hfile load works step 1: stop hbase on Slave step 2: at Master: t1_syncup: Load
102     * another 100 rows into cf1 and 3 rows into norep t2_syncup: Load another 200 rows into cf1 and
103     * 3 rows into norep step 3: stop hbase on master, restart hbase on Slave step 4: verify Slave
104     * still has the rows before load t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 step
105     * 5: run syncup tool on Master step 6: verify that hfiles show up on Slave and 'norep' does not
106     * t1_syncup: 200 rows from cf1 t2_syncup: 400 rows from cf1 verify correctly replicated to
107     * Slave
108     */
109    mimicSyncUpAfterBulkLoad(randomHFileRangeListIterator);
110
111  }
112
113  private void mimicSyncUpAfterBulkLoad(Iterator<String> randomHFileRangeListIterator)
114      throws Exception {
115    LOG.debug("mimicSyncUpAfterBulkLoad");
116    utility2.shutdownMiniHBaseCluster();
117
118    loadAndReplicateHFiles(false, randomHFileRangeListIterator);
119
120    int rowCount_ht1Source = utility1.countRows(ht1Source);
121    assertEquals("t1_syncup has 206 rows on source, after bulk load of another 103 hfiles", 206,
122      rowCount_ht1Source);
123
124    int rowCount_ht2Source = utility1.countRows(ht2Source);
125    assertEquals("t2_syncup has 406 rows on source, after bulk load of another 203 hfiles", 406,
126      rowCount_ht2Source);
127
128    utility1.shutdownMiniHBaseCluster();
129    utility2.restartHBaseCluster(1);
130
131    Thread.sleep(SLEEP_TIME);
132
133    // Before sync up
134    int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
135    int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
136    assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCount_ht1TargetAtPeer1);
137    assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCount_ht2TargetAtPeer1);
138
139    // Run sync up tool
140    syncUp(utility1);
141
142    // After syun up
143    for (int i = 0; i < NB_RETRIES; i++) {
144      syncUp(utility1);
145      rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
146      rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
147      if (i == NB_RETRIES - 1) {
148        if (rowCount_ht1TargetAtPeer1 != 200 || rowCount_ht2TargetAtPeer1 != 400) {
149          // syncUP still failed. Let's look at the source in case anything wrong there
150          utility1.restartHBaseCluster(1);
151          rowCount_ht1Source = utility1.countRows(ht1Source);
152          LOG.debug("t1_syncup should have 206 rows at source, and it is " + rowCount_ht1Source);
153          rowCount_ht2Source = utility1.countRows(ht2Source);
154          LOG.debug("t2_syncup should have 406 rows at source, and it is " + rowCount_ht2Source);
155        }
156        assertEquals("@Peer1 t1_syncup should be sync up and have 200 rows", 200,
157          rowCount_ht1TargetAtPeer1);
158        assertEquals("@Peer1 t2_syncup should be sync up and have 400 rows", 400,
159          rowCount_ht2TargetAtPeer1);
160      }
161      if (rowCount_ht1TargetAtPeer1 == 200 && rowCount_ht2TargetAtPeer1 == 400) {
162        LOG.info("SyncUpAfterBulkLoad succeeded at retry = " + i);
163        break;
164      } else {
165        LOG.debug("SyncUpAfterBulkLoad failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
166            + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
167            + rowCount_ht2TargetAtPeer1);
168      }
169      Thread.sleep(SLEEP_TIME);
170    }
171  }
172
173  private void loadAndReplicateHFiles(boolean verifyReplicationOnSlave,
174      Iterator<String> randomHFileRangeListIterator) throws Exception {
175    LOG.debug("loadAndReplicateHFiles");
176
177    // Load 100 + 3 hfiles to t1_syncup.
178    byte[][][] hfileRanges =
179        new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
180            Bytes.toBytes(randomHFileRangeListIterator.next()) } };
181    loadAndValidateHFileReplication("HFileReplication_1", row, famName, ht1Source, hfileRanges,
182      100);
183
184    hfileRanges =
185        new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
186            Bytes.toBytes(randomHFileRangeListIterator.next()) } };
187    loadAndValidateHFileReplication("HFileReplication_1", row, noRepfamName, ht1Source,
188      hfileRanges, 3);
189
190    // Load 200 + 3 hfiles to t2_syncup.
191    hfileRanges =
192        new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
193            Bytes.toBytes(randomHFileRangeListIterator.next()) } };
194    loadAndValidateHFileReplication("HFileReplication_1", row, famName, ht2Source, hfileRanges,
195      200);
196
197    hfileRanges =
198        new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
199            Bytes.toBytes(randomHFileRangeListIterator.next()) } };
200    loadAndValidateHFileReplication("HFileReplication_1", row, noRepfamName, ht2Source,
201      hfileRanges, 3);
202
203    if (verifyReplicationOnSlave) {
204      // ensure replication completed
205      wait(ht1TargetAtPeer1, utility1.countRows(ht1Source) - 3,
206        "t1_syncup has 103 rows on source, and 100 on slave1");
207
208      wait(ht2TargetAtPeer1, utility1.countRows(ht2Source) - 3,
209        "t2_syncup has 203 rows on source, and 200 on slave1");
210    }
211  }
212
213  private void loadAndValidateHFileReplication(String testName, byte[] row, byte[] fam,
214      Table source, byte[][][] hfileRanges, int numOfRows) throws Exception {
215    Path dir = utility1.getDataTestDirOnTestFS(testName);
216    FileSystem fs = utility1.getTestFileSystem();
217    dir = dir.makeQualified(fs);
218    Path familyDir = new Path(dir, Bytes.toString(fam));
219
220    int hfileIdx = 0;
221    for (byte[][] range : hfileRanges) {
222      byte[] from = range[0];
223      byte[] to = range[1];
224      HFileTestUtil.createHFile(utility1.getConfiguration(), fs, new Path(familyDir, "hfile_"
225          + hfileIdx++), fam, row, from, to, numOfRows);
226    }
227
228    final TableName tableName = source.getName();
229    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(utility1.getConfiguration());
230    String[] args = { dir.toString(), tableName.toString() };
231    loader.run(args);
232  }
233
234  private void wait(Table target, int expectedCount, String msg) throws IOException,
235      InterruptedException {
236    for (int i = 0; i < NB_RETRIES; i++) {
237      int rowCount_ht2TargetAtPeer1 = utility2.countRows(target);
238      if (i == NB_RETRIES - 1) {
239        assertEquals(msg, expectedCount, rowCount_ht2TargetAtPeer1);
240      }
241      if (expectedCount == rowCount_ht2TargetAtPeer1) {
242        break;
243      }
244      Thread.sleep(SLEEP_TIME);
245    }
246  }
247}