001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertNotNull;
021
022import java.io.IOException;
023import java.util.List;
024import java.util.concurrent.TimeUnit;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.HBaseTestingUtil;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.Waiter;
030import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
031import org.apache.hadoop.hbase.client.Connection;
032import org.apache.hadoop.hbase.client.ConnectionFactory;
033import org.apache.hadoop.hbase.client.Durability;
034import org.apache.hadoop.hbase.client.Table;
035import org.apache.hadoop.hbase.client.TableDescriptor;
036import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
037import org.apache.hadoop.hbase.regionserver.HRegion;
038import org.apache.hadoop.hbase.regionserver.HRegionServer;
039import org.apache.hadoop.hbase.regionserver.Region;
040import org.apache.hadoop.hbase.replication.ReplicationException;
041import org.apache.hadoop.hbase.testclassification.FlakeyTests;
042import org.apache.hadoop.hbase.testclassification.LargeTests;
043import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
044import org.junit.jupiter.api.AfterAll;
045import org.junit.jupiter.api.BeforeAll;
046import org.junit.jupiter.api.Tag;
047import org.junit.jupiter.api.Test;
048import org.junit.jupiter.api.TestInfo;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
053
054/**
055 * Tests region replication by setting up region replicas and verifying async wal replication
056 * replays the edits to the secondary region in various scenarios.
057 */
058@Tag(FlakeyTests.TAG)
059@Tag(LargeTests.TAG)
060public class TestRegionReplicaReplication {
061
062  private static final Logger LOG = LoggerFactory.getLogger(TestRegionReplicaReplication.class);
063
064  private static final int NB_SERVERS = 2;
065
066  private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
067
068  private String testName;
069
070  @BeforeAll
071  public static void beforeClass() throws Exception {
072    Configuration conf = HTU.getConfiguration();
073    conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f);
074    conf.setInt("replication.source.size.capacity", 10240);
075    conf.setLong("replication.source.sleepforretries", 100);
076    conf.setInt("hbase.regionserver.maxlogs", 10);
077    conf.setLong("hbase.master.logcleaner.ttl", 10);
078    conf.setInt("zookeeper.recovery.retry", 1);
079    conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
080    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
081    conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
082    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
083    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); // less number of retries is needed
084    conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
085
086    HTU.startMiniCluster(NB_SERVERS);
087  }
088
089  @AfterAll
090  public static void afterClass() throws Exception {
091    HTU.shutdownMiniCluster();
092  }
093
094  private void testRegionReplicaReplication(int regionReplication, boolean skipWAL)
095    throws Exception {
096    // test region replica replication. Create a table with single region, write some data
097    // ensure that data is replicated to the secondary region
098    TableName tableName = TableName.valueOf("testRegionReplicaReplicationWithReplicas_"
099      + regionReplication + (skipWAL ? "_skipWAL" : ""));
100    TableDescriptorBuilder builder =
101      HTU
102        .createModifyableTableDescriptor(TableName.valueOf(tableName.toString()),
103          ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
104          ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED)
105        .setRegionReplication(regionReplication);
106    if (skipWAL) {
107      builder.setDurability(Durability.SKIP_WAL);
108    }
109    TableDescriptor htd = builder.build();
110    createOrEnableTableWithRetries(htd, true);
111    TableName tableNameNoReplicas =
112      TableName.valueOf("testRegionReplicaReplicationWithReplicas_NO_REPLICAS");
113    HTU.deleteTableIfAny(tableNameNoReplicas);
114    HTU.createTable(tableNameNoReplicas, HBaseTestingUtil.fam1);
115
116    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
117      Table table = connection.getTable(tableName);
118      Table tableNoReplicas = connection.getTable(tableNameNoReplicas)) {
119      // load some data to the non-replicated table
120      HTU.loadNumericRows(tableNoReplicas, HBaseTestingUtil.fam1, 6000, 7000);
121
122      // load the data to the table
123      HTU.loadNumericRows(table, HBaseTestingUtil.fam1, 0, 1000);
124
125      verifyReplication(tableName, regionReplication, 0, 1000);
126    } finally {
127      HTU.deleteTableIfAny(tableNameNoReplicas);
128    }
129  }
130
131  private void verifyReplication(TableName tableName, int regionReplication, final int startRow,
132    final int endRow) throws Exception {
133    verifyReplication(tableName, regionReplication, startRow, endRow, true);
134  }
135
136  private void verifyReplication(TableName tableName, int regionReplication, final int startRow,
137    final int endRow, final boolean present) throws Exception {
138    // find the regions
139    final Region[] regions = new Region[regionReplication];
140
141    for (int i = 0; i < NB_SERVERS; i++) {
142      HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
143      List<HRegion> onlineRegions = rs.getRegions(tableName);
144      for (HRegion region : onlineRegions) {
145        regions[region.getRegionInfo().getReplicaId()] = region;
146      }
147    }
148
149    for (Region region : regions) {
150      assertNotNull(region);
151    }
152
153    for (int i = 1; i < regionReplication; i++) {
154      final Region region = regions[i];
155      // wait until all the data is replicated to all secondary regions
156      Waiter.waitFor(HTU.getConfiguration(), 90000, 1000, new Waiter.Predicate<Exception>() {
157        @Override
158        public boolean evaluate() throws Exception {
159          LOG.info("verifying replication for region replica:" + region.getRegionInfo());
160          try {
161            HTU.verifyNumericRows(region, HBaseTestingUtil.fam1, startRow, endRow, present);
162          } catch (Throwable ex) {
163            LOG.warn("Verification from secondary region is not complete yet", ex);
164            // still wait
165            return false;
166          }
167          return true;
168        }
169      });
170    }
171  }
172
173  @Test
174  public void testRegionReplicaReplicationWith2Replicas() throws Exception {
175    testRegionReplicaReplication(2, false);
176    testRegionReplicaReplication(2, true);
177  }
178
179  @Test
180  public void testRegionReplicaReplicationWith3Replicas() throws Exception {
181    testRegionReplicaReplication(3, false);
182    testRegionReplicaReplication(3, true);
183  }
184
185  @Test
186  public void testRegionReplicaReplicationWith10Replicas() throws Exception {
187    testRegionReplicaReplication(10, false);
188    testRegionReplicaReplication(10, true);
189  }
190
191  @Test
192  public void testRegionReplicaWithoutMemstoreReplication(TestInfo testInfo) throws Exception {
193    testName = testInfo.getTestMethod().get().getName();
194    int regionReplication = 3;
195    TableDescriptor htd = HTU.createModifyableTableDescriptor(testName)
196      .setRegionReplication(regionReplication).setRegionMemStoreReplication(false).build();
197    createOrEnableTableWithRetries(htd, true);
198    final TableName tableName = htd.getTableName();
199    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
200    Table table = connection.getTable(tableName);
201    try {
202      // write data to the primary. The replicas should not receive the data
203      final int STEP = 100;
204      for (int i = 0; i < 3; ++i) {
205        final int startRow = i * STEP;
206        final int endRow = (i + 1) * STEP;
207        LOG.info("Writing data from " + startRow + " to " + endRow);
208        HTU.loadNumericRows(table, HBaseTestingUtil.fam1, startRow, endRow);
209        verifyReplication(tableName, regionReplication, startRow, endRow, false);
210
211        // Flush the table, now the data should show up in the replicas
212        LOG.info("flushing table");
213        HTU.flush(tableName);
214        verifyReplication(tableName, regionReplication, 0, endRow, true);
215      }
216    } finally {
217      table.close();
218      connection.close();
219    }
220  }
221
222  @Test
223  public void testRegionReplicaReplicationForFlushAndCompaction(TestInfo testInfo)
224    throws Exception {
225    testName = testInfo.getTestMethod().get().getName();
226    // Tests a table with region replication 3. Writes some data, and causes flushes and
227    // compactions. Verifies that the data is readable from the replicas. Note that this
228    // does not test whether the replicas actually pick up flushed files and apply compaction
229    // to their stores
230    int regionReplication = 3;
231    TableDescriptor htd =
232      HTU.createModifyableTableDescriptor(testName).setRegionReplication(regionReplication).build();
233    createOrEnableTableWithRetries(htd, true);
234    final TableName tableName = htd.getTableName();
235
236    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
237    Table table = connection.getTable(tableName);
238    try {
239      // load the data to the table
240
241      for (int i = 0; i < 6000; i += 1000) {
242        LOG.info("Writing data from " + i + " to " + (i + 1000));
243        HTU.loadNumericRows(table, HBaseTestingUtil.fam1, i, i + 1000);
244        LOG.info("flushing table");
245        HTU.flush(tableName);
246        LOG.info("compacting table");
247        HTU.compact(tableName, false);
248      }
249
250      verifyReplication(tableName, regionReplication, 0, 1000);
251    } finally {
252      table.close();
253      connection.close();
254    }
255  }
256
257  private void createOrEnableTableWithRetries(TableDescriptor htd, boolean createTableOperation) {
258    // Helper function to run create/enable table operations with a retry feature
259    boolean continueToRetry = true;
260    int tries = 0;
261    while (continueToRetry && tries < 50) {
262      try {
263        continueToRetry = false;
264        if (createTableOperation) {
265          HTU.getAdmin().createTable(htd);
266        } else {
267          HTU.getAdmin().enableTable(htd.getTableName());
268        }
269      } catch (IOException e) {
270        if (e.getCause() instanceof ReplicationException) {
271          continueToRetry = true;
272          tries++;
273          Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
274        }
275      }
276    }
277  }
278}