001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.HashSet; 026import java.util.Iterator; 027import java.util.List; 028import java.util.Set; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.Table; 035import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 036import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider; 037import org.apache.hadoop.hbase.testclassification.LargeTests; 038import org.apache.hadoop.hbase.testclassification.ReplicationTests; 039import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.hbase.util.HFileTestUtil; 042import org.junit.BeforeClass; 043import org.junit.ClassRule; 044import org.junit.experimental.categories.Category; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048@Category({ ReplicationTests.class, LargeTests.class }) 049public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplicationSyncUpTool { 050 051 @ClassRule 052 public static final HBaseClassTestRule CLASS_RULE = 053 HBaseClassTestRule.forClass(TestReplicationSyncUpToolWithBulkLoadedData.class); 054 055 private static final Logger LOG = LoggerFactory 056 .getLogger(TestReplicationSyncUpToolWithBulkLoadedData.class); 057 058 @BeforeClass 059 public static void setUpBeforeClass() throws Exception { 060 conf1.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); 061 conf1.set(HConstants.REPLICATION_CLUSTER_ID, "12345"); 062 conf1.set("hbase.replication.source.fs.conf.provider", 063 TestSourceFSConfigurationProvider.class.getCanonicalName()); 064 String classes = conf1.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); 065 if (!classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint")) { 066 classes = classes + ",org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"; 067 conf1.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, classes); 068 } 069 070 TestReplicationBase.setUpBeforeClass(); 071 } 072 073 @Override 074 public void testSyncUpTool() throws Exception { 075 /** 076 * Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily: 077 * 'cf1' : replicated 'norep': not replicated 078 */ 079 setupReplication(); 080 081 /** 082 * Prepare 16 random hfile ranges required for creating hfiles 083 */ 084 Iterator<String> randomHFileRangeListIterator = null; 085 Set<String> randomHFileRanges = new HashSet<>(16); 086 for (int i = 0; i < 16; i++) { 087 randomHFileRanges.add(utility1.getRandomUUID().toString()); 088 } 089 List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges); 090 Collections.sort(randomHFileRangeList); 091 randomHFileRangeListIterator = randomHFileRangeList.iterator(); 092 093 /** 094 * at Master: t1_syncup: Load 100 rows into cf1, and 3 rows into norep t2_syncup: Load 200 rows 095 * into cf1, and 3 rows into norep verify correctly replicated to slave 096 */ 097 loadAndReplicateHFiles(true, randomHFileRangeListIterator); 098 099 /** 100 * Verify hfile load works step 1: stop hbase on Slave step 2: at Master: t1_syncup: Load 101 * another 100 rows into cf1 and 3 rows into norep t2_syncup: Load another 200 rows into cf1 and 102 * 3 rows into norep step 3: stop hbase on master, restart hbase on Slave step 4: verify Slave 103 * still has the rows before load t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 step 104 * 5: run syncup tool on Master step 6: verify that hfiles show up on Slave and 'norep' does not 105 * t1_syncup: 200 rows from cf1 t2_syncup: 400 rows from cf1 verify correctly replicated to 106 * Slave 107 */ 108 mimicSyncUpAfterBulkLoad(randomHFileRangeListIterator); 109 110 } 111 112 private void mimicSyncUpAfterBulkLoad(Iterator<String> randomHFileRangeListIterator) 113 throws Exception { 114 LOG.debug("mimicSyncUpAfterBulkLoad"); 115 utility2.shutdownMiniHBaseCluster(); 116 117 loadAndReplicateHFiles(false, randomHFileRangeListIterator); 118 119 int rowCount_ht1Source = utility1.countRows(ht1Source); 120 assertEquals("t1_syncup has 206 rows on source, after bulk load of another 103 hfiles", 206, 121 rowCount_ht1Source); 122 123 int rowCount_ht2Source = utility1.countRows(ht2Source); 124 assertEquals("t2_syncup has 406 rows on source, after bulk load of another 203 hfiles", 406, 125 rowCount_ht2Source); 126 127 utility1.shutdownMiniHBaseCluster(); 128 utility2.restartHBaseCluster(1); 129 130 Thread.sleep(SLEEP_TIME); 131 132 // Before sync up 133 int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); 134 int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); 135 assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCount_ht1TargetAtPeer1); 136 assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCount_ht2TargetAtPeer1); 137 138 // Run sync up tool 139 syncUp(utility1); 140 141 // After syun up 142 for (int i = 0; i < NB_RETRIES; i++) { 143 syncUp(utility1); 144 rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); 145 rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); 146 if (i == NB_RETRIES - 1) { 147 if (rowCount_ht1TargetAtPeer1 != 200 || rowCount_ht2TargetAtPeer1 != 400) { 148 // syncUP still failed. Let's look at the source in case anything wrong there 149 utility1.restartHBaseCluster(1); 150 rowCount_ht1Source = utility1.countRows(ht1Source); 151 LOG.debug("t1_syncup should have 206 rows at source, and it is " + rowCount_ht1Source); 152 rowCount_ht2Source = utility1.countRows(ht2Source); 153 LOG.debug("t2_syncup should have 406 rows at source, and it is " + rowCount_ht2Source); 154 } 155 assertEquals("@Peer1 t1_syncup should be sync up and have 200 rows", 200, 156 rowCount_ht1TargetAtPeer1); 157 assertEquals("@Peer1 t2_syncup should be sync up and have 400 rows", 400, 158 rowCount_ht2TargetAtPeer1); 159 } 160 if (rowCount_ht1TargetAtPeer1 == 200 && rowCount_ht2TargetAtPeer1 == 400) { 161 LOG.info("SyncUpAfterBulkLoad succeeded at retry = " + i); 162 break; 163 } else { 164 LOG.debug("SyncUpAfterBulkLoad failed at retry = " + i + ", with rowCount_ht1TargetPeer1 =" 165 + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 =" 166 + rowCount_ht2TargetAtPeer1); 167 } 168 Thread.sleep(SLEEP_TIME); 169 } 170 } 171 172 private void loadAndReplicateHFiles(boolean verifyReplicationOnSlave, 173 Iterator<String> randomHFileRangeListIterator) throws Exception { 174 LOG.debug("loadAndReplicateHFiles"); 175 176 // Load 100 + 3 hfiles to t1_syncup. 177 byte[][][] hfileRanges = 178 new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), 179 Bytes.toBytes(randomHFileRangeListIterator.next()) } }; 180 loadAndValidateHFileReplication("HFileReplication_1", row, famName, ht1Source, hfileRanges, 181 100); 182 183 hfileRanges = 184 new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), 185 Bytes.toBytes(randomHFileRangeListIterator.next()) } }; 186 loadAndValidateHFileReplication("HFileReplication_1", row, noRepfamName, ht1Source, 187 hfileRanges, 3); 188 189 // Load 200 + 3 hfiles to t2_syncup. 190 hfileRanges = 191 new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), 192 Bytes.toBytes(randomHFileRangeListIterator.next()) } }; 193 loadAndValidateHFileReplication("HFileReplication_1", row, famName, ht2Source, hfileRanges, 194 200); 195 196 hfileRanges = 197 new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), 198 Bytes.toBytes(randomHFileRangeListIterator.next()) } }; 199 loadAndValidateHFileReplication("HFileReplication_1", row, noRepfamName, ht2Source, 200 hfileRanges, 3); 201 202 if (verifyReplicationOnSlave) { 203 // ensure replication completed 204 wait(ht1TargetAtPeer1, utility1.countRows(ht1Source) - 3, 205 "t1_syncup has 103 rows on source, and 100 on slave1"); 206 207 wait(ht2TargetAtPeer1, utility1.countRows(ht2Source) - 3, 208 "t2_syncup has 203 rows on source, and 200 on slave1"); 209 } 210 } 211 212 private void loadAndValidateHFileReplication(String testName, byte[] row, byte[] fam, 213 Table source, byte[][][] hfileRanges, int numOfRows) throws Exception { 214 Path dir = utility1.getDataTestDirOnTestFS(testName); 215 FileSystem fs = utility1.getTestFileSystem(); 216 dir = dir.makeQualified(fs); 217 Path familyDir = new Path(dir, Bytes.toString(fam)); 218 219 int hfileIdx = 0; 220 for (byte[][] range : hfileRanges) { 221 byte[] from = range[0]; 222 byte[] to = range[1]; 223 HFileTestUtil.createHFile(utility1.getConfiguration(), fs, new Path(familyDir, "hfile_" 224 + hfileIdx++), fam, row, from, to, numOfRows); 225 } 226 227 final TableName tableName = source.getName(); 228 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(utility1.getConfiguration()); 229 String[] args = { dir.toString(), tableName.toString() }; 230 loader.run(args); 231 } 232 233 private void wait(Table target, int expectedCount, String msg) throws IOException, 234 InterruptedException { 235 for (int i = 0; i < NB_RETRIES; i++) { 236 int rowCount_ht2TargetAtPeer1 = utility2.countRows(target); 237 if (i == NB_RETRIES - 1) { 238 assertEquals(msg, expectedCount, rowCount_ht2TargetAtPeer1); 239 } 240 if (expectedCount == rowCount_ht2TargetAtPeer1) { 241 break; 242 } 243 Thread.sleep(SLEEP_TIME); 244 } 245 } 246}