001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertNotNull;
021
022import java.io.IOException;
023import java.util.List;
024import java.util.concurrent.TimeUnit;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.HBaseClassTestRule;
027import org.apache.hadoop.hbase.HBaseTestingUtil;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.Waiter;
031import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
032import org.apache.hadoop.hbase.client.Connection;
033import org.apache.hadoop.hbase.client.ConnectionFactory;
034import org.apache.hadoop.hbase.client.Durability;
035import org.apache.hadoop.hbase.client.Table;
036import org.apache.hadoop.hbase.client.TableDescriptor;
037import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
038import org.apache.hadoop.hbase.regionserver.HRegion;
039import org.apache.hadoop.hbase.regionserver.HRegionServer;
040import org.apache.hadoop.hbase.regionserver.Region;
041import org.apache.hadoop.hbase.replication.ReplicationException;
042import org.apache.hadoop.hbase.testclassification.FlakeyTests;
043import org.apache.hadoop.hbase.testclassification.LargeTests;
044import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
045import org.junit.AfterClass;
046import org.junit.BeforeClass;
047import org.junit.ClassRule;
048import org.junit.Rule;
049import org.junit.Test;
050import org.junit.experimental.categories.Category;
051import org.junit.rules.TestName;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
056
057/**
058 * Tests region replication by setting up region replicas and verifying async wal replication
059 * replays the edits to the secondary region in various scenarios.
060 */
061@Category({ FlakeyTests.class, LargeTests.class })
062public class TestRegionReplicaReplication {
063
064  @ClassRule
065  public static final HBaseClassTestRule CLASS_RULE =
066    HBaseClassTestRule.forClass(TestRegionReplicaReplication.class);
067
068  private static final Logger LOG = LoggerFactory.getLogger(TestRegionReplicaReplication.class);
069
070  private static final int NB_SERVERS = 2;
071
072  private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
073
074  @Rule
075  public TestName name = new TestName();
076
077  @BeforeClass
078  public static void beforeClass() throws Exception {
079    Configuration conf = HTU.getConfiguration();
080    conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f);
081    conf.setInt("replication.source.size.capacity", 10240);
082    conf.setLong("replication.source.sleepforretries", 100);
083    conf.setInt("hbase.regionserver.maxlogs", 10);
084    conf.setLong("hbase.master.logcleaner.ttl", 10);
085    conf.setInt("zookeeper.recovery.retry", 1);
086    conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
087    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
088    conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
089    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
090    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); // less number of retries is needed
091    conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
092
093    HTU.startMiniCluster(NB_SERVERS);
094  }
095
096  @AfterClass
097  public static void afterClass() throws Exception {
098    HTU.shutdownMiniCluster();
099  }
100
101  private void testRegionReplicaReplication(int regionReplication, boolean skipWAL)
102    throws Exception {
103    // test region replica replication. Create a table with single region, write some data
104    // ensure that data is replicated to the secondary region
105    TableName tableName = TableName.valueOf("testRegionReplicaReplicationWithReplicas_"
106      + regionReplication + (skipWAL ? "_skipWAL" : ""));
107    TableDescriptorBuilder builder =
108      HTU
109        .createModifyableTableDescriptor(TableName.valueOf(tableName.toString()),
110          ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
111          ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED)
112        .setRegionReplication(regionReplication);
113    if (skipWAL) {
114      builder.setDurability(Durability.SKIP_WAL);
115    }
116    TableDescriptor htd = builder.build();
117    createOrEnableTableWithRetries(htd, true);
118    TableName tableNameNoReplicas =
119      TableName.valueOf("testRegionReplicaReplicationWithReplicas_NO_REPLICAS");
120    HTU.deleteTableIfAny(tableNameNoReplicas);
121    HTU.createTable(tableNameNoReplicas, HBaseTestingUtil.fam1);
122
123    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
124      Table table = connection.getTable(tableName);
125      Table tableNoReplicas = connection.getTable(tableNameNoReplicas)) {
126      // load some data to the non-replicated table
127      HTU.loadNumericRows(tableNoReplicas, HBaseTestingUtil.fam1, 6000, 7000);
128
129      // load the data to the table
130      HTU.loadNumericRows(table, HBaseTestingUtil.fam1, 0, 1000);
131
132      verifyReplication(tableName, regionReplication, 0, 1000);
133    } finally {
134      HTU.deleteTableIfAny(tableNameNoReplicas);
135    }
136  }
137
138  private void verifyReplication(TableName tableName, int regionReplication, final int startRow,
139    final int endRow) throws Exception {
140    verifyReplication(tableName, regionReplication, startRow, endRow, true);
141  }
142
143  private void verifyReplication(TableName tableName, int regionReplication, final int startRow,
144    final int endRow, final boolean present) throws Exception {
145    // find the regions
146    final Region[] regions = new Region[regionReplication];
147
148    for (int i = 0; i < NB_SERVERS; i++) {
149      HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
150      List<HRegion> onlineRegions = rs.getRegions(tableName);
151      for (HRegion region : onlineRegions) {
152        regions[region.getRegionInfo().getReplicaId()] = region;
153      }
154    }
155
156    for (Region region : regions) {
157      assertNotNull(region);
158    }
159
160    for (int i = 1; i < regionReplication; i++) {
161      final Region region = regions[i];
162      // wait until all the data is replicated to all secondary regions
163      Waiter.waitFor(HTU.getConfiguration(), 90000, 1000, new Waiter.Predicate<Exception>() {
164        @Override
165        public boolean evaluate() throws Exception {
166          LOG.info("verifying replication for region replica:" + region.getRegionInfo());
167          try {
168            HTU.verifyNumericRows(region, HBaseTestingUtil.fam1, startRow, endRow, present);
169          } catch (Throwable ex) {
170            LOG.warn("Verification from secondary region is not complete yet", ex);
171            // still wait
172            return false;
173          }
174          return true;
175        }
176      });
177    }
178  }
179
180  @Test
181  public void testRegionReplicaReplicationWith2Replicas() throws Exception {
182    testRegionReplicaReplication(2, false);
183    testRegionReplicaReplication(2, true);
184  }
185
186  @Test
187  public void testRegionReplicaReplicationWith3Replicas() throws Exception {
188    testRegionReplicaReplication(3, false);
189    testRegionReplicaReplication(3, true);
190  }
191
192  @Test
193  public void testRegionReplicaReplicationWith10Replicas() throws Exception {
194    testRegionReplicaReplication(10, false);
195    testRegionReplicaReplication(10, true);
196  }
197
198  @Test
199  public void testRegionReplicaWithoutMemstoreReplication() throws Exception {
200    int regionReplication = 3;
201    TableDescriptor htd = HTU.createModifyableTableDescriptor(name.getMethodName())
202      .setRegionReplication(regionReplication).setRegionMemStoreReplication(false).build();
203    createOrEnableTableWithRetries(htd, true);
204    final TableName tableName = htd.getTableName();
205    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
206    Table table = connection.getTable(tableName);
207    try {
208      // write data to the primary. The replicas should not receive the data
209      final int STEP = 100;
210      for (int i = 0; i < 3; ++i) {
211        final int startRow = i * STEP;
212        final int endRow = (i + 1) * STEP;
213        LOG.info("Writing data from " + startRow + " to " + endRow);
214        HTU.loadNumericRows(table, HBaseTestingUtil.fam1, startRow, endRow);
215        verifyReplication(tableName, regionReplication, startRow, endRow, false);
216
217        // Flush the table, now the data should show up in the replicas
218        LOG.info("flushing table");
219        HTU.flush(tableName);
220        verifyReplication(tableName, regionReplication, 0, endRow, true);
221      }
222    } finally {
223      table.close();
224      connection.close();
225    }
226  }
227
228  @Test
229  public void testRegionReplicaReplicationForFlushAndCompaction() throws Exception {
230    // Tests a table with region replication 3. Writes some data, and causes flushes and
231    // compactions. Verifies that the data is readable from the replicas. Note that this
232    // does not test whether the replicas actually pick up flushed files and apply compaction
233    // to their stores
234    int regionReplication = 3;
235    TableDescriptor htd = HTU.createModifyableTableDescriptor(name.getMethodName())
236      .setRegionReplication(regionReplication).build();
237    createOrEnableTableWithRetries(htd, true);
238    final TableName tableName = htd.getTableName();
239
240    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
241    Table table = connection.getTable(tableName);
242    try {
243      // load the data to the table
244
245      for (int i = 0; i < 6000; i += 1000) {
246        LOG.info("Writing data from " + i + " to " + (i + 1000));
247        HTU.loadNumericRows(table, HBaseTestingUtil.fam1, i, i + 1000);
248        LOG.info("flushing table");
249        HTU.flush(tableName);
250        LOG.info("compacting table");
251        HTU.compact(tableName, false);
252      }
253
254      verifyReplication(tableName, regionReplication, 0, 1000);
255    } finally {
256      table.close();
257      connection.close();
258    }
259  }
260
261  private void createOrEnableTableWithRetries(TableDescriptor htd, boolean createTableOperation) {
262    // Helper function to run create/enable table operations with a retry feature
263    boolean continueToRetry = true;
264    int tries = 0;
265    while (continueToRetry && tries < 50) {
266      try {
267        continueToRetry = false;
268        if (createTableOperation) {
269          HTU.getAdmin().createTable(htd);
270        } else {
271          HTU.getAdmin().enableTable(htd.getTableName());
272        }
273      } catch (IOException e) {
274        if (e.getCause() instanceof ReplicationException) {
275          continueToRetry = true;
276          tries++;
277          Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
278        }
279      }
280    }
281  }
282}