001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID; 021import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertTrue; 024import java.io.File; 025import java.io.FileOutputStream; 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Date; 030import java.util.HashMap; 031import java.util.List; 032import java.util.Map; 033import java.util.Optional; 034import java.util.Random; 035import java.util.UUID; 036import java.util.concurrent.CountDownLatch; 037import java.util.concurrent.ExecutorService; 038import java.util.concurrent.Executors; 039import java.util.concurrent.TimeUnit; 040import java.util.concurrent.atomic.AtomicInteger; 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.fs.FSDataOutputStream; 043import org.apache.hadoop.fs.Path; 044import org.apache.hadoop.hbase.Cell; 045import org.apache.hadoop.hbase.CellBuilder; 046import org.apache.hadoop.hbase.CellBuilderFactory; 047import org.apache.hadoop.hbase.CellBuilderType; 048import org.apache.hadoop.hbase.HBaseClassTestRule; 049import org.apache.hadoop.hbase.HBaseConfiguration; 050import org.apache.hadoop.hbase.HBaseTestingUtility; 051import org.apache.hadoop.hbase.HConstants; 052import org.apache.hadoop.hbase.KeyValue; 053import org.apache.hadoop.hbase.client.Admin; 054import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 055import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 056import org.apache.hadoop.hbase.client.Connection; 057import org.apache.hadoop.hbase.client.ConnectionFactory; 058import org.apache.hadoop.hbase.client.Get; 059import org.apache.hadoop.hbase.client.Result; 060import org.apache.hadoop.hbase.client.Table; 061import org.apache.hadoop.hbase.client.TableDescriptor; 062import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 063import org.apache.hadoop.hbase.coprocessor.ObserverContext; 064import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 065import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 066import org.apache.hadoop.hbase.coprocessor.RegionObserver; 067import org.apache.hadoop.hbase.io.hfile.CacheConfig; 068import org.apache.hadoop.hbase.io.hfile.HFile; 069import org.apache.hadoop.hbase.io.hfile.HFileContext; 070import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 071import org.apache.hadoop.hbase.mob.MobConstants; 072import org.apache.hadoop.hbase.mob.MobFileName; 073import org.apache.hadoop.hbase.mob.MobUtils; 074import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor; 075import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 076import org.apache.hadoop.hbase.replication.TestReplicationBase; 077import org.apache.hadoop.hbase.testclassification.MediumTests; 078import org.apache.hadoop.hbase.testclassification.ReplicationTests; 079import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; 080import org.apache.hadoop.hbase.util.Bytes; 081import org.apache.hadoop.hbase.util.FSUtils; 082import org.apache.hadoop.hbase.util.Pair; 083import org.apache.hadoop.hdfs.MiniDFSCluster; 084import org.junit.After; 085import org.junit.Before; 086import org.junit.BeforeClass; 087import org.junit.ClassRule; 088import org.junit.Rule; 089import org.junit.Test; 090import org.junit.experimental.categories.Category; 091import org.junit.rules.TemporaryFolder; 092import org.junit.rules.TestName; 093import org.slf4j.Logger; 094import org.slf4j.LoggerFactory; 095 096/** 097 * Integration test for bulk load replication. Defines three clusters, with the following 098 * replication topology: "1 <-> 2 <-> 3" (active-active between 1 and 2, and active-active between 099 * 2 and 3). 100 * 101 * For each of defined test clusters, it performs a bulk load, asserting values on bulk loaded file 102 * gets replicated to other two peers. Since we are doing 3 bulk loads, with the given replication 103 * topology all these bulk loads should get replicated only once on each peer. To assert this, 104 * this test defines a preBulkLoad coprocessor and adds it to all test table regions, on each of the 105 * clusters. This CP counts the amount of times bulk load actually gets invoked, certifying 106 * we are not entering the infinite loop condition addressed by HBASE-22380. 107 */ 108@Category({ ReplicationTests.class, MediumTests.class}) 109public class TestBulkLoadReplication extends TestReplicationBase { 110 111 @ClassRule 112 public static final HBaseClassTestRule CLASS_RULE = 113 HBaseClassTestRule.forClass(TestBulkLoadReplication.class); 114 115 protected static final Logger LOG = 116 LoggerFactory.getLogger(TestBulkLoadReplication.class); 117 118 private static final String PEER1_CLUSTER_ID = "peer1"; 119 private static final String PEER4_CLUSTER_ID = "peer4"; 120 private static final String PEER3_CLUSTER_ID = "peer3"; 121 122 private static final String PEER_ID1 = "1"; 123 private static final String PEER_ID3 = "3"; 124 private static final String PEER_ID4 = "4"; 125 126 private static AtomicInteger BULK_LOADS_COUNT; 127 private static CountDownLatch BULK_LOAD_LATCH; 128 129 private static final Path BULK_LOAD_BASE_DIR = new Path("/bulk_dir"); 130 131 private static HBaseTestingUtility utility3; 132 private static HBaseTestingUtility utility4; 133 private static Configuration conf3; 134 private static Configuration conf4; 135 private static Table htable3; 136 private static Table htable4; 137 138 @Rule 139 public TestName name = new TestName(); 140 141 @ClassRule 142 public static TemporaryFolder testFolder = new TemporaryFolder(); 143 144 @BeforeClass 145 public static void setUpBeforeClass() throws Exception { 146 setupBulkLoadConfigsForCluster(conf1, PEER1_CLUSTER_ID); 147 conf3 = HBaseConfiguration.create(conf1); 148 conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3"); 149 utility3 = new HBaseTestingUtility(conf3); 150 conf4 = HBaseConfiguration.create(conf1); 151 conf4.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/4"); 152 utility3 = new HBaseTestingUtility(conf3); 153 utility4 = new HBaseTestingUtility(conf4); 154 TestReplicationBase.setUpBeforeClass(); 155 setupBulkLoadConfigsForCluster(conf3, PEER3_CLUSTER_ID); 156 //utility4 is started within TestReplicationBase.setUpBeforeClass(), but we had not set 157 //bulkload replication configs yet, so setting a 4th utility. 158 setupBulkLoadConfigsForCluster(conf4, PEER4_CLUSTER_ID); 159 startCluster(utility3, conf3); 160 startCluster(utility4, conf4); 161 } 162 163 private static void startCluster(HBaseTestingUtility util, Configuration configuration) 164 throws Exception { 165 LOG.info("Setup Zk to same one from utility1 and utility4"); 166 util.setZkCluster(utility1.getZkCluster()); 167 util.startMiniCluster(2); 168 169 TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tableName) 170 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName) 171 .setMobEnabled(true) 172 .setMobThreshold(4000) 173 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 174 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); 175 176 Connection connection = ConnectionFactory.createConnection(configuration); 177 try (Admin admin = connection.getAdmin()) { 178 admin.createTable(tableDesc, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); 179 } 180 util.waitUntilAllRegionsAssigned(tableName); 181 } 182 183 @Before 184 @Override 185 public void setUpBase() throws Exception { 186 super.setUpBase(); 187 ReplicationPeerConfig peer1Config = getPeerConfigForCluster(utility1); 188 ReplicationPeerConfig peer4Config = getPeerConfigForCluster(utility4); 189 ReplicationPeerConfig peer3Config = getPeerConfigForCluster(utility3); 190 //adds cluster4 as a remote peer on cluster1 191 utility1.getAdmin().addReplicationPeer(PEER_ID4, peer4Config); 192 //adds cluster1 as a remote peer on cluster4 193 utility4.getAdmin().addReplicationPeer(PEER_ID1, peer1Config); 194 //adds cluster3 as a remote peer on cluster4 195 utility4.getAdmin().addReplicationPeer(PEER_ID3, peer3Config); 196 //adds cluster4 as a remote peer on cluster3 197 utility3.getAdmin().addReplicationPeer(PEER_ID4, peer4Config); 198 setupCoprocessor(utility1); 199 setupCoprocessor(utility4); 200 setupCoprocessor(utility3); 201 BULK_LOADS_COUNT = new AtomicInteger(0); 202 } 203 204 private ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtility util) { 205 return ReplicationPeerConfig.newBuilder() 206 .setClusterKey(util.getClusterKey()).setSerial(isSerialPeer()).build(); 207 } 208 209 private void setupCoprocessor(HBaseTestingUtility cluster){ 210 cluster.getHBaseCluster().getRegions(tableName).forEach(r -> { 211 try { 212 TestBulkLoadReplication.BulkReplicationTestObserver cp = r.getCoprocessorHost(). 213 findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class); 214 if(cp == null) { 215 r.getCoprocessorHost(). 216 load(TestBulkLoadReplication.BulkReplicationTestObserver.class, 0, 217 cluster.getConfiguration()); 218 cp = r.getCoprocessorHost(). 219 findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class); 220 cp.clusterName = cluster.getClusterKey(); 221 } 222 } catch (Exception e){ 223 LOG.error(e.getMessage(), e); 224 } 225 }); 226 } 227 228 @After 229 @Override 230 public void tearDownBase() throws Exception { 231 super.tearDownBase(); 232 utility4.getAdmin().removeReplicationPeer(PEER_ID1); 233 utility4.getAdmin().removeReplicationPeer(PEER_ID3); 234 utility3.getAdmin().removeReplicationPeer(PEER_ID4); 235 utility1.getAdmin().removeReplicationPeer(PEER_ID4); 236 } 237 238 private static void setupBulkLoadConfigsForCluster(Configuration config, 239 String clusterReplicationId) throws Exception { 240 config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); 241 config.set(REPLICATION_CLUSTER_ID, clusterReplicationId); 242 File sourceConfigFolder = testFolder.newFolder(clusterReplicationId); 243 File sourceConfigFile = new File(sourceConfigFolder.getAbsolutePath() 244 + "/hbase-site.xml"); 245 config.writeXml(new FileOutputStream(sourceConfigFile)); 246 config.set(REPLICATION_CONF_DIR, testFolder.getRoot().getAbsolutePath()); 247 } 248 249 @Test 250 public void testBulkLoadReplicationActiveActive() throws Exception { 251 Table peer1TestTable = utility1.getConnection().getTable(TestReplicationBase.tableName); 252 Table peer4TestTable = utility4.getConnection().getTable(TestReplicationBase.tableName); 253 Table peer3TestTable = utility3.getConnection().getTable(TestReplicationBase.tableName); 254 byte[] row = Bytes.toBytes("001"); 255 byte[] value = Bytes.toBytes("v1"); 256 assertBulkLoadConditions(row, value, utility1, peer1TestTable, peer4TestTable, peer3TestTable); 257 row = Bytes.toBytes("002"); 258 value = Bytes.toBytes("v2"); 259 assertBulkLoadConditions(row, value, utility4, peer1TestTable, peer4TestTable, peer3TestTable); 260 row = Bytes.toBytes("003"); 261 value = Bytes.toBytes("v3"); 262 assertBulkLoadConditions(row, value, utility3, peer1TestTable, peer4TestTable, peer3TestTable); 263 //Additional wait to make sure no extra bulk load happens 264 Thread.sleep(400); 265 //We have 3 bulk load events (1 initiated on each cluster). 266 //Each event gets 3 counts (the originator cluster, plus the two peers), 267 //so BULK_LOADS_COUNT expected value is 3 * 3 = 9. 268 assertEquals(9, BULK_LOADS_COUNT.get()); 269 } 270 271 @Test 272 public void testPartionedMOBCompactionBulkLoadDoesntReplicate() throws Exception { 273 Path path = createMobFiles(utility3); 274 ColumnFamilyDescriptor descriptor = 275 new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(famName); 276 ExecutorService pool = null; 277 try { 278 pool = Executors.newFixedThreadPool(1); 279 PartitionedMobCompactor compactor = 280 new PartitionedMobCompactor(utility3.getConfiguration(), utility3.getTestFileSystem(), 281 tableName, descriptor, pool); 282 BULK_LOAD_LATCH = new CountDownLatch(1); 283 BULK_LOADS_COUNT.set(0); 284 compactor.compact(Arrays.asList(utility3.getTestFileSystem().listStatus(path)), true); 285 assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.SECONDS)); 286 Thread.sleep(400); 287 assertEquals(1, BULK_LOADS_COUNT.get()); 288 } finally { 289 if(pool != null && !pool.isTerminated()) { 290 pool.shutdownNow(); 291 } 292 } 293 } 294 295 296 private void assertBulkLoadConditions(byte[] row, byte[] value, 297 HBaseTestingUtility utility, Table...tables) throws Exception { 298 BULK_LOAD_LATCH = new CountDownLatch(3); 299 bulkLoadOnCluster(row, value, utility); 300 assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES)); 301 assertTableHasValue(tables[0], row, value); 302 assertTableHasValue(tables[1], row, value); 303 assertTableHasValue(tables[2], row, value); 304 } 305 306 private void bulkLoadOnCluster(byte[] row, byte[] value, 307 HBaseTestingUtility cluster) throws Exception { 308 String bulkLoadFile = createHFileForFamilies(row, value, cluster.getConfiguration()); 309 Path bulkLoadFilePath = new Path(bulkLoadFile); 310 copyToHdfs(bulkLoadFile, cluster.getDFSCluster()); 311 LoadIncrementalHFiles bulkLoadHFilesTool = 312 new LoadIncrementalHFiles(cluster.getConfiguration()); 313 Map<byte[], List<Path>> family2Files = new HashMap<>(); 314 List<Path> files = new ArrayList<>(); 315 files.add(new Path(BULK_LOAD_BASE_DIR + "/f/" + bulkLoadFilePath.getName())); 316 family2Files.put(Bytes.toBytes("f"), files); 317 bulkLoadHFilesTool.run(family2Files, tableName); 318 } 319 320 private void copyToHdfs(String bulkLoadFilePath, MiniDFSCluster cluster) throws Exception { 321 Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR + "/f/"); 322 cluster.getFileSystem().mkdirs(bulkLoadDir); 323 cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir); 324 } 325 326 private void assertTableHasValue(Table table, byte[] row, byte[] value) throws Exception { 327 Get get = new Get(row); 328 Result result = table.get(get); 329 assertTrue(result.advance()); 330 assertEquals(Bytes.toString(value), Bytes.toString(result.value())); 331 } 332 333 private String createHFileForFamilies(byte[] row, byte[] value, 334 Configuration clusterConfig) throws IOException { 335 CellBuilder cellBuilder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY); 336 cellBuilder.setRow(row) 337 .setFamily(TestReplicationBase.famName) 338 .setQualifier(Bytes.toBytes("1")) 339 .setValue(value) 340 .setType(Cell.Type.Put); 341 342 HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(clusterConfig); 343 // TODO We need a way to do this without creating files 344 File hFileLocation = testFolder.newFile(); 345 FSDataOutputStream out = 346 new FSDataOutputStream(new FileOutputStream(hFileLocation), null); 347 try { 348 hFileFactory.withOutputStream(out); 349 hFileFactory.withFileContext(new HFileContext()); 350 HFile.Writer writer = hFileFactory.create(); 351 try { 352 writer.append(new KeyValue(cellBuilder.build())); 353 } finally { 354 writer.close(); 355 } 356 } finally { 357 out.close(); 358 } 359 return hFileLocation.getAbsoluteFile().getAbsolutePath(); 360 } 361 362 private Path createMobFiles(HBaseTestingUtility util) throws IOException { 363 Path testDir = FSUtils.getRootDir(util.getConfiguration()); 364 Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME); 365 Path basePath = new Path(new Path(mobTestDir, tableName.getNameAsString()), "f"); 366 HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build(); 367 MobFileName mobFileName = null; 368 byte[] mobFileStartRow = new byte[32]; 369 for (byte rowKey : Bytes.toBytes("01234")) { 370 mobFileName = MobFileName.create(mobFileStartRow, MobUtils.formatDate(new Date()), 371 UUID.randomUUID().toString().replaceAll("-", "")); 372 StoreFileWriter mobFileWriter = 373 new StoreFileWriter.Builder(util.getConfiguration(), 374 new CacheConfig(util.getConfiguration()), util.getTestFileSystem()).withFileContext(meta) 375 .withFilePath(new Path(basePath, mobFileName.getFileName())).build(); 376 long now = System.currentTimeMillis(); 377 try { 378 for (int i = 0; i < 10; i++) { 379 byte[] key = Bytes.add(Bytes.toBytes(rowKey), Bytes.toBytes(i)); 380 byte[] dummyData = new byte[5000]; 381 new Random().nextBytes(dummyData); 382 mobFileWriter.append( 383 new KeyValue(key, famName, Bytes.toBytes("1"), now, KeyValue.Type.Put, dummyData)); 384 } 385 } finally { 386 mobFileWriter.close(); 387 } 388 } 389 return basePath; 390 } 391 392 public static class BulkReplicationTestObserver implements RegionCoprocessor { 393 394 String clusterName; 395 AtomicInteger bulkLoadCounts = new AtomicInteger(); 396 397 @Override 398 public Optional<RegionObserver> getRegionObserver() { 399 return Optional.of(new RegionObserver() { 400 401 @Override 402 public void postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx, 403 List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths) 404 throws IOException { 405 BULK_LOAD_LATCH.countDown(); 406 BULK_LOADS_COUNT.incrementAndGet(); 407 LOG.debug("Another file bulk loaded. Total for {}: {}", clusterName, 408 bulkLoadCounts.addAndGet(1)); 409 } 410 }); 411 } 412 } 413}