001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.hamcrest.Matchers.greaterThan;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertTrue;
025
026import java.io.IOException;
027import java.util.concurrent.Executors;
028import java.util.concurrent.ScheduledExecutorService;
029import java.util.concurrent.atomic.AtomicBoolean;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
036import org.apache.hadoop.hbase.StartTestingClusterOption;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.Waiter;
039import org.apache.hadoop.hbase.client.Admin;
040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
041import org.apache.hadoop.hbase.client.Get;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.RegionInfo;
044import org.apache.hadoop.hbase.client.Result;
045import org.apache.hadoop.hbase.client.Table;
046import org.apache.hadoop.hbase.client.TableDescriptor;
047import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
048import org.apache.hadoop.hbase.regionserver.HRegion;
049import org.apache.hadoop.hbase.regionserver.HRegionServer;
050import org.apache.hadoop.hbase.regionserver.Store;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.hadoop.hbase.util.Threads;
053import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
054import org.apache.hadoop.hbase.wal.WAL;
055import org.apache.hadoop.hbase.wal.WALFactory;
056import org.apache.hadoop.hbase.wal.WALProvider;
057import org.apache.hadoop.hdfs.MiniDFSCluster;
058import org.junit.After;
059import org.junit.AfterClass;
060import org.junit.Assert;
061import org.junit.Before;
062import org.junit.BeforeClass;
063import org.junit.Rule;
064import org.junit.Test;
065import org.junit.rules.TestName;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
070
071/**
072 * Test log deletion as logs are rolled.
073 */
074public abstract class AbstractTestLogRolling {
075  private static final Logger LOG = LoggerFactory.getLogger(AbstractTestLogRolling.class);
076  protected HRegionServer server;
077  protected String tableName;
078  protected byte[] value;
079  protected FileSystem fs;
080  protected MiniDFSCluster dfsCluster;
081  protected Admin admin;
082  protected SingleProcessHBaseCluster cluster;
083  protected static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
084  @Rule
085  public final TestName name = new TestName();
086  protected static int syncLatencyMillis;
087  private static int rowNum = 1;
088  private static final AtomicBoolean slowSyncHookCalled = new AtomicBoolean();
089  protected static ScheduledExecutorService EXECUTOR;
090
091  public AbstractTestLogRolling() {
092    this.server = null;
093    this.tableName = null;
094
095    String className = this.getClass().getName();
096    StringBuilder v = new StringBuilder(className);
097    while (v.length() < 1000) {
098      v.append(className);
099    }
100    this.value = Bytes.toBytes(v.toString());
101  }
102
103  // Need to override this setup so we can edit the config before it gets sent
104  // to the HDFS & HBase cluster startup.
105  @BeforeClass
106  public static void setUpBeforeClass() throws Exception {
107    /**** configuration for testLogRolling ****/
108    // Force a region split after every 768KB
109    Configuration conf = TEST_UTIL.getConfiguration();
110    conf.setLong(HConstants.HREGION_MAX_FILESIZE, 768L * 1024L);
111
112    // We roll the log after every 32 writes
113    conf.setInt("hbase.regionserver.maxlogentries", 32);
114
115    conf.setInt("hbase.regionserver.logroll.errors.tolerated", 2);
116    conf.setInt("hbase.rpc.timeout", 10 * 1000);
117
118    // For less frequently updated regions flush after every 2 flushes
119    conf.setInt("hbase.hregion.memstore.optionalflushcount", 2);
120
121    // We flush the cache after every 8192 bytes
122    conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 8192);
123
124    // Increase the amount of time between client retries
125    conf.setLong("hbase.client.pause", 10 * 1000);
126
127    // Reduce thread wake frequency so that other threads can get
128    // a chance to run.
129    conf.setInt(HConstants.THREAD_WAKE_FREQUENCY, 2 * 1000);
130
131    // disable low replication check for log roller to get a more stable result
132    // TestWALOpenAfterDNRollingStart will test this option.
133    conf.setLong("hbase.regionserver.hlog.check.lowreplication.interval", 24L * 60 * 60 * 1000);
134
135    // For slow sync threshold test: roll after 5 slow syncs in 10 seconds
136    conf.setInt(FSHLog.SLOW_SYNC_ROLL_THRESHOLD, 5);
137    conf.setInt(FSHLog.SLOW_SYNC_ROLL_INTERVAL_MS, 10 * 1000);
138    // For slow sync threshold test: roll once after a sync above this threshold
139    conf.setInt(FSHLog.ROLL_ON_SYNC_TIME_MS, 5000);
140
141    // Slow sync executor.
142    EXECUTOR = Executors
143      .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("Slow-sync-%d")
144        .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
145  }
146
147  @Before
148  public void setUp() throws Exception {
149    // Use 2 DataNodes and default values for other StartMiniCluster options.
150    TEST_UTIL.startMiniCluster(StartTestingClusterOption.builder().numDataNodes(2).build());
151
152    cluster = TEST_UTIL.getHBaseCluster();
153    dfsCluster = TEST_UTIL.getDFSCluster();
154    fs = TEST_UTIL.getTestFileSystem();
155    admin = TEST_UTIL.getAdmin();
156
157    // disable region rebalancing (interferes with log watching)
158    cluster.getMaster().balanceSwitch(false);
159  }
160
161  @After
162  public void tearDown() throws Exception {
163    TEST_UTIL.shutdownMiniCluster();
164  }
165
166  @AfterClass
167  public static void tearDownAfterClass() {
168    EXECUTOR.shutdownNow();
169  }
170
171  private void startAndWriteData() throws IOException, InterruptedException {
172    this.server = cluster.getRegionServerThreads().get(0).getRegionServer();
173
174    Table table = createTestTable(this.tableName);
175
176    server = TEST_UTIL.getRSForFirstRegionInTable(table.getName());
177    for (int i = 1; i <= 256; i++) { // 256 writes should cause 8 log rolls
178      doPut(table, i);
179      if (i % 32 == 0) {
180        // After every 32 writes sleep to let the log roller run
181        try {
182          Thread.sleep(2000);
183        } catch (InterruptedException e) {
184          // continue
185        }
186      }
187    }
188  }
189
190  private static void setSyncLatencyMillis(int latency) {
191    syncLatencyMillis = latency;
192  }
193
194  protected final AbstractFSWAL<?> getWALAndRegisterSlowSyncHook(RegionInfo region)
195    throws IOException {
196    // Get a reference to the wal.
197    final AbstractFSWAL<?> log = (AbstractFSWAL<?>) server.getWAL(region);
198
199    // Register a WALActionsListener to observe if a SLOW_SYNC roll is requested
200    log.registerWALActionsListener(new WALActionsListener() {
201      @Override
202      public void logRollRequested(RollRequestReason reason) {
203        switch (reason) {
204          case SLOW_SYNC:
205            slowSyncHookCalled.lazySet(true);
206            break;
207          default:
208            break;
209        }
210      }
211    });
212    return log;
213  }
214
215  protected final void checkSlowSync(AbstractFSWAL<?> log, Table table, int slowSyncLatency,
216    int writeCount, boolean slowSync) throws Exception {
217    if (slowSyncLatency > 0) {
218      setSyncLatencyMillis(slowSyncLatency);
219      setSlowLogWriter(log.conf);
220    } else {
221      setDefaultLogWriter(log.conf);
222    }
223
224    // Set up for test
225    log.rollWriter(true);
226    slowSyncHookCalled.set(false);
227
228    final WALProvider.WriterBase oldWriter = log.getWriter();
229
230    // Write some data
231    for (int i = 0; i < writeCount; i++) {
232      writeData(table, rowNum++);
233    }
234
235    if (slowSync) {
236      TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() {
237        @Override
238        public boolean evaluate() throws Exception {
239          return log.getWriter() != oldWriter;
240        }
241
242        @Override
243        public String explainFailure() throws Exception {
244          return "Waited too long for our test writer to get rolled out";
245        }
246      });
247
248      assertTrue("Should have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get());
249    } else {
250      assertFalse("Should not have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get());
251    }
252  }
253
254  protected abstract void setSlowLogWriter(Configuration conf);
255
256  protected abstract void setDefaultLogWriter(Configuration conf);
257
258  /**
259   * Tests that log rolling doesn't hang when no data is written.
260   */
261  @Test
262  public void testLogRollOnNothingWritten() throws Exception {
263    final Configuration conf = TEST_UTIL.getConfiguration();
264    final WALFactory wals =
265      new WALFactory(conf, ServerName.valueOf("test.com", 8080, 1).toString());
266    final WAL newLog = wals.getWAL(null);
267    try {
268      // Now roll the log before we write anything.
269      newLog.rollWriter(true);
270    } finally {
271      wals.close();
272    }
273  }
274
275  /**
276   * Tests that logs are deleted
277   */
278  @Test
279  public void testLogRolling() throws Exception {
280    this.tableName = getName();
281    // TODO: Why does this write data take for ever?
282    startAndWriteData();
283    RegionInfo region = server.getRegions(TableName.valueOf(tableName)).get(0).getRegionInfo();
284    final WAL log = server.getWAL(region);
285    LOG.info(
286      "after writing there are " + AbstractFSWALProvider.getNumRolledLogFiles(log) + " log files");
287
288    // roll the log, so we should have at least one rolled file and the log file size should be
289    // greater than 0, in case in the above method we rolled in the last round and then flushed so
290    // all the old wal files are deleted and cause the below assertion to fail
291    log.rollWriter();
292
293    assertThat(AbstractFSWALProvider.getLogFileSize(log), greaterThan(0L));
294
295    // flush all regions
296    for (HRegion r : server.getOnlineRegionsLocalContext()) {
297      r.flush(true);
298    }
299
300    // Now roll the log the again
301    log.rollWriter();
302
303    // should have deleted all the rolled wal files
304    TEST_UTIL.waitFor(5000, () -> AbstractFSWALProvider.getNumRolledLogFiles(log) == 0);
305    assertEquals(0, AbstractFSWALProvider.getLogFileSize(log));
306  }
307
308  protected String getName() {
309    return "TestLogRolling-" + name.getMethodName();
310  }
311
312  void writeData(Table table, int rownum) throws IOException {
313    doPut(table, rownum);
314
315    // sleep to let the log roller run (if it needs to)
316    try {
317      Thread.sleep(2000);
318    } catch (InterruptedException e) {
319      // continue
320    }
321  }
322
323  void validateData(Table table, int rownum) throws IOException {
324    String row = "row" + String.format("%1$04d", rownum);
325    Get get = new Get(Bytes.toBytes(row));
326    get.addFamily(HConstants.CATALOG_FAMILY);
327    Result result = table.get(get);
328    assertTrue(result.size() == 1);
329    assertTrue(Bytes.equals(value, result.getValue(HConstants.CATALOG_FAMILY, null)));
330    LOG.info("Validated row " + row);
331  }
332
333  /**
334   * Tests that logs are deleted when some region has a compaction record in WAL and no other
335   * records. See HBASE-8597.
336   */
337  @Test
338  public void testCompactionRecordDoesntBlockRolling() throws Exception {
339
340    // When the hbase:meta table can be opened, the region servers are running
341    try (Table t = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME);
342      Table table = createTestTable(getName())) {
343
344      server = TEST_UTIL.getRSForFirstRegionInTable(table.getName());
345      HRegion region = server.getRegions(table.getName()).get(0);
346      final WAL log = server.getWAL(region.getRegionInfo());
347      Store s = region.getStore(HConstants.CATALOG_FAMILY);
348
349      // Put some stuff into table, to make sure we have some files to compact.
350      for (int i = 1; i <= 2; ++i) {
351        doPut(table, i);
352        admin.flush(table.getName());
353      }
354      doPut(table, 3); // don't flush yet, or compaction might trigger before we roll WAL
355      assertEquals("Should have no WAL after initial writes", 0,
356        AbstractFSWALProvider.getNumRolledLogFiles(log));
357      assertEquals(2, s.getStorefilesCount());
358
359      // Roll the log and compact table, to have compaction record in the 2nd WAL.
360      log.rollWriter();
361      assertEquals("Should have WAL; one table is not flushed", 1,
362        AbstractFSWALProvider.getNumRolledLogFiles(log));
363      admin.flush(table.getName());
364      region.compact(false);
365      // Wait for compaction in case if flush triggered it before us.
366      Assert.assertNotNull(s);
367      for (int waitTime = 3000; s.getStorefilesCount() > 1 && waitTime > 0; waitTime -= 200) {
368        Threads.sleepWithoutInterrupt(200);
369      }
370      assertEquals("Compaction didn't happen", 1, s.getStorefilesCount());
371
372      // Write some value to the table so the WAL cannot be deleted until table is flushed.
373      doPut(table, 0); // Now 2nd WAL will have both compaction and put record for table.
374      log.rollWriter(); // 1st WAL deleted, 2nd not deleted yet.
375      assertEquals("Should have WAL; one table is not flushed", 1,
376        AbstractFSWALProvider.getNumRolledLogFiles(log));
377
378      // Flush table to make latest WAL obsolete; write another record, and roll again.
379      admin.flush(table.getName());
380      doPut(table, 1);
381      log.rollWriter(); // Now 2nd WAL is deleted and 3rd is added.
382      assertEquals("Should have 1 WALs at the end", 1,
383        AbstractFSWALProvider.getNumRolledLogFiles(log));
384    }
385  }
386
387  protected void doPut(Table table, int i) throws IOException {
388    Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i)));
389    put.addColumn(HConstants.CATALOG_FAMILY, null, value);
390    table.put(put);
391  }
392
393  protected Table createTestTable(String tableName) throws IOException {
394    // Create the test table and open it
395    TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName()))
396      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build();
397    admin.createTable(desc);
398    return TEST_UTIL.getConnection().getTable(desc.getTableName());
399  }
400}