001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertNull;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.concurrent.atomic.AtomicBoolean;
025import java.util.concurrent.atomic.AtomicInteger;
026import java.util.concurrent.atomic.AtomicReference;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseTestingUtil;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.Waiter.Predicate;
033import org.apache.hadoop.hbase.client.Admin;
034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
035import org.apache.hadoop.hbase.client.Connection;
036import org.apache.hadoop.hbase.client.ConnectionFactory;
037import org.apache.hadoop.hbase.client.Consistency;
038import org.apache.hadoop.hbase.client.Get;
039import org.apache.hadoop.hbase.client.Table;
040import org.apache.hadoop.hbase.client.TableDescriptor;
041import org.apache.hadoop.hbase.replication.regionserver.TestRegionReplicaReplication;
042import org.apache.hadoop.hbase.testclassification.LargeTests;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
045import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
046import org.junit.After;
047import org.junit.Before;
048import org.junit.ClassRule;
049import org.junit.Rule;
050import org.junit.Test;
051import org.junit.experimental.categories.Category;
052import org.junit.rules.TestName;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056/**
057 * Tests failover of secondary region replicas.
058 */
059@Category(LargeTests.class)
060public class TestRegionReplicaFailover {
061
062  @ClassRule
063  public static final HBaseClassTestRule CLASS_RULE =
064      HBaseClassTestRule.forClass(TestRegionReplicaFailover.class);
065
066  private static final Logger LOG =
067      LoggerFactory.getLogger(TestRegionReplicaReplication.class);
068
069  private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
070
071  private static final int NB_SERVERS = 3;
072
073  protected final byte[][] families =
074    new byte[][] { HBaseTestingUtil.fam1, HBaseTestingUtil.fam2, HBaseTestingUtil.fam3 };
075  protected final byte[] fam = HBaseTestingUtil.fam1;
076  protected final byte[] qual1 = Bytes.toBytes("qual1");
077  protected final byte[] value1 = Bytes.toBytes("value1");
078  protected final byte[] row = Bytes.toBytes("rowA");
079  protected final byte[] row2 = Bytes.toBytes("rowB");
080
081  @Rule
082  public TestName name = new TestName();
083
084  private TableDescriptor htd;
085
086  @Before
087  public void before() throws Exception {
088    Configuration conf = HTU.getConfiguration();
089   // Up the handlers; this test needs more than usual.
090    conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
091    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
092    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, true);
093    conf.setInt("replication.stats.thread.period.seconds", 5);
094    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
095
096    HTU.startMiniCluster(NB_SERVERS);
097    htd = HTU.createModifyableTableDescriptor(
098      TableName.valueOf(name.getMethodName().substring(0, name.getMethodName().length() - 3)),
099      ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
100      ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED).setRegionReplication(3).build();
101    HTU.getAdmin().createTable(htd);
102  }
103
104  @After
105  public void after() throws Exception {
106    HTU.deleteTableIfAny(htd.getTableName());
107    HTU.shutdownMiniCluster();
108  }
109
110  /**
111   * Tests the case where a newly created table with region replicas and no data, the secondary
112   * region replicas are available to read immediately.
113   */
114  @Test
115  public void testSecondaryRegionWithEmptyRegion() throws IOException {
116    // Create a new table with region replication, don't put any data. Test that the secondary
117    // region replica is available to read.
118    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
119        Table table = connection.getTable(htd.getTableName())) {
120
121      Get get = new Get(row);
122      get.setConsistency(Consistency.TIMELINE);
123      get.setReplicaId(1);
124      table.get(get); // this should not block
125    }
126  }
127
128  /**
129   * Tests the case where if there is some data in the primary region, reopening the region replicas
130   * (enable/disable table, etc) makes the region replicas readable.
131   * @throws IOException
132   */
133  @Test
134  public void testSecondaryRegionWithNonEmptyRegion() throws IOException {
135    // Create a new table with region replication and load some data
136    // than disable and enable the table again and verify the data from secondary
137    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
138        Table table = connection.getTable(htd.getTableName())) {
139
140      HTU.loadNumericRows(table, fam, 0, 1000);
141
142      HTU.getAdmin().disableTable(htd.getTableName());
143      HTU.getAdmin().enableTable(htd.getTableName());
144
145      HTU.verifyNumericRows(table, fam, 0, 1000, 1);
146    }
147  }
148
149  /**
150   * Tests the case where killing a primary region with unflushed data recovers
151   */
152  @Test
153  public void testPrimaryRegionKill() throws Exception {
154    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
155        Table table = connection.getTable(htd.getTableName())) {
156
157      HTU.loadNumericRows(table, fam, 0, 1000);
158
159      // wal replication is async, we have to wait until the replication catches up, or we timeout
160      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
161      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000);
162
163      // we should not have flushed files now, but data in memstores of primary and secondary
164      // kill the primary region replica now, and ensure that when it comes back up, we can still
165      // read from it the same data from primary and secondaries
166      boolean aborted = false;
167      for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
168        for (Region r : rs.getRegionServer().getRegions(htd.getTableName())) {
169          if (r.getRegionInfo().getReplicaId() == 0) {
170            LOG.info("Aborting region server hosting primary region replica");
171            rs.getRegionServer().abort("for test");
172            aborted = true;
173            break;
174          }
175        }
176      }
177      assertTrue(aborted);
178
179      // wal replication is async, we have to wait until the replication catches up, or we timeout
180      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 0, 30000);
181      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
182      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000);
183    }
184
185    // restart the region server
186    HTU.getMiniHBaseCluster().startRegionServer();
187  }
188
189  /** wal replication is async, we have to wait until the replication catches up, or we timeout
190   */
191  private void verifyNumericRowsWithTimeout(final Table table, final byte[] f, final int startRow,
192      final int endRow, final int replicaId, final long timeout) throws Exception {
193    try {
194      HTU.waitFor(timeout, new Predicate<Exception>() {
195        @Override
196        public boolean evaluate() throws Exception {
197          try {
198            HTU.verifyNumericRows(table, f, startRow, endRow, replicaId);
199            return true;
200          } catch (AssertionError ae) {
201            return false;
202          }
203        }
204      });
205    } catch (Throwable t) {
206      // ignore this, but redo the verify do get the actual exception
207      HTU.verifyNumericRows(table, f, startRow, endRow, replicaId);
208    }
209  }
210
211  /**
212   * Tests the case where killing a secondary region with unflushed data recovers, and the replica
213   * becomes available to read again shortly.
214   */
215  @Test
216  public void testSecondaryRegionKill() throws Exception {
217    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
218        Table table = connection.getTable(htd.getTableName())) {
219      HTU.loadNumericRows(table, fam, 0, 1000);
220
221      // wait for some time to ensure that async wal replication does it's magic
222      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
223      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000);
224
225      // we should not have flushed files now, but data in memstores of primary and secondary
226      // kill the secondary region replica now, and ensure that when it comes back up, we can still
227      // read from it the same data
228      boolean aborted = false;
229      for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
230        for (Region r : rs.getRegionServer().getRegions(htd.getTableName())) {
231          if (r.getRegionInfo().getReplicaId() == 1) {
232            LOG.info("Aborting region server hosting secondary region replica");
233            rs.getRegionServer().abort("for test");
234            aborted = true;
235            break;
236          }
237        }
238      }
239      assertTrue(aborted);
240
241      // It takes extra time for replica region is ready for read as during
242      // region open process, it needs to ask primary region to do a flush and replica region
243      // can open newly flushed hfiles to avoid data out-of-sync.
244      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
245      HTU.verifyNumericRows(table, fam, 0, 1000, 2);
246    }
247
248    // restart the region server
249    HTU.getMiniHBaseCluster().startRegionServer();
250  }
251
252  /**
253   * Tests the case where there are 3 region replicas and the primary is continuously accepting
254   * new writes while one of the secondaries is killed. Verification is done for both of the
255   * secondary replicas.
256   */
257  @Test
258  public void testSecondaryRegionKillWhilePrimaryIsAcceptingWrites() throws Exception {
259    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
260        Table table = connection.getTable(htd.getTableName());
261        Admin admin = connection.getAdmin()) {
262      // start a thread to do the loading of primary
263      HTU.loadNumericRows(table, fam, 0, 1000); // start with some base
264      admin.flush(table.getName());
265      HTU.loadNumericRows(table, fam, 1000, 2000);
266
267      final AtomicReference<Throwable> ex = new AtomicReference<>(null);
268      final AtomicBoolean done = new AtomicBoolean(false);
269      final AtomicInteger key = new AtomicInteger(2000);
270
271      Thread loader = new Thread() {
272        @Override
273        public void run() {
274          while (!done.get()) {
275            try {
276              HTU.loadNumericRows(table, fam, key.get(), key.get()+1000);
277              key.addAndGet(1000);
278            } catch (Throwable e) {
279              ex.compareAndSet(null, e);
280            }
281          }
282        }
283      };
284      loader.start();
285
286      Thread aborter = new Thread() {
287        @Override
288        public void run() {
289          try {
290            boolean aborted = false;
291            for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
292              for (Region r : rs.getRegionServer().getRegions(htd.getTableName())) {
293                if (r.getRegionInfo().getReplicaId() == 1) {
294                  LOG.info("Aborting region server hosting secondary region replica");
295                  rs.getRegionServer().abort("for test");
296                  aborted = true;
297                }
298              }
299            }
300            assertTrue(aborted);
301          } catch (Throwable e) {
302            ex.compareAndSet(null, e);
303          }
304        }
305      };
306
307      aborter.start();
308      aborter.join();
309      done.set(true);
310      loader.join();
311
312      assertNull(ex.get());
313
314      assertTrue(key.get() > 1000); // assert that the test is working as designed
315      LOG.info("Loaded up to key :" + key.get());
316      verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 0, 30000);
317      verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 1, 30000);
318      verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 2, 30000);
319    }
320
321    // restart the region server
322    HTU.getMiniHBaseCluster().startRegionServer();
323  }
324
325  /**
326   * Tests the case where we are creating a table with a lot of regions and replicas. Opening region
327   * replicas should not block handlers on RS indefinitely.
328   */
329  @Test
330  public void testLotsOfRegionReplicas() throws IOException {
331    int numRegions = NB_SERVERS * 20;
332    int regionReplication = 10;
333    String tableName = htd.getTableName().getNameAsString() + "2";
334    htd = HTU
335      .createModifyableTableDescriptor(TableName.valueOf(tableName),
336        ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
337        ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED)
338      .setRegionReplication(regionReplication).build();
339
340    // dont care about splits themselves too much
341    byte[] startKey = Bytes.toBytes("aaa");
342    byte[] endKey = Bytes.toBytes("zzz");
343    byte[][] splits = HTU.getRegionSplitStartKeys(startKey, endKey, numRegions);
344    HTU.getAdmin().createTable(htd, startKey, endKey, numRegions);
345
346    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
347        Table table = connection.getTable(htd.getTableName())) {
348
349      for (int i = 1; i < splits.length; i++) {
350        for (int j = 0; j < regionReplication; j++) {
351          Get get = new Get(splits[i]);
352          get.setConsistency(Consistency.TIMELINE);
353          get.setReplicaId(j);
354          table.get(get); // this should not block. Regions should be coming online
355        }
356      }
357    }
358
359    HTU.deleteTableIfAny(TableName.valueOf(tableName));
360  }
361}