001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotNull; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.fail; 024import static org.mockito.Mockito.mock; 025import static org.mockito.Mockito.when; 026 027import java.io.IOException; 028import java.util.List; 029import java.util.concurrent.Executors; 030import java.util.concurrent.atomic.AtomicLong; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.Cell.Type; 035import org.apache.hadoop.hbase.CellBuilderFactory; 036import org.apache.hadoop.hbase.CellBuilderType; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseTestingUtility; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.HRegionLocation; 041import org.apache.hadoop.hbase.HTableDescriptor; 042import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.Waiter; 045import org.apache.hadoop.hbase.client.ClusterConnection; 046import org.apache.hadoop.hbase.client.Connection; 047import org.apache.hadoop.hbase.client.ConnectionFactory; 048import org.apache.hadoop.hbase.client.RegionLocator; 049import org.apache.hadoop.hbase.client.Table; 050import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; 051import org.apache.hadoop.hbase.regionserver.HRegion; 052import org.apache.hadoop.hbase.regionserver.HRegionServer; 053import org.apache.hadoop.hbase.regionserver.Region; 054import org.apache.hadoop.hbase.replication.ReplicationException; 055import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 056import org.apache.hadoop.hbase.testclassification.FlakeyTests; 057import org.apache.hadoop.hbase.testclassification.LargeTests; 058import org.apache.hadoop.hbase.util.Bytes; 059import org.apache.hadoop.hbase.util.FSTableDescriptors; 060import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 061import org.apache.hadoop.hbase.wal.WAL.Entry; 062import org.apache.hadoop.hbase.wal.WALEdit; 063import org.apache.hadoop.hbase.wal.WALKeyImpl; 064import org.apache.hadoop.hbase.zookeeper.ZKConfig; 065import org.junit.AfterClass; 066import org.junit.BeforeClass; 067import org.junit.ClassRule; 068import org.junit.Rule; 069import org.junit.Test; 070import org.junit.experimental.categories.Category; 071import org.junit.rules.TestName; 072import org.slf4j.Logger; 073import org.slf4j.LoggerFactory; 074 075import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 076 077/** 078 * Tests RegionReplicaReplicationEndpoint class by setting up region replicas and verifying 079 * async wal replication replays the edits to the secondary region in various scenarios. 080 */ 081@Category({FlakeyTests.class, LargeTests.class}) 082public class TestRegionReplicaReplicationEndpoint { 083 084 @ClassRule 085 public static final HBaseClassTestRule CLASS_RULE = 086 HBaseClassTestRule.forClass(TestRegionReplicaReplicationEndpoint.class); 087 088 private static final Logger LOG = 089 LoggerFactory.getLogger(TestRegionReplicaReplicationEndpoint.class); 090 091 private static final int NB_SERVERS = 2; 092 093 private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); 094 095 @Rule 096 public TestName name = new TestName(); 097 098 @BeforeClass 099 public static void beforeClass() throws Exception { 100 Configuration conf = HTU.getConfiguration(); 101 conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f); 102 conf.setInt("replication.source.size.capacity", 10240); 103 conf.setLong("replication.source.sleepforretries", 100); 104 conf.setInt("hbase.regionserver.maxlogs", 10); 105 conf.setLong("hbase.master.logcleaner.ttl", 10); 106 conf.setInt("zookeeper.recovery.retry", 1); 107 conf.setInt("zookeeper.recovery.retry.intervalmill", 10); 108 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); 109 conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); 110 conf.setInt("replication.stats.thread.period.seconds", 5); 111 conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); 112 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); // less number of retries is needed 113 conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); 114 115 HTU.startMiniCluster(NB_SERVERS); 116 } 117 118 @AfterClass 119 public static void afterClass() throws Exception { 120 HTU.shutdownMiniCluster(); 121 } 122 123 @Test 124 public void testRegionReplicaReplicationPeerIsCreated() throws IOException, ReplicationException { 125 // create a table with region replicas. Check whether the replication peer is created 126 // and replication started. 127 ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration()); 128 String peerId = "region_replica_replication"; 129 130 ReplicationPeerConfig peerConfig = null; 131 try { 132 peerConfig = admin.getPeerConfig(peerId); 133 } catch (ReplicationPeerNotFoundException e) { 134 LOG.warn("Region replica replication peer id=" + peerId + " not exist", e); 135 } 136 137 if (peerConfig != null) { 138 admin.removePeer(peerId); 139 peerConfig = null; 140 } 141 142 HTableDescriptor htd = HTU.createTableDescriptor( 143 "testReplicationPeerIsCreated_no_region_replicas"); 144 HTU.getAdmin().createTable(htd); 145 try { 146 peerConfig = admin.getPeerConfig(peerId); 147 fail("Should throw ReplicationException, because replication peer id=" + peerId 148 + " not exist"); 149 } catch (ReplicationPeerNotFoundException e) { 150 } 151 assertNull(peerConfig); 152 153 htd = HTU.createTableDescriptor("testReplicationPeerIsCreated"); 154 htd.setRegionReplication(2); 155 HTU.getAdmin().createTable(htd); 156 157 // assert peer configuration is correct 158 peerConfig = admin.getPeerConfig(peerId); 159 assertNotNull(peerConfig); 160 assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey( 161 HTU.getConfiguration())); 162 assertEquals(RegionReplicaReplicationEndpoint.class.getName(), 163 peerConfig.getReplicationEndpointImpl()); 164 admin.close(); 165 } 166 167 @Test 168 public void testRegionReplicaReplicationPeerIsCreatedForModifyTable() throws Exception { 169 // modify a table by adding region replicas. Check whether the replication peer is created 170 // and replication started. 171 ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration()); 172 String peerId = "region_replica_replication"; 173 174 ReplicationPeerConfig peerConfig = null; 175 try { 176 peerConfig = admin.getPeerConfig(peerId); 177 } catch (ReplicationPeerNotFoundException e) { 178 LOG.warn("Region replica replication peer id=" + peerId + " not exist", e); 179 } 180 181 if (peerConfig != null) { 182 admin.removePeer(peerId); 183 peerConfig = null; 184 } 185 186 HTableDescriptor htd 187 = HTU.createTableDescriptor("testRegionReplicaReplicationPeerIsCreatedForModifyTable"); 188 HTU.getAdmin().createTable(htd); 189 190 // assert that replication peer is not created yet 191 try { 192 peerConfig = admin.getPeerConfig(peerId); 193 fail("Should throw ReplicationException, because replication peer id=" + peerId 194 + " not exist"); 195 } catch (ReplicationPeerNotFoundException e) { 196 } 197 assertNull(peerConfig); 198 199 HTU.getAdmin().disableTable(htd.getTableName()); 200 htd.setRegionReplication(2); 201 HTU.getAdmin().modifyTable(htd.getTableName(), htd); 202 HTU.getAdmin().enableTable(htd.getTableName()); 203 204 // assert peer configuration is correct 205 peerConfig = admin.getPeerConfig(peerId); 206 assertNotNull(peerConfig); 207 assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey( 208 HTU.getConfiguration())); 209 assertEquals(RegionReplicaReplicationEndpoint.class.getName(), 210 peerConfig.getReplicationEndpointImpl()); 211 admin.close(); 212 } 213 214 public void testRegionReplicaReplication(int regionReplication) throws Exception { 215 // test region replica replication. Create a table with single region, write some data 216 // ensure that data is replicated to the secondary region 217 TableName tableName = TableName.valueOf("testRegionReplicaReplicationWithReplicas_" 218 + regionReplication); 219 HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString()); 220 htd.setRegionReplication(regionReplication); 221 HTU.getAdmin().createTable(htd); 222 TableName tableNameNoReplicas = 223 TableName.valueOf("testRegionReplicaReplicationWithReplicas_NO_REPLICAS"); 224 HTU.deleteTableIfAny(tableNameNoReplicas); 225 HTU.createTable(tableNameNoReplicas, HBaseTestingUtility.fam1); 226 227 Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 228 Table table = connection.getTable(tableName); 229 Table tableNoReplicas = connection.getTable(tableNameNoReplicas); 230 231 try { 232 // load some data to the non-replicated table 233 HTU.loadNumericRows(tableNoReplicas, HBaseTestingUtility.fam1, 6000, 7000); 234 235 // load the data to the table 236 HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000); 237 238 verifyReplication(tableName, regionReplication, 0, 1000); 239 240 } finally { 241 table.close(); 242 tableNoReplicas.close(); 243 HTU.deleteTableIfAny(tableNameNoReplicas); 244 connection.close(); 245 } 246 } 247 248 private void verifyReplication(TableName tableName, int regionReplication, 249 final int startRow, final int endRow) throws Exception { 250 verifyReplication(tableName, regionReplication, startRow, endRow, true); 251 } 252 253 private void verifyReplication(TableName tableName, int regionReplication, 254 final int startRow, final int endRow, final boolean present) throws Exception { 255 // find the regions 256 final Region[] regions = new Region[regionReplication]; 257 258 for (int i=0; i < NB_SERVERS; i++) { 259 HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i); 260 List<HRegion> onlineRegions = rs.getRegions(tableName); 261 for (HRegion region : onlineRegions) { 262 regions[region.getRegionInfo().getReplicaId()] = region; 263 } 264 } 265 266 for (Region region : regions) { 267 assertNotNull(region); 268 } 269 270 for (int i = 1; i < regionReplication; i++) { 271 final Region region = regions[i]; 272 // wait until all the data is replicated to all secondary regions 273 Waiter.waitFor(HTU.getConfiguration(), 90000, 1000, new Waiter.Predicate<Exception>() { 274 @Override 275 public boolean evaluate() throws Exception { 276 LOG.info("verifying replication for region replica:" + region.getRegionInfo()); 277 try { 278 HTU.verifyNumericRows(region, HBaseTestingUtility.fam1, startRow, endRow, present); 279 } catch(Throwable ex) { 280 LOG.warn("Verification from secondary region is not complete yet", ex); 281 // still wait 282 return false; 283 } 284 return true; 285 } 286 }); 287 } 288 } 289 290 @Test 291 public void testRegionReplicaReplicationWith2Replicas() throws Exception { 292 testRegionReplicaReplication(2); 293 } 294 295 @Test 296 public void testRegionReplicaReplicationWith3Replicas() throws Exception { 297 testRegionReplicaReplication(3); 298 } 299 300 @Test 301 public void testRegionReplicaReplicationWith10Replicas() throws Exception { 302 testRegionReplicaReplication(10); 303 } 304 305 @Test 306 public void testRegionReplicaWithoutMemstoreReplication() throws Exception { 307 int regionReplication = 3; 308 final TableName tableName = TableName.valueOf(name.getMethodName()); 309 HTableDescriptor htd = HTU.createTableDescriptor(tableName); 310 htd.setRegionReplication(regionReplication); 311 htd.setRegionMemstoreReplication(false); 312 HTU.getAdmin().createTable(htd); 313 314 Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 315 Table table = connection.getTable(tableName); 316 try { 317 // write data to the primary. The replicas should not receive the data 318 final int STEP = 100; 319 for (int i = 0; i < 3; ++i) { 320 final int startRow = i * STEP; 321 final int endRow = (i + 1) * STEP; 322 LOG.info("Writing data from " + startRow + " to " + endRow); 323 HTU.loadNumericRows(table, HBaseTestingUtility.fam1, startRow, endRow); 324 verifyReplication(tableName, regionReplication, startRow, endRow, false); 325 326 // Flush the table, now the data should show up in the replicas 327 LOG.info("flushing table"); 328 HTU.flush(tableName); 329 verifyReplication(tableName, regionReplication, 0, endRow, true); 330 } 331 } finally { 332 table.close(); 333 connection.close(); 334 } 335 } 336 337 @Test 338 public void testRegionReplicaReplicationForFlushAndCompaction() throws Exception { 339 // Tests a table with region replication 3. Writes some data, and causes flushes and 340 // compactions. Verifies that the data is readable from the replicas. Note that this 341 // does not test whether the replicas actually pick up flushed files and apply compaction 342 // to their stores 343 int regionReplication = 3; 344 final TableName tableName = TableName.valueOf(name.getMethodName()); 345 HTableDescriptor htd = HTU.createTableDescriptor(tableName); 346 htd.setRegionReplication(regionReplication); 347 HTU.getAdmin().createTable(htd); 348 349 350 Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 351 Table table = connection.getTable(tableName); 352 try { 353 // load the data to the table 354 355 for (int i = 0; i < 6000; i += 1000) { 356 LOG.info("Writing data from " + i + " to " + (i+1000)); 357 HTU.loadNumericRows(table, HBaseTestingUtility.fam1, i, i+1000); 358 LOG.info("flushing table"); 359 HTU.flush(tableName); 360 LOG.info("compacting table"); 361 HTU.compact(tableName, false); 362 } 363 364 verifyReplication(tableName, regionReplication, 0, 1000); 365 } finally { 366 table.close(); 367 connection.close(); 368 } 369 } 370 371 @Test 372 public void testRegionReplicaReplicationIgnoresDisabledTables() throws Exception { 373 testRegionReplicaReplicationIgnores(false, false); 374 } 375 376 @Test 377 public void testRegionReplicaReplicationIgnoresDroppedTables() throws Exception { 378 testRegionReplicaReplicationIgnores(true, false); 379 } 380 381 @Test 382 public void testRegionReplicaReplicationIgnoresNonReplicatedTables() throws Exception { 383 testRegionReplicaReplicationIgnores(false, true); 384 } 385 386 public void testRegionReplicaReplicationIgnores(boolean dropTable, boolean disableReplication) 387 throws Exception { 388 389 // tests having edits from a disabled or dropped table is handled correctly by skipping those 390 // entries and further edits after the edits from dropped/disabled table can be replicated 391 // without problems. 392 final TableName tableName = TableName.valueOf( 393 name.getMethodName() + "_drop_" + dropTable + "_disabledReplication_" + disableReplication); 394 HTableDescriptor htd = HTU.createTableDescriptor(tableName); 395 int regionReplication = 3; 396 htd.setRegionReplication(regionReplication); 397 HTU.deleteTableIfAny(tableName); 398 399 HTU.getAdmin().createTable(htd); 400 TableName toBeDisabledTable = TableName.valueOf( 401 dropTable ? "droppedTable" : (disableReplication ? "disableReplication" : "disabledTable")); 402 HTU.deleteTableIfAny(toBeDisabledTable); 403 htd = HTU.createTableDescriptor(toBeDisabledTable.toString()); 404 htd.setRegionReplication(regionReplication); 405 HTU.getAdmin().createTable(htd); 406 407 // both tables are created, now pause replication 408 ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration()); 409 admin.disablePeer(ServerRegionReplicaUtil.getReplicationPeerId()); 410 411 // now that the replication is disabled, write to the table to be dropped, then drop the table. 412 413 Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 414 Table table = connection.getTable(tableName); 415 Table tableToBeDisabled = connection.getTable(toBeDisabledTable); 416 417 HTU.loadNumericRows(tableToBeDisabled, HBaseTestingUtility.fam1, 6000, 7000); 418 419 AtomicLong skippedEdits = new AtomicLong(); 420 RegionReplicaReplicationEndpoint.RegionReplicaOutputSink sink = 421 mock(RegionReplicaReplicationEndpoint.RegionReplicaOutputSink.class); 422 when(sink.getSkippedEditsCounter()).thenReturn(skippedEdits); 423 FSTableDescriptors fstd = new FSTableDescriptors(HTU.getConfiguration(), 424 FileSystem.get(HTU.getConfiguration()), HTU.getDefaultRootDirPath()); 425 RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter sinkWriter = 426 new RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter(sink, 427 (ClusterConnection) connection, Executors.newSingleThreadExecutor(), Integer.MAX_VALUE, 428 fstd); 429 RegionLocator rl = connection.getRegionLocator(toBeDisabledTable); 430 HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY); 431 byte[] encodedRegionName = hrl.getRegionInfo().getEncodedNameAsBytes(); 432 433 Cell cell = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(Bytes.toBytes("A")) 434 .setFamily(HTU.fam1).setValue(Bytes.toBytes("VAL")).setType(Type.Put).build(); 435 Entry entry = new Entry( 436 new WALKeyImpl(encodedRegionName, toBeDisabledTable, 1), 437 new WALEdit() 438 .add(cell)); 439 440 HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table 441 if (dropTable) { 442 HTU.getAdmin().deleteTable(toBeDisabledTable); 443 } else if (disableReplication) { 444 htd.setRegionReplication(regionReplication - 2); 445 HTU.getAdmin().modifyTable(toBeDisabledTable, htd); 446 HTU.getAdmin().enableTable(toBeDisabledTable); 447 } 448 sinkWriter.append(toBeDisabledTable, encodedRegionName, 449 HConstants.EMPTY_BYTE_ARRAY, Lists.newArrayList(entry, entry)); 450 451 assertEquals(2, skippedEdits.get()); 452 453 if (disableReplication) { 454 // enable replication again so that we can verify replication 455 HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table 456 htd.setRegionReplication(regionReplication); 457 HTU.getAdmin().modifyTable(toBeDisabledTable, htd); 458 HTU.getAdmin().enableTable(toBeDisabledTable); 459 } 460 461 try { 462 // load some data to the to-be-dropped table 463 464 // load the data to the table 465 HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000); 466 467 // now enable the replication 468 admin.enablePeer(ServerRegionReplicaUtil.getReplicationPeerId()); 469 470 verifyReplication(tableName, regionReplication, 0, 1000); 471 472 } finally { 473 admin.close(); 474 table.close(); 475 rl.close(); 476 tableToBeDisabled.close(); 477 HTU.deleteTableIfAny(toBeDisabledTable); 478 connection.close(); 479 } 480 } 481}