001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.jupiter.api.Assertions.assertArrayEquals; 021import static org.junit.jupiter.api.Assertions.assertEquals; 022import static org.junit.jupiter.api.Assertions.assertFalse; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024import static org.junit.jupiter.api.Assertions.fail; 025 026import java.util.ArrayList; 027import java.util.List; 028import java.util.NavigableMap; 029import java.util.TreeMap; 030import java.util.stream.Stream; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.KeyValue; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.Admin; 037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 038import org.apache.hadoop.hbase.client.Delete; 039import org.apache.hadoop.hbase.client.Get; 040import org.apache.hadoop.hbase.client.Put; 041import org.apache.hadoop.hbase.client.RegionInfo; 042import org.apache.hadoop.hbase.client.Result; 043import org.apache.hadoop.hbase.client.ResultScanner; 044import org.apache.hadoop.hbase.client.Scan; 045import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 046import org.apache.hadoop.hbase.client.replication.TableCFs; 047import org.apache.hadoop.hbase.regionserver.HRegion; 048import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 049import org.apache.hadoop.hbase.testclassification.LargeTests; 050import org.apache.hadoop.hbase.testclassification.ReplicationTests; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 053import org.apache.hadoop.hbase.wal.WAL; 054import org.apache.hadoop.hbase.wal.WALEdit; 055import org.apache.hadoop.hbase.wal.WALEditInternalHelper; 056import org.apache.hadoop.hbase.wal.WALKeyImpl; 057import org.junit.jupiter.api.BeforeEach; 058import org.junit.jupiter.api.Tag; 059import org.junit.jupiter.api.TestTemplate; 060import org.junit.jupiter.params.provider.Arguments; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064@Tag(ReplicationTests.TAG) 065@Tag(LargeTests.TAG) 066@HBaseParameterizedTestTemplate(name = "{index}: serialPeer={0}") 067public class TestReplicationSmallTests extends TestReplicationBase { 068 069 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSmallTests.class); 070 private static final String PEER_ID = "2"; 071 072 private boolean serialPeer; 073 074 public TestReplicationSmallTests(boolean serialPeer) { 075 this.serialPeer = serialPeer; 076 } 077 078 @Override 079 protected boolean isSerialPeer() { 080 return serialPeer; 081 } 082 083 public static Stream<Arguments> parameters() { 084 return Stream.of(Arguments.of(true), Arguments.of(false)); 085 } 086 087 @BeforeEach 088 public void setUp() throws Exception { 089 cleanUp(); 090 } 091 092 /** 093 * Verify that version and column delete marker types are replicated correctly. 094 */ 095 @TestTemplate 096 public void testDeleteTypes() throws Exception { 097 LOG.info("testDeleteTypes"); 098 final byte[] v1 = Bytes.toBytes("v1"); 099 final byte[] v2 = Bytes.toBytes("v2"); 100 final byte[] v3 = Bytes.toBytes("v3"); 101 htable1 = UTIL1.getConnection().getTable(tableName); 102 103 long t = EnvironmentEdgeManager.currentTime(); 104 // create three versions for "row" 105 Put put = new Put(row); 106 put.addColumn(famName, row, t, v1); 107 htable1.put(put); 108 109 put = new Put(row); 110 put.addColumn(famName, row, t + 1, v2); 111 htable1.put(put); 112 113 put = new Put(row); 114 put.addColumn(famName, row, t + 2, v3); 115 htable1.put(put); 116 117 Get get = new Get(row); 118 get.readAllVersions(); 119 for (int i = 0; i < NB_RETRIES; i++) { 120 if (i == NB_RETRIES - 1) { 121 fail("Waited too much time for put replication"); 122 } 123 Result res = htable2.get(get); 124 if (res.size() < 3) { 125 LOG.info("Rows not available"); 126 Thread.sleep(SLEEP_TIME); 127 } else { 128 assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3); 129 assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2); 130 assertArrayEquals(CellUtil.cloneValue(res.rawCells()[2]), v1); 131 break; 132 } 133 } 134 // place a version delete marker (delete last version) 135 Delete d = new Delete(row); 136 d.addColumn(famName, row, t); 137 htable1.delete(d); 138 139 get = new Get(row); 140 get.readAllVersions(); 141 for (int i = 0; i < NB_RETRIES; i++) { 142 if (i == NB_RETRIES - 1) { 143 fail("Waited too much time for put replication"); 144 } 145 Result res = htable2.get(get); 146 if (res.size() > 2) { 147 LOG.info("Version not deleted"); 148 Thread.sleep(SLEEP_TIME); 149 } else { 150 assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3); 151 assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2); 152 break; 153 } 154 } 155 156 // place a column delete marker 157 d = new Delete(row); 158 d.addColumns(famName, row, t + 2); 159 htable1.delete(d); 160 161 // now *both* of the remaining version should be deleted 162 // at the replica 163 get = new Get(row); 164 for (int i = 0; i < NB_RETRIES; i++) { 165 if (i == NB_RETRIES - 1) { 166 fail("Waited too much time for del replication"); 167 } 168 Result res = htable2.get(get); 169 if (res.size() >= 1) { 170 LOG.info("Rows not deleted"); 171 Thread.sleep(SLEEP_TIME); 172 } else { 173 break; 174 } 175 } 176 } 177 178 /** 179 * Add a row, check it's replicated, delete it, check's gone 180 */ 181 @TestTemplate 182 public void testSimplePutDelete() throws Exception { 183 LOG.info("testSimplePutDelete"); 184 runSimplePutDeleteTest(); 185 } 186 187 /** 188 * Try a small batch upload using the write buffer, check it's replicated 189 */ 190 @TestTemplate 191 public void testSmallBatch() throws Exception { 192 LOG.info("testSmallBatch"); 193 runSmallBatchTest(); 194 } 195 196 /** 197 * Test disable/enable replication, trying to insert, make sure nothing's replicated, enable it, 198 * the insert should be replicated 199 */ 200 @TestTemplate 201 public void testDisableEnable() throws Exception { 202 // Test disabling replication 203 hbaseAdmin.disableReplicationPeer(PEER_ID); 204 205 byte[] rowkey = Bytes.toBytes("disable enable"); 206 Put put = new Put(rowkey); 207 put.addColumn(famName, row, row); 208 htable1.put(put); 209 210 Get get = new Get(rowkey); 211 for (int i = 0; i < NB_RETRIES; i++) { 212 Result res = htable2.get(get); 213 if (res.size() >= 1) { 214 fail("Replication wasn't disabled"); 215 } else { 216 LOG.info("Row not replicated, let's wait a bit more..."); 217 Thread.sleep(SLEEP_TIME); 218 } 219 } 220 221 // Test enable replication 222 hbaseAdmin.enableReplicationPeer(PEER_ID); 223 224 for (int i = 0; i < NB_RETRIES; i++) { 225 Result res = htable2.get(get); 226 if (res.isEmpty()) { 227 LOG.info("Row not available"); 228 Thread.sleep(SLEEP_TIME); 229 } else { 230 assertArrayEquals(row, res.value()); 231 return; 232 } 233 } 234 fail("Waited too much time for put replication"); 235 } 236 237 /** 238 * Removes and re-add a peer cluster 239 */ 240 @TestTemplate 241 public void testAddAndRemoveClusters() throws Exception { 242 LOG.info("testAddAndRemoveClusters"); 243 hbaseAdmin.removeReplicationPeer(PEER_ID); 244 Thread.sleep(SLEEP_TIME); 245 byte[] rowKey = Bytes.toBytes("Won't be replicated"); 246 Put put = new Put(rowKey); 247 put.addColumn(famName, row, row); 248 htable1.put(put); 249 250 Get get = new Get(rowKey); 251 for (int i = 0; i < NB_RETRIES; i++) { 252 if (i == NB_RETRIES - 1) { 253 break; 254 } 255 Result res = htable2.get(get); 256 if (res.size() >= 1) { 257 fail("Not supposed to be replicated"); 258 } else { 259 LOG.info("Row not replicated, let's wait a bit more..."); 260 Thread.sleep(SLEEP_TIME); 261 } 262 } 263 ReplicationPeerConfig rpc = 264 ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getRpcConnnectionURI()).build(); 265 hbaseAdmin.addReplicationPeer(PEER_ID, rpc); 266 Thread.sleep(SLEEP_TIME); 267 rowKey = Bytes.toBytes("do rep"); 268 put = new Put(rowKey); 269 put.addColumn(famName, row, row); 270 LOG.info("Adding new row"); 271 htable1.put(put); 272 273 get = new Get(rowKey); 274 for (int i = 0; i < NB_RETRIES; i++) { 275 if (i == NB_RETRIES - 1) { 276 fail("Waited too much time for put replication"); 277 } 278 Result res = htable2.get(get); 279 if (res.isEmpty()) { 280 LOG.info("Row not available"); 281 Thread.sleep(SLEEP_TIME * i); 282 } else { 283 assertArrayEquals(row, res.value()); 284 break; 285 } 286 } 287 } 288 289 /** 290 * Do a more intense version testSmallBatch, one that will trigger wal rolling and other 291 * non-trivial code paths 292 */ 293 @TestTemplate 294 public void testLoading() throws Exception { 295 LOG.info("Writing out rows to table1 in testLoading"); 296 List<Put> puts = new ArrayList<>(NB_ROWS_IN_BIG_BATCH); 297 for (int i = 0; i < NB_ROWS_IN_BIG_BATCH; i++) { 298 Put put = new Put(Bytes.toBytes(i)); 299 put.addColumn(famName, row, row); 300 puts.add(put); 301 } 302 // The puts will be iterated through and flushed only when the buffer 303 // size is reached. 304 htable1.put(puts); 305 306 Scan scan = new Scan(); 307 308 ResultScanner scanner = htable1.getScanner(scan); 309 Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH); 310 scanner.close(); 311 312 assertEquals(NB_ROWS_IN_BIG_BATCH, res.length); 313 314 LOG.info("Looking in table2 for replicated rows in testLoading"); 315 long start = EnvironmentEdgeManager.currentTime(); 316 // Retry more than NB_RETRIES. As it was, retries were done in 5 seconds and we'd fail 317 // sometimes. 318 final long retries = NB_RETRIES * 10; 319 for (int i = 0; i < retries; i++) { 320 scan = new Scan(); 321 scanner = htable2.getScanner(scan); 322 res = scanner.next(NB_ROWS_IN_BIG_BATCH); 323 scanner.close(); 324 if (res.length != NB_ROWS_IN_BIG_BATCH) { 325 if (i == retries - 1) { 326 int lastRow = -1; 327 for (Result result : res) { 328 int currentRow = Bytes.toInt(result.getRow()); 329 for (int row = lastRow + 1; row < currentRow; row++) { 330 LOG.error("Row missing: " + row); 331 } 332 lastRow = currentRow; 333 } 334 LOG.error("Last row: " + lastRow); 335 fail("Waited too much time for normal batch replication, " + res.length + " instead of " 336 + NB_ROWS_IN_BIG_BATCH + "; waited=" + (EnvironmentEdgeManager.currentTime() - start) 337 + "ms"); 338 } else { 339 LOG.info("Only got " + res.length + " rows... retrying"); 340 Thread.sleep(SLEEP_TIME); 341 } 342 } else { 343 break; 344 } 345 } 346 } 347 348 /** 349 * Test for HBASE-8663 350 * <p> 351 * Create two new Tables with colfamilies enabled for replication then run 352 * {@link Admin#listReplicatedTableCFs()}. Finally verify the table:colfamilies. 353 */ 354 @TestTemplate 355 public void testVerifyListReplicatedTable() throws Exception { 356 LOG.info("testVerifyListReplicatedTable"); 357 358 final String tName = "VerifyListReplicated_"; 359 final String colFam = "cf1"; 360 final int numOfTables = 3; 361 362 Admin hadmin = UTIL1.getAdmin(); 363 364 // Create Tables 365 for (int i = 0; i < numOfTables; i++) { 366 hadmin.createTable(TableDescriptorBuilder 367 .newBuilder(TableName.valueOf(tName + i)).setColumnFamily(ColumnFamilyDescriptorBuilder 368 .newBuilder(Bytes.toBytes(colFam)).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 369 .build()); 370 } 371 372 // verify the result 373 List<TableCFs> replicationColFams = hbaseAdmin.listReplicatedTableCFs(); 374 int[] match = new int[numOfTables]; // array of 3 with init value of zero 375 376 for (int i = 0; i < replicationColFams.size(); i++) { 377 TableCFs replicationEntry = replicationColFams.get(i); 378 String tn = replicationEntry.getTable().getNameAsString(); 379 if (tn.startsWith(tName) && replicationEntry.getColumnFamilyMap().containsKey(colFam)) { 380 int m = Integer.parseInt(tn.substring(tn.length() - 1)); // get the last digit 381 match[m]++; // should only increase once 382 } 383 } 384 385 // check the matching result 386 for (int i = 0; i < match.length; i++) { 387 assertEquals(1, match[i], "listReplicated() does not match table " + i); 388 } 389 390 // drop tables 391 for (int i = 0; i < numOfTables; i++) { 392 TableName tableName = TableName.valueOf(tName + i); 393 hadmin.disableTable(tableName); 394 hadmin.deleteTable(tableName); 395 } 396 397 hadmin.close(); 398 } 399 400 /** 401 * Test for HBase-15259 WALEdits under replay will also be replicated 402 */ 403 @TestTemplate 404 public void testReplicationInReplay() throws Exception { 405 final TableName tableName = htable1.getName(); 406 407 HRegion region = UTIL1.getMiniHBaseCluster().getRegions(tableName).get(0); 408 RegionInfo hri = region.getRegionInfo(); 409 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 410 for (byte[] fam : htable1.getDescriptor().getColumnFamilyNames()) { 411 scopes.put(fam, 1); 412 } 413 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 414 int index = UTIL1.getMiniHBaseCluster().getServerWith(hri.getRegionName()); 415 WAL wal = UTIL1.getMiniHBaseCluster().getRegionServer(index).getWAL(region.getRegionInfo()); 416 final byte[] rowName = Bytes.toBytes("testReplicationInReplay"); 417 final byte[] qualifier = Bytes.toBytes("q"); 418 final byte[] value = Bytes.toBytes("v"); 419 WALEdit edit = new WALEdit(true); 420 long now = EnvironmentEdgeManager.currentTime(); 421 WALEditInternalHelper.addExtendedCell(edit, 422 new KeyValue(rowName, famName, qualifier, now, value)); 423 WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes); 424 wal.appendData(hri, walKey, edit); 425 wal.sync(); 426 427 Get get = new Get(rowName); 428 for (int i = 0; i < NB_RETRIES; i++) { 429 if (i == NB_RETRIES - 1) { 430 break; 431 } 432 Result res = htable2.get(get); 433 if (res.size() >= 1) { 434 fail("Not supposed to be replicated for " + Bytes.toString(res.getRow())); 435 } else { 436 LOG.info("Row not replicated, let's wait a bit more..."); 437 Thread.sleep(SLEEP_TIME); 438 } 439 } 440 } 441 442 /** 443 * Test for HBASE-27448 Add an admin method to get replication enabled state 444 */ 445 @TestTemplate 446 public void testGetReplicationPeerState() throws Exception { 447 448 // Test disable replication peer 449 hbaseAdmin.disableReplicationPeer("2"); 450 assertFalse(hbaseAdmin.isReplicationPeerEnabled("2")); 451 452 // Test enable replication peer 453 hbaseAdmin.enableReplicationPeer("2"); 454 assertTrue(hbaseAdmin.isReplicationPeerEnabled("2")); 455 } 456}