001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtil.countRows; 021import static org.apache.hadoop.hbase.replication.TestReplicationBaseNoBeforeAll.NB_RETRIES; 022import static org.apache.hadoop.hbase.replication.TestReplicationBaseNoBeforeAll.SLEEP_TIME; 023import static org.apache.hadoop.hbase.replication.TestReplicationBaseNoBeforeAll.row; 024import static org.junit.jupiter.api.Assertions.assertEquals; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.Collections; 029import java.util.HashSet; 030import java.util.Iterator; 031import java.util.List; 032import java.util.Set; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.HBaseTestingUtil; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.Table; 040import org.apache.hadoop.hbase.quotas.QuotaUtil; 041import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider; 042import org.apache.hadoop.hbase.testclassification.LargeTests; 043import org.apache.hadoop.hbase.testclassification.ReplicationTests; 044import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.hbase.util.HFileTestUtil; 047import org.junit.jupiter.api.Tag; 048import org.junit.jupiter.api.Test; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052@Tag(ReplicationTests.TAG) 053@Tag(LargeTests.TAG) 054public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplicationSyncUpToolBase { 055 056 private static final Logger LOG = 057 LoggerFactory.getLogger(TestReplicationSyncUpToolWithBulkLoadedData.class); 058 059 @Override 060 protected void customizeClusterConf(Configuration conf) { 061 conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); 062 conf.set(HConstants.REPLICATION_CLUSTER_ID, "12345"); 063 conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); 064 conf.set("hbase.replication.source.fs.conf.provider", 065 TestSourceFSConfigurationProvider.class.getCanonicalName()); 066 } 067 068 @Test 069 public void testSyncUpTool() throws Exception { 070 // Set up Replication: 071 // on Master and one Slave Table: t1_syncup and t2_syncup 072 // columnfamily: 073 // 'cf1' : replicated 074 // 'norep': not replicated 075 setupReplication(); 076 077 // Prepare 24 random hfile ranges required for creating hfiles 078 Iterator<String> randomHFileRangeListIterator = null; 079 Set<String> randomHFileRanges = new HashSet<>(24); 080 for (int i = 0; i < 24; i++) { 081 randomHFileRanges.add(HBaseTestingUtil.getRandomUUID().toString()); 082 } 083 List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges); 084 Collections.sort(randomHFileRangeList); 085 randomHFileRangeListIterator = randomHFileRangeList.iterator(); 086 087 // at Master: 088 // t1_syncup: Load 50 rows into cf1, and 50 rows from other hdfs into cf1, and 3 rows into norep 089 // t2_syncup: Load 100 rows into cf1, and 100 rows from other hdfs into cf1, and 3 rows into 090 // norep 091 // verify correctly replicated to slave 092 loadAndReplicateHFiles(true, randomHFileRangeListIterator); 093 094 // Verify hfile load works 095 // 096 // step 1: stop hbase on Slave 097 // 098 // step 2: at Master: 099 // t1_syncup: Load another 100 rows into cf1 and 3 rows into norep 100 // t2_syncup: Load another 200 rows into cf1 and 3 rows into norep 101 // 102 // step 3: stop hbase on master, restart hbase on Slave 103 // 104 // step 4: verify Slave still has the rows before load 105 // t1_syncup: 100 rows from cf1 106 // t2_syncup: 200 rows from cf1 107 // 108 // step 5: run syncup tool on Master 109 // 110 // step 6: verify that hfiles show up on Slave and 'norep' does not 111 // t1_syncup: 200 rows from cf1 112 // t2_syncup: 400 rows from cf1 113 // verify correctly replicated to Slave 114 mimicSyncUpAfterBulkLoad(randomHFileRangeListIterator); 115 116 } 117 118 private void mimicSyncUpAfterBulkLoad(Iterator<String> randomHFileRangeListIterator) 119 throws Exception { 120 LOG.debug("mimicSyncUpAfterBulkLoad"); 121 shutDownTargetHBaseCluster(); 122 123 loadAndReplicateHFiles(false, randomHFileRangeListIterator); 124 125 int rowCount_ht1Source = countRows(ht1Source); 126 assertEquals(206, rowCount_ht1Source, 127 "t1_syncup has 206 rows on source, after bulk load of another 103 hfiles"); 128 129 int rowCount_ht2Source = countRows(ht2Source); 130 assertEquals(406, rowCount_ht2Source, 131 "t2_syncup has 406 rows on source, after bulk load of another 203 hfiles"); 132 133 shutDownSourceHBaseCluster(); 134 restartTargetHBaseCluster(1); 135 136 Thread.sleep(SLEEP_TIME); 137 138 // Before sync up 139 int rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1); 140 int rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1); 141 assertEquals(100, rowCountHt1TargetAtPeer1, "@Peer1 t1_syncup should still have 100 rows"); 142 assertEquals(200, rowCountHt2TargetAtPeer1, "@Peer1 t2_syncup should still have 200 rows"); 143 144 // Run sync up tool 145 syncUp(UTIL1); 146 147 // After syun up 148 rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1); 149 rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1); 150 assertEquals(200, rowCountHt1TargetAtPeer1, 151 "@Peer1 t1_syncup should be sync up and have 200 rows"); 152 assertEquals(400, rowCountHt2TargetAtPeer1, 153 "@Peer1 t2_syncup should be sync up and have 400 rows"); 154 } 155 156 private void loadAndReplicateHFiles(boolean verifyReplicationOnSlave, 157 Iterator<String> randomHFileRangeListIterator) throws Exception { 158 LOG.debug("loadAndReplicateHFiles"); 159 160 // Load 50 + 50 + 3 hfiles to t1_syncup. 161 byte[][][] hfileRanges = 162 new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), 163 Bytes.toBytes(randomHFileRangeListIterator.next()) } }; 164 loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht1Source, hfileRanges, 50); 165 166 hfileRanges = 167 new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), 168 Bytes.toBytes(randomHFileRangeListIterator.next()) } }; 169 loadFromOtherHDFSAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht1Source, 170 hfileRanges, 50); 171 172 hfileRanges = 173 new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), 174 Bytes.toBytes(randomHFileRangeListIterator.next()) } }; 175 loadAndValidateHFileReplication("HFileReplication_1", row, NO_REP_FAMILY, ht1Source, 176 hfileRanges, 3); 177 178 // Load 100 + 100 + 3 hfiles to t2_syncup. 179 hfileRanges = 180 new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), 181 Bytes.toBytes(randomHFileRangeListIterator.next()) } }; 182 loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht2Source, hfileRanges, 100); 183 184 hfileRanges = 185 new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), 186 Bytes.toBytes(randomHFileRangeListIterator.next()) } }; 187 loadFromOtherHDFSAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht2Source, 188 hfileRanges, 100); 189 190 hfileRanges = 191 new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), 192 Bytes.toBytes(randomHFileRangeListIterator.next()) } }; 193 loadAndValidateHFileReplication("HFileReplication_1", row, NO_REP_FAMILY, ht2Source, 194 hfileRanges, 3); 195 196 if (verifyReplicationOnSlave) { 197 // ensure replication completed 198 wait(ht1TargetAtPeer1, countRows(ht1Source) - 3, 199 "t1_syncup has 103 rows on source, and 100 on slave1"); 200 201 wait(ht2TargetAtPeer1, countRows(ht2Source) - 3, 202 "t2_syncup has 203 rows on source, and 200 on slave1"); 203 } 204 } 205 206 private void loadAndValidateHFileReplication(String testName, byte[] row, byte[] fam, 207 Table source, byte[][][] hfileRanges, int numOfRows) throws Exception { 208 Path dir = UTIL1.getDataTestDirOnTestFS(testName); 209 FileSystem fs = UTIL1.getTestFileSystem(); 210 dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 211 Path familyDir = new Path(dir, Bytes.toString(fam)); 212 213 int hfileIdx = 0; 214 for (byte[][] range : hfileRanges) { 215 byte[] from = range[0]; 216 byte[] to = range[1]; 217 HFileTestUtil.createHFile(UTIL1.getConfiguration(), fs, 218 new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows); 219 } 220 221 final TableName tableName = source.getName(); 222 BulkLoadHFiles loader = BulkLoadHFiles.create(UTIL1.getConfiguration()); 223 loader.bulkLoad(tableName, dir); 224 } 225 226 private void loadFromOtherHDFSAndValidateHFileReplication(String testName, byte[] row, byte[] fam, 227 Table source, byte[][][] hfileRanges, int numOfRows) throws Exception { 228 Path dir = UTIL2.getDataTestDirOnTestFS(testName); 229 FileSystem fs = UTIL2.getTestFileSystem(); 230 dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 231 Path familyDir = new Path(dir, Bytes.toString(fam)); 232 233 int hfileIdx = 0; 234 for (byte[][] range : hfileRanges) { 235 byte[] from = range[0]; 236 byte[] to = range[1]; 237 HFileTestUtil.createHFile(UTIL2.getConfiguration(), fs, 238 new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows); 239 } 240 241 final TableName tableName = source.getName(); 242 BulkLoadHFiles loader = BulkLoadHFiles.create(UTIL1.getConfiguration()); 243 loader.bulkLoad(tableName, dir); 244 } 245 246 private void wait(Table target, int expectedCount, String msg) 247 throws IOException, InterruptedException { 248 for (int i = 0; i < NB_RETRIES; i++) { 249 int rowCountHt2TargetAtPeer1 = countRows(target); 250 if (i == NB_RETRIES - 1) { 251 assertEquals(expectedCount, rowCountHt2TargetAtPeer1, msg); 252 } 253 if (expectedCount == rowCountHt2TargetAtPeer1) { 254 break; 255 } 256 Thread.sleep(SLEEP_TIME); 257 } 258 } 259}