001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.FileSystem;
026import org.apache.hadoop.hbase.HBaseTestingUtility;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.MiniHBaseCluster;
029import org.apache.hadoop.hbase.ServerName;
030import org.apache.hadoop.hbase.StartMiniClusterOption;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.Admin;
033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
034import org.apache.hadoop.hbase.client.Get;
035import org.apache.hadoop.hbase.client.Put;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.client.Result;
038import org.apache.hadoop.hbase.client.Table;
039import org.apache.hadoop.hbase.client.TableDescriptor;
040import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
041import org.apache.hadoop.hbase.regionserver.HRegion;
042import org.apache.hadoop.hbase.regionserver.HRegionServer;
043import org.apache.hadoop.hbase.regionserver.Store;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.apache.hadoop.hbase.util.Threads;
046import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
047import org.apache.hadoop.hbase.wal.WAL;
048import org.apache.hadoop.hbase.wal.WALFactory;
049import org.apache.hadoop.hdfs.MiniDFSCluster;
050import org.junit.After;
051import org.junit.Assert;
052import org.junit.Before;
053import org.junit.BeforeClass;
054import org.junit.Rule;
055import org.junit.Test;
056import org.junit.rules.TestName;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060/**
061 * Test log deletion as logs are rolled.
062 */
063public abstract class AbstractTestLogRolling {
064  private static final Logger LOG = LoggerFactory.getLogger(AbstractTestLogRolling.class);
065  protected HRegionServer server;
066  protected String tableName;
067  protected byte[] value;
068  protected FileSystem fs;
069  protected MiniDFSCluster dfsCluster;
070  protected Admin admin;
071  protected MiniHBaseCluster cluster;
072  protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
073  @Rule
074  public final TestName name = new TestName();
075
076  public AbstractTestLogRolling() {
077    this.server = null;
078    this.tableName = null;
079
080    String className = this.getClass().getName();
081    StringBuilder v = new StringBuilder(className);
082    while (v.length() < 1000) {
083      v.append(className);
084    }
085    this.value = Bytes.toBytes(v.toString());
086  }
087
088  // Need to override this setup so we can edit the config before it gets sent
089  // to the HDFS & HBase cluster startup.
090  @BeforeClass
091  public static void setUpBeforeClass() throws Exception {
092    /**** configuration for testLogRolling ****/
093    // Force a region split after every 768KB
094    Configuration conf = TEST_UTIL.getConfiguration();
095    conf.setLong(HConstants.HREGION_MAX_FILESIZE, 768L * 1024L);
096
097    // We roll the log after every 32 writes
098    conf.setInt("hbase.regionserver.maxlogentries", 32);
099
100    conf.setInt("hbase.regionserver.logroll.errors.tolerated", 2);
101    conf.setInt("hbase.rpc.timeout", 10 * 1000);
102
103    // For less frequently updated regions flush after every 2 flushes
104    conf.setInt("hbase.hregion.memstore.optionalflushcount", 2);
105
106    // We flush the cache after every 8192 bytes
107    conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 8192);
108
109    // Increase the amount of time between client retries
110    conf.setLong("hbase.client.pause", 10 * 1000);
111
112    // Reduce thread wake frequency so that other threads can get
113    // a chance to run.
114    conf.setInt(HConstants.THREAD_WAKE_FREQUENCY, 2 * 1000);
115
116    // disable low replication check for log roller to get a more stable result
117    // TestWALOpenAfterDNRollingStart will test this option.
118    conf.setLong("hbase.regionserver.hlog.check.lowreplication.interval", 24L * 60 * 60 * 1000);
119  }
120
121  @Before
122  public void setUp() throws Exception {
123    // Use 2 DataNodes and default values for other StartMiniCluster options.
124    TEST_UTIL.startMiniCluster(StartMiniClusterOption.builder().numDataNodes(2).build());
125
126    cluster = TEST_UTIL.getHBaseCluster();
127    dfsCluster = TEST_UTIL.getDFSCluster();
128    fs = TEST_UTIL.getTestFileSystem();
129    admin = TEST_UTIL.getAdmin();
130
131    // disable region rebalancing (interferes with log watching)
132    cluster.getMaster().balanceSwitch(false);
133  }
134
135  @After
136  public void tearDown() throws Exception {
137    TEST_UTIL.shutdownMiniCluster();
138  }
139
140  protected void startAndWriteData() throws IOException, InterruptedException {
141    // When the hbase:meta table can be opened, the region servers are running
142    TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME);
143    this.server = cluster.getRegionServerThreads().get(0).getRegionServer();
144
145    Table table = createTestTable(this.tableName);
146
147    server = TEST_UTIL.getRSForFirstRegionInTable(table.getName());
148    for (int i = 1; i <= 256; i++) { // 256 writes should cause 8 log rolls
149      doPut(table, i);
150      if (i % 32 == 0) {
151        // After every 32 writes sleep to let the log roller run
152        try {
153          Thread.sleep(2000);
154        } catch (InterruptedException e) {
155          // continue
156        }
157      }
158    }
159  }
160
161  /**
162   * Tests that log rolling doesn't hang when no data is written.
163   */
164  @Test
165  public void testLogRollOnNothingWritten() throws Exception {
166    final Configuration conf = TEST_UTIL.getConfiguration();
167    final WALFactory wals =
168      new WALFactory(conf, ServerName.valueOf("test.com", 8080, 1).toString());
169    final WAL newLog = wals.getWAL(null);
170    try {
171      // Now roll the log before we write anything.
172      newLog.rollWriter(true);
173    } finally {
174      wals.close();
175    }
176  }
177
178  private void assertLogFileSize(WAL log) throws InterruptedException {
179    if (AbstractFSWALProvider.getNumRolledLogFiles(log) > 0) {
180      assertTrue(AbstractFSWALProvider.getLogFileSize(log) > 0);
181    } else {
182      for (int i = 0; i < 10; i++) {
183        if (AbstractFSWALProvider.getLogFileSize(log) != 0) {
184          Thread.sleep(10);
185        }
186      }
187      assertEquals(0, AbstractFSWALProvider.getLogFileSize(log));
188    }
189  }
190
191  /**
192   * Tests that logs are deleted
193   */
194  @Test
195  public void testLogRolling() throws Exception {
196    this.tableName = getName();
197    // TODO: Why does this write data take for ever?
198    startAndWriteData();
199    RegionInfo region = server.getRegions(TableName.valueOf(tableName)).get(0).getRegionInfo();
200    final WAL log = server.getWAL(region);
201    LOG.info(
202      "after writing there are " + AbstractFSWALProvider.getNumRolledLogFiles(log) + " log files");
203    assertLogFileSize(log);
204
205    // flush all regions
206    for (HRegion r : server.getOnlineRegionsLocalContext()) {
207      r.flush(true);
208    }
209
210    // Now roll the log
211    log.rollWriter();
212
213    int count = AbstractFSWALProvider.getNumRolledLogFiles(log);
214    LOG.info("after flushing all regions and rolling logs there are " + count + " log files");
215    assertTrue(("actual count: " + count), count <= 2);
216    assertLogFileSize(log);
217  }
218
219  protected String getName() {
220    return "TestLogRolling-" + name.getMethodName();
221  }
222
223  void writeData(Table table, int rownum) throws IOException {
224    doPut(table, rownum);
225
226    // sleep to let the log roller run (if it needs to)
227    try {
228      Thread.sleep(2000);
229    } catch (InterruptedException e) {
230      // continue
231    }
232  }
233
234  void validateData(Table table, int rownum) throws IOException {
235    String row = "row" + String.format("%1$04d", rownum);
236    Get get = new Get(Bytes.toBytes(row));
237    get.addFamily(HConstants.CATALOG_FAMILY);
238    Result result = table.get(get);
239    assertTrue(result.size() == 1);
240    assertTrue(Bytes.equals(value, result.getValue(HConstants.CATALOG_FAMILY, null)));
241    LOG.info("Validated row " + row);
242  }
243
244  /**
245   * Tests that logs are deleted when some region has a compaction record in WAL and no other
246   * records. See HBASE-8597.
247   */
248  @Test
249  public void testCompactionRecordDoesntBlockRolling() throws Exception {
250    Table table = null;
251
252    // When the hbase:meta table can be opened, the region servers are running
253    Table t = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME);
254    try {
255      table = createTestTable(getName());
256
257      server = TEST_UTIL.getRSForFirstRegionInTable(table.getName());
258      HRegion region = server.getRegions(table.getName()).get(0);
259      final WAL log = server.getWAL(region.getRegionInfo());
260      Store s = region.getStore(HConstants.CATALOG_FAMILY);
261
262      // have to flush namespace to ensure it doesn't affect wall tests
263      admin.flush(TableName.NAMESPACE_TABLE_NAME);
264
265      // Put some stuff into table, to make sure we have some files to compact.
266      for (int i = 1; i <= 2; ++i) {
267        doPut(table, i);
268        admin.flush(table.getName());
269      }
270      doPut(table, 3); // don't flush yet, or compaction might trigger before we roll WAL
271      assertEquals("Should have no WAL after initial writes", 0,
272        AbstractFSWALProvider.getNumRolledLogFiles(log));
273      assertEquals(2, s.getStorefilesCount());
274
275      // Roll the log and compact table, to have compaction record in the 2nd WAL.
276      log.rollWriter();
277      assertEquals("Should have WAL; one table is not flushed", 1,
278        AbstractFSWALProvider.getNumRolledLogFiles(log));
279      admin.flush(table.getName());
280      region.compact(false);
281      // Wait for compaction in case if flush triggered it before us.
282      Assert.assertNotNull(s);
283      for (int waitTime = 3000; s.getStorefilesCount() > 1 && waitTime > 0; waitTime -= 200) {
284        Threads.sleepWithoutInterrupt(200);
285      }
286      assertEquals("Compaction didn't happen", 1, s.getStorefilesCount());
287
288      // Write some value to the table so the WAL cannot be deleted until table is flushed.
289      doPut(table, 0); // Now 2nd WAL will have both compaction and put record for table.
290      log.rollWriter(); // 1st WAL deleted, 2nd not deleted yet.
291      assertEquals("Should have WAL; one table is not flushed", 1,
292        AbstractFSWALProvider.getNumRolledLogFiles(log));
293
294      // Flush table to make latest WAL obsolete; write another record, and roll again.
295      admin.flush(table.getName());
296      doPut(table, 1);
297      log.rollWriter(); // Now 2nd WAL is deleted and 3rd is added.
298      assertEquals("Should have 1 WALs at the end", 1,
299        AbstractFSWALProvider.getNumRolledLogFiles(log));
300    } finally {
301      if (t != null) t.close();
302      if (table != null) table.close();
303    }
304  }
305
306  protected void doPut(Table table, int i) throws IOException {
307    Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i)));
308    put.addColumn(HConstants.CATALOG_FAMILY, null, value);
309    table.put(put);
310  }
311
312  protected Table createTestTable(String tableName) throws IOException {
313    // Create the test table and open it
314    TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName()))
315      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build();
316    admin.createTable(desc);
317    return TEST_UTIL.getConnection().getTable(desc.getTableName());
318  }
319}