001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.jupiter.api.Assertions.assertNotNull; 021 022import java.io.IOException; 023import java.util.List; 024import java.util.concurrent.TimeUnit; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.HBaseTestingUtil; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.Waiter; 030import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 031import org.apache.hadoop.hbase.client.Connection; 032import org.apache.hadoop.hbase.client.ConnectionFactory; 033import org.apache.hadoop.hbase.client.Durability; 034import org.apache.hadoop.hbase.client.Table; 035import org.apache.hadoop.hbase.client.TableDescriptor; 036import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 037import org.apache.hadoop.hbase.regionserver.HRegion; 038import org.apache.hadoop.hbase.regionserver.HRegionServer; 039import org.apache.hadoop.hbase.regionserver.Region; 040import org.apache.hadoop.hbase.replication.ReplicationException; 041import org.apache.hadoop.hbase.testclassification.FlakeyTests; 042import org.apache.hadoop.hbase.testclassification.LargeTests; 043import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 044import org.junit.jupiter.api.AfterAll; 045import org.junit.jupiter.api.BeforeAll; 046import org.junit.jupiter.api.Tag; 047import org.junit.jupiter.api.Test; 048import org.junit.jupiter.api.TestInfo; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles; 053 054/** 055 * Tests region replication by setting up region replicas and verifying async wal replication 056 * replays the edits to the secondary region in various scenarios. 057 */ 058@Tag(FlakeyTests.TAG) 059@Tag(LargeTests.TAG) 060public class TestRegionReplicaReplication { 061 062 private static final Logger LOG = LoggerFactory.getLogger(TestRegionReplicaReplication.class); 063 064 private static final int NB_SERVERS = 2; 065 066 private static final HBaseTestingUtil HTU = new HBaseTestingUtil(); 067 068 private String testName; 069 070 @BeforeAll 071 public static void beforeClass() throws Exception { 072 Configuration conf = HTU.getConfiguration(); 073 conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f); 074 conf.setInt("replication.source.size.capacity", 10240); 075 conf.setLong("replication.source.sleepforretries", 100); 076 conf.setInt("hbase.regionserver.maxlogs", 10); 077 conf.setLong("hbase.master.logcleaner.ttl", 10); 078 conf.setInt("zookeeper.recovery.retry", 1); 079 conf.setInt("zookeeper.recovery.retry.intervalmill", 10); 080 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); 081 conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); 082 conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); 083 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); // less number of retries is needed 084 conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); 085 086 HTU.startMiniCluster(NB_SERVERS); 087 } 088 089 @AfterAll 090 public static void afterClass() throws Exception { 091 HTU.shutdownMiniCluster(); 092 } 093 094 private void testRegionReplicaReplication(int regionReplication, boolean skipWAL) 095 throws Exception { 096 // test region replica replication. Create a table with single region, write some data 097 // ensure that data is replicated to the secondary region 098 TableName tableName = TableName.valueOf("testRegionReplicaReplicationWithReplicas_" 099 + regionReplication + (skipWAL ? "_skipWAL" : "")); 100 TableDescriptorBuilder builder = 101 HTU 102 .createModifyableTableDescriptor(TableName.valueOf(tableName.toString()), 103 ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER, 104 ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED) 105 .setRegionReplication(regionReplication); 106 if (skipWAL) { 107 builder.setDurability(Durability.SKIP_WAL); 108 } 109 TableDescriptor htd = builder.build(); 110 createOrEnableTableWithRetries(htd, true); 111 TableName tableNameNoReplicas = 112 TableName.valueOf("testRegionReplicaReplicationWithReplicas_NO_REPLICAS"); 113 HTU.deleteTableIfAny(tableNameNoReplicas); 114 HTU.createTable(tableNameNoReplicas, HBaseTestingUtil.fam1); 115 116 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 117 Table table = connection.getTable(tableName); 118 Table tableNoReplicas = connection.getTable(tableNameNoReplicas)) { 119 // load some data to the non-replicated table 120 HTU.loadNumericRows(tableNoReplicas, HBaseTestingUtil.fam1, 6000, 7000); 121 122 // load the data to the table 123 HTU.loadNumericRows(table, HBaseTestingUtil.fam1, 0, 1000); 124 125 verifyReplication(tableName, regionReplication, 0, 1000); 126 } finally { 127 HTU.deleteTableIfAny(tableNameNoReplicas); 128 } 129 } 130 131 private void verifyReplication(TableName tableName, int regionReplication, final int startRow, 132 final int endRow) throws Exception { 133 verifyReplication(tableName, regionReplication, startRow, endRow, true); 134 } 135 136 private void verifyReplication(TableName tableName, int regionReplication, final int startRow, 137 final int endRow, final boolean present) throws Exception { 138 // find the regions 139 final Region[] regions = new Region[regionReplication]; 140 141 for (int i = 0; i < NB_SERVERS; i++) { 142 HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i); 143 List<HRegion> onlineRegions = rs.getRegions(tableName); 144 for (HRegion region : onlineRegions) { 145 regions[region.getRegionInfo().getReplicaId()] = region; 146 } 147 } 148 149 for (Region region : regions) { 150 assertNotNull(region); 151 } 152 153 for (int i = 1; i < regionReplication; i++) { 154 final Region region = regions[i]; 155 // wait until all the data is replicated to all secondary regions 156 Waiter.waitFor(HTU.getConfiguration(), 90000, 1000, new Waiter.Predicate<Exception>() { 157 @Override 158 public boolean evaluate() throws Exception { 159 LOG.info("verifying replication for region replica:" + region.getRegionInfo()); 160 try { 161 HTU.verifyNumericRows(region, HBaseTestingUtil.fam1, startRow, endRow, present); 162 } catch (Throwable ex) { 163 LOG.warn("Verification from secondary region is not complete yet", ex); 164 // still wait 165 return false; 166 } 167 return true; 168 } 169 }); 170 } 171 } 172 173 @Test 174 public void testRegionReplicaReplicationWith2Replicas() throws Exception { 175 testRegionReplicaReplication(2, false); 176 testRegionReplicaReplication(2, true); 177 } 178 179 @Test 180 public void testRegionReplicaReplicationWith3Replicas() throws Exception { 181 testRegionReplicaReplication(3, false); 182 testRegionReplicaReplication(3, true); 183 } 184 185 @Test 186 public void testRegionReplicaReplicationWith10Replicas() throws Exception { 187 testRegionReplicaReplication(10, false); 188 testRegionReplicaReplication(10, true); 189 } 190 191 @Test 192 public void testRegionReplicaWithoutMemstoreReplication(TestInfo testInfo) throws Exception { 193 testName = testInfo.getTestMethod().get().getName(); 194 int regionReplication = 3; 195 TableDescriptor htd = HTU.createModifyableTableDescriptor(testName) 196 .setRegionReplication(regionReplication).setRegionMemStoreReplication(false).build(); 197 createOrEnableTableWithRetries(htd, true); 198 final TableName tableName = htd.getTableName(); 199 Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 200 Table table = connection.getTable(tableName); 201 try { 202 // write data to the primary. The replicas should not receive the data 203 final int STEP = 100; 204 for (int i = 0; i < 3; ++i) { 205 final int startRow = i * STEP; 206 final int endRow = (i + 1) * STEP; 207 LOG.info("Writing data from " + startRow + " to " + endRow); 208 HTU.loadNumericRows(table, HBaseTestingUtil.fam1, startRow, endRow); 209 verifyReplication(tableName, regionReplication, startRow, endRow, false); 210 211 // Flush the table, now the data should show up in the replicas 212 LOG.info("flushing table"); 213 HTU.flush(tableName); 214 verifyReplication(tableName, regionReplication, 0, endRow, true); 215 } 216 } finally { 217 table.close(); 218 connection.close(); 219 } 220 } 221 222 @Test 223 public void testRegionReplicaReplicationForFlushAndCompaction(TestInfo testInfo) 224 throws Exception { 225 testName = testInfo.getTestMethod().get().getName(); 226 // Tests a table with region replication 3. Writes some data, and causes flushes and 227 // compactions. Verifies that the data is readable from the replicas. Note that this 228 // does not test whether the replicas actually pick up flushed files and apply compaction 229 // to their stores 230 int regionReplication = 3; 231 TableDescriptor htd = 232 HTU.createModifyableTableDescriptor(testName).setRegionReplication(regionReplication).build(); 233 createOrEnableTableWithRetries(htd, true); 234 final TableName tableName = htd.getTableName(); 235 236 Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 237 Table table = connection.getTable(tableName); 238 try { 239 // load the data to the table 240 241 for (int i = 0; i < 6000; i += 1000) { 242 LOG.info("Writing data from " + i + " to " + (i + 1000)); 243 HTU.loadNumericRows(table, HBaseTestingUtil.fam1, i, i + 1000); 244 LOG.info("flushing table"); 245 HTU.flush(tableName); 246 LOG.info("compacting table"); 247 HTU.compact(tableName, false); 248 } 249 250 verifyReplication(tableName, regionReplication, 0, 1000); 251 } finally { 252 table.close(); 253 connection.close(); 254 } 255 } 256 257 private void createOrEnableTableWithRetries(TableDescriptor htd, boolean createTableOperation) { 258 // Helper function to run create/enable table operations with a retry feature 259 boolean continueToRetry = true; 260 int tries = 0; 261 while (continueToRetry && tries < 50) { 262 try { 263 continueToRetry = false; 264 if (createTableOperation) { 265 HTU.getAdmin().createTable(htd); 266 } else { 267 HTU.getAdmin().enableTable(htd.getTableName()); 268 } 269 } catch (IOException e) { 270 if (e.getCause() instanceof ReplicationException) { 271 continueToRetry = true; 272 tries++; 273 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); 274 } 275 } 276 } 277 } 278}