001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_RETRIES;
021import static org.apache.hadoop.hbase.replication.TestReplicationBase.SLEEP_TIME;
022import static org.apache.hadoop.hbase.replication.TestReplicationBase.row;
023import static org.junit.Assert.assertEquals;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.Collections;
028import java.util.HashSet;
029import java.util.Iterator;
030import java.util.List;
031import java.util.Set;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.Table;
039import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
040import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
041import org.apache.hadoop.hbase.testclassification.LargeTests;
042import org.apache.hadoop.hbase.testclassification.ReplicationTests;
043import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.apache.hadoop.hbase.util.HFileTestUtil;
046import org.junit.ClassRule;
047import org.junit.Test;
048import org.junit.experimental.categories.Category;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052@Category({ ReplicationTests.class, LargeTests.class })
053public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplicationSyncUpToolBase {
054
055  @ClassRule
056  public static final HBaseClassTestRule CLASS_RULE =
057    HBaseClassTestRule.forClass(TestReplicationSyncUpToolWithBulkLoadedData.class);
058
059  private static final Logger LOG =
060    LoggerFactory.getLogger(TestReplicationSyncUpToolWithBulkLoadedData.class);
061
062  @Override
063  protected void customizeClusterConf(Configuration conf) {
064    conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
065    conf.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
066    conf.set("hbase.replication.source.fs.conf.provider",
067      TestSourceFSConfigurationProvider.class.getCanonicalName());
068    String classes = conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
069    if (!classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint")) {
070      classes = classes + ",org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint";
071      conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, classes);
072    }
073  }
074
075  @Test
076  public void testSyncUpTool() throws Exception {
077    /**
078     * Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily:
079     * 'cf1' : replicated 'norep': not replicated
080     */
081    setupReplication();
082
083    /**
084     * Prepare 16 random hfile ranges required for creating hfiles
085     */
086    Iterator<String> randomHFileRangeListIterator = null;
087    Set<String> randomHFileRanges = new HashSet<>(16);
088    for (int i = 0; i < 16; i++) {
089      randomHFileRanges.add(UTIL1.getRandomUUID().toString());
090    }
091    List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges);
092    Collections.sort(randomHFileRangeList);
093    randomHFileRangeListIterator = randomHFileRangeList.iterator();
094
095    /**
096     * at Master: t1_syncup: Load 100 rows into cf1, and 3 rows into norep t2_syncup: Load 200 rows
097     * into cf1, and 3 rows into norep verify correctly replicated to slave
098     */
099    loadAndReplicateHFiles(true, randomHFileRangeListIterator);
100
101    /**
102     * Verify hfile load works step 1: stop hbase on Slave step 2: at Master: t1_syncup: Load
103     * another 100 rows into cf1 and 3 rows into norep t2_syncup: Load another 200 rows into cf1 and
104     * 3 rows into norep step 3: stop hbase on master, restart hbase on Slave step 4: verify Slave
105     * still has the rows before load t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 step
106     * 5: run syncup tool on Master step 6: verify that hfiles show up on Slave and 'norep' does not
107     * t1_syncup: 200 rows from cf1 t2_syncup: 400 rows from cf1 verify correctly replicated to
108     * Slave
109     */
110    mimicSyncUpAfterBulkLoad(randomHFileRangeListIterator);
111
112  }
113
114  private void mimicSyncUpAfterBulkLoad(Iterator<String> randomHFileRangeListIterator)
115      throws Exception {
116    LOG.debug("mimicSyncUpAfterBulkLoad");
117    shutDownTargetHBaseCluster();
118
119    loadAndReplicateHFiles(false, randomHFileRangeListIterator);
120
121    int rowCount_ht1Source = UTIL1.countRows(ht1Source);
122    assertEquals("t1_syncup has 206 rows on source, after bulk load of another 103 hfiles", 206,
123      rowCount_ht1Source);
124
125    int rowCount_ht2Source = UTIL1.countRows(ht2Source);
126    assertEquals("t2_syncup has 406 rows on source, after bulk load of another 203 hfiles", 406,
127      rowCount_ht2Source);
128
129    shutDownSourceHBaseCluster();
130    restartTargetHBaseCluster(1);
131
132    Thread.sleep(SLEEP_TIME);
133
134    // Before sync up
135    int rowCountHt1TargetAtPeer1 = UTIL2.countRows(ht1TargetAtPeer1);
136    int rowCountHt2TargetAtPeer1 = UTIL2.countRows(ht2TargetAtPeer1);
137    assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCountHt1TargetAtPeer1);
138    assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCountHt2TargetAtPeer1);
139
140    // Run sync up tool
141    syncUp(UTIL1);
142
143    // After syun up
144    for (int i = 0; i < NB_RETRIES; i++) {
145      syncUp(UTIL1);
146      rowCountHt1TargetAtPeer1 = UTIL2.countRows(ht1TargetAtPeer1);
147      rowCountHt2TargetAtPeer1 = UTIL2.countRows(ht2TargetAtPeer1);
148      if (i == NB_RETRIES - 1) {
149        if (rowCountHt1TargetAtPeer1 != 200 || rowCountHt2TargetAtPeer1 != 400) {
150          // syncUP still failed. Let's look at the source in case anything wrong there
151          restartSourceHBaseCluster(1);
152          rowCount_ht1Source = UTIL1.countRows(ht1Source);
153          LOG.debug("t1_syncup should have 206 rows at source, and it is " + rowCount_ht1Source);
154          rowCount_ht2Source = UTIL1.countRows(ht2Source);
155          LOG.debug("t2_syncup should have 406 rows at source, and it is " + rowCount_ht2Source);
156        }
157        assertEquals("@Peer1 t1_syncup should be sync up and have 200 rows", 200,
158          rowCountHt1TargetAtPeer1);
159        assertEquals("@Peer1 t2_syncup should be sync up and have 400 rows", 400,
160          rowCountHt2TargetAtPeer1);
161      }
162      if (rowCountHt1TargetAtPeer1 == 200 && rowCountHt2TargetAtPeer1 == 400) {
163        LOG.info("SyncUpAfterBulkLoad succeeded at retry = " + i);
164        break;
165      } else {
166        LOG.debug("SyncUpAfterBulkLoad failed at retry = " + i +
167          ", with rowCount_ht1TargetPeer1 =" + rowCountHt1TargetAtPeer1 +
168          " and rowCount_ht2TargetAtPeer1 =" + rowCountHt2TargetAtPeer1);
169      }
170      Thread.sleep(SLEEP_TIME);
171    }
172  }
173
174  private void loadAndReplicateHFiles(boolean verifyReplicationOnSlave,
175      Iterator<String> randomHFileRangeListIterator) throws Exception {
176    LOG.debug("loadAndReplicateHFiles");
177
178    // Load 100 + 3 hfiles to t1_syncup.
179    byte[][][] hfileRanges =
180      new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
181        Bytes.toBytes(randomHFileRangeListIterator.next()) } };
182    loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht1Source, hfileRanges, 100);
183
184    hfileRanges =
185      new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
186        Bytes.toBytes(randomHFileRangeListIterator.next()) } };
187    loadAndValidateHFileReplication("HFileReplication_1", row, NO_REP_FAMILY, ht1Source,
188      hfileRanges, 3);
189
190    // Load 200 + 3 hfiles to t2_syncup.
191    hfileRanges =
192      new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
193        Bytes.toBytes(randomHFileRangeListIterator.next()) } };
194    loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht2Source, hfileRanges, 200);
195
196    hfileRanges =
197      new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
198        Bytes.toBytes(randomHFileRangeListIterator.next()) } };
199    loadAndValidateHFileReplication("HFileReplication_1", row, NO_REP_FAMILY, ht2Source,
200      hfileRanges, 3);
201
202    if (verifyReplicationOnSlave) {
203      // ensure replication completed
204      wait(ht1TargetAtPeer1, UTIL1.countRows(ht1Source) - 3,
205        "t1_syncup has 103 rows on source, and 100 on slave1");
206
207      wait(ht2TargetAtPeer1, UTIL1.countRows(ht2Source) - 3,
208        "t2_syncup has 203 rows on source, and 200 on slave1");
209    }
210  }
211
212  private void loadAndValidateHFileReplication(String testName, byte[] row, byte[] fam,
213      Table source, byte[][][] hfileRanges, int numOfRows) throws Exception {
214    Path dir = UTIL1.getDataTestDirOnTestFS(testName);
215    FileSystem fs = UTIL1.getTestFileSystem();
216    dir = dir.makeQualified(fs);
217    Path familyDir = new Path(dir, Bytes.toString(fam));
218
219    int hfileIdx = 0;
220    for (byte[][] range : hfileRanges) {
221      byte[] from = range[0];
222      byte[] to = range[1];
223      HFileTestUtil.createHFile(UTIL1.getConfiguration(), fs,
224        new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows);
225    }
226
227    final TableName tableName = source.getName();
228    BulkLoadHFiles loader = BulkLoadHFiles.create(UTIL1.getConfiguration());
229    loader.bulkLoad(tableName, dir);
230  }
231
232  private void wait(Table target, int expectedCount, String msg)
233      throws IOException, InterruptedException {
234    for (int i = 0; i < NB_RETRIES; i++) {
235      int rowCountHt2TargetAtPeer1 = UTIL2.countRows(target);
236      if (i == NB_RETRIES - 1) {
237        assertEquals(msg, expectedCount, rowCountHt2TargetAtPeer1);
238      }
239      if (expectedCount == rowCountHt2TargetAtPeer1) {
240        break;
241      }
242      Thread.sleep(SLEEP_TIME);
243    }
244  }
245}