001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertNull;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.concurrent.atomic.AtomicBoolean;
025import java.util.concurrent.atomic.AtomicInteger;
026import java.util.concurrent.atomic.AtomicReference;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseTestingUtil;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.Waiter.Predicate;
033import org.apache.hadoop.hbase.client.Admin;
034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
035import org.apache.hadoop.hbase.client.Connection;
036import org.apache.hadoop.hbase.client.ConnectionFactory;
037import org.apache.hadoop.hbase.client.Consistency;
038import org.apache.hadoop.hbase.client.Get;
039import org.apache.hadoop.hbase.client.Table;
040import org.apache.hadoop.hbase.client.TableDescriptor;
041import org.apache.hadoop.hbase.replication.regionserver.TestRegionReplicaReplication;
042import org.apache.hadoop.hbase.testclassification.LargeTests;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
045import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
046import org.junit.After;
047import org.junit.Before;
048import org.junit.ClassRule;
049import org.junit.Rule;
050import org.junit.Test;
051import org.junit.experimental.categories.Category;
052import org.junit.rules.TestName;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056/**
057 * Tests failover of secondary region replicas.
058 */
059@Category(LargeTests.class)
060public class TestRegionReplicaFailover {
061
062  @ClassRule
063  public static final HBaseClassTestRule CLASS_RULE =
064    HBaseClassTestRule.forClass(TestRegionReplicaFailover.class);
065
066  private static final Logger LOG = LoggerFactory.getLogger(TestRegionReplicaReplication.class);
067
068  private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
069
070  private static final int NB_SERVERS = 3;
071
072  protected final byte[][] families =
073    new byte[][] { HBaseTestingUtil.fam1, HBaseTestingUtil.fam2, HBaseTestingUtil.fam3 };
074  protected final byte[] fam = HBaseTestingUtil.fam1;
075  protected final byte[] qual1 = Bytes.toBytes("qual1");
076  protected final byte[] value1 = Bytes.toBytes("value1");
077  protected final byte[] row = Bytes.toBytes("rowA");
078  protected final byte[] row2 = Bytes.toBytes("rowB");
079
080  @Rule
081  public TestName name = new TestName();
082
083  private TableDescriptor htd;
084
085  @Before
086  public void before() throws Exception {
087    Configuration conf = HTU.getConfiguration();
088    // Up the handlers; this test needs more than usual.
089    conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
090    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
091    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, true);
092    conf.setInt("replication.stats.thread.period.seconds", 5);
093    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
094
095    HTU.startMiniCluster(NB_SERVERS);
096    htd = HTU.createModifyableTableDescriptor(
097      TableName.valueOf(name.getMethodName().substring(0, name.getMethodName().length() - 3)),
098      ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
099      ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED).setRegionReplication(3).build();
100    HTU.getAdmin().createTable(htd);
101  }
102
103  @After
104  public void after() throws Exception {
105    HTU.deleteTableIfAny(htd.getTableName());
106    HTU.shutdownMiniCluster();
107  }
108
109  /**
110   * Tests the case where a newly created table with region replicas and no data, the secondary
111   * region replicas are available to read immediately.
112   */
113  @Test
114  public void testSecondaryRegionWithEmptyRegion() throws IOException {
115    // Create a new table with region replication, don't put any data. Test that the secondary
116    // region replica is available to read.
117    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
118      Table table = connection.getTable(htd.getTableName())) {
119
120      Get get = new Get(row);
121      get.setConsistency(Consistency.TIMELINE);
122      get.setReplicaId(1);
123      table.get(get); // this should not block
124    }
125  }
126
127  /**
128   * Tests the case where if there is some data in the primary region, reopening the region replicas
129   * (enable/disable table, etc) makes the region replicas readable.
130   */
131  @Test
132  public void testSecondaryRegionWithNonEmptyRegion() throws IOException {
133    // Create a new table with region replication and load some data
134    // than disable and enable the table again and verify the data from secondary
135    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
136      Table table = connection.getTable(htd.getTableName())) {
137
138      HTU.loadNumericRows(table, fam, 0, 1000);
139
140      HTU.getAdmin().disableTable(htd.getTableName());
141      HTU.getAdmin().enableTable(htd.getTableName());
142
143      HTU.verifyNumericRows(table, fam, 0, 1000, 1);
144    }
145  }
146
147  /**
148   * Tests the case where killing a primary region with unflushed data recovers
149   */
150  @Test
151  public void testPrimaryRegionKill() throws Exception {
152    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
153      Table table = connection.getTable(htd.getTableName())) {
154
155      HTU.loadNumericRows(table, fam, 0, 1000);
156
157      // wal replication is async, we have to wait until the replication catches up, or we timeout
158      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
159      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000);
160
161      // we should not have flushed files now, but data in memstores of primary and secondary
162      // kill the primary region replica now, and ensure that when it comes back up, we can still
163      // read from it the same data from primary and secondaries
164      boolean aborted = false;
165      for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
166        for (Region r : rs.getRegionServer().getRegions(htd.getTableName())) {
167          if (r.getRegionInfo().getReplicaId() == 0) {
168            LOG.info("Aborting region server hosting primary region replica");
169            rs.getRegionServer().abort("for test");
170            aborted = true;
171            break;
172          }
173        }
174      }
175      assertTrue(aborted);
176
177      // wal replication is async, we have to wait until the replication catches up, or we timeout
178      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 0, 30000);
179      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
180      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000);
181    }
182
183    // restart the region server
184    HTU.getMiniHBaseCluster().startRegionServer();
185  }
186
187  /**
188   * wal replication is async, we have to wait until the replication catches up, or we timeout
189   */
190  private void verifyNumericRowsWithTimeout(final Table table, final byte[] f, final int startRow,
191    final int endRow, final int replicaId, final long timeout) throws Exception {
192    try {
193      HTU.waitFor(timeout, new Predicate<Exception>() {
194        @Override
195        public boolean evaluate() throws Exception {
196          try {
197            HTU.verifyNumericRows(table, f, startRow, endRow, replicaId);
198            return true;
199          } catch (AssertionError ae) {
200            return false;
201          }
202        }
203      });
204    } catch (Throwable t) {
205      // ignore this, but redo the verify do get the actual exception
206      HTU.verifyNumericRows(table, f, startRow, endRow, replicaId);
207    }
208  }
209
210  /**
211   * Tests the case where killing a secondary region with unflushed data recovers, and the replica
212   * becomes available to read again shortly.
213   */
214  @Test
215  public void testSecondaryRegionKill() throws Exception {
216    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
217      Table table = connection.getTable(htd.getTableName())) {
218      HTU.loadNumericRows(table, fam, 0, 1000);
219
220      // wait for some time to ensure that async wal replication does it's magic
221      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
222      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000);
223
224      // we should not have flushed files now, but data in memstores of primary and secondary
225      // kill the secondary region replica now, and ensure that when it comes back up, we can still
226      // read from it the same data
227      boolean aborted = false;
228      for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
229        for (Region r : rs.getRegionServer().getRegions(htd.getTableName())) {
230          if (r.getRegionInfo().getReplicaId() == 1) {
231            LOG.info("Aborting region server hosting secondary region replica");
232            rs.getRegionServer().abort("for test");
233            aborted = true;
234            break;
235          }
236        }
237      }
238      assertTrue(aborted);
239
240      // It takes extra time for replica region is ready for read as during
241      // region open process, it needs to ask primary region to do a flush and replica region
242      // can open newly flushed hfiles to avoid data out-of-sync.
243      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
244      HTU.verifyNumericRows(table, fam, 0, 1000, 2);
245    }
246
247    // restart the region server
248    HTU.getMiniHBaseCluster().startRegionServer();
249  }
250
251  /**
252   * Tests the case where there are 3 region replicas and the primary is continuously accepting new
253   * writes while one of the secondaries is killed. Verification is done for both of the secondary
254   * replicas.
255   */
256  @Test
257  public void testSecondaryRegionKillWhilePrimaryIsAcceptingWrites() throws Exception {
258    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
259      Table table = connection.getTable(htd.getTableName()); Admin admin = connection.getAdmin()) {
260      // start a thread to do the loading of primary
261      HTU.loadNumericRows(table, fam, 0, 1000); // start with some base
262      admin.flush(table.getName());
263      HTU.loadNumericRows(table, fam, 1000, 2000);
264
265      final AtomicReference<Throwable> ex = new AtomicReference<>(null);
266      final AtomicBoolean done = new AtomicBoolean(false);
267      final AtomicInteger key = new AtomicInteger(2000);
268
269      Thread loader = new Thread() {
270        @Override
271        public void run() {
272          while (!done.get()) {
273            try {
274              HTU.loadNumericRows(table, fam, key.get(), key.get() + 1000);
275              key.addAndGet(1000);
276            } catch (Throwable e) {
277              ex.compareAndSet(null, e);
278            }
279          }
280        }
281      };
282      loader.start();
283
284      Thread aborter = new Thread() {
285        @Override
286        public void run() {
287          try {
288            boolean aborted = false;
289            for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
290              for (Region r : rs.getRegionServer().getRegions(htd.getTableName())) {
291                if (r.getRegionInfo().getReplicaId() == 1) {
292                  LOG.info("Aborting region server hosting secondary region replica");
293                  rs.getRegionServer().abort("for test");
294                  aborted = true;
295                }
296              }
297            }
298            assertTrue(aborted);
299          } catch (Throwable e) {
300            ex.compareAndSet(null, e);
301          }
302        }
303      };
304
305      aborter.start();
306      aborter.join();
307      done.set(true);
308      loader.join();
309
310      assertNull(ex.get());
311
312      assertTrue(key.get() > 1000); // assert that the test is working as designed
313      LOG.info("Loaded up to key :" + key.get());
314      verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 0, 30000);
315      verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 1, 30000);
316      verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 2, 30000);
317    }
318
319    // restart the region server
320    HTU.getMiniHBaseCluster().startRegionServer();
321  }
322
323  /**
324   * Tests the case where we are creating a table with a lot of regions and replicas. Opening region
325   * replicas should not block handlers on RS indefinitely.
326   */
327  @Test
328  public void testLotsOfRegionReplicas() throws IOException {
329    int numRegions = NB_SERVERS * 20;
330    int regionReplication = 10;
331    String tableName = htd.getTableName().getNameAsString() + "2";
332    htd = HTU
333      .createModifyableTableDescriptor(TableName.valueOf(tableName),
334        ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
335        ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED)
336      .setRegionReplication(regionReplication).build();
337
338    // dont care about splits themselves too much
339    byte[] startKey = Bytes.toBytes("aaa");
340    byte[] endKey = Bytes.toBytes("zzz");
341    byte[][] splits = HTU.getRegionSplitStartKeys(startKey, endKey, numRegions);
342    HTU.getAdmin().createTable(htd, startKey, endKey, numRegions);
343
344    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
345      Table table = connection.getTable(htd.getTableName())) {
346
347      for (int i = 1; i < splits.length; i++) {
348        for (int j = 0; j < regionReplication; j++) {
349          Get get = new Get(splits[i]);
350          get.setConsistency(Consistency.TIMELINE);
351          get.setReplicaId(j);
352          table.get(get); // this should not block. Regions should be coming online
353        }
354      }
355    }
356
357    HTU.deleteTableIfAny(TableName.valueOf(tableName));
358  }
359}