001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.Collection;
025import java.util.List;
026import java.util.concurrent.CountDownLatch;
027import java.util.concurrent.atomic.AtomicInteger;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.client.Admin;
032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
033import org.apache.hadoop.hbase.client.RegionInfo;
034import org.apache.hadoop.hbase.client.Table;
035import org.apache.hadoop.hbase.client.TableDescriptor;
036import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
037import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
038import org.apache.hadoop.hbase.regionserver.HRegion;
039import org.apache.hadoop.hbase.regionserver.HRegionServer;
040import org.apache.hadoop.hbase.regionserver.HStore;
041import org.apache.hadoop.hbase.regionserver.HStoreFile;
042import org.apache.hadoop.hbase.regionserver.Region;
043import org.apache.hadoop.hbase.regionserver.RegionServerServices;
044import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
045import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
046import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
047import org.apache.hadoop.hbase.security.User;
048import org.apache.hadoop.hbase.testclassification.LargeTests;
049import org.apache.hadoop.hbase.testclassification.MiscTests;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
052import org.apache.hadoop.hbase.wal.WAL;
053import org.junit.ClassRule;
054import org.junit.Test;
055import org.junit.experimental.categories.Category;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
060
061import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
063
064/**
065 * Test for the case where a regionserver going down has enough cycles to do damage to regions that
066 * have actually been assigned elsehwere.
067 * <p>
068 * If we happen to assign a region before it fully done with in its old location -- i.e. it is on
069 * two servers at the same time -- all can work fine until the case where the region on the dying
070 * server decides to compact or otherwise change the region file set. The region in its new location
071 * will then get a surprise when it tries to do something w/ a file removed by the region in its old
072 * location on dying server.
073 * <p>
074 * Making a test for this case is a little tough in that even if a file is deleted up on the
075 * namenode, if the file was opened before the delete, it will continue to let reads happen until
076 * something changes the state of cached blocks in the dfsclient that was already open (a block from
077 * the deleted file is cleaned from the datanode by NN).
078 * <p>
079 * What we will do below is do an explicit check for existence on the files listed in the region
080 * that has had some files removed because of a compaction. This sort of hurry's along and makes
081 * certain what is a chance occurance.
082 */
083@Category({MiscTests.class, LargeTests.class})
084public class TestIOFencing {
085
086  @ClassRule
087  public static final HBaseClassTestRule CLASS_RULE =
088      HBaseClassTestRule.forClass(TestIOFencing.class);
089
090  private static final Logger LOG = LoggerFactory.getLogger(TestIOFencing.class);
091  static {
092    // Uncomment the following lines if more verbosity is needed for
093    // debugging (see HBASE-12285 for details).
094    //((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
095    //((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
096    //((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
097    //((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.server.namenode.FSNamesystem"))
098    //    .getLogger().setLevel(Level.ALL);
099    //((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
100  }
101
102  public abstract static class CompactionBlockerRegion extends HRegion {
103    AtomicInteger compactCount = new AtomicInteger();
104    volatile CountDownLatch compactionsBlocked = new CountDownLatch(0);
105    volatile CountDownLatch compactionsWaiting = new CountDownLatch(0);
106
107    @SuppressWarnings("deprecation")
108    public CompactionBlockerRegion(Path tableDir, WAL log,
109        FileSystem fs, Configuration confParam, RegionInfo info,
110        TableDescriptor htd, RegionServerServices rsServices) {
111      super(tableDir, log, fs, confParam, info, htd, rsServices);
112    }
113
114    public void stopCompactions() {
115      compactionsBlocked = new CountDownLatch(1);
116      compactionsWaiting = new CountDownLatch(1);
117    }
118
119    public void allowCompactions() {
120      LOG.debug("allowing compactions");
121      compactionsBlocked.countDown();
122    }
123    public void waitForCompactionToBlock() throws IOException {
124      try {
125        LOG.debug("waiting for compaction to block");
126        compactionsWaiting.await();
127        LOG.debug("compaction block reached");
128      } catch (InterruptedException ex) {
129        throw new IOException(ex);
130      }
131    }
132
133    @Override
134    public boolean compact(CompactionContext compaction, HStore store,
135        ThroughputController throughputController) throws IOException {
136      try {
137        return super.compact(compaction, store, throughputController);
138      } finally {
139        compactCount.getAndIncrement();
140      }
141    }
142
143    @Override
144    public boolean compact(CompactionContext compaction, HStore store,
145        ThroughputController throughputController, User user) throws IOException {
146      try {
147        return super.compact(compaction, store, throughputController, user);
148      } finally {
149        compactCount.getAndIncrement();
150      }
151    }
152
153    public int countStoreFiles() {
154      int count = 0;
155      for (HStore store : stores.values()) {
156        count += store.getStorefilesCount();
157      }
158      return count;
159    }
160  }
161
162  /**
163   * An override of HRegion that allows us park compactions in a holding pattern and
164   * then when appropriate for the test, allow them proceed again.
165   */
166  public static class BlockCompactionsInPrepRegion extends CompactionBlockerRegion {
167
168    public BlockCompactionsInPrepRegion(Path tableDir, WAL log,
169        FileSystem fs, Configuration confParam, RegionInfo info,
170        TableDescriptor htd, RegionServerServices rsServices) {
171      super(tableDir, log, fs, confParam, info, htd, rsServices);
172    }
173    @Override
174    protected void doRegionCompactionPrep() throws IOException {
175      compactionsWaiting.countDown();
176      try {
177        compactionsBlocked.await();
178      } catch (InterruptedException ex) {
179        throw new IOException();
180      }
181      super.doRegionCompactionPrep();
182    }
183  }
184
185  /**
186   * An override of HRegion that allows us park compactions in a holding pattern and
187   * then when appropriate for the test, allow them proceed again. This allows the compaction
188   * entry to go the WAL before blocking, but blocks afterwards
189   */
190  public static class BlockCompactionsInCompletionRegion extends CompactionBlockerRegion {
191    public BlockCompactionsInCompletionRegion(Path tableDir, WAL log,
192        FileSystem fs, Configuration confParam, RegionInfo info,
193        TableDescriptor htd, RegionServerServices rsServices) {
194      super(tableDir, log, fs, confParam, info, htd, rsServices);
195    }
196    @Override
197    protected HStore instantiateHStore(final ColumnFamilyDescriptor family) throws IOException {
198      return new BlockCompactionsInCompletionHStore(this, family, this.conf);
199    }
200  }
201
202  public static class BlockCompactionsInCompletionHStore extends HStore {
203    CompactionBlockerRegion r;
204    protected BlockCompactionsInCompletionHStore(HRegion region, ColumnFamilyDescriptor family,
205        Configuration confParam) throws IOException {
206      super(region, family, confParam);
207      r = (CompactionBlockerRegion) region;
208    }
209
210    @Override
211    protected void completeCompaction(Collection<HStoreFile> compactedFiles) throws IOException {
212      try {
213        r.compactionsWaiting.countDown();
214        r.compactionsBlocked.await();
215      } catch (InterruptedException ex) {
216        throw new IOException(ex);
217      }
218      super.completeCompaction(compactedFiles);
219    }
220  }
221
222  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
223  private final static TableName TABLE_NAME =
224      TableName.valueOf("tabletest");
225  private final static byte[] FAMILY = Bytes.toBytes("family");
226  private static final int FIRST_BATCH_COUNT = 4000;
227  private static final int SECOND_BATCH_COUNT = FIRST_BATCH_COUNT;
228
229  /**
230   * Test that puts up a regionserver, starts a compaction on a loaded region but holds the
231   * compaction until after we have killed the server and the region has come up on
232   * a new regionserver altogether.  This fakes the double assignment case where region in one
233   * location changes the files out from underneath a region being served elsewhere.
234   */
235  @Test
236  public void testFencingAroundCompaction() throws Exception {
237    for(MemoryCompactionPolicy policy : MemoryCompactionPolicy.values()) {
238      doTest(BlockCompactionsInPrepRegion.class, policy);
239    }
240  }
241
242  /**
243   * Test that puts up a regionserver, starts a compaction on a loaded region but holds the
244   * compaction completion until after we have killed the server and the region has come up on
245   * a new regionserver altogether.  This fakes the double assignment case where region in one
246   * location changes the files out from underneath a region being served elsewhere.
247   */
248  @Test
249  public void testFencingAroundCompactionAfterWALSync() throws Exception {
250    for(MemoryCompactionPolicy policy : MemoryCompactionPolicy.values()) {
251      doTest(BlockCompactionsInCompletionRegion.class, policy);
252    }
253  }
254
255  public void doTest(Class<?> regionClass, MemoryCompactionPolicy policy) throws Exception {
256    Configuration c = TEST_UTIL.getConfiguration();
257    // Insert our custom region
258    c.setClass(HConstants.REGION_IMPL, regionClass, HRegion.class);
259    // Encourage plenty of flushes
260    c.setLong("hbase.hregion.memstore.flush.size", 25000);
261    c.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, ConstantSizeRegionSplitPolicy.class.getName());
262    // Only run compaction when we tell it to
263    c.setInt("hbase.hstore.compaction.min",1);
264    c.setInt("hbase.hstore.compactionThreshold", 1000);
265    c.setLong("hbase.hstore.blockingStoreFiles", 1000);
266    // Compact quickly after we tell it to!
267    c.setInt("hbase.regionserver.thread.splitcompactcheckfrequency", 1000);
268    c.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(policy));
269    LOG.info("Starting mini cluster");
270    TEST_UTIL.startMiniCluster(1);
271    CompactionBlockerRegion compactingRegion = null;
272    Admin admin = null;
273    try {
274      LOG.info("Creating admin");
275      admin = TEST_UTIL.getConnection().getAdmin();
276      LOG.info("Creating table");
277      TEST_UTIL.createTable(TABLE_NAME, FAMILY);
278      Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME);
279      LOG.info("Loading test table");
280      // Find the region
281      List<HRegion> testRegions = TEST_UTIL.getMiniHBaseCluster().findRegionsForTable(TABLE_NAME);
282      assertEquals(1, testRegions.size());
283      compactingRegion = (CompactionBlockerRegion)testRegions.get(0);
284      LOG.info("Blocking compactions");
285      compactingRegion.stopCompactions();
286      long lastFlushTime = compactingRegion.getEarliestFlushTimeForAllStores();
287      // Load some rows
288      TEST_UTIL.loadNumericRows(table, FAMILY, 0, FIRST_BATCH_COUNT);
289
290      // add a compaction from an older (non-existing) region to see whether we successfully skip
291      // those entries
292      HRegionInfo oldHri = new HRegionInfo(table.getName(),
293        HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
294      CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(oldHri,
295        FAMILY, Lists.newArrayList(new Path("/a")), Lists.newArrayList(new Path("/b")),
296        new Path("store_dir"));
297      WALUtil.writeCompactionMarker(compactingRegion.getWAL(),
298          ((HRegion)compactingRegion).getReplicationScope(),
299        oldHri, compactionDescriptor, compactingRegion.getMVCC());
300
301      // Wait till flush has happened, otherwise there won't be multiple store files
302      long startWaitTime = System.currentTimeMillis();
303      while (compactingRegion.getEarliestFlushTimeForAllStores() <= lastFlushTime ||
304          compactingRegion.countStoreFiles() <= 1) {
305        LOG.info("Waiting for the region to flush " +
306          compactingRegion.getRegionInfo().getRegionNameAsString());
307        Thread.sleep(1000);
308        admin.flush(table.getName());
309        assertTrue("Timed out waiting for the region to flush",
310          System.currentTimeMillis() - startWaitTime < 30000);
311      }
312      assertTrue(compactingRegion.countStoreFiles() > 1);
313      final byte REGION_NAME[] = compactingRegion.getRegionInfo().getRegionName();
314      LOG.info("Asking for compaction");
315      admin.majorCompact(TABLE_NAME);
316      LOG.info("Waiting for compaction to be about to start");
317      compactingRegion.waitForCompactionToBlock();
318      LOG.info("Starting a new server");
319      RegionServerThread newServerThread = TEST_UTIL.getMiniHBaseCluster().startRegionServer();
320      final HRegionServer newServer = newServerThread.getRegionServer();
321      LOG.info("Killing region server ZK lease");
322      TEST_UTIL.expireRegionServerSession(0);
323      CompactionBlockerRegion newRegion = null;
324      startWaitTime = System.currentTimeMillis();
325      LOG.info("Waiting for the new server to pick up the region " + Bytes.toString(REGION_NAME));
326
327      // wait for region to be assigned and to go out of log replay if applicable
328      Waiter.waitFor(c, 60000, new Waiter.Predicate<Exception>() {
329        @Override
330        public boolean evaluate() throws Exception {
331          Region newRegion = newServer.getOnlineRegion(REGION_NAME);
332          return newRegion != null;
333        }
334      });
335
336      newRegion = (CompactionBlockerRegion)newServer.getOnlineRegion(REGION_NAME);
337
338      // After compaction of old region finishes on the server that was going down, make sure that
339      // all the files we expect are still working when region is up in new location.
340      FileSystem fs = newRegion.getFilesystem();
341      for (String f: newRegion.getStoreFileList(new byte [][] {FAMILY})) {
342        assertTrue("After compaction, does not exist: " + f, fs.exists(new Path(f)));
343      }
344      LOG.info("Allowing compaction to proceed");
345      compactingRegion.allowCompactions();
346      while (compactingRegion.compactCount.get() == 0) {
347        Thread.sleep(1000);
348      }
349      // The server we killed stays up until the compaction that was started before it was killed
350      // completes. In logs you should see the old regionserver now going down.
351      LOG.info("Compaction finished");
352
353      // If we survive the split keep going...
354      // Now we make sure that the region isn't totally confused.  Load up more rows.
355      TEST_UTIL.loadNumericRows(table, FAMILY, FIRST_BATCH_COUNT,
356        FIRST_BATCH_COUNT + SECOND_BATCH_COUNT);
357      admin.majorCompact(TABLE_NAME);
358      startWaitTime = System.currentTimeMillis();
359      while (newRegion.compactCount.get() == 0) {
360        Thread.sleep(1000);
361        assertTrue("New region never compacted",
362          System.currentTimeMillis() - startWaitTime < 180000);
363      }
364      int count;
365      for (int i = 0;; i++) {
366        try {
367          count = TEST_UTIL.countRows(table);
368          break;
369        } catch (DoNotRetryIOException e) {
370          // wait up to 30s
371          if (i >= 30 || !e.getMessage().contains("File does not exist")) {
372            throw e;
373          }
374          Thread.sleep(1000);
375        }
376      }
377      if (policy == MemoryCompactionPolicy.EAGER || policy == MemoryCompactionPolicy.ADAPTIVE) {
378        assertTrue(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT >= count);
379      } else {
380        assertEquals(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT, count);
381      }
382    } finally {
383      if (compactingRegion != null) {
384        compactingRegion.allowCompactions();
385      }
386      admin.close();
387      TEST_UTIL.shutdownMiniCluster();
388    }
389  }
390}