001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.List;
025import java.util.concurrent.CountDownLatch;
026import java.util.concurrent.atomic.AtomicInteger;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.client.Admin;
031import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
032import org.apache.hadoop.hbase.client.RegionInfo;
033import org.apache.hadoop.hbase.client.Table;
034import org.apache.hadoop.hbase.client.TableDescriptor;
035import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
036import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
037import org.apache.hadoop.hbase.regionserver.HRegion;
038import org.apache.hadoop.hbase.regionserver.HRegionServer;
039import org.apache.hadoop.hbase.regionserver.HStore;
040import org.apache.hadoop.hbase.regionserver.Region;
041import org.apache.hadoop.hbase.regionserver.RegionServerServices;
042import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
043import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
044import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
045import org.apache.hadoop.hbase.security.User;
046import org.apache.hadoop.hbase.testclassification.LargeTests;
047import org.apache.hadoop.hbase.testclassification.MiscTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
050import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
051import org.apache.hadoop.hbase.wal.WAL;
052import org.junit.ClassRule;
053import org.junit.Test;
054import org.junit.experimental.categories.Category;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
059
060import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
062
063/**
064 * Test for the case where a regionserver going down has enough cycles to do damage to regions that
065 * have actually been assigned elsehwere.
066 * <p>
067 * If we happen to assign a region before it fully done with in its old location -- i.e. it is on
068 * two servers at the same time -- all can work fine until the case where the region on the dying
069 * server decides to compact or otherwise change the region file set. The region in its new location
070 * will then get a surprise when it tries to do something w/ a file removed by the region in its old
071 * location on dying server.
072 * <p>
073 * Making a test for this case is a little tough in that even if a file is deleted up on the
074 * namenode, if the file was opened before the delete, it will continue to let reads happen until
075 * something changes the state of cached blocks in the dfsclient that was already open (a block from
076 * the deleted file is cleaned from the datanode by NN).
077 * <p>
078 * What we will do below is do an explicit check for existence on the files listed in the region
079 * that has had some files removed because of a compaction. This sort of hurry's along and makes
080 * certain what is a chance occurance.
081 */
082@Category({ MiscTests.class, LargeTests.class })
083public class TestIOFencing {
084
085  @ClassRule
086  public static final HBaseClassTestRule CLASS_RULE =
087    HBaseClassTestRule.forClass(TestIOFencing.class);
088
089  private static final Logger LOG = LoggerFactory.getLogger(TestIOFencing.class);
090  static {
091    // Uncomment the following lines if more verbosity is needed for
092    // debugging (see HBASE-12285 for details).
093    // ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
094    // ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
095    // ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
096    // ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.server.namenode.FSNamesystem"))
097    // .getLogger().setLevel(Level.ALL);
098    // ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
099  }
100
101  public abstract static class CompactionBlockerRegion extends HRegion {
102    AtomicInteger compactCount = new AtomicInteger();
103    volatile CountDownLatch compactionsBlocked = new CountDownLatch(0);
104    volatile CountDownLatch compactionsWaiting = new CountDownLatch(0);
105
106    @SuppressWarnings("deprecation")
107    public CompactionBlockerRegion(Path tableDir, WAL log, FileSystem fs, Configuration confParam,
108      RegionInfo info, TableDescriptor htd, RegionServerServices rsServices) {
109      super(tableDir, log, fs, confParam, info, htd, rsServices);
110    }
111
112    public void stopCompactions() {
113      compactionsBlocked = new CountDownLatch(1);
114      compactionsWaiting = new CountDownLatch(1);
115    }
116
117    public void allowCompactions() {
118      LOG.debug("allowing compactions");
119      compactionsBlocked.countDown();
120    }
121
122    public void waitForCompactionToBlock() throws IOException {
123      try {
124        LOG.debug("waiting for compaction to block");
125        compactionsWaiting.await();
126        LOG.debug("compaction block reached");
127      } catch (InterruptedException ex) {
128        throw new IOException(ex);
129      }
130    }
131
132    @Override
133    public boolean compact(CompactionContext compaction, HStore store,
134      ThroughputController throughputController) throws IOException {
135      try {
136        return super.compact(compaction, store, throughputController);
137      } finally {
138        compactCount.getAndIncrement();
139      }
140    }
141
142    @Override
143    public boolean compact(CompactionContext compaction, HStore store,
144      ThroughputController throughputController, User user) throws IOException {
145      try {
146        return super.compact(compaction, store, throughputController, user);
147      } finally {
148        compactCount.getAndIncrement();
149      }
150    }
151
152    public int countStoreFiles() {
153      int count = 0;
154      for (HStore store : stores.values()) {
155        count += store.getStorefilesCount();
156      }
157      return count;
158    }
159  }
160
161  /**
162   * An override of HRegion that allows us park compactions in a holding pattern and then when
163   * appropriate for the test, allow them proceed again.
164   */
165  public static class BlockCompactionsInPrepRegion extends CompactionBlockerRegion {
166
167    public BlockCompactionsInPrepRegion(Path tableDir, WAL log, FileSystem fs,
168      Configuration confParam, RegionInfo info, TableDescriptor htd,
169      RegionServerServices rsServices) {
170      super(tableDir, log, fs, confParam, info, htd, rsServices);
171    }
172
173    @Override
174    protected void doRegionCompactionPrep() throws IOException {
175      compactionsWaiting.countDown();
176      try {
177        compactionsBlocked.await();
178      } catch (InterruptedException ex) {
179        throw new IOException();
180      }
181      super.doRegionCompactionPrep();
182    }
183  }
184
185  /**
186   * An override of HRegion that allows us park compactions in a holding pattern and then when
187   * appropriate for the test, allow them proceed again. This allows the compaction entry to go the
188   * WAL before blocking, but blocks afterwards
189   */
190  public static class BlockCompactionsInCompletionRegion extends CompactionBlockerRegion {
191    public BlockCompactionsInCompletionRegion(Path tableDir, WAL log, FileSystem fs,
192      Configuration confParam, RegionInfo info, TableDescriptor htd,
193      RegionServerServices rsServices) {
194      super(tableDir, log, fs, confParam, info, htd, rsServices);
195    }
196
197    @Override
198    protected HStore instantiateHStore(final ColumnFamilyDescriptor family, boolean warmup)
199      throws IOException {
200      return new BlockCompactionsInCompletionHStore(this, family, this.conf, warmup);
201    }
202  }
203
204  public static class BlockCompactionsInCompletionHStore extends HStore {
205    CompactionBlockerRegion r;
206
207    protected BlockCompactionsInCompletionHStore(HRegion region, ColumnFamilyDescriptor family,
208      Configuration confParam, boolean warmup) throws IOException {
209      super(region, family, confParam, warmup);
210      r = (CompactionBlockerRegion) region;
211    }
212
213    @Override
214    protected void refreshStoreSizeAndTotalBytes() throws IOException {
215      if (r != null) {
216        try {
217          r.compactionsWaiting.countDown();
218          r.compactionsBlocked.await();
219        } catch (InterruptedException ex) {
220          throw new IOException(ex);
221        }
222      }
223      super.refreshStoreSizeAndTotalBytes();
224    }
225  }
226
227  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
228  private final static TableName TABLE_NAME = TableName.valueOf("tabletest");
229  private final static byte[] FAMILY = Bytes.toBytes("family");
230  private static final int FIRST_BATCH_COUNT = 4000;
231  private static final int SECOND_BATCH_COUNT = FIRST_BATCH_COUNT;
232
233  /**
234   * Test that puts up a regionserver, starts a compaction on a loaded region but holds the
235   * compaction until after we have killed the server and the region has come up on a new
236   * regionserver altogether. This fakes the double assignment case where region in one location
237   * changes the files out from underneath a region being served elsewhere.
238   */
239  @Test
240  public void testFencingAroundCompaction() throws Exception {
241    for (MemoryCompactionPolicy policy : MemoryCompactionPolicy.values()) {
242      doTest(BlockCompactionsInPrepRegion.class, policy);
243    }
244  }
245
246  /**
247   * Test that puts up a regionserver, starts a compaction on a loaded region but holds the
248   * compaction completion until after we have killed the server and the region has come up on a new
249   * regionserver altogether. This fakes the double assignment case where region in one location
250   * changes the files out from underneath a region being served elsewhere.
251   */
252  @Test
253  public void testFencingAroundCompactionAfterWALSync() throws Exception {
254    for (MemoryCompactionPolicy policy : MemoryCompactionPolicy.values()) {
255      doTest(BlockCompactionsInCompletionRegion.class, policy);
256    }
257  }
258
259  public void doTest(Class<?> regionClass, MemoryCompactionPolicy policy) throws Exception {
260    Configuration c = TEST_UTIL.getConfiguration();
261    // Insert our custom region
262    c.setClass(HConstants.REGION_IMPL, regionClass, HRegion.class);
263    // Encourage plenty of flushes
264    c.setLong("hbase.hregion.memstore.flush.size", 25000);
265    c.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, ConstantSizeRegionSplitPolicy.class.getName());
266    // Only run compaction when we tell it to
267    c.setInt("hbase.hstore.compaction.min", 1);
268    c.setInt("hbase.hstore.compactionThreshold", 1000);
269    c.setLong("hbase.hstore.blockingStoreFiles", 1000);
270    // Compact quickly after we tell it to!
271    c.setInt("hbase.regionserver.thread.splitcompactcheckfrequency", 1000);
272    c.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(policy));
273    LOG.info("Starting mini cluster");
274    TEST_UTIL.startMiniCluster(1);
275    CompactionBlockerRegion compactingRegion = null;
276    Admin admin = null;
277    try {
278      LOG.info("Creating admin");
279      admin = TEST_UTIL.getConnection().getAdmin();
280      LOG.info("Creating table");
281      TEST_UTIL.createTable(TABLE_NAME, FAMILY);
282      Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME);
283      LOG.info("Loading test table");
284      // Find the region
285      List<HRegion> testRegions = TEST_UTIL.getMiniHBaseCluster().findRegionsForTable(TABLE_NAME);
286      assertEquals(1, testRegions.size());
287      compactingRegion = (CompactionBlockerRegion) testRegions.get(0);
288      LOG.info("Blocking compactions");
289      compactingRegion.stopCompactions();
290      long lastFlushTime = compactingRegion.getEarliestFlushTimeForAllStores();
291      // Load some rows
292      TEST_UTIL.loadNumericRows(table, FAMILY, 0, FIRST_BATCH_COUNT);
293
294      // add a compaction from an older (non-existing) region to see whether we successfully skip
295      // those entries
296      HRegionInfo oldHri =
297        new HRegionInfo(table.getName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
298      CompactionDescriptor compactionDescriptor =
299        ProtobufUtil.toCompactionDescriptor(oldHri, FAMILY, Lists.newArrayList(new Path("/a")),
300          Lists.newArrayList(new Path("/b")), new Path("store_dir"));
301      WALUtil.writeCompactionMarker(compactingRegion.getWAL(),
302        ((HRegion) compactingRegion).getReplicationScope(), oldHri, compactionDescriptor,
303        compactingRegion.getMVCC());
304
305      // Wait till flush has happened, otherwise there won't be multiple store files
306      long startWaitTime = EnvironmentEdgeManager.currentTime();
307      while (
308        compactingRegion.getEarliestFlushTimeForAllStores() <= lastFlushTime
309          || compactingRegion.countStoreFiles() <= 1
310      ) {
311        LOG.info("Waiting for the region to flush "
312          + compactingRegion.getRegionInfo().getRegionNameAsString());
313        Thread.sleep(1000);
314        admin.flush(table.getName());
315        assertTrue("Timed out waiting for the region to flush",
316          EnvironmentEdgeManager.currentTime() - startWaitTime < 30000);
317      }
318      assertTrue(compactingRegion.countStoreFiles() > 1);
319      final byte REGION_NAME[] = compactingRegion.getRegionInfo().getRegionName();
320      LOG.info("Asking for compaction");
321      admin.majorCompact(TABLE_NAME);
322      LOG.info("Waiting for compaction to be about to start");
323      compactingRegion.waitForCompactionToBlock();
324      LOG.info("Starting a new server");
325      RegionServerThread newServerThread = TEST_UTIL.getMiniHBaseCluster().startRegionServer();
326      final HRegionServer newServer = newServerThread.getRegionServer();
327      LOG.info("Killing region server ZK lease");
328      TEST_UTIL.expireRegionServerSession(0);
329      CompactionBlockerRegion newRegion = null;
330      startWaitTime = EnvironmentEdgeManager.currentTime();
331      LOG.info("Waiting for the new server to pick up the region " + Bytes.toString(REGION_NAME));
332
333      // wait for region to be assigned and to go out of log replay if applicable
334      Waiter.waitFor(c, 60000, new Waiter.Predicate<Exception>() {
335        @Override
336        public boolean evaluate() throws Exception {
337          Region newRegion = newServer.getOnlineRegion(REGION_NAME);
338          return newRegion != null;
339        }
340      });
341
342      newRegion = (CompactionBlockerRegion) newServer.getOnlineRegion(REGION_NAME);
343
344      // After compaction of old region finishes on the server that was going down, make sure that
345      // all the files we expect are still working when region is up in new location.
346      FileSystem fs = newRegion.getFilesystem();
347      for (String f : newRegion.getStoreFileList(new byte[][] { FAMILY })) {
348        assertTrue("After compaction, does not exist: " + f, fs.exists(new Path(f)));
349      }
350      LOG.info("Allowing compaction to proceed");
351      compactingRegion.allowCompactions();
352      while (compactingRegion.compactCount.get() == 0) {
353        Thread.sleep(1000);
354      }
355      // The server we killed stays up until the compaction that was started before it was killed
356      // completes. In logs you should see the old regionserver now going down.
357      LOG.info("Compaction finished");
358
359      // If we survive the split keep going...
360      // Now we make sure that the region isn't totally confused. Load up more rows.
361      TEST_UTIL.loadNumericRows(table, FAMILY, FIRST_BATCH_COUNT,
362        FIRST_BATCH_COUNT + SECOND_BATCH_COUNT);
363      admin.majorCompact(TABLE_NAME);
364      startWaitTime = EnvironmentEdgeManager.currentTime();
365      while (newRegion.compactCount.get() == 0) {
366        Thread.sleep(1000);
367        assertTrue("New region never compacted",
368          EnvironmentEdgeManager.currentTime() - startWaitTime < 180000);
369      }
370      int count;
371      for (int i = 0;; i++) {
372        try {
373          count = TEST_UTIL.countRows(table);
374          break;
375        } catch (DoNotRetryIOException e) {
376          // wait up to 30s
377          if (i >= 30 || !e.getMessage().contains("File does not exist")) {
378            throw e;
379          }
380          Thread.sleep(1000);
381        }
382      }
383      if (policy == MemoryCompactionPolicy.EAGER || policy == MemoryCompactionPolicy.ADAPTIVE) {
384        assertTrue(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT >= count);
385      } else {
386        assertEquals(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT, count);
387      }
388    } finally {
389      if (compactingRegion != null) {
390        compactingRegion.allowCompactions();
391      }
392      admin.close();
393      TEST_UTIL.shutdownMiniCluster();
394    }
395  }
396}