001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024 025import java.util.ArrayList; 026import java.util.List; 027import java.util.NavigableMap; 028import java.util.TreeMap; 029import org.apache.hadoop.hbase.CellUtil; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.KeyValue; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.Admin; 035import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 036import org.apache.hadoop.hbase.client.Delete; 037import org.apache.hadoop.hbase.client.Get; 038import org.apache.hadoop.hbase.client.Put; 039import org.apache.hadoop.hbase.client.RegionInfo; 040import org.apache.hadoop.hbase.client.Result; 041import org.apache.hadoop.hbase.client.ResultScanner; 042import org.apache.hadoop.hbase.client.Scan; 043import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 044import org.apache.hadoop.hbase.client.replication.TableCFs; 045import org.apache.hadoop.hbase.regionserver.HRegion; 046import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 047import org.apache.hadoop.hbase.testclassification.LargeTests; 048import org.apache.hadoop.hbase.testclassification.ReplicationTests; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 051import org.apache.hadoop.hbase.wal.WAL; 052import org.apache.hadoop.hbase.wal.WALEdit; 053import org.apache.hadoop.hbase.wal.WALKeyImpl; 054import org.junit.Before; 055import org.junit.ClassRule; 056import org.junit.Test; 057import org.junit.experimental.categories.Category; 058import org.junit.runner.RunWith; 059import org.junit.runners.Parameterized; 060import org.junit.runners.Parameterized.Parameter; 061import org.junit.runners.Parameterized.Parameters; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 066 067@RunWith(Parameterized.class) 068@Category({ ReplicationTests.class, LargeTests.class }) 069public class TestReplicationSmallTests extends TestReplicationBase { 070 071 @ClassRule 072 public static final HBaseClassTestRule CLASS_RULE = 073 HBaseClassTestRule.forClass(TestReplicationSmallTests.class); 074 075 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSmallTests.class); 076 private static final String PEER_ID = "2"; 077 078 @Parameter 079 public boolean serialPeer; 080 081 @Override 082 protected boolean isSerialPeer() { 083 return serialPeer; 084 } 085 086 @Parameters(name = "{index}: serialPeer={0}") 087 public static List<Boolean> parameters() { 088 return ImmutableList.of(true, false); 089 } 090 091 @Before 092 public void setUp() throws Exception { 093 cleanUp(); 094 } 095 096 /** 097 * Verify that version and column delete marker types are replicated correctly. 098 */ 099 @Test 100 public void testDeleteTypes() throws Exception { 101 LOG.info("testDeleteTypes"); 102 final byte[] v1 = Bytes.toBytes("v1"); 103 final byte[] v2 = Bytes.toBytes("v2"); 104 final byte[] v3 = Bytes.toBytes("v3"); 105 htable1 = UTIL1.getConnection().getTable(tableName); 106 107 long t = EnvironmentEdgeManager.currentTime(); 108 // create three versions for "row" 109 Put put = new Put(row); 110 put.addColumn(famName, row, t, v1); 111 htable1.put(put); 112 113 put = new Put(row); 114 put.addColumn(famName, row, t + 1, v2); 115 htable1.put(put); 116 117 put = new Put(row); 118 put.addColumn(famName, row, t + 2, v3); 119 htable1.put(put); 120 121 Get get = new Get(row); 122 get.readAllVersions(); 123 for (int i = 0; i < NB_RETRIES; i++) { 124 if (i == NB_RETRIES - 1) { 125 fail("Waited too much time for put replication"); 126 } 127 Result res = htable2.get(get); 128 if (res.size() < 3) { 129 LOG.info("Rows not available"); 130 Thread.sleep(SLEEP_TIME); 131 } else { 132 assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3); 133 assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2); 134 assertArrayEquals(CellUtil.cloneValue(res.rawCells()[2]), v1); 135 break; 136 } 137 } 138 // place a version delete marker (delete last version) 139 Delete d = new Delete(row); 140 d.addColumn(famName, row, t); 141 htable1.delete(d); 142 143 get = new Get(row); 144 get.readAllVersions(); 145 for (int i = 0; i < NB_RETRIES; i++) { 146 if (i == NB_RETRIES - 1) { 147 fail("Waited too much time for put replication"); 148 } 149 Result res = htable2.get(get); 150 if (res.size() > 2) { 151 LOG.info("Version not deleted"); 152 Thread.sleep(SLEEP_TIME); 153 } else { 154 assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3); 155 assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2); 156 break; 157 } 158 } 159 160 // place a column delete marker 161 d = new Delete(row); 162 d.addColumns(famName, row, t + 2); 163 htable1.delete(d); 164 165 // now *both* of the remaining version should be deleted 166 // at the replica 167 get = new Get(row); 168 for (int i = 0; i < NB_RETRIES; i++) { 169 if (i == NB_RETRIES - 1) { 170 fail("Waited too much time for del replication"); 171 } 172 Result res = htable2.get(get); 173 if (res.size() >= 1) { 174 LOG.info("Rows not deleted"); 175 Thread.sleep(SLEEP_TIME); 176 } else { 177 break; 178 } 179 } 180 } 181 182 /** 183 * Add a row, check it's replicated, delete it, check's gone 184 */ 185 @Test 186 public void testSimplePutDelete() throws Exception { 187 LOG.info("testSimplePutDelete"); 188 runSimplePutDeleteTest(); 189 } 190 191 /** 192 * Try a small batch upload using the write buffer, check it's replicated 193 */ 194 @Test 195 public void testSmallBatch() throws Exception { 196 LOG.info("testSmallBatch"); 197 runSmallBatchTest(); 198 } 199 200 /** 201 * Test disable/enable replication, trying to insert, make sure nothing's replicated, enable it, 202 * the insert should be replicated 203 */ 204 @Test 205 public void testDisableEnable() throws Exception { 206 // Test disabling replication 207 hbaseAdmin.disableReplicationPeer(PEER_ID); 208 209 byte[] rowkey = Bytes.toBytes("disable enable"); 210 Put put = new Put(rowkey); 211 put.addColumn(famName, row, row); 212 htable1.put(put); 213 214 Get get = new Get(rowkey); 215 for (int i = 0; i < NB_RETRIES; i++) { 216 Result res = htable2.get(get); 217 if (res.size() >= 1) { 218 fail("Replication wasn't disabled"); 219 } else { 220 LOG.info("Row not replicated, let's wait a bit more..."); 221 Thread.sleep(SLEEP_TIME); 222 } 223 } 224 225 // Test enable replication 226 hbaseAdmin.enableReplicationPeer(PEER_ID); 227 228 for (int i = 0; i < NB_RETRIES; i++) { 229 Result res = htable2.get(get); 230 if (res.isEmpty()) { 231 LOG.info("Row not available"); 232 Thread.sleep(SLEEP_TIME); 233 } else { 234 assertArrayEquals(row, res.value()); 235 return; 236 } 237 } 238 fail("Waited too much time for put replication"); 239 } 240 241 /** 242 * Integration test for TestReplicationAdmin, removes and re-add a peer cluster 243 */ 244 @Test 245 public void testAddAndRemoveClusters() throws Exception { 246 LOG.info("testAddAndRemoveClusters"); 247 hbaseAdmin.removeReplicationPeer(PEER_ID); 248 Thread.sleep(SLEEP_TIME); 249 byte[] rowKey = Bytes.toBytes("Won't be replicated"); 250 Put put = new Put(rowKey); 251 put.addColumn(famName, row, row); 252 htable1.put(put); 253 254 Get get = new Get(rowKey); 255 for (int i = 0; i < NB_RETRIES; i++) { 256 if (i == NB_RETRIES - 1) { 257 break; 258 } 259 Result res = htable2.get(get); 260 if (res.size() >= 1) { 261 fail("Not supposed to be replicated"); 262 } else { 263 LOG.info("Row not replicated, let's wait a bit more..."); 264 Thread.sleep(SLEEP_TIME); 265 } 266 } 267 ReplicationPeerConfig rpc = 268 ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey()).build(); 269 hbaseAdmin.addReplicationPeer(PEER_ID, rpc); 270 Thread.sleep(SLEEP_TIME); 271 rowKey = Bytes.toBytes("do rep"); 272 put = new Put(rowKey); 273 put.addColumn(famName, row, row); 274 LOG.info("Adding new row"); 275 htable1.put(put); 276 277 get = new Get(rowKey); 278 for (int i = 0; i < NB_RETRIES; i++) { 279 if (i == NB_RETRIES - 1) { 280 fail("Waited too much time for put replication"); 281 } 282 Result res = htable2.get(get); 283 if (res.isEmpty()) { 284 LOG.info("Row not available"); 285 Thread.sleep(SLEEP_TIME * i); 286 } else { 287 assertArrayEquals(row, res.value()); 288 break; 289 } 290 } 291 } 292 293 /** 294 * Do a more intense version testSmallBatch, one that will trigger wal rolling and other 295 * non-trivial code paths 296 */ 297 @Test 298 public void testLoading() throws Exception { 299 LOG.info("Writing out rows to table1 in testLoading"); 300 List<Put> puts = new ArrayList<>(NB_ROWS_IN_BIG_BATCH); 301 for (int i = 0; i < NB_ROWS_IN_BIG_BATCH; i++) { 302 Put put = new Put(Bytes.toBytes(i)); 303 put.addColumn(famName, row, row); 304 puts.add(put); 305 } 306 // The puts will be iterated through and flushed only when the buffer 307 // size is reached. 308 htable1.put(puts); 309 310 Scan scan = new Scan(); 311 312 ResultScanner scanner = htable1.getScanner(scan); 313 Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH); 314 scanner.close(); 315 316 assertEquals(NB_ROWS_IN_BIG_BATCH, res.length); 317 318 LOG.info("Looking in table2 for replicated rows in testLoading"); 319 long start = System.currentTimeMillis(); 320 // Retry more than NB_RETRIES. As it was, retries were done in 5 seconds and we'd fail 321 // sometimes. 322 final long retries = NB_RETRIES * 10; 323 for (int i = 0; i < retries; i++) { 324 scan = new Scan(); 325 scanner = htable2.getScanner(scan); 326 res = scanner.next(NB_ROWS_IN_BIG_BATCH); 327 scanner.close(); 328 if (res.length != NB_ROWS_IN_BIG_BATCH) { 329 if (i == retries - 1) { 330 int lastRow = -1; 331 for (Result result : res) { 332 int currentRow = Bytes.toInt(result.getRow()); 333 for (int row = lastRow + 1; row < currentRow; row++) { 334 LOG.error("Row missing: " + row); 335 } 336 lastRow = currentRow; 337 } 338 LOG.error("Last row: " + lastRow); 339 fail("Waited too much time for normal batch replication, " + res.length + " instead of " 340 + NB_ROWS_IN_BIG_BATCH + "; waited=" + (System.currentTimeMillis() - start) + "ms"); 341 } else { 342 LOG.info("Only got " + res.length + " rows... retrying"); 343 Thread.sleep(SLEEP_TIME); 344 } 345 } else { 346 break; 347 } 348 } 349 } 350 351 /** 352 * Test for HBASE-8663 353 * <p> 354 * Create two new Tables with colfamilies enabled for replication then run 355 * ReplicationAdmin.listReplicated(). Finally verify the table:colfamilies. Note: 356 * TestReplicationAdmin is a better place for this testing but it would need mocks. 357 */ 358 @Test 359 public void testVerifyListReplicatedTable() throws Exception { 360 LOG.info("testVerifyListReplicatedTable"); 361 362 final String tName = "VerifyListReplicated_"; 363 final String colFam = "cf1"; 364 final int numOfTables = 3; 365 366 Admin hadmin = UTIL1.getAdmin(); 367 368 // Create Tables 369 for (int i = 0; i < numOfTables; i++) { 370 hadmin.createTable(TableDescriptorBuilder.newBuilder(TableName.valueOf(tName + i)) 371 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(colFam)) 372 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 373 .build()); 374 } 375 376 // verify the result 377 List<TableCFs> replicationColFams = hbaseAdmin.listReplicatedTableCFs(); 378 int[] match = new int[numOfTables]; // array of 3 with init value of zero 379 380 for (int i = 0; i < replicationColFams.size(); i++) { 381 TableCFs replicationEntry = replicationColFams.get(i); 382 String tn = replicationEntry.getTable().getNameAsString(); 383 if (tn.startsWith(tName) && replicationEntry.getColumnFamilyMap().containsKey(colFam)) { 384 int m = Integer.parseInt(tn.substring(tn.length() - 1)); // get the last digit 385 match[m]++; // should only increase once 386 } 387 } 388 389 // check the matching result 390 for (int i = 0; i < match.length; i++) { 391 assertTrue("listReplicated() does not match table " + i, (match[i] == 1)); 392 } 393 394 // drop tables 395 for (int i = 0; i < numOfTables; i++) { 396 TableName tableName = TableName.valueOf(tName + i); 397 hadmin.disableTable(tableName); 398 hadmin.deleteTable(tableName); 399 } 400 401 hadmin.close(); 402 } 403 404 /** 405 * Test for HBase-15259 WALEdits under replay will also be replicated 406 */ 407 @Test 408 public void testReplicationInReplay() throws Exception { 409 final TableName tableName = htable1.getName(); 410 411 HRegion region = UTIL1.getMiniHBaseCluster().getRegions(tableName).get(0); 412 RegionInfo hri = region.getRegionInfo(); 413 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 414 for (byte[] fam : htable1.getDescriptor().getColumnFamilyNames()) { 415 scopes.put(fam, 1); 416 } 417 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 418 int index = UTIL1.getMiniHBaseCluster().getServerWith(hri.getRegionName()); 419 WAL wal = UTIL1.getMiniHBaseCluster().getRegionServer(index).getWAL(region.getRegionInfo()); 420 final byte[] rowName = Bytes.toBytes("testReplicationInReplay"); 421 final byte[] qualifier = Bytes.toBytes("q"); 422 final byte[] value = Bytes.toBytes("v"); 423 WALEdit edit = new WALEdit(true); 424 long now = EnvironmentEdgeManager.currentTime(); 425 edit.add(new KeyValue(rowName, famName, qualifier, now, value)); 426 WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes); 427 wal.appendData(hri, walKey, edit); 428 wal.sync(); 429 430 Get get = new Get(rowName); 431 for (int i = 0; i < NB_RETRIES; i++) { 432 if (i == NB_RETRIES - 1) { 433 break; 434 } 435 Result res = htable2.get(get); 436 if (res.size() >= 1) { 437 fail("Not supposed to be replicated for " + Bytes.toString(res.getRow())); 438 } else { 439 LOG.info("Row not replicated, let's wait a bit more..."); 440 Thread.sleep(SLEEP_TIME); 441 } 442 } 443 } 444}