001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertNull;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022
023import java.io.IOException;
024import java.util.concurrent.atomic.AtomicBoolean;
025import java.util.concurrent.atomic.AtomicInteger;
026import java.util.concurrent.atomic.AtomicReference;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.HBaseTestingUtil;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.Waiter.Predicate;
032import org.apache.hadoop.hbase.client.Admin;
033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
034import org.apache.hadoop.hbase.client.Connection;
035import org.apache.hadoop.hbase.client.ConnectionFactory;
036import org.apache.hadoop.hbase.client.Consistency;
037import org.apache.hadoop.hbase.client.Get;
038import org.apache.hadoop.hbase.client.Table;
039import org.apache.hadoop.hbase.client.TableDescriptor;
040import org.apache.hadoop.hbase.testclassification.LargeTests;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
043import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
044import org.junit.jupiter.api.AfterEach;
045import org.junit.jupiter.api.BeforeEach;
046import org.junit.jupiter.api.Tag;
047import org.junit.jupiter.api.Test;
048import org.junit.jupiter.api.TestInfo;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052/**
053 * Tests failover of secondary region replicas.
054 */
055@Tag(LargeTests.TAG)
056public class TestRegionReplicaFailover {
057
058  private static final Logger LOG = LoggerFactory.getLogger(TestRegionReplicaFailover.class);
059
060  private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
061
062  private static final int NB_SERVERS = 3;
063
064  protected final byte[][] families =
065    new byte[][] { HBaseTestingUtil.fam1, HBaseTestingUtil.fam2, HBaseTestingUtil.fam3 };
066  protected final byte[] fam = HBaseTestingUtil.fam1;
067  protected final byte[] qual1 = Bytes.toBytes("qual1");
068  protected final byte[] value1 = Bytes.toBytes("value1");
069  protected final byte[] row = Bytes.toBytes("rowA");
070  protected final byte[] row2 = Bytes.toBytes("rowB");
071
072  private TableDescriptor htd;
073
074  @BeforeEach
075  public void before(TestInfo testInfo) throws Exception {
076    Configuration conf = HTU.getConfiguration();
077    // Up the handlers; this test needs more than usual.
078    conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
079    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
080    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, true);
081    conf.setInt("replication.stats.thread.period.seconds", 5);
082    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
083
084    HTU.startMiniCluster(NB_SERVERS);
085    String name = testInfo.getTestMethod().get().getName();
086    htd =
087      HTU.createModifyableTableDescriptor(TableName.valueOf(name.substring(0, name.length() - 3)),
088        ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
089        ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED).setRegionReplication(3).build();
090    HTU.getAdmin().createTable(htd);
091  }
092
093  @AfterEach
094  public void after() throws Exception {
095    HTU.deleteTableIfAny(htd.getTableName());
096    HTU.shutdownMiniCluster();
097  }
098
099  /**
100   * Tests the case where a newly created table with region replicas and no data, the secondary
101   * region replicas are available to read immediately.
102   */
103  @Test
104  public void testSecondaryRegionWithEmptyRegion() throws IOException {
105    // Create a new table with region replication, don't put any data. Test that the secondary
106    // region replica is available to read.
107    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
108      Table table = connection.getTable(htd.getTableName())) {
109
110      Get get = new Get(row);
111      get.setConsistency(Consistency.TIMELINE);
112      get.setReplicaId(1);
113      table.get(get); // this should not block
114    }
115  }
116
117  /**
118   * Tests the case where if there is some data in the primary region, reopening the region replicas
119   * (enable/disable table, etc) makes the region replicas readable.
120   */
121  @Test
122  public void testSecondaryRegionWithNonEmptyRegion() throws IOException {
123    // Create a new table with region replication and load some data
124    // than disable and enable the table again and verify the data from secondary
125    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
126      Table table = connection.getTable(htd.getTableName())) {
127
128      HTU.loadNumericRows(table, fam, 0, 1000);
129
130      HTU.getAdmin().disableTable(htd.getTableName());
131      HTU.getAdmin().enableTable(htd.getTableName());
132
133      HTU.verifyNumericRows(table, fam, 0, 1000, 1);
134    }
135  }
136
137  /**
138   * Tests the case where killing a primary region with unflushed data recovers
139   */
140  @Test
141  public void testPrimaryRegionKill() throws Exception {
142    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
143      Table table = connection.getTable(htd.getTableName())) {
144
145      HTU.loadNumericRows(table, fam, 0, 1000);
146
147      // wal replication is async, we have to wait until the replication catches up, or we timeout
148      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
149      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000);
150
151      // we should not have flushed files now, but data in memstores of primary and secondary
152      // kill the primary region replica now, and ensure that when it comes back up, we can still
153      // read from it the same data from primary and secondaries
154      boolean aborted = false;
155      for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
156        for (Region r : rs.getRegionServer().getRegions(htd.getTableName())) {
157          if (r.getRegionInfo().getReplicaId() == 0) {
158            LOG.info("Aborting region server hosting primary region replica");
159            rs.getRegionServer().abort("for test");
160            aborted = true;
161            break;
162          }
163        }
164      }
165      assertTrue(aborted);
166
167      // wal replication is async, we have to wait until the replication catches up, or we timeout
168      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 0, 30000);
169      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
170      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000);
171    }
172
173    // restart the region server
174    HTU.getMiniHBaseCluster().startRegionServer();
175  }
176
177  /**
178   * wal replication is async, we have to wait until the replication catches up, or we timeout
179   */
180  private void verifyNumericRowsWithTimeout(final Table table, final byte[] f, final int startRow,
181    final int endRow, final int replicaId, final long timeout) throws Exception {
182    try {
183      HTU.waitFor(timeout, new Predicate<Exception>() {
184        @Override
185        public boolean evaluate() throws Exception {
186          try {
187            HTU.verifyNumericRows(table, f, startRow, endRow, replicaId);
188            return true;
189          } catch (AssertionError ae) {
190            return false;
191          }
192        }
193      });
194    } catch (Throwable t) {
195      // ignore this, but redo the verify do get the actual exception
196      HTU.verifyNumericRows(table, f, startRow, endRow, replicaId);
197    }
198  }
199
200  /**
201   * Tests the case where killing a secondary region with unflushed data recovers, and the replica
202   * becomes available to read again shortly.
203   */
204  @Test
205  public void testSecondaryRegionKill() throws Exception {
206    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
207      Table table = connection.getTable(htd.getTableName())) {
208      HTU.loadNumericRows(table, fam, 0, 1000);
209
210      // wait for some time to ensure that async wal replication does it's magic
211      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
212      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000);
213
214      // we should not have flushed files now, but data in memstores of primary and secondary
215      // kill the secondary region replica now, and ensure that when it comes back up, we can still
216      // read from it the same data
217      boolean aborted = false;
218      for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
219        for (Region r : rs.getRegionServer().getRegions(htd.getTableName())) {
220          if (r.getRegionInfo().getReplicaId() == 1) {
221            LOG.info("Aborting region server hosting secondary region replica");
222            rs.getRegionServer().abort("for test");
223            aborted = true;
224            break;
225          }
226        }
227      }
228      assertTrue(aborted);
229
230      // It takes extra time for replica region is ready for read as during
231      // region open process, it needs to ask primary region to do a flush and replica region
232      // can open newly flushed hfiles to avoid data out-of-sync.
233      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
234      HTU.verifyNumericRows(table, fam, 0, 1000, 2);
235    }
236
237    // restart the region server
238    HTU.getMiniHBaseCluster().startRegionServer();
239  }
240
241  /**
242   * Tests the case where there are 3 region replicas and the primary is continuously accepting new
243   * writes while one of the secondaries is killed. Verification is done for both of the secondary
244   * replicas.
245   */
246  @Test
247  public void testSecondaryRegionKillWhilePrimaryIsAcceptingWrites() throws Exception {
248    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
249      Table table = connection.getTable(htd.getTableName()); Admin admin = connection.getAdmin()) {
250      // start a thread to do the loading of primary
251      HTU.loadNumericRows(table, fam, 0, 1000); // start with some base
252      admin.flush(table.getName());
253      HTU.loadNumericRows(table, fam, 1000, 2000);
254
255      final AtomicReference<Throwable> ex = new AtomicReference<>(null);
256      final AtomicBoolean done = new AtomicBoolean(false);
257      final AtomicInteger key = new AtomicInteger(2000);
258
259      Thread loader = new Thread() {
260        @Override
261        public void run() {
262          while (!done.get()) {
263            try {
264              HTU.loadNumericRows(table, fam, key.get(), key.get() + 1000);
265              key.addAndGet(1000);
266            } catch (Throwable e) {
267              ex.compareAndSet(null, e);
268            }
269          }
270        }
271      };
272      loader.start();
273
274      Thread aborter = new Thread() {
275        @Override
276        public void run() {
277          try {
278            boolean aborted = false;
279            for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
280              for (Region r : rs.getRegionServer().getRegions(htd.getTableName())) {
281                if (r.getRegionInfo().getReplicaId() == 1) {
282                  LOG.info("Aborting region server hosting secondary region replica");
283                  rs.getRegionServer().abort("for test");
284                  aborted = true;
285                }
286              }
287            }
288            assertTrue(aborted);
289          } catch (Throwable e) {
290            ex.compareAndSet(null, e);
291          }
292        }
293      };
294
295      aborter.start();
296      aborter.join();
297      done.set(true);
298      loader.join();
299
300      assertNull(ex.get());
301
302      assertTrue(key.get() > 1000); // assert that the test is working as designed
303      LOG.info("Loaded up to key :" + key.get());
304      verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 0, 30000);
305      verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 1, 30000);
306      verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 2, 30000);
307    }
308
309    // restart the region server
310    HTU.getMiniHBaseCluster().startRegionServer();
311  }
312
313  /**
314   * Tests the case where we are creating a table with a lot of regions and replicas. Opening region
315   * replicas should not block handlers on RS indefinitely.
316   */
317  @Test
318  public void testLotsOfRegionReplicas() throws IOException {
319    int numRegions = NB_SERVERS * 20;
320    int regionReplication = 10;
321    String tableName = htd.getTableName().getNameAsString() + "2";
322    htd = HTU
323      .createModifyableTableDescriptor(TableName.valueOf(tableName),
324        ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
325        ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED)
326      .setRegionReplication(regionReplication).build();
327
328    // dont care about splits themselves too much
329    byte[] startKey = Bytes.toBytes("aaa");
330    byte[] endKey = Bytes.toBytes("zzz");
331    byte[][] splits = HTU.getRegionSplitStartKeys(startKey, endKey, numRegions);
332    HTU.getAdmin().createTable(htd, startKey, endKey, numRegions);
333
334    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
335      Table table = connection.getTable(htd.getTableName())) {
336
337      for (int i = 1; i < splits.length; i++) {
338        for (int j = 0; j < regionReplication; j++) {
339          Get get = new Get(splits[i]);
340          get.setConsistency(Consistency.TIMELINE);
341          get.setReplicaId(j);
342          table.get(get); // this should not block. Regions should be coming online
343        }
344      }
345    }
346
347    HTU.deleteTableIfAny(TableName.valueOf(tableName));
348  }
349}