001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.junit.jupiter.api.Assertions.assertArrayEquals; 021import static org.junit.jupiter.api.Assertions.assertEquals; 022import static org.junit.jupiter.api.Assertions.assertThrows; 023 024import java.io.EOFException; 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.List; 028import org.apache.hadoop.fs.FSDataInputStream; 029import org.apache.hadoop.fs.FSDataOutputStream; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.Cell.Type; 034import org.apache.hadoop.hbase.CellBuilderType; 035import org.apache.hadoop.hbase.CellUtil; 036import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; 037import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.RegionInfo; 041import org.apache.hadoop.hbase.client.RegionInfoBuilder; 042import org.apache.hadoop.hbase.regionserver.wal.WALHeaderEOFException; 043import org.apache.hadoop.hbase.testclassification.MediumTests; 044import org.apache.hadoop.hbase.testclassification.RegionServerTests; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.hbase.util.CommonFSUtils; 047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 048import org.junit.jupiter.api.AfterAll; 049import org.junit.jupiter.api.BeforeAll; 050import org.junit.jupiter.api.Tag; 051import org.junit.jupiter.api.Test; 052 053/** 054 * In this test, we write a small WAL file first, and then generate partial WAL file which length is 055 * in range [0, fileLength)(we test all the possible length in the range), to see if we can 056 * successfully get the completed entries, and also get an EOF at the end. 057 * <p/> 058 * It is very important to make sure 3 things: 059 * <ul> 060 * <li>We do not get incorrect entries. Otherwise there will be data corruption.</li> 061 * <li>We can get all the completed entries, i.e, we do not miss some data. Otherwise there will be 062 * data loss.</li> 063 * <li>We will get an EOF finally, instead of a general IOException. Otherwise the split or 064 * replication will be stuck.</li> 065 * </ul> 066 */ 067@Tag(RegionServerTests.TAG) 068@Tag(MediumTests.TAG) 069public class TestParsePartialWALFile { 070 071 private static final HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil(); 072 073 private static FileSystem FS; 074 075 private static TableName TN = TableName.valueOf("test"); 076 private static RegionInfo RI = RegionInfoBuilder.newBuilder(TN).build(); 077 private static byte[] ROW = Bytes.toBytes("row"); 078 private static byte[] FAMILY = Bytes.toBytes("family"); 079 private static byte[] QUAL = Bytes.toBytes("qualifier"); 080 private static byte[] VALUE = Bytes.toBytes("value"); 081 082 @BeforeAll 083 public static void setUp() throws IOException { 084 UTIL.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false); 085 FS = FileSystem.getLocal(UTIL.getConfiguration()); 086 if (!FS.mkdirs(UTIL.getDataTestDir())) { 087 throw new IOException("can not create " + UTIL.getDataTestDir()); 088 } 089 } 090 091 @AfterAll 092 public static void tearDown() { 093 UTIL.cleanupTestDir(); 094 } 095 096 private Path generateBrokenWALFile(byte[] content, int length) throws IOException { 097 Path walFile = UTIL.getDataTestDir("wal-" + length); 098 try (FSDataOutputStream out = FS.create(walFile)) { 099 out.write(content, 0, length); 100 } 101 return walFile; 102 } 103 104 private void assertEntryEquals(WAL.Entry entry, int index) { 105 WALKeyImpl key = entry.getKey(); 106 assertEquals(TN, key.getTableName()); 107 assertArrayEquals(RI.getEncodedNameAsBytes(), key.getEncodedRegionName()); 108 WALEdit edit = entry.getEdit(); 109 assertEquals(1, edit.getCells().size()); 110 Cell cell = edit.getCells().get(0); 111 assertArrayEquals(ROW, CellUtil.cloneRow(cell)); 112 assertArrayEquals(FAMILY, CellUtil.cloneFamily(cell)); 113 if (index % 2 == 0) { 114 assertEquals(Type.Put, cell.getType()); 115 assertArrayEquals(QUAL, CellUtil.cloneQualifier(cell)); 116 assertArrayEquals(VALUE, CellUtil.cloneValue(cell)); 117 } else { 118 assertEquals(Type.DeleteFamily, cell.getType()); 119 } 120 } 121 122 private void testReadEntry(Path file, int entryCount) throws IOException { 123 try ( 124 WALStreamReader reader = WALFactory.createStreamReader(FS, file, UTIL.getConfiguration())) { 125 for (int i = 0; i < entryCount; i++) { 126 assertEntryEquals(reader.next(), i); 127 } 128 assertThrows(EOFException.class, () -> reader.next()); 129 } 130 try (WALTailingReader reader = 131 WALFactory.createTailingReader(FS, file, UTIL.getConfiguration(), -1)) { 132 for (int i = 0; i < entryCount; i++) { 133 WALTailingReader.Result result = reader.next(-1); 134 assertEquals(WALTailingReader.State.NORMAL, result.getState()); 135 assertEntryEquals(result.getEntry(), i); 136 } 137 WALTailingReader.Result result = reader.next(-1); 138 assertEquals(WALTailingReader.State.EOF_AND_RESET, result.getState()); 139 } 140 } 141 142 @Test 143 public void testPartialParse() throws Exception { 144 Path walFile = UTIL.getDataTestDir("wal"); 145 long headerLength; 146 List<Long> endOffsets = new ArrayList<>(); 147 try (WALProvider.Writer writer = 148 WALFactory.createWALWriter(FS, walFile, UTIL.getConfiguration())) { 149 headerLength = writer.getLength(); 150 for (int i = 0; i < 3; i++) { 151 WALKeyImpl key = new WALKeyImpl(RI.getEncodedNameAsBytes(), TN, i, 152 EnvironmentEdgeManager.currentTime(), HConstants.DEFAULT_CLUSTER_ID); 153 WALEdit edit = new WALEdit(); 154 if (i % 2 == 0) { 155 edit.add(ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Type.Put) 156 .setRow(ROW).setFamily(FAMILY).setQualifier(QUAL).setValue(VALUE).build()); 157 } else { 158 edit.add(ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 159 .setType(Type.DeleteFamily).setRow(ROW).setFamily(FAMILY).build()); 160 } 161 writer.append(new WAL.Entry(key, edit)); 162 writer.sync(true); 163 endOffsets.add(writer.getLength()); 164 } 165 } 166 long fileLength = FS.getFileStatus(walFile).getLen(); 167 byte[] content = new byte[(int) fileLength]; 168 try (FSDataInputStream in = FS.open(walFile)) { 169 in.readFully(content); 170 } 171 // partial header, should throw WALHeaderEOFException 172 for (int i = 0; i < headerLength; i++) { 173 Path brokenFile = generateBrokenWALFile(content, i); 174 assertThrows(WALHeaderEOFException.class, 175 () -> WALFactory.createStreamReader(FS, brokenFile, UTIL.getConfiguration())); 176 assertThrows(WALHeaderEOFException.class, 177 () -> WALFactory.createTailingReader(FS, brokenFile, UTIL.getConfiguration(), -1)); 178 FS.delete(brokenFile, false); 179 } 180 // partial WAL entries, should be able to read some entries and the last one we will get an EOF 181 for (int i = 0; i <= endOffsets.size(); i++) { 182 int startOffset; 183 int endOffset; 184 if (i == 0) { 185 startOffset = (int) headerLength; 186 endOffset = endOffsets.get(i).intValue(); 187 } else if (i == endOffsets.size()) { 188 startOffset = endOffsets.get(i - 1).intValue(); 189 endOffset = (int) fileLength; 190 } else { 191 startOffset = endOffsets.get(i - 1).intValue(); 192 endOffset = endOffsets.get(i).intValue(); 193 } 194 for (int j = startOffset; j < endOffset; j++) { 195 Path brokenFile = generateBrokenWALFile(content, j); 196 testReadEntry(brokenFile, i); 197 FS.delete(brokenFile, false); 198 } 199 } 200 // partial trailer, should be able to read all the entries but get an EOF when trying read 201 // again, as we do not know it is a trailer 202 for (int i = endOffsets.get(endOffsets.size() - 1).intValue(); i < fileLength; i++) { 203 Path brokenFile = generateBrokenWALFile(content, i); 204 testReadEntry(brokenFile, endOffsets.size()); 205 FS.delete(brokenFile, false); 206 } 207 } 208}