001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import static org.junit.jupiter.api.Assertions.assertArrayEquals;
021import static org.junit.jupiter.api.Assertions.assertEquals;
022import static org.junit.jupiter.api.Assertions.assertThrows;
023
024import java.io.EOFException;
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.List;
028import org.apache.hadoop.fs.FSDataInputStream;
029import org.apache.hadoop.fs.FSDataOutputStream;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.Cell.Type;
034import org.apache.hadoop.hbase.CellBuilderType;
035import org.apache.hadoop.hbase.CellUtil;
036import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
037import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.RegionInfo;
041import org.apache.hadoop.hbase.client.RegionInfoBuilder;
042import org.apache.hadoop.hbase.regionserver.wal.WALHeaderEOFException;
043import org.apache.hadoop.hbase.testclassification.MediumTests;
044import org.apache.hadoop.hbase.testclassification.RegionServerTests;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.apache.hadoop.hbase.util.CommonFSUtils;
047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
048import org.junit.jupiter.api.AfterAll;
049import org.junit.jupiter.api.BeforeAll;
050import org.junit.jupiter.api.Tag;
051import org.junit.jupiter.api.Test;
052
053/**
054 * In this test, we write a small WAL file first, and then generate partial WAL file which length is
055 * in range [0, fileLength)(we test all the possible length in the range), to see if we can
056 * successfully get the completed entries, and also get an EOF at the end.
057 * <p/>
058 * It is very important to make sure 3 things:
059 * <ul>
060 * <li>We do not get incorrect entries. Otherwise there will be data corruption.</li>
061 * <li>We can get all the completed entries, i.e, we do not miss some data. Otherwise there will be
062 * data loss.</li>
063 * <li>We will get an EOF finally, instead of a general IOException. Otherwise the split or
064 * replication will be stuck.</li>
065 * </ul>
066 */
067@Tag(RegionServerTests.TAG)
068@Tag(MediumTests.TAG)
069public class TestParsePartialWALFile {
070
071  private static final HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil();
072
073  private static FileSystem FS;
074
075  private static TableName TN = TableName.valueOf("test");
076  private static RegionInfo RI = RegionInfoBuilder.newBuilder(TN).build();
077  private static byte[] ROW = Bytes.toBytes("row");
078  private static byte[] FAMILY = Bytes.toBytes("family");
079  private static byte[] QUAL = Bytes.toBytes("qualifier");
080  private static byte[] VALUE = Bytes.toBytes("value");
081
082  @BeforeAll
083  public static void setUp() throws IOException {
084    UTIL.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
085    FS = FileSystem.getLocal(UTIL.getConfiguration());
086    if (!FS.mkdirs(UTIL.getDataTestDir())) {
087      throw new IOException("can not create " + UTIL.getDataTestDir());
088    }
089  }
090
091  @AfterAll
092  public static void tearDown() {
093    UTIL.cleanupTestDir();
094  }
095
096  private Path generateBrokenWALFile(byte[] content, int length) throws IOException {
097    Path walFile = UTIL.getDataTestDir("wal-" + length);
098    try (FSDataOutputStream out = FS.create(walFile)) {
099      out.write(content, 0, length);
100    }
101    return walFile;
102  }
103
104  private void assertEntryEquals(WAL.Entry entry, int index) {
105    WALKeyImpl key = entry.getKey();
106    assertEquals(TN, key.getTableName());
107    assertArrayEquals(RI.getEncodedNameAsBytes(), key.getEncodedRegionName());
108    WALEdit edit = entry.getEdit();
109    assertEquals(1, edit.getCells().size());
110    Cell cell = edit.getCells().get(0);
111    assertArrayEquals(ROW, CellUtil.cloneRow(cell));
112    assertArrayEquals(FAMILY, CellUtil.cloneFamily(cell));
113    if (index % 2 == 0) {
114      assertEquals(Type.Put, cell.getType());
115      assertArrayEquals(QUAL, CellUtil.cloneQualifier(cell));
116      assertArrayEquals(VALUE, CellUtil.cloneValue(cell));
117    } else {
118      assertEquals(Type.DeleteFamily, cell.getType());
119    }
120  }
121
122  private void testReadEntry(Path file, int entryCount) throws IOException {
123    try (
124      WALStreamReader reader = WALFactory.createStreamReader(FS, file, UTIL.getConfiguration())) {
125      for (int i = 0; i < entryCount; i++) {
126        assertEntryEquals(reader.next(), i);
127      }
128      assertThrows(EOFException.class, () -> reader.next());
129    }
130    try (WALTailingReader reader =
131      WALFactory.createTailingReader(FS, file, UTIL.getConfiguration(), -1)) {
132      for (int i = 0; i < entryCount; i++) {
133        WALTailingReader.Result result = reader.next(-1);
134        assertEquals(WALTailingReader.State.NORMAL, result.getState());
135        assertEntryEquals(result.getEntry(), i);
136      }
137      WALTailingReader.Result result = reader.next(-1);
138      assertEquals(WALTailingReader.State.EOF_AND_RESET, result.getState());
139    }
140  }
141
142  @Test
143  public void testPartialParse() throws Exception {
144    Path walFile = UTIL.getDataTestDir("wal");
145    long headerLength;
146    List<Long> endOffsets = new ArrayList<>();
147    try (WALProvider.Writer writer =
148      WALFactory.createWALWriter(FS, walFile, UTIL.getConfiguration())) {
149      headerLength = writer.getLength();
150      for (int i = 0; i < 3; i++) {
151        WALKeyImpl key = new WALKeyImpl(RI.getEncodedNameAsBytes(), TN, i,
152          EnvironmentEdgeManager.currentTime(), HConstants.DEFAULT_CLUSTER_ID);
153        WALEdit edit = new WALEdit();
154        if (i % 2 == 0) {
155          edit.add(ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Type.Put)
156            .setRow(ROW).setFamily(FAMILY).setQualifier(QUAL).setValue(VALUE).build());
157        } else {
158          edit.add(ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
159            .setType(Type.DeleteFamily).setRow(ROW).setFamily(FAMILY).build());
160        }
161        writer.append(new WAL.Entry(key, edit));
162        writer.sync(true);
163        endOffsets.add(writer.getLength());
164      }
165    }
166    long fileLength = FS.getFileStatus(walFile).getLen();
167    byte[] content = new byte[(int) fileLength];
168    try (FSDataInputStream in = FS.open(walFile)) {
169      in.readFully(content);
170    }
171    // partial header, should throw WALHeaderEOFException
172    for (int i = 0; i < headerLength; i++) {
173      Path brokenFile = generateBrokenWALFile(content, i);
174      assertThrows(WALHeaderEOFException.class,
175        () -> WALFactory.createStreamReader(FS, brokenFile, UTIL.getConfiguration()));
176      assertThrows(WALHeaderEOFException.class,
177        () -> WALFactory.createTailingReader(FS, brokenFile, UTIL.getConfiguration(), -1));
178      FS.delete(brokenFile, false);
179    }
180    // partial WAL entries, should be able to read some entries and the last one we will get an EOF
181    for (int i = 0; i <= endOffsets.size(); i++) {
182      int startOffset;
183      int endOffset;
184      if (i == 0) {
185        startOffset = (int) headerLength;
186        endOffset = endOffsets.get(i).intValue();
187      } else if (i == endOffsets.size()) {
188        startOffset = endOffsets.get(i - 1).intValue();
189        endOffset = (int) fileLength;
190      } else {
191        startOffset = endOffsets.get(i - 1).intValue();
192        endOffset = endOffsets.get(i).intValue();
193      }
194      for (int j = startOffset; j < endOffset; j++) {
195        Path brokenFile = generateBrokenWALFile(content, j);
196        testReadEntry(brokenFile, i);
197        FS.delete(brokenFile, false);
198      }
199    }
200    // partial trailer, should be able to read all the entries but get an EOF when trying read
201    // again, as we do not know it is a trailer
202    for (int i = endOffsets.get(endOffsets.size() - 1).intValue(); i < fileLength; i++) {
203      Path brokenFile = generateBrokenWALFile(content, i);
204      testReadEntry(brokenFile, endOffsets.size());
205      FS.delete(brokenFile, false);
206    }
207  }
208}