001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_RETRIES; 021import static org.apache.hadoop.hbase.replication.TestReplicationBase.SLEEP_TIME; 022import static org.apache.hadoop.hbase.replication.TestReplicationBase.row; 023import static org.junit.Assert.assertEquals; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.Collections; 028import java.util.HashSet; 029import java.util.Iterator; 030import java.util.List; 031import java.util.Set; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.Table; 039import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 040import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider; 041import org.apache.hadoop.hbase.testclassification.LargeTests; 042import org.apache.hadoop.hbase.testclassification.ReplicationTests; 043import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.apache.hadoop.hbase.util.HFileTestUtil; 046import org.junit.ClassRule; 047import org.junit.Test; 048import org.junit.experimental.categories.Category; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052@Category({ ReplicationTests.class, LargeTests.class }) 053public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplicationSyncUpToolBase { 054 055 @ClassRule 056 public static final HBaseClassTestRule CLASS_RULE = 057 HBaseClassTestRule.forClass(TestReplicationSyncUpToolWithBulkLoadedData.class); 058 059 private static final Logger LOG = 060 LoggerFactory.getLogger(TestReplicationSyncUpToolWithBulkLoadedData.class); 061 062 @Override 063 protected void customizeClusterConf(Configuration conf) { 064 conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); 065 conf.set(HConstants.REPLICATION_CLUSTER_ID, "12345"); 066 conf.set("hbase.replication.source.fs.conf.provider", 067 TestSourceFSConfigurationProvider.class.getCanonicalName()); 068 String classes = conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); 069 if (!classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint")) { 070 classes = classes + ",org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"; 071 conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, classes); 072 } 073 } 074 075 @Test 076 public void testSyncUpTool() throws Exception { 077 /** 078 * Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily: 079 * 'cf1' : replicated 'norep': not replicated 080 */ 081 setupReplication(); 082 083 /** 084 * Prepare 16 random hfile ranges required for creating hfiles 085 */ 086 Iterator<String> randomHFileRangeListIterator = null; 087 Set<String> randomHFileRanges = new HashSet<>(16); 088 for (int i = 0; i < 16; i++) { 089 randomHFileRanges.add(UTIL1.getRandomUUID().toString()); 090 } 091 List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges); 092 Collections.sort(randomHFileRangeList); 093 randomHFileRangeListIterator = randomHFileRangeList.iterator(); 094 095 /** 096 * at Master: t1_syncup: Load 100 rows into cf1, and 3 rows into norep t2_syncup: Load 200 rows 097 * into cf1, and 3 rows into norep verify correctly replicated to slave 098 */ 099 loadAndReplicateHFiles(true, randomHFileRangeListIterator); 100 101 /** 102 * Verify hfile load works step 1: stop hbase on Slave step 2: at Master: t1_syncup: Load 103 * another 100 rows into cf1 and 3 rows into norep t2_syncup: Load another 200 rows into cf1 and 104 * 3 rows into norep step 3: stop hbase on master, restart hbase on Slave step 4: verify Slave 105 * still has the rows before load t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 step 106 * 5: run syncup tool on Master step 6: verify that hfiles show up on Slave and 'norep' does not 107 * t1_syncup: 200 rows from cf1 t2_syncup: 400 rows from cf1 verify correctly replicated to 108 * Slave 109 */ 110 mimicSyncUpAfterBulkLoad(randomHFileRangeListIterator); 111 112 } 113 114 private void mimicSyncUpAfterBulkLoad(Iterator<String> randomHFileRangeListIterator) 115 throws Exception { 116 LOG.debug("mimicSyncUpAfterBulkLoad"); 117 shutDownTargetHBaseCluster(); 118 119 loadAndReplicateHFiles(false, randomHFileRangeListIterator); 120 121 int rowCount_ht1Source = UTIL1.countRows(ht1Source); 122 assertEquals("t1_syncup has 206 rows on source, after bulk load of another 103 hfiles", 206, 123 rowCount_ht1Source); 124 125 int rowCount_ht2Source = UTIL1.countRows(ht2Source); 126 assertEquals("t2_syncup has 406 rows on source, after bulk load of another 203 hfiles", 406, 127 rowCount_ht2Source); 128 129 shutDownSourceHBaseCluster(); 130 restartTargetHBaseCluster(1); 131 132 Thread.sleep(SLEEP_TIME); 133 134 // Before sync up 135 int rowCountHt1TargetAtPeer1 = UTIL2.countRows(ht1TargetAtPeer1); 136 int rowCountHt2TargetAtPeer1 = UTIL2.countRows(ht2TargetAtPeer1); 137 assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCountHt1TargetAtPeer1); 138 assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCountHt2TargetAtPeer1); 139 140 // Run sync up tool 141 syncUp(UTIL1); 142 143 // After syun up 144 for (int i = 0; i < NB_RETRIES; i++) { 145 syncUp(UTIL1); 146 rowCountHt1TargetAtPeer1 = UTIL2.countRows(ht1TargetAtPeer1); 147 rowCountHt2TargetAtPeer1 = UTIL2.countRows(ht2TargetAtPeer1); 148 if (i == NB_RETRIES - 1) { 149 if (rowCountHt1TargetAtPeer1 != 200 || rowCountHt2TargetAtPeer1 != 400) { 150 // syncUP still failed. Let's look at the source in case anything wrong there 151 restartSourceHBaseCluster(1); 152 rowCount_ht1Source = UTIL1.countRows(ht1Source); 153 LOG.debug("t1_syncup should have 206 rows at source, and it is " + rowCount_ht1Source); 154 rowCount_ht2Source = UTIL1.countRows(ht2Source); 155 LOG.debug("t2_syncup should have 406 rows at source, and it is " + rowCount_ht2Source); 156 } 157 assertEquals("@Peer1 t1_syncup should be sync up and have 200 rows", 200, 158 rowCountHt1TargetAtPeer1); 159 assertEquals("@Peer1 t2_syncup should be sync up and have 400 rows", 400, 160 rowCountHt2TargetAtPeer1); 161 } 162 if (rowCountHt1TargetAtPeer1 == 200 && rowCountHt2TargetAtPeer1 == 400) { 163 LOG.info("SyncUpAfterBulkLoad succeeded at retry = " + i); 164 break; 165 } else { 166 LOG.debug("SyncUpAfterBulkLoad failed at retry = " + i + 167 ", with rowCount_ht1TargetPeer1 =" + rowCountHt1TargetAtPeer1 + 168 " and rowCount_ht2TargetAtPeer1 =" + rowCountHt2TargetAtPeer1); 169 } 170 Thread.sleep(SLEEP_TIME); 171 } 172 } 173 174 private void loadAndReplicateHFiles(boolean verifyReplicationOnSlave, 175 Iterator<String> randomHFileRangeListIterator) throws Exception { 176 LOG.debug("loadAndReplicateHFiles"); 177 178 // Load 100 + 3 hfiles to t1_syncup. 179 byte[][][] hfileRanges = 180 new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), 181 Bytes.toBytes(randomHFileRangeListIterator.next()) } }; 182 loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht1Source, hfileRanges, 100); 183 184 hfileRanges = 185 new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), 186 Bytes.toBytes(randomHFileRangeListIterator.next()) } }; 187 loadAndValidateHFileReplication("HFileReplication_1", row, NO_REP_FAMILY, ht1Source, 188 hfileRanges, 3); 189 190 // Load 200 + 3 hfiles to t2_syncup. 191 hfileRanges = 192 new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), 193 Bytes.toBytes(randomHFileRangeListIterator.next()) } }; 194 loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht2Source, hfileRanges, 200); 195 196 hfileRanges = 197 new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), 198 Bytes.toBytes(randomHFileRangeListIterator.next()) } }; 199 loadAndValidateHFileReplication("HFileReplication_1", row, NO_REP_FAMILY, ht2Source, 200 hfileRanges, 3); 201 202 if (verifyReplicationOnSlave) { 203 // ensure replication completed 204 wait(ht1TargetAtPeer1, UTIL1.countRows(ht1Source) - 3, 205 "t1_syncup has 103 rows on source, and 100 on slave1"); 206 207 wait(ht2TargetAtPeer1, UTIL1.countRows(ht2Source) - 3, 208 "t2_syncup has 203 rows on source, and 200 on slave1"); 209 } 210 } 211 212 private void loadAndValidateHFileReplication(String testName, byte[] row, byte[] fam, 213 Table source, byte[][][] hfileRanges, int numOfRows) throws Exception { 214 Path dir = UTIL1.getDataTestDirOnTestFS(testName); 215 FileSystem fs = UTIL1.getTestFileSystem(); 216 dir = dir.makeQualified(fs); 217 Path familyDir = new Path(dir, Bytes.toString(fam)); 218 219 int hfileIdx = 0; 220 for (byte[][] range : hfileRanges) { 221 byte[] from = range[0]; 222 byte[] to = range[1]; 223 HFileTestUtil.createHFile(UTIL1.getConfiguration(), fs, 224 new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows); 225 } 226 227 final TableName tableName = source.getName(); 228 BulkLoadHFiles loader = BulkLoadHFiles.create(UTIL1.getConfiguration()); 229 loader.bulkLoad(tableName, dir); 230 } 231 232 private void wait(Table target, int expectedCount, String msg) 233 throws IOException, InterruptedException { 234 for (int i = 0; i < NB_RETRIES; i++) { 235 int rowCountHt2TargetAtPeer1 = UTIL2.countRows(target); 236 if (i == NB_RETRIES - 1) { 237 assertEquals(msg, expectedCount, rowCountHt2TargetAtPeer1); 238 } 239 if (expectedCount == rowCountHt2TargetAtPeer1) { 240 break; 241 } 242 Thread.sleep(SLEEP_TIME); 243 } 244 } 245}