001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.*;
021
022import java.io.IOException;
023import java.util.concurrent.atomic.AtomicBoolean;
024import java.util.concurrent.atomic.AtomicInteger;
025import java.util.concurrent.atomic.AtomicReference;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.HBaseClassTestRule;
028import org.apache.hadoop.hbase.HBaseTestingUtility;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.HTableDescriptor;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.Waiter.Predicate;
033import org.apache.hadoop.hbase.client.Admin;
034import org.apache.hadoop.hbase.client.Connection;
035import org.apache.hadoop.hbase.client.ConnectionFactory;
036import org.apache.hadoop.hbase.client.Consistency;
037import org.apache.hadoop.hbase.client.Get;
038import org.apache.hadoop.hbase.client.Table;
039import org.apache.hadoop.hbase.replication.regionserver.TestRegionReplicaReplicationEndpoint;
040import org.apache.hadoop.hbase.testclassification.LargeTests;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
043import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
044import org.apache.hadoop.hbase.util.Threads;
045import org.junit.After;
046import org.junit.Before;
047import org.junit.ClassRule;
048import org.junit.Rule;
049import org.junit.Test;
050import org.junit.experimental.categories.Category;
051import org.junit.rules.TestName;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055/**
056 * Tests failover of secondary region replicas.
057 */
058@Category(LargeTests.class)
059public class TestRegionReplicaFailover {
060
061  @ClassRule
062  public static final HBaseClassTestRule CLASS_RULE =
063      HBaseClassTestRule.forClass(TestRegionReplicaFailover.class);
064
065  private static final Logger LOG =
066      LoggerFactory.getLogger(TestRegionReplicaReplicationEndpoint.class);
067
068  private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
069
070  private static final int NB_SERVERS = 3;
071
072  protected final byte[][] families = new byte[][] {HBaseTestingUtility.fam1,
073      HBaseTestingUtility.fam2, HBaseTestingUtility.fam3};
074  protected final byte[] fam = HBaseTestingUtility.fam1;
075  protected final byte[] qual1 = Bytes.toBytes("qual1");
076  protected final byte[] value1 = Bytes.toBytes("value1");
077  protected final byte[] row = Bytes.toBytes("rowA");
078  protected final byte[] row2 = Bytes.toBytes("rowB");
079
080  @Rule public TestName name = new TestName();
081
082  private HTableDescriptor htd;
083
084  @Before
085  public void before() throws Exception {
086    Configuration conf = HTU.getConfiguration();
087   // Up the handlers; this test needs more than usual.
088    conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
089    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
090    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, true);
091    conf.setInt("replication.stats.thread.period.seconds", 5);
092    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
093
094    HTU.startMiniCluster(NB_SERVERS);
095    htd = HTU.createTableDescriptor(
096      name.getMethodName().substring(0, name.getMethodName().length()-3));
097    htd.setRegionReplication(3);
098    HTU.getAdmin().createTable(htd);
099  }
100
101  @After
102  public void after() throws Exception {
103    HTU.deleteTableIfAny(htd.getTableName());
104    HTU.shutdownMiniCluster();
105  }
106
107  /**
108   * Tests the case where a newly created table with region replicas and no data, the secondary
109   * region replicas are available to read immediately.
110   */
111  @Test
112  public void testSecondaryRegionWithEmptyRegion() throws IOException {
113    // Create a new table with region replication, don't put any data. Test that the secondary
114    // region replica is available to read.
115    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
116        Table table = connection.getTable(htd.getTableName())) {
117
118      Get get = new Get(row);
119      get.setConsistency(Consistency.TIMELINE);
120      get.setReplicaId(1);
121      table.get(get); // this should not block
122    }
123  }
124
125  /**
126   * Tests the case where if there is some data in the primary region, reopening the region replicas
127   * (enable/disable table, etc) makes the region replicas readable.
128   * @throws IOException
129   */
130  @Test
131  public void testSecondaryRegionWithNonEmptyRegion() throws IOException {
132    // Create a new table with region replication and load some data
133    // than disable and enable the table again and verify the data from secondary
134    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
135        Table table = connection.getTable(htd.getTableName())) {
136
137      HTU.loadNumericRows(table, fam, 0, 1000);
138
139      HTU.getAdmin().disableTable(htd.getTableName());
140      HTU.getAdmin().enableTable(htd.getTableName());
141
142      HTU.verifyNumericRows(table, fam, 0, 1000, 1);
143    }
144  }
145
146  /**
147   * Tests the case where killing a primary region with unflushed data recovers
148   */
149  @Test
150  public void testPrimaryRegionKill() throws Exception {
151    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
152        Table table = connection.getTable(htd.getTableName())) {
153
154      HTU.loadNumericRows(table, fam, 0, 1000);
155
156      // wal replication is async, we have to wait until the replication catches up, or we timeout
157      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
158      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000);
159
160      // we should not have flushed files now, but data in memstores of primary and secondary
161      // kill the primary region replica now, and ensure that when it comes back up, we can still
162      // read from it the same data from primary and secondaries
163      boolean aborted = false;
164      for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
165        for (Region r : rs.getRegionServer().getRegions(htd.getTableName())) {
166          if (r.getRegionInfo().getReplicaId() == 0) {
167            LOG.info("Aborting region server hosting primary region replica");
168            rs.getRegionServer().abort("for test");
169            aborted = true;
170            break;
171          }
172        }
173      }
174      assertTrue(aborted);
175
176      // wal replication is async, we have to wait until the replication catches up, or we timeout
177      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 0, 30000);
178      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
179      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000);
180    }
181
182    // restart the region server
183    HTU.getMiniHBaseCluster().startRegionServer();
184  }
185
186  /** wal replication is async, we have to wait until the replication catches up, or we timeout
187   */
188  private void verifyNumericRowsWithTimeout(final Table table, final byte[] f, final int startRow,
189      final int endRow, final int replicaId, final long timeout) throws Exception {
190    try {
191      HTU.waitFor(timeout, new Predicate<Exception>() {
192        @Override
193        public boolean evaluate() throws Exception {
194          try {
195            HTU.verifyNumericRows(table, f, startRow, endRow, replicaId);
196            return true;
197          } catch (AssertionError ae) {
198            return false;
199          }
200        }
201      });
202    } catch (Throwable t) {
203      // ignore this, but redo the verify do get the actual exception
204      HTU.verifyNumericRows(table, f, startRow, endRow, replicaId);
205    }
206  }
207
208  /**
209   * Tests the case where killing a secondary region with unflushed data recovers, and the replica
210   * becomes available to read again shortly.
211   */
212  @Test
213  public void testSecondaryRegionKill() throws Exception {
214    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
215        Table table = connection.getTable(htd.getTableName())) {
216      HTU.loadNumericRows(table, fam, 0, 1000);
217
218      // wait for some time to ensure that async wal replication does it's magic
219      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
220      verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000);
221
222      // we should not have flushed files now, but data in memstores of primary and secondary
223      // kill the secondary region replica now, and ensure that when it comes back up, we can still
224      // read from it the same data
225      boolean aborted = false;
226      for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
227        for (Region r : rs.getRegionServer().getRegions(htd.getTableName())) {
228          if (r.getRegionInfo().getReplicaId() == 1) {
229            LOG.info("Aborting region server hosting secondary region replica");
230            rs.getRegionServer().abort("for test");
231            aborted = true;
232            break;
233          }
234        }
235      }
236      assertTrue(aborted);
237
238      Threads.sleep(5000);
239
240      HTU.verifyNumericRows(table, fam, 0, 1000, 1);
241      HTU.verifyNumericRows(table, fam, 0, 1000, 2);
242    }
243
244    // restart the region server
245    HTU.getMiniHBaseCluster().startRegionServer();
246  }
247
248  /**
249   * Tests the case where there are 3 region replicas and the primary is continuously accepting
250   * new writes while one of the secondaries is killed. Verification is done for both of the
251   * secondary replicas.
252   */
253  @Test
254  public void testSecondaryRegionKillWhilePrimaryIsAcceptingWrites() throws Exception {
255    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
256        Table table = connection.getTable(htd.getTableName());
257        Admin admin = connection.getAdmin()) {
258      // start a thread to do the loading of primary
259      HTU.loadNumericRows(table, fam, 0, 1000); // start with some base
260      admin.flush(table.getName());
261      HTU.loadNumericRows(table, fam, 1000, 2000);
262
263      final AtomicReference<Throwable> ex = new AtomicReference<>(null);
264      final AtomicBoolean done = new AtomicBoolean(false);
265      final AtomicInteger key = new AtomicInteger(2000);
266
267      Thread loader = new Thread() {
268        @Override
269        public void run() {
270          while (!done.get()) {
271            try {
272              HTU.loadNumericRows(table, fam, key.get(), key.get()+1000);
273              key.addAndGet(1000);
274            } catch (Throwable e) {
275              ex.compareAndSet(null, e);
276            }
277          }
278        }
279      };
280      loader.start();
281
282      Thread aborter = new Thread() {
283        @Override
284        public void run() {
285          try {
286            boolean aborted = false;
287            for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
288              for (Region r : rs.getRegionServer().getRegions(htd.getTableName())) {
289                if (r.getRegionInfo().getReplicaId() == 1) {
290                  LOG.info("Aborting region server hosting secondary region replica");
291                  rs.getRegionServer().abort("for test");
292                  aborted = true;
293                }
294              }
295            }
296            assertTrue(aborted);
297          } catch (Throwable e) {
298            ex.compareAndSet(null, e);
299          }
300        };
301      };
302
303      aborter.start();
304      aborter.join();
305      done.set(true);
306      loader.join();
307
308      assertNull(ex.get());
309
310      assertTrue(key.get() > 1000); // assert that the test is working as designed
311      LOG.info("Loaded up to key :" + key.get());
312      verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 0, 30000);
313      verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 1, 30000);
314      verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 2, 30000);
315    }
316
317    // restart the region server
318    HTU.getMiniHBaseCluster().startRegionServer();
319  }
320
321  /**
322   * Tests the case where we are creating a table with a lot of regions and replicas. Opening region
323   * replicas should not block handlers on RS indefinitely.
324   */
325  @Test
326  public void testLotsOfRegionReplicas() throws IOException {
327    int numRegions = NB_SERVERS * 20;
328    int regionReplication = 10;
329    String tableName = htd.getTableName().getNameAsString() + "2";
330    htd = HTU.createTableDescriptor(tableName);
331    htd.setRegionReplication(regionReplication);
332
333    // dont care about splits themselves too much
334    byte[] startKey = Bytes.toBytes("aaa");
335    byte[] endKey = Bytes.toBytes("zzz");
336    byte[][] splits = HTU.getRegionSplitStartKeys(startKey, endKey, numRegions);
337    HTU.getAdmin().createTable(htd, startKey, endKey, numRegions);
338
339    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
340        Table table = connection.getTable(htd.getTableName())) {
341
342      for (int i = 1; i < splits.length; i++) {
343        for (int j = 0; j < regionReplication; j++) {
344          Get get = new Get(splits[i]);
345          get.setConsistency(Consistency.TIMELINE);
346          get.setReplicaId(j);
347          table.get(get); // this should not block. Regions should be coming online
348        }
349      }
350    }
351
352    HTU.deleteTableIfAny(TableName.valueOf(tableName));
353  }
354}