001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.HashSet; 026import java.util.Iterator; 027import java.util.List; 028import java.util.Set; 029import java.util.UUID; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.Table; 036import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 037import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider; 038import org.apache.hadoop.hbase.testclassification.LargeTests; 039import org.apache.hadoop.hbase.testclassification.ReplicationTests; 040import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.hadoop.hbase.util.HFileTestUtil; 043import org.junit.BeforeClass; 044import org.junit.ClassRule; 045import org.junit.experimental.categories.Category; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049@Category({ ReplicationTests.class, LargeTests.class }) 050public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplicationSyncUpTool { 051 052 @ClassRule 053 public static final HBaseClassTestRule CLASS_RULE = 054 HBaseClassTestRule.forClass(TestReplicationSyncUpToolWithBulkLoadedData.class); 055 056 private static final Logger LOG = LoggerFactory 057 .getLogger(TestReplicationSyncUpToolWithBulkLoadedData.class); 058 059 @BeforeClass 060 public static void setUpBeforeClass() throws Exception { 061 conf1.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); 062 conf1.set(HConstants.REPLICATION_CLUSTER_ID, "12345"); 063 conf1.set("hbase.replication.source.fs.conf.provider", 064 TestSourceFSConfigurationProvider.class.getCanonicalName()); 065 String classes = conf1.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); 066 if (!classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint")) { 067 classes = classes + ",org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"; 068 conf1.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, classes); 069 } 070 071 TestReplicationBase.setUpBeforeClass(); 072 } 073 074 @Override 075 public void testSyncUpTool() throws Exception { 076 /** 077 * Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily: 078 * 'cf1' : replicated 'norep': not replicated 079 */ 080 setupReplication(); 081 082 /** 083 * Prepare 16 random hfile ranges required for creating hfiles 084 */ 085 Iterator<String> randomHFileRangeListIterator = null; 086 Set<String> randomHFileRanges = new HashSet<>(16); 087 for (int i = 0; i < 16; i++) { 088 randomHFileRanges.add(UUID.randomUUID().toString()); 089 } 090 List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges); 091 Collections.sort(randomHFileRangeList); 092 randomHFileRangeListIterator = randomHFileRangeList.iterator(); 093 094 /** 095 * at Master: t1_syncup: Load 100 rows into cf1, and 3 rows into norep t2_syncup: Load 200 rows 096 * into cf1, and 3 rows into norep verify correctly replicated to slave 097 */ 098 loadAndReplicateHFiles(true, randomHFileRangeListIterator); 099 100 /** 101 * Verify hfile load works step 1: stop hbase on Slave step 2: at Master: t1_syncup: Load 102 * another 100 rows into cf1 and 3 rows into norep t2_syncup: Load another 200 rows into cf1 and 103 * 3 rows into norep step 3: stop hbase on master, restart hbase on Slave step 4: verify Slave 104 * still has the rows before load t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 step 105 * 5: run syncup tool on Master step 6: verify that hfiles show up on Slave and 'norep' does not 106 * t1_syncup: 200 rows from cf1 t2_syncup: 400 rows from cf1 verify correctly replicated to 107 * Slave 108 */ 109 mimicSyncUpAfterBulkLoad(randomHFileRangeListIterator); 110 111 } 112 113 private void mimicSyncUpAfterBulkLoad(Iterator<String> randomHFileRangeListIterator) 114 throws Exception { 115 LOG.debug("mimicSyncUpAfterBulkLoad"); 116 utility2.shutdownMiniHBaseCluster(); 117 118 loadAndReplicateHFiles(false, randomHFileRangeListIterator); 119 120 int rowCount_ht1Source = utility1.countRows(ht1Source); 121 assertEquals("t1_syncup has 206 rows on source, after bulk load of another 103 hfiles", 206, 122 rowCount_ht1Source); 123 124 int rowCount_ht2Source = utility1.countRows(ht2Source); 125 assertEquals("t2_syncup has 406 rows on source, after bulk load of another 203 hfiles", 406, 126 rowCount_ht2Source); 127 128 utility1.shutdownMiniHBaseCluster(); 129 utility2.restartHBaseCluster(1); 130 131 Thread.sleep(SLEEP_TIME); 132 133 // Before sync up 134 int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); 135 int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); 136 assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCount_ht1TargetAtPeer1); 137 assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCount_ht2TargetAtPeer1); 138 139 // Run sync up tool 140 syncUp(utility1); 141 142 // After syun up 143 for (int i = 0; i < NB_RETRIES; i++) { 144 syncUp(utility1); 145 rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); 146 rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); 147 if (i == NB_RETRIES - 1) { 148 if (rowCount_ht1TargetAtPeer1 != 200 || rowCount_ht2TargetAtPeer1 != 400) { 149 // syncUP still failed. Let's look at the source in case anything wrong there 150 utility1.restartHBaseCluster(1); 151 rowCount_ht1Source = utility1.countRows(ht1Source); 152 LOG.debug("t1_syncup should have 206 rows at source, and it is " + rowCount_ht1Source); 153 rowCount_ht2Source = utility1.countRows(ht2Source); 154 LOG.debug("t2_syncup should have 406 rows at source, and it is " + rowCount_ht2Source); 155 } 156 assertEquals("@Peer1 t1_syncup should be sync up and have 200 rows", 200, 157 rowCount_ht1TargetAtPeer1); 158 assertEquals("@Peer1 t2_syncup should be sync up and have 400 rows", 400, 159 rowCount_ht2TargetAtPeer1); 160 } 161 if (rowCount_ht1TargetAtPeer1 == 200 && rowCount_ht2TargetAtPeer1 == 400) { 162 LOG.info("SyncUpAfterBulkLoad succeeded at retry = " + i); 163 break; 164 } else { 165 LOG.debug("SyncUpAfterBulkLoad failed at retry = " + i + ", with rowCount_ht1TargetPeer1 =" 166 + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 =" 167 + rowCount_ht2TargetAtPeer1); 168 } 169 Thread.sleep(SLEEP_TIME); 170 } 171 } 172 173 private void loadAndReplicateHFiles(boolean verifyReplicationOnSlave, 174 Iterator<String> randomHFileRangeListIterator) throws Exception { 175 LOG.debug("loadAndReplicateHFiles"); 176 177 // Load 100 + 3 hfiles to t1_syncup. 178 byte[][][] hfileRanges = 179 new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), 180 Bytes.toBytes(randomHFileRangeListIterator.next()) } }; 181 loadAndValidateHFileReplication("HFileReplication_1", row, famName, ht1Source, hfileRanges, 182 100); 183 184 hfileRanges = 185 new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), 186 Bytes.toBytes(randomHFileRangeListIterator.next()) } }; 187 loadAndValidateHFileReplication("HFileReplication_1", row, noRepfamName, ht1Source, 188 hfileRanges, 3); 189 190 // Load 200 + 3 hfiles to t2_syncup. 191 hfileRanges = 192 new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), 193 Bytes.toBytes(randomHFileRangeListIterator.next()) } }; 194 loadAndValidateHFileReplication("HFileReplication_1", row, famName, ht2Source, hfileRanges, 195 200); 196 197 hfileRanges = 198 new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), 199 Bytes.toBytes(randomHFileRangeListIterator.next()) } }; 200 loadAndValidateHFileReplication("HFileReplication_1", row, noRepfamName, ht2Source, 201 hfileRanges, 3); 202 203 if (verifyReplicationOnSlave) { 204 // ensure replication completed 205 wait(ht1TargetAtPeer1, utility1.countRows(ht1Source) - 3, 206 "t1_syncup has 103 rows on source, and 100 on slave1"); 207 208 wait(ht2TargetAtPeer1, utility1.countRows(ht2Source) - 3, 209 "t2_syncup has 203 rows on source, and 200 on slave1"); 210 } 211 } 212 213 private void loadAndValidateHFileReplication(String testName, byte[] row, byte[] fam, 214 Table source, byte[][][] hfileRanges, int numOfRows) throws Exception { 215 Path dir = utility1.getDataTestDirOnTestFS(testName); 216 FileSystem fs = utility1.getTestFileSystem(); 217 dir = dir.makeQualified(fs); 218 Path familyDir = new Path(dir, Bytes.toString(fam)); 219 220 int hfileIdx = 0; 221 for (byte[][] range : hfileRanges) { 222 byte[] from = range[0]; 223 byte[] to = range[1]; 224 HFileTestUtil.createHFile(utility1.getConfiguration(), fs, new Path(familyDir, "hfile_" 225 + hfileIdx++), fam, row, from, to, numOfRows); 226 } 227 228 final TableName tableName = source.getName(); 229 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(utility1.getConfiguration()); 230 String[] args = { dir.toString(), tableName.toString() }; 231 loader.run(args); 232 } 233 234 private void wait(Table target, int expectedCount, String msg) throws IOException, 235 InterruptedException { 236 for (int i = 0; i < NB_RETRIES; i++) { 237 int rowCount_ht2TargetAtPeer1 = utility2.countRows(target); 238 if (i == NB_RETRIES - 1) { 239 assertEquals(msg, expectedCount, rowCount_ht2TargetAtPeer1); 240 } 241 if (expectedCount == rowCount_ht2TargetAtPeer1) { 242 break; 243 } 244 Thread.sleep(SLEEP_TIME); 245 } 246 } 247}