001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotNull; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.fail; 024import static org.mockito.Mockito.mock; 025import static org.mockito.Mockito.when; 026 027import java.io.IOException; 028import java.util.List; 029import java.util.concurrent.Executors; 030import java.util.concurrent.atomic.AtomicLong; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HBaseTestingUtility; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.HRegionLocation; 036import org.apache.hadoop.hbase.HTableDescriptor; 037import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.Waiter; 040import org.apache.hadoop.hbase.client.ClusterConnection; 041import org.apache.hadoop.hbase.client.Connection; 042import org.apache.hadoop.hbase.client.ConnectionFactory; 043import org.apache.hadoop.hbase.client.RegionLocator; 044import org.apache.hadoop.hbase.client.Table; 045import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; 046import org.apache.hadoop.hbase.regionserver.HRegion; 047import org.apache.hadoop.hbase.regionserver.HRegionServer; 048import org.apache.hadoop.hbase.regionserver.Region; 049import org.apache.hadoop.hbase.replication.ReplicationException; 050import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 051import org.apache.hadoop.hbase.testclassification.FlakeyTests; 052import org.apache.hadoop.hbase.testclassification.LargeTests; 053import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 054import org.apache.hadoop.hbase.wal.WAL.Entry; 055import org.apache.hadoop.hbase.wal.WALEdit; 056import org.apache.hadoop.hbase.wal.WALKeyImpl; 057import org.apache.hadoop.hbase.zookeeper.ZKConfig; 058import org.junit.AfterClass; 059import org.junit.BeforeClass; 060import org.junit.ClassRule; 061import org.junit.Rule; 062import org.junit.Test; 063import org.junit.experimental.categories.Category; 064import org.junit.rules.TestName; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067 068import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 069 070/** 071 * Tests RegionReplicaReplicationEndpoint class by setting up region replicas and verifying 072 * async wal replication replays the edits to the secondary region in various scenarios. 073 */ 074@Category({FlakeyTests.class, LargeTests.class}) 075public class TestRegionReplicaReplicationEndpoint { 076 077 @ClassRule 078 public static final HBaseClassTestRule CLASS_RULE = 079 HBaseClassTestRule.forClass(TestRegionReplicaReplicationEndpoint.class); 080 081 private static final Logger LOG = 082 LoggerFactory.getLogger(TestRegionReplicaReplicationEndpoint.class); 083 084 private static final int NB_SERVERS = 2; 085 086 private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); 087 088 @Rule 089 public TestName name = new TestName(); 090 091 @BeforeClass 092 public static void beforeClass() throws Exception { 093 Configuration conf = HTU.getConfiguration(); 094 conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f); 095 conf.setInt("replication.source.size.capacity", 10240); 096 conf.setLong("replication.source.sleepforretries", 100); 097 conf.setInt("hbase.regionserver.maxlogs", 10); 098 conf.setLong("hbase.master.logcleaner.ttl", 10); 099 conf.setInt("zookeeper.recovery.retry", 1); 100 conf.setInt("zookeeper.recovery.retry.intervalmill", 10); 101 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); 102 conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); 103 conf.setInt("replication.stats.thread.period.seconds", 5); 104 conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); 105 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); // less number of retries is needed 106 conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); 107 108 HTU.startMiniCluster(NB_SERVERS); 109 } 110 111 @AfterClass 112 public static void afterClass() throws Exception { 113 HTU.shutdownMiniCluster(); 114 } 115 116 @Test 117 public void testRegionReplicaReplicationPeerIsCreated() throws IOException, ReplicationException { 118 // create a table with region replicas. Check whether the replication peer is created 119 // and replication started. 120 ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration()); 121 String peerId = "region_replica_replication"; 122 123 ReplicationPeerConfig peerConfig = null; 124 try { 125 peerConfig = admin.getPeerConfig(peerId); 126 } catch (ReplicationPeerNotFoundException e) { 127 LOG.warn("Region replica replication peer id=" + peerId + " not exist", e); 128 } 129 130 if (peerConfig != null) { 131 admin.removePeer(peerId); 132 peerConfig = null; 133 } 134 135 HTableDescriptor htd = HTU.createTableDescriptor( 136 "testReplicationPeerIsCreated_no_region_replicas"); 137 HTU.getAdmin().createTable(htd); 138 try { 139 peerConfig = admin.getPeerConfig(peerId); 140 fail("Should throw ReplicationException, because replication peer id=" + peerId 141 + " not exist"); 142 } catch (ReplicationPeerNotFoundException e) { 143 } 144 assertNull(peerConfig); 145 146 htd = HTU.createTableDescriptor("testReplicationPeerIsCreated"); 147 htd.setRegionReplication(2); 148 HTU.getAdmin().createTable(htd); 149 150 // assert peer configuration is correct 151 peerConfig = admin.getPeerConfig(peerId); 152 assertNotNull(peerConfig); 153 assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey( 154 HTU.getConfiguration())); 155 assertEquals(RegionReplicaReplicationEndpoint.class.getName(), 156 peerConfig.getReplicationEndpointImpl()); 157 admin.close(); 158 } 159 160 @Test 161 public void testRegionReplicaReplicationPeerIsCreatedForModifyTable() throws Exception { 162 // modify a table by adding region replicas. Check whether the replication peer is created 163 // and replication started. 164 ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration()); 165 String peerId = "region_replica_replication"; 166 167 ReplicationPeerConfig peerConfig = null; 168 try { 169 peerConfig = admin.getPeerConfig(peerId); 170 } catch (ReplicationPeerNotFoundException e) { 171 LOG.warn("Region replica replication peer id=" + peerId + " not exist", e); 172 } 173 174 if (peerConfig != null) { 175 admin.removePeer(peerId); 176 peerConfig = null; 177 } 178 179 HTableDescriptor htd 180 = HTU.createTableDescriptor("testRegionReplicaReplicationPeerIsCreatedForModifyTable"); 181 HTU.getAdmin().createTable(htd); 182 183 // assert that replication peer is not created yet 184 try { 185 peerConfig = admin.getPeerConfig(peerId); 186 fail("Should throw ReplicationException, because replication peer id=" + peerId 187 + " not exist"); 188 } catch (ReplicationPeerNotFoundException e) { 189 } 190 assertNull(peerConfig); 191 192 HTU.getAdmin().disableTable(htd.getTableName()); 193 htd.setRegionReplication(2); 194 HTU.getAdmin().modifyTable(htd.getTableName(), htd); 195 HTU.getAdmin().enableTable(htd.getTableName()); 196 197 // assert peer configuration is correct 198 peerConfig = admin.getPeerConfig(peerId); 199 assertNotNull(peerConfig); 200 assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey( 201 HTU.getConfiguration())); 202 assertEquals(RegionReplicaReplicationEndpoint.class.getName(), 203 peerConfig.getReplicationEndpointImpl()); 204 admin.close(); 205 } 206 207 public void testRegionReplicaReplication(int regionReplication) throws Exception { 208 // test region replica replication. Create a table with single region, write some data 209 // ensure that data is replicated to the secondary region 210 TableName tableName = TableName.valueOf("testRegionReplicaReplicationWithReplicas_" 211 + regionReplication); 212 HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString()); 213 htd.setRegionReplication(regionReplication); 214 HTU.getAdmin().createTable(htd); 215 TableName tableNameNoReplicas = 216 TableName.valueOf("testRegionReplicaReplicationWithReplicas_NO_REPLICAS"); 217 HTU.deleteTableIfAny(tableNameNoReplicas); 218 HTU.createTable(tableNameNoReplicas, HBaseTestingUtility.fam1); 219 220 Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 221 Table table = connection.getTable(tableName); 222 Table tableNoReplicas = connection.getTable(tableNameNoReplicas); 223 224 try { 225 // load some data to the non-replicated table 226 HTU.loadNumericRows(tableNoReplicas, HBaseTestingUtility.fam1, 6000, 7000); 227 228 // load the data to the table 229 HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000); 230 231 verifyReplication(tableName, regionReplication, 0, 1000); 232 233 } finally { 234 table.close(); 235 tableNoReplicas.close(); 236 HTU.deleteTableIfAny(tableNameNoReplicas); 237 connection.close(); 238 } 239 } 240 241 private void verifyReplication(TableName tableName, int regionReplication, 242 final int startRow, final int endRow) throws Exception { 243 verifyReplication(tableName, regionReplication, startRow, endRow, true); 244 } 245 246 private void verifyReplication(TableName tableName, int regionReplication, 247 final int startRow, final int endRow, final boolean present) throws Exception { 248 // find the regions 249 final Region[] regions = new Region[regionReplication]; 250 251 for (int i=0; i < NB_SERVERS; i++) { 252 HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i); 253 List<HRegion> onlineRegions = rs.getRegions(tableName); 254 for (HRegion region : onlineRegions) { 255 regions[region.getRegionInfo().getReplicaId()] = region; 256 } 257 } 258 259 for (Region region : regions) { 260 assertNotNull(region); 261 } 262 263 for (int i = 1; i < regionReplication; i++) { 264 final Region region = regions[i]; 265 // wait until all the data is replicated to all secondary regions 266 Waiter.waitFor(HTU.getConfiguration(), 90000, new Waiter.Predicate<Exception>() { 267 @Override 268 public boolean evaluate() throws Exception { 269 LOG.info("verifying replication for region replica:" + region.getRegionInfo()); 270 try { 271 HTU.verifyNumericRows(region, HBaseTestingUtility.fam1, startRow, endRow, present); 272 } catch(Throwable ex) { 273 LOG.warn("Verification from secondary region is not complete yet", ex); 274 // still wait 275 return false; 276 } 277 return true; 278 } 279 }); 280 } 281 } 282 283 @Test 284 public void testRegionReplicaReplicationWith2Replicas() throws Exception { 285 testRegionReplicaReplication(2); 286 } 287 288 @Test 289 public void testRegionReplicaReplicationWith3Replicas() throws Exception { 290 testRegionReplicaReplication(3); 291 } 292 293 @Test 294 public void testRegionReplicaReplicationWith10Replicas() throws Exception { 295 testRegionReplicaReplication(10); 296 } 297 298 @Test 299 public void testRegionReplicaWithoutMemstoreReplication() throws Exception { 300 int regionReplication = 3; 301 final TableName tableName = TableName.valueOf(name.getMethodName()); 302 HTableDescriptor htd = HTU.createTableDescriptor(tableName); 303 htd.setRegionReplication(regionReplication); 304 htd.setRegionMemstoreReplication(false); 305 HTU.getAdmin().createTable(htd); 306 307 Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 308 Table table = connection.getTable(tableName); 309 try { 310 // write data to the primary. The replicas should not receive the data 311 final int STEP = 100; 312 for (int i = 0; i < 3; ++i) { 313 final int startRow = i * STEP; 314 final int endRow = (i + 1) * STEP; 315 LOG.info("Writing data from " + startRow + " to " + endRow); 316 HTU.loadNumericRows(table, HBaseTestingUtility.fam1, startRow, endRow); 317 verifyReplication(tableName, regionReplication, startRow, endRow, false); 318 319 // Flush the table, now the data should show up in the replicas 320 LOG.info("flushing table"); 321 HTU.flush(tableName); 322 verifyReplication(tableName, regionReplication, 0, endRow, true); 323 } 324 } finally { 325 table.close(); 326 connection.close(); 327 } 328 } 329 330 @Test 331 public void testRegionReplicaReplicationForFlushAndCompaction() throws Exception { 332 // Tests a table with region replication 3. Writes some data, and causes flushes and 333 // compactions. Verifies that the data is readable from the replicas. Note that this 334 // does not test whether the replicas actually pick up flushed files and apply compaction 335 // to their stores 336 int regionReplication = 3; 337 final TableName tableName = TableName.valueOf(name.getMethodName()); 338 HTableDescriptor htd = HTU.createTableDescriptor(tableName); 339 htd.setRegionReplication(regionReplication); 340 HTU.getAdmin().createTable(htd); 341 342 343 Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 344 Table table = connection.getTable(tableName); 345 346 try { 347 // load the data to the table 348 349 for (int i = 0; i < 6000; i += 1000) { 350 LOG.info("Writing data from " + i + " to " + (i+1000)); 351 HTU.loadNumericRows(table, HBaseTestingUtility.fam1, i, i+1000); 352 LOG.info("flushing table"); 353 HTU.flush(tableName); 354 LOG.info("compacting table"); 355 HTU.compact(tableName, false); 356 } 357 358 verifyReplication(tableName, regionReplication, 0, 1000); 359 } finally { 360 table.close(); 361 connection.close(); 362 } 363 } 364 365 @Test 366 public void testRegionReplicaReplicationIgnoresDisabledTables() throws Exception { 367 testRegionReplicaReplicationIgnoresDisabledTables(false); 368 } 369 370 @Test 371 public void testRegionReplicaReplicationIgnoresDroppedTables() throws Exception { 372 testRegionReplicaReplicationIgnoresDisabledTables(true); 373 } 374 375 public void testRegionReplicaReplicationIgnoresDisabledTables(boolean dropTable) 376 throws Exception { 377 // tests having edits from a disabled or dropped table is handled correctly by skipping those 378 // entries and further edits after the edits from dropped/disabled table can be replicated 379 // without problems. 380 final TableName tableName = TableName.valueOf(name.getMethodName() + dropTable); 381 HTableDescriptor htd = HTU.createTableDescriptor(tableName); 382 int regionReplication = 3; 383 htd.setRegionReplication(regionReplication); 384 HTU.deleteTableIfAny(tableName); 385 HTU.getAdmin().createTable(htd); 386 TableName toBeDisabledTable = TableName.valueOf(dropTable ? "droppedTable" : "disabledTable"); 387 HTU.deleteTableIfAny(toBeDisabledTable); 388 htd = HTU.createTableDescriptor(toBeDisabledTable.toString()); 389 htd.setRegionReplication(regionReplication); 390 HTU.getAdmin().createTable(htd); 391 392 // both tables are created, now pause replication 393 ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration()); 394 admin.disablePeer(ServerRegionReplicaUtil.getReplicationPeerId()); 395 396 // now that the replication is disabled, write to the table to be dropped, then drop the table. 397 398 Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 399 Table table = connection.getTable(tableName); 400 Table tableToBeDisabled = connection.getTable(toBeDisabledTable); 401 402 HTU.loadNumericRows(tableToBeDisabled, HBaseTestingUtility.fam1, 6000, 7000); 403 404 AtomicLong skippedEdits = new AtomicLong(); 405 RegionReplicaReplicationEndpoint.RegionReplicaOutputSink sink = 406 mock(RegionReplicaReplicationEndpoint.RegionReplicaOutputSink.class); 407 when(sink.getSkippedEditsCounter()).thenReturn(skippedEdits); 408 RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter sinkWriter = 409 new RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter(sink, 410 (ClusterConnection) connection, 411 Executors.newSingleThreadExecutor(), Integer.MAX_VALUE); 412 RegionLocator rl = connection.getRegionLocator(toBeDisabledTable); 413 HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY); 414 byte[] encodedRegionName = hrl.getRegionInfo().getEncodedNameAsBytes(); 415 416 Entry entry = new Entry( 417 new WALKeyImpl(encodedRegionName, toBeDisabledTable, 1), 418 new WALEdit()); 419 420 HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table 421 if (dropTable) { 422 HTU.getAdmin().deleteTable(toBeDisabledTable); 423 } 424 425 sinkWriter.append(toBeDisabledTable, encodedRegionName, 426 HConstants.EMPTY_BYTE_ARRAY, Lists.newArrayList(entry, entry)); 427 428 assertEquals(2, skippedEdits.get()); 429 430 try { 431 // load some data to the to-be-dropped table 432 433 // load the data to the table 434 HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000); 435 436 // now enable the replication 437 admin.enablePeer(ServerRegionReplicaUtil.getReplicationPeerId()); 438 439 verifyReplication(tableName, regionReplication, 0, 1000); 440 441 } finally { 442 admin.close(); 443 table.close(); 444 rl.close(); 445 tableToBeDisabled.close(); 446 HTU.deleteTableIfAny(toBeDisabledTable); 447 connection.close(); 448 } 449 } 450}