001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.hamcrest.Matchers.greaterThan;
022import static org.junit.jupiter.api.Assertions.assertEquals;
023import static org.junit.jupiter.api.Assertions.assertFalse;
024import static org.junit.jupiter.api.Assertions.assertNotNull;
025import static org.junit.jupiter.api.Assertions.assertTrue;
026
027import java.io.IOException;
028import java.util.concurrent.Executors;
029import java.util.concurrent.ScheduledExecutorService;
030import java.util.concurrent.atomic.AtomicBoolean;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.hbase.HBaseTestingUtil;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
037import org.apache.hadoop.hbase.StartTestingClusterOption;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.Waiter;
040import org.apache.hadoop.hbase.client.Admin;
041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
042import org.apache.hadoop.hbase.client.Get;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.RegionInfo;
045import org.apache.hadoop.hbase.client.Result;
046import org.apache.hadoop.hbase.client.Table;
047import org.apache.hadoop.hbase.client.TableDescriptor;
048import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
049import org.apache.hadoop.hbase.regionserver.HRegion;
050import org.apache.hadoop.hbase.regionserver.HRegionServer;
051import org.apache.hadoop.hbase.regionserver.Store;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.util.Threads;
054import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
055import org.apache.hadoop.hbase.wal.WAL;
056import org.apache.hadoop.hbase.wal.WALFactory;
057import org.apache.hadoop.hbase.wal.WALProvider;
058import org.apache.hadoop.hdfs.MiniDFSCluster;
059import org.junit.jupiter.api.AfterAll;
060import org.junit.jupiter.api.AfterEach;
061import org.junit.jupiter.api.BeforeAll;
062import org.junit.jupiter.api.BeforeEach;
063import org.junit.jupiter.api.Test;
064import org.junit.jupiter.api.TestInfo;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
069
070/**
071 * Test log deletion as logs are rolled.
072 */
073public abstract class AbstractTestLogRolling {
074  private static final Logger LOG = LoggerFactory.getLogger(AbstractTestLogRolling.class);
075  protected HRegionServer server;
076  protected String tableName;
077  protected byte[] value;
078  protected FileSystem fs;
079  protected MiniDFSCluster dfsCluster;
080  protected Admin admin;
081  protected SingleProcessHBaseCluster cluster;
082  protected static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
083  private String name;
084  protected static int syncLatencyMillis;
085  private static int rowNum = 1;
086  private static final AtomicBoolean slowSyncHookCalled = new AtomicBoolean();
087  protected static ScheduledExecutorService EXECUTOR;
088
089  public AbstractTestLogRolling() {
090    this.server = null;
091    this.tableName = null;
092
093    String className = this.getClass().getName();
094    StringBuilder v = new StringBuilder(className);
095    while (v.length() < 1000) {
096      v.append(className);
097    }
098    this.value = Bytes.toBytes(v.toString());
099  }
100
101  // Need to override this setup so we can edit the config before it gets sent
102  // to the HDFS & HBase cluster startup.
103  @BeforeAll
104  public static void setUpBeforeClass() throws Exception {
105    /**** configuration for testLogRolling ****/
106    // Force a region split after every 768KB
107    Configuration conf = TEST_UTIL.getConfiguration();
108    conf.setLong(HConstants.HREGION_MAX_FILESIZE, 768L * 1024L);
109
110    // We roll the log after every 32 writes
111    conf.setInt("hbase.regionserver.maxlogentries", 32);
112
113    conf.setInt("hbase.regionserver.logroll.errors.tolerated", 2);
114    conf.setInt("hbase.rpc.timeout", 10 * 1000);
115
116    // For less frequently updated regions flush after every 2 flushes
117    conf.setInt("hbase.hregion.memstore.optionalflushcount", 2);
118
119    // We flush the cache after every 8192 bytes
120    conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 8192);
121
122    // Increase the amount of time between client retries
123    conf.setLong("hbase.client.pause", 10 * 1000);
124
125    // Reduce thread wake frequency so that other threads can get
126    // a chance to run.
127    conf.setInt(HConstants.THREAD_WAKE_FREQUENCY, 2 * 1000);
128
129    // disable low replication check for log roller to get a more stable result
130    // TestWALOpenAfterDNRollingStart will test this option.
131    conf.setLong("hbase.regionserver.hlog.check.lowreplication.interval", 24L * 60 * 60 * 1000);
132
133    // For slow sync threshold test: roll after 5 slow syncs in 10 seconds
134    conf.setInt(FSHLog.SLOW_SYNC_ROLL_THRESHOLD, 5);
135    conf.setInt(FSHLog.SLOW_SYNC_ROLL_INTERVAL_MS, 10 * 1000);
136    // For slow sync threshold test: roll once after a sync above this threshold
137    conf.setInt(FSHLog.ROLL_ON_SYNC_TIME_MS, 5000);
138
139    // Slow sync executor.
140    EXECUTOR = Executors
141      .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("Slow-sync-%d")
142        .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
143  }
144
145  @BeforeEach
146  public void initTestName(TestInfo testInfo) {
147    name = testInfo.getTestMethod().get().getName();
148  }
149
150  @BeforeEach
151  public void setUp() throws Exception {
152    // Use 2 DataNodes and default values for other StartMiniCluster options.
153    TEST_UTIL.startMiniCluster(StartTestingClusterOption.builder().numDataNodes(2).build());
154
155    cluster = TEST_UTIL.getHBaseCluster();
156    dfsCluster = TEST_UTIL.getDFSCluster();
157    fs = TEST_UTIL.getTestFileSystem();
158    admin = TEST_UTIL.getAdmin();
159
160    // disable region rebalancing (interferes with log watching)
161    cluster.getMaster().balanceSwitch(false);
162  }
163
164  @AfterEach
165  public void tearDown() throws Exception {
166    TEST_UTIL.shutdownMiniCluster();
167  }
168
169  @AfterAll
170  public static void tearDownAfterClass() {
171    EXECUTOR.shutdownNow();
172  }
173
174  private void startAndWriteData() throws IOException, InterruptedException {
175    this.server = cluster.getRegionServerThreads().get(0).getRegionServer();
176
177    Table table = createTestTable(this.tableName);
178
179    server = TEST_UTIL.getRSForFirstRegionInTable(table.getName());
180    for (int i = 1; i <= 256; i++) { // 256 writes should cause 8 log rolls
181      doPut(table, i);
182      if (i % 32 == 0) {
183        // After every 32 writes sleep to let the log roller run
184        try {
185          Thread.sleep(2000);
186        } catch (InterruptedException e) {
187          // continue
188        }
189      }
190    }
191  }
192
193  private static void setSyncLatencyMillis(int latency) {
194    syncLatencyMillis = latency;
195  }
196
197  protected final AbstractFSWAL<?> getWALAndRegisterSlowSyncHook(RegionInfo region)
198    throws IOException {
199    // Get a reference to the wal.
200    final AbstractFSWAL<?> log = (AbstractFSWAL<?>) server.getWAL(region);
201
202    // Register a WALActionsListener to observe if a SLOW_SYNC roll is requested
203    log.registerWALActionsListener(new WALActionsListener() {
204      @Override
205      public void logRollRequested(RollRequestReason reason) {
206        switch (reason) {
207          case SLOW_SYNC:
208            slowSyncHookCalled.lazySet(true);
209            break;
210          default:
211            break;
212        }
213      }
214    });
215    return log;
216  }
217
218  protected final void checkSlowSync(AbstractFSWAL<?> log, Table table, int slowSyncLatency,
219    int writeCount, boolean slowSync) throws Exception {
220    if (slowSyncLatency > 0) {
221      setSyncLatencyMillis(slowSyncLatency);
222      setSlowLogWriter(log.conf);
223    } else {
224      setDefaultLogWriter(log.conf);
225    }
226
227    // Set up for test
228    log.rollWriter(true);
229    slowSyncHookCalled.set(false);
230
231    final WALProvider.WriterBase oldWriter = log.getWriter();
232
233    // Write some data
234    for (int i = 0; i < writeCount; i++) {
235      writeData(table, rowNum++);
236    }
237
238    if (slowSync) {
239      TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() {
240        @Override
241        public boolean evaluate() throws Exception {
242          return log.getWriter() != oldWriter;
243        }
244
245        @Override
246        public String explainFailure() throws Exception {
247          return "Waited too long for our test writer to get rolled out";
248        }
249      });
250
251      assertTrue(slowSyncHookCalled.get(), "Should have triggered log roll due to SLOW_SYNC");
252    } else {
253      assertFalse(slowSyncHookCalled.get(), "Should not have triggered log roll due to SLOW_SYNC");
254    }
255  }
256
257  protected abstract void setSlowLogWriter(Configuration conf);
258
259  protected abstract void setDefaultLogWriter(Configuration conf);
260
261  /**
262   * Tests that log rolling doesn't hang when no data is written.
263   */
264  @Test
265  public void testLogRollOnNothingWritten() throws Exception {
266    final Configuration conf = TEST_UTIL.getConfiguration();
267    final WALFactory wals =
268      new WALFactory(conf, ServerName.valueOf("test.com", 8080, 1).toString());
269    final WAL newLog = wals.getWAL(null);
270    try {
271      // Now roll the log before we write anything.
272      newLog.rollWriter(true);
273    } finally {
274      wals.close();
275    }
276  }
277
278  /**
279   * Tests that logs are deleted
280   */
281  @Test
282  public void testLogRolling() throws Exception {
283    this.tableName = getName();
284    // TODO: Why does this write data take for ever?
285    startAndWriteData();
286    RegionInfo region = server.getRegions(TableName.valueOf(tableName)).get(0).getRegionInfo();
287    final WAL log = server.getWAL(region);
288    LOG.info(
289      "after writing there are " + AbstractFSWALProvider.getNumRolledLogFiles(log) + " log files");
290
291    // roll the log, so we should have at least one rolled file and the log file size should be
292    // greater than 0, in case in the above method we rolled in the last round and then flushed so
293    // all the old wal files are deleted and cause the below assertion to fail
294    log.rollWriter();
295
296    assertThat(AbstractFSWALProvider.getLogFileSize(log), greaterThan(0L));
297
298    // flush all regions
299    for (HRegion r : server.getOnlineRegionsLocalContext()) {
300      r.flush(true);
301    }
302
303    // Now roll the log the again
304    log.rollWriter();
305
306    // should have deleted all the rolled wal files
307    TEST_UTIL.waitFor(5000, () -> AbstractFSWALProvider.getNumRolledLogFiles(log) == 0);
308    assertEquals(0, AbstractFSWALProvider.getLogFileSize(log));
309  }
310
311  protected String getName() {
312    return "TestLogRolling-" + name;
313  }
314
315  void writeData(Table table, int rownum) throws IOException {
316    doPut(table, rownum);
317
318    // sleep to let the log roller run (if it needs to)
319    try {
320      Thread.sleep(2000);
321    } catch (InterruptedException e) {
322      // continue
323    }
324  }
325
326  void validateData(Table table, int rownum) throws IOException {
327    String row = "row" + String.format("%1$04d", rownum);
328    Get get = new Get(Bytes.toBytes(row));
329    get.addFamily(HConstants.CATALOG_FAMILY);
330    Result result = table.get(get);
331    assertTrue(result.size() == 1);
332    assertTrue(Bytes.equals(value, result.getValue(HConstants.CATALOG_FAMILY, null)));
333    LOG.info("Validated row " + row);
334  }
335
336  /**
337   * Tests that logs are deleted when some region has a compaction record in WAL and no other
338   * records. See HBASE-8597.
339   */
340  @Test
341  public void testCompactionRecordDoesntBlockRolling() throws Exception {
342
343    // When the hbase:meta table can be opened, the region servers are running
344    try (Table t = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME);
345      Table table = createTestTable(getName())) {
346
347      server = TEST_UTIL.getRSForFirstRegionInTable(table.getName());
348      HRegion region = server.getRegions(table.getName()).get(0);
349      final WAL log = server.getWAL(region.getRegionInfo());
350      Store s = region.getStore(HConstants.CATALOG_FAMILY);
351
352      // Put some stuff into table, to make sure we have some files to compact.
353      for (int i = 1; i <= 2; ++i) {
354        doPut(table, i);
355        admin.flush(table.getName());
356      }
357      doPut(table, 3); // don't flush yet, or compaction might trigger before we roll WAL
358      assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(log),
359        "Should have no WAL after initial writes");
360      assertEquals(2, s.getStorefilesCount());
361
362      // Roll the log and compact table, to have compaction record in the 2nd WAL.
363      log.rollWriter();
364      assertEquals(1, AbstractFSWALProvider.getNumRolledLogFiles(log),
365        "Should have WAL; one table is not flushed");
366      admin.flush(table.getName());
367      region.compact(false);
368      // Wait for compaction in case if flush triggered it before us.
369      assertNotNull(s);
370      for (int waitTime = 3000; s.getStorefilesCount() > 1 && waitTime > 0; waitTime -= 200) {
371        Threads.sleepWithoutInterrupt(200);
372      }
373      assertEquals(1, s.getStorefilesCount(), "Compaction didn't happen");
374
375      // Write some value to the table so the WAL cannot be deleted until table is flushed.
376      doPut(table, 0); // Now 2nd WAL will have both compaction and put record for table.
377      log.rollWriter(); // 1st WAL deleted, 2nd not deleted yet.
378      assertEquals(1, AbstractFSWALProvider.getNumRolledLogFiles(log),
379        "Should have WAL; one table is not flushed");
380
381      // Flush table to make latest WAL obsolete; write another record, and roll again.
382      admin.flush(table.getName());
383      doPut(table, 1);
384      log.rollWriter(); // Now 2nd WAL is deleted and 3rd is added.
385      assertEquals(1, AbstractFSWALProvider.getNumRolledLogFiles(log),
386        "Should have 1 WALs at the end");
387    }
388  }
389
390  protected void doPut(Table table, int i) throws IOException {
391    Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i)));
392    put.addColumn(HConstants.CATALOG_FAMILY, null, value);
393    table.put(put);
394  }
395
396  protected Table createTestTable(String tableName) throws IOException {
397    // Create the test table and open it
398    TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName()))
399      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build();
400    admin.createTable(desc);
401    return TEST_UTIL.getConnection().getTable(desc.getTableName());
402  }
403}