001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.Assert.assertEquals; 021 022import java.security.SecureRandom; 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.Iterator; 028import java.util.List; 029import java.util.Map; 030import java.util.Random; 031import java.util.Set; 032import java.util.concurrent.atomic.AtomicBoolean; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.FileUtil; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.CellUtil; 039import org.apache.hadoop.hbase.HBaseClassTestRule; 040import org.apache.hadoop.hbase.HBaseTestingUtility; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.HRegionInfo; 043import org.apache.hadoop.hbase.KeyValue; 044import org.apache.hadoop.hbase.Stoppable; 045import org.apache.hadoop.hbase.TableName; 046import org.apache.hadoop.hbase.TableNotFoundException; 047import org.apache.hadoop.hbase.client.Admin; 048import org.apache.hadoop.hbase.client.Connection; 049import org.apache.hadoop.hbase.client.ConnectionFactory; 050import org.apache.hadoop.hbase.client.Get; 051import org.apache.hadoop.hbase.client.RegionLocator; 052import org.apache.hadoop.hbase.client.Result; 053import org.apache.hadoop.hbase.client.ResultScanner; 054import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; 055import org.apache.hadoop.hbase.client.Scan; 056import org.apache.hadoop.hbase.client.Table; 057import org.apache.hadoop.hbase.testclassification.LargeTests; 058import org.apache.hadoop.hbase.testclassification.ReplicationTests; 059import org.apache.hadoop.hbase.util.Bytes; 060import org.apache.hadoop.hbase.util.CommonFSUtils; 061import org.apache.hadoop.hbase.util.HFileTestUtil; 062import org.junit.AfterClass; 063import org.junit.Assert; 064import org.junit.Before; 065import org.junit.BeforeClass; 066import org.junit.ClassRule; 067import org.junit.Test; 068import org.junit.experimental.categories.Category; 069import org.slf4j.Logger; 070import org.slf4j.LoggerFactory; 071 072import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 073 074import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 075import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 076import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.UUID; 077import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 078import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey; 079 080@Category({ReplicationTests.class, LargeTests.class}) 081public class TestReplicationSink { 082 083 @ClassRule 084 public static final HBaseClassTestRule CLASS_RULE = 085 HBaseClassTestRule.forClass(TestReplicationSink.class); 086 087 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSink.class); 088 private static final int BATCH_SIZE = 10; 089 090 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 091 092 protected static ReplicationSink SINK; 093 094 protected static final TableName TABLE_NAME1 = TableName.valueOf("table1"); 095 protected static final TableName TABLE_NAME2 = TableName.valueOf("table2"); 096 097 protected static final byte[] FAM_NAME1 = Bytes.toBytes("info1"); 098 protected static final byte[] FAM_NAME2 = Bytes.toBytes("info2"); 099 100 protected static Table table1; 101 protected static Stoppable STOPPABLE = new Stoppable() { 102 final AtomicBoolean stop = new AtomicBoolean(false); 103 104 @Override 105 public boolean isStopped() { 106 return this.stop.get(); 107 } 108 109 @Override 110 public void stop(String why) { 111 LOG.info("STOPPING BECAUSE: " + why); 112 this.stop.set(true); 113 } 114 115 }; 116 117 protected static Table table2; 118 protected static String baseNamespaceDir; 119 protected static String hfileArchiveDir; 120 protected static String replicationClusterId; 121 122 /** 123 * @throws java.lang.Exception 124 */ 125 @BeforeClass 126 public static void setUpBeforeClass() throws Exception { 127 TEST_UTIL.getConfiguration().set("hbase.replication.source.fs.conf.provider", 128 TestSourceFSConfigurationProvider.class.getCanonicalName()); 129 TEST_UTIL.startMiniCluster(3); 130 SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration())); 131 table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1); 132 table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2); 133 Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration()); 134 baseNamespaceDir = new Path(rootDir, new Path(HConstants.BASE_NAMESPACE_DIR)).toString(); 135 hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY)).toString(); 136 replicationClusterId = "12345"; 137 } 138 139 /** 140 * @throws java.lang.Exception 141 */ 142 @AfterClass 143 public static void tearDownAfterClass() throws Exception { 144 STOPPABLE.stop("Shutting down"); 145 TEST_UTIL.shutdownMiniCluster(); 146 } 147 148 /** 149 * @throws java.lang.Exception 150 */ 151 @Before 152 public void setUp() throws Exception { 153 table1 = TEST_UTIL.deleteTableData(TABLE_NAME1); 154 table2 = TEST_UTIL.deleteTableData(TABLE_NAME2); 155 } 156 157 /** 158 * Insert a whole batch of entries 159 * @throws Exception 160 */ 161 @Test 162 public void testBatchSink() throws Exception { 163 List<WALEntry> entries = new ArrayList<>(BATCH_SIZE); 164 List<Cell> cells = new ArrayList<>(); 165 for(int i = 0; i < BATCH_SIZE; i++) { 166 entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); 167 } 168 SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), 169 replicationClusterId, baseNamespaceDir, hfileArchiveDir); 170 Scan scan = new Scan(); 171 ResultScanner scanRes = table1.getScanner(scan); 172 assertEquals(BATCH_SIZE, scanRes.next(BATCH_SIZE).length); 173 } 174 175 /** 176 * Insert a mix of puts and deletes 177 * @throws Exception 178 */ 179 @Test 180 public void testMixedPutDelete() throws Exception { 181 List<WALEntry> entries = new ArrayList<>(BATCH_SIZE/2); 182 List<Cell> cells = new ArrayList<>(); 183 for(int i = 0; i < BATCH_SIZE/2; i++) { 184 entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); 185 } 186 SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId, 187 baseNamespaceDir, hfileArchiveDir); 188 189 entries = new ArrayList<>(BATCH_SIZE); 190 cells = new ArrayList<>(); 191 for(int i = 0; i < BATCH_SIZE; i++) { 192 entries.add(createEntry(TABLE_NAME1, i, 193 i % 2 != 0 ? KeyValue.Type.Put: KeyValue.Type.DeleteColumn, cells)); 194 } 195 196 SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), 197 replicationClusterId, baseNamespaceDir, hfileArchiveDir); 198 Scan scan = new Scan(); 199 ResultScanner scanRes = table1.getScanner(scan); 200 assertEquals(BATCH_SIZE/2, scanRes.next(BATCH_SIZE).length); 201 } 202 203 @Test 204 public void testLargeEditsPutDelete() throws Exception { 205 List<WALEntry> entries = new ArrayList<>(); 206 List<Cell> cells = new ArrayList<>(); 207 for (int i = 0; i < 5510; i++) { 208 entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); 209 } 210 SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId, 211 baseNamespaceDir, hfileArchiveDir); 212 213 ResultScanner resultScanner = table1.getScanner(new Scan()); 214 int totalRows = 0; 215 while (resultScanner.next() != null) { 216 totalRows++; 217 } 218 assertEquals(5510, totalRows); 219 220 entries = new ArrayList<>(); 221 cells = new ArrayList<>(); 222 for (int i = 0; i < 11000; i++) { 223 entries.add( 224 createEntry(TABLE_NAME1, i, i % 2 != 0 ? KeyValue.Type.Put : KeyValue.Type.DeleteColumn, 225 cells)); 226 } 227 SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId, 228 baseNamespaceDir, hfileArchiveDir); 229 resultScanner = table1.getScanner(new Scan()); 230 totalRows = 0; 231 while (resultScanner.next() != null) { 232 totalRows++; 233 } 234 assertEquals(5500, totalRows); 235 } 236 237 /** 238 * Insert to 2 different tables 239 * @throws Exception 240 */ 241 @Test 242 public void testMixedPutTables() throws Exception { 243 List<WALEntry> entries = new ArrayList<>(BATCH_SIZE/2); 244 List<Cell> cells = new ArrayList<>(); 245 for(int i = 0; i < BATCH_SIZE; i++) { 246 entries.add(createEntry( i % 2 == 0 ? TABLE_NAME2 : TABLE_NAME1, 247 i, KeyValue.Type.Put, cells)); 248 } 249 250 SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), 251 replicationClusterId, baseNamespaceDir, hfileArchiveDir); 252 Scan scan = new Scan(); 253 ResultScanner scanRes = table2.getScanner(scan); 254 for(Result res : scanRes) { 255 assertEquals(0, Bytes.toInt(res.getRow()) % 2); 256 } 257 scanRes = table1.getScanner(scan); 258 for(Result res : scanRes) { 259 assertEquals(1, Bytes.toInt(res.getRow()) % 2); 260 } 261 } 262 263 /** 264 * Insert then do different types of deletes 265 * @throws Exception 266 */ 267 @Test 268 public void testMixedDeletes() throws Exception { 269 List<WALEntry> entries = new ArrayList<>(3); 270 List<Cell> cells = new ArrayList<>(); 271 for(int i = 0; i < 3; i++) { 272 entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); 273 } 274 SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), 275 replicationClusterId, baseNamespaceDir, hfileArchiveDir); 276 entries = new ArrayList<>(3); 277 cells = new ArrayList<>(); 278 entries.add(createEntry(TABLE_NAME1, 0, KeyValue.Type.DeleteColumn, cells)); 279 entries.add(createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily, cells)); 280 entries.add(createEntry(TABLE_NAME1, 2, KeyValue.Type.DeleteColumn, cells)); 281 282 SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), 283 replicationClusterId, baseNamespaceDir, hfileArchiveDir); 284 285 Scan scan = new Scan(); 286 ResultScanner scanRes = table1.getScanner(scan); 287 assertEquals(0, scanRes.next(3).length); 288 } 289 290 /** 291 * Puts are buffered, but this tests when a delete (not-buffered) is applied 292 * before the actual Put that creates it. 293 * @throws Exception 294 */ 295 @Test 296 public void testApplyDeleteBeforePut() throws Exception { 297 List<WALEntry> entries = new ArrayList<>(5); 298 List<Cell> cells = new ArrayList<>(); 299 for(int i = 0; i < 2; i++) { 300 entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); 301 } 302 entries.add(createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily, cells)); 303 for(int i = 3; i < 5; i++) { 304 entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); 305 } 306 SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), 307 replicationClusterId, baseNamespaceDir, hfileArchiveDir); 308 Get get = new Get(Bytes.toBytes(1)); 309 Result res = table1.get(get); 310 assertEquals(0, res.size()); 311 } 312 313 @Test 314 public void testRethrowRetriesExhaustedWithDetailsException() throws Exception { 315 TableName notExistTable = TableName.valueOf("notExistTable"); 316 List<WALEntry> entries = new ArrayList<>(); 317 List<Cell> cells = new ArrayList<>(); 318 for (int i = 0; i < 10; i++) { 319 entries.add(createEntry(notExistTable, i, KeyValue.Type.Put, cells)); 320 } 321 try { 322 SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), 323 replicationClusterId, baseNamespaceDir, hfileArchiveDir); 324 Assert.fail("Should re-throw TableNotFoundException."); 325 } catch (TableNotFoundException e) { 326 } 327 entries.clear(); 328 cells.clear(); 329 for (int i = 0; i < 10; i++) { 330 entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); 331 } 332 try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) { 333 try (Admin admin = conn.getAdmin()) { 334 admin.disableTable(TABLE_NAME1); 335 try { 336 SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), 337 replicationClusterId, baseNamespaceDir, hfileArchiveDir); 338 Assert.fail("Should re-throw RetriesExhaustedWithDetailsException."); 339 } catch (RetriesExhaustedWithDetailsException e) { 340 } finally { 341 admin.enableTable(TABLE_NAME1); 342 } 343 } 344 } 345 } 346 347 /** 348 * Test replicateEntries with a bulk load entry for 25 HFiles 349 */ 350 @Test 351 public void testReplicateEntriesForHFiles() throws Exception { 352 Path dir = TEST_UTIL.getDataTestDirOnTestFS("testReplicateEntries"); 353 Path familyDir = new Path(dir, Bytes.toString(FAM_NAME1)); 354 int numRows = 10; 355 List<Path> p = new ArrayList<>(1); 356 final String hfilePrefix = "hfile-"; 357 358 // 1. Generate 25 hfile ranges 359 Random rng = new SecureRandom(); 360 Set<Integer> numbers = new HashSet<>(); 361 while (numbers.size() < 50) { 362 numbers.add(rng.nextInt(1000)); 363 } 364 List<Integer> numberList = new ArrayList<>(numbers); 365 Collections.sort(numberList); 366 Map<String, Long> storeFilesSize = new HashMap<>(1); 367 368 // 2. Create 25 hfiles 369 Configuration conf = TEST_UTIL.getConfiguration(); 370 FileSystem fs = dir.getFileSystem(conf); 371 Iterator<Integer> numbersItr = numberList.iterator(); 372 for (int i = 0; i < 25; i++) { 373 Path hfilePath = new Path(familyDir, hfilePrefix + i); 374 HFileTestUtil.createHFile(conf, fs, hfilePath, FAM_NAME1, FAM_NAME1, 375 Bytes.toBytes(numbersItr.next()), Bytes.toBytes(numbersItr.next()), numRows); 376 p.add(hfilePath); 377 storeFilesSize.put(hfilePath.getName(), fs.getFileStatus(hfilePath).getLen()); 378 } 379 380 // 3. Create a BulkLoadDescriptor and a WALEdit 381 Map<byte[], List<Path>> storeFiles = new HashMap<>(1); 382 storeFiles.put(FAM_NAME1, p); 383 org.apache.hadoop.hbase.wal.WALEdit edit = null; 384 WALProtos.BulkLoadDescriptor loadDescriptor = null; 385 386 try (Connection c = ConnectionFactory.createConnection(conf); 387 RegionLocator l = c.getRegionLocator(TABLE_NAME1)) { 388 HRegionInfo regionInfo = l.getAllRegionLocations().get(0).getRegionInfo(); 389 loadDescriptor = 390 ProtobufUtil.toBulkLoadDescriptor(TABLE_NAME1, 391 UnsafeByteOperations.unsafeWrap(regionInfo.getEncodedNameAsBytes()), 392 storeFiles, storeFilesSize, 1); 393 edit = org.apache.hadoop.hbase.wal.WALEdit.createBulkLoadEvent(regionInfo, 394 loadDescriptor); 395 } 396 List<WALEntry> entries = new ArrayList<>(1); 397 398 // 4. Create a WALEntryBuilder 399 WALEntry.Builder builder = createWALEntryBuilder(TABLE_NAME1); 400 401 // 5. Copy the hfile to the path as it is in reality 402 for (int i = 0; i < 25; i++) { 403 String pathToHfileFromNS = 404 new StringBuilder(100).append(TABLE_NAME1.getNamespaceAsString()).append(Path.SEPARATOR) 405 .append(Bytes.toString(TABLE_NAME1.getName())).append(Path.SEPARATOR) 406 .append(Bytes.toString(loadDescriptor.getEncodedRegionName().toByteArray())) 407 .append(Path.SEPARATOR).append(Bytes.toString(FAM_NAME1)).append(Path.SEPARATOR) 408 .append(hfilePrefix + i).toString(); 409 String dst = baseNamespaceDir + Path.SEPARATOR + pathToHfileFromNS; 410 Path dstPath = new Path(dst); 411 FileUtil.copy(fs, p.get(0), fs, dstPath, false, conf); 412 } 413 414 entries.add(builder.build()); 415 try (ResultScanner scanner = table1.getScanner(new Scan())) { 416 // 6. Assert no existing data in table 417 assertEquals(0, scanner.next(numRows).length); 418 } 419 // 7. Replicate the bulk loaded entry 420 SINK.replicateEntries(entries, CellUtil.createCellScanner(edit.getCells().iterator()), 421 replicationClusterId, baseNamespaceDir, hfileArchiveDir); 422 try (ResultScanner scanner = table1.getScanner(new Scan())) { 423 // 8. Assert data is replicated 424 assertEquals(numRows, scanner.next(numRows).length); 425 } 426 // Clean up the created hfiles or it will mess up subsequent tests 427 } 428 429 private WALEntry createEntry(TableName table, int row, KeyValue.Type type, List<Cell> cells) { 430 byte[] fam = table.equals(TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2; 431 byte[] rowBytes = Bytes.toBytes(row); 432 // Just make sure we don't get the same ts for two consecutive rows with 433 // same key 434 try { 435 Thread.sleep(1); 436 } catch (InterruptedException e) { 437 LOG.info("Was interrupted while sleep, meh", e); 438 } 439 final long now = System.currentTimeMillis(); 440 KeyValue kv = null; 441 if(type.getCode() == KeyValue.Type.Put.getCode()) { 442 kv = new KeyValue(rowBytes, fam, fam, now, 443 KeyValue.Type.Put, Bytes.toBytes(row)); 444 } else if (type.getCode() == KeyValue.Type.DeleteColumn.getCode()) { 445 kv = new KeyValue(rowBytes, fam, fam, 446 now, KeyValue.Type.DeleteColumn); 447 } else if (type.getCode() == KeyValue.Type.DeleteFamily.getCode()) { 448 kv = new KeyValue(rowBytes, fam, null, 449 now, KeyValue.Type.DeleteFamily); 450 } 451 WALEntry.Builder builder = createWALEntryBuilder(table); 452 cells.add(kv); 453 454 return builder.build(); 455 } 456 457 private WALEntry.Builder createWALEntryBuilder(TableName table) { 458 WALEntry.Builder builder = WALEntry.newBuilder(); 459 builder.setAssociatedCellCount(1); 460 WALKey.Builder keyBuilder = WALKey.newBuilder(); 461 UUID.Builder uuidBuilder = UUID.newBuilder(); 462 uuidBuilder.setLeastSigBits(HConstants.DEFAULT_CLUSTER_ID.getLeastSignificantBits()); 463 uuidBuilder.setMostSigBits(HConstants.DEFAULT_CLUSTER_ID.getMostSignificantBits()); 464 keyBuilder.setClusterId(uuidBuilder.build()); 465 keyBuilder.setTableName(UnsafeByteOperations.unsafeWrap(table.getName())); 466 keyBuilder.setWriteTime(System.currentTimeMillis()); 467 keyBuilder.setEncodedRegionName(UnsafeByteOperations.unsafeWrap(HConstants.EMPTY_BYTE_ARRAY)); 468 keyBuilder.setLogSequenceNumber(-1); 469 builder.setKey(keyBuilder.build()); 470 return builder; 471 } 472}