001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.storefiletracker;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNull;
022import static org.junit.Assert.assertThrows;
023import static org.junit.Assert.assertTrue;
024import static org.mockito.Mockito.mock;
025import static org.mockito.Mockito.when;
026
027import java.io.IOException;
028import org.apache.hadoop.fs.FSDataInputStream;
029import org.apache.hadoop.fs.FSDataOutputStream;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
035import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
036import org.apache.hadoop.hbase.regionserver.StoreContext;
037import org.apache.hadoop.hbase.testclassification.RegionServerTests;
038import org.apache.hadoop.hbase.testclassification.SmallTests;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.junit.AfterClass;
042import org.junit.Before;
043import org.junit.ClassRule;
044import org.junit.Rule;
045import org.junit.Test;
046import org.junit.experimental.categories.Category;
047import org.junit.rules.TestName;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
052
053import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileEntry;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList;
055
056@Category({ RegionServerTests.class, SmallTests.class })
057public class TestStoreFileListFile {
058
059  @ClassRule
060  public static final HBaseClassTestRule CLASS_RULE =
061    HBaseClassTestRule.forClass(TestStoreFileListFile.class);
062
063  private static final Logger LOG = LoggerFactory.getLogger(TestStoreFileListFile.class);
064
065  private static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();
066
067  private Path testDir;
068
069  private StoreFileListFile storeFileListFile;
070
071  @Rule
072  public TestName name = new TestName();
073
074  private StoreFileListFile create() throws IOException {
075    HRegionFileSystem hfs = mock(HRegionFileSystem.class);
076    when(hfs.getFileSystem()).thenReturn(FileSystem.get(UTIL.getConfiguration()));
077    StoreContext ctx = StoreContext.getBuilder().withFamilyStoreDirectoryPath(testDir)
078      .withRegionFileSystem(hfs).build();
079    return new StoreFileListFile(ctx);
080  }
081
082  @Before
083  public void setUp() throws IOException {
084    testDir = UTIL.getDataTestDir(name.getMethodName());
085    storeFileListFile = create();
086  }
087
088  @AfterClass
089  public static void tearDown() {
090    UTIL.cleanupTestDir();
091  }
092
093  @Test
094  public void testEmptyLoad() throws IOException {
095    assertNull(storeFileListFile.load(false));
096  }
097
098  private FileStatus getOnlyTrackerFile(FileSystem fs) throws IOException {
099    return fs.listStatus(new Path(testDir, StoreFileListFile.TRACK_FILE_DIR))[0];
100  }
101
102  private byte[] readAll(FileSystem fs, Path file) throws IOException {
103    try (FSDataInputStream in = fs.open(file)) {
104      return ByteStreams.toByteArray(in);
105    }
106  }
107
108  private void write(FileSystem fs, Path file, byte[] buf, int off, int len) throws IOException {
109    try (FSDataOutputStream out = fs.create(file, true)) {
110      out.write(buf, off, len);
111    }
112  }
113
114  @Test
115  public void testLoadPartial() throws IOException {
116    StoreFileList.Builder builder = StoreFileList.newBuilder();
117    storeFileListFile.update(builder);
118    FileSystem fs = FileSystem.get(UTIL.getConfiguration());
119    FileStatus trackerFileStatus = getOnlyTrackerFile(fs);
120    // truncate it so we do not have enough data
121    LOG.info("Truncate file {} with size {} to {}", trackerFileStatus.getPath(),
122      trackerFileStatus.getLen(), trackerFileStatus.getLen() / 2);
123    byte[] content = readAll(fs, trackerFileStatus.getPath());
124    write(fs, trackerFileStatus.getPath(), content, 0, content.length / 2);
125    assertNull(storeFileListFile.load(false));
126  }
127
128  private void writeInt(byte[] buf, int off, int value) {
129    byte[] b = Bytes.toBytes(value);
130    for (int i = 0; i < 4; i++) {
131      buf[off + i] = b[i];
132    }
133  }
134
135  @Test
136  public void testZeroFileLength() throws IOException {
137    StoreFileList.Builder builder = StoreFileList.newBuilder();
138    storeFileListFile.update(builder);
139    FileSystem fs = FileSystem.get(UTIL.getConfiguration());
140    FileStatus trackerFileStatus = getOnlyTrackerFile(fs);
141    // write a zero length
142    byte[] content = readAll(fs, trackerFileStatus.getPath());
143    writeInt(content, 0, 0);
144    write(fs, trackerFileStatus.getPath(), content, 0, content.length);
145    assertThrows(IOException.class, () -> storeFileListFile.load(false));
146  }
147
148  @Test
149  public void testBigFileLength() throws IOException {
150    StoreFileList.Builder builder = StoreFileList.newBuilder();
151    storeFileListFile.update(builder);
152    FileSystem fs = FileSystem.get(UTIL.getConfiguration());
153    FileStatus trackerFileStatus = getOnlyTrackerFile(fs);
154    // write a large length
155    byte[] content = readAll(fs, trackerFileStatus.getPath());
156    writeInt(content, 0, 128 * 1024 * 1024);
157    write(fs, trackerFileStatus.getPath(), content, 0, content.length);
158    assertThrows(IOException.class, () -> storeFileListFile.load(false));
159  }
160
161  @Test
162  public void testChecksumMismatch() throws IOException {
163    StoreFileList.Builder builder = StoreFileList.newBuilder();
164    storeFileListFile.update(builder);
165    FileSystem fs = FileSystem.get(UTIL.getConfiguration());
166    FileStatus trackerFileStatus = getOnlyTrackerFile(fs);
167    // flip one byte
168    byte[] content = readAll(fs, trackerFileStatus.getPath());
169    content[5] = (byte) ~content[5];
170    write(fs, trackerFileStatus.getPath(), content, 0, content.length);
171    assertThrows(IOException.class, () -> storeFileListFile.load(false));
172  }
173
174  @Test
175  public void testLoadNewerTrackFiles() throws IOException, InterruptedException {
176    StoreFileList.Builder builder = StoreFileList.newBuilder();
177    storeFileListFile.update(builder);
178
179    FileSystem fs = FileSystem.get(UTIL.getConfiguration());
180    FileStatus trackFileStatus = getOnlyTrackerFile(fs);
181
182    builder.addStoreFile(StoreFileEntry.newBuilder().setName("hehe").setSize(10).build());
183    storeFileListFile = create();
184    storeFileListFile.update(builder);
185
186    // should load the list we stored the second time
187    storeFileListFile = create();
188    StoreFileList list = storeFileListFile.load(true);
189    assertEquals(1, list.getStoreFileCount());
190    // since read only is true, we should not delete the old track file
191    // the deletion is in background, so we will test it multiple times through HTU.waitFor and make
192    // sure that it is still there after timeout, i.e, the waitFor method returns -1
193    assertTrue(UTIL.waitFor(2000, 100, false, () -> !fs.exists(testDir)) < 0);
194
195    // this time read only is false, we should delete the old track file
196    list = storeFileListFile.load(false);
197    assertEquals(1, list.getStoreFileCount());
198    UTIL.waitFor(5000, () -> !fs.exists(trackFileStatus.getPath()));
199  }
200
201  // This is to simulate the scenario where a 'dead' RS perform flush or compaction on a region
202  // which has already been reassigned to another RS. This is possible in real world, usually caused
203  // by a long STW GC.
204  @Test
205  public void testConcurrentUpdate() throws IOException {
206    storeFileListFile.update(StoreFileList.newBuilder());
207
208    StoreFileListFile storeFileListFile2 = create();
209    storeFileListFile2.update(StoreFileList.newBuilder()
210      .addStoreFile(StoreFileEntry.newBuilder().setName("hehe").setSize(10).build()));
211
212    // let's update storeFileListFile several times
213    for (int i = 0; i < 10; i++) {
214      storeFileListFile.update(StoreFileList.newBuilder()
215        .addStoreFile(StoreFileEntry.newBuilder().setName("haha-" + i).setSize(100 + i).build()));
216    }
217
218    // create a new list file, make sure we load the list generate by storeFileListFile2.
219    StoreFileListFile storeFileListFile3 = create();
220    StoreFileList fileList = storeFileListFile3.load(true);
221    assertEquals(1, fileList.getStoreFileCount());
222    StoreFileEntry entry = fileList.getStoreFile(0);
223    assertEquals("hehe", entry.getName());
224    assertEquals(10, entry.getSize());
225  }
226
227  @Test
228  public void testLoadHigherVersion() throws IOException {
229    // write a fake StoreFileList file with higher version
230    StoreFileList storeFileList =
231      StoreFileList.newBuilder().setVersion(StoreFileListFile.VERSION + 1)
232        .setTimestamp(EnvironmentEdgeManager.currentTime()).build();
233    Path trackFileDir = new Path(testDir, StoreFileListFile.TRACK_FILE_DIR);
234    StoreFileListFile.write(FileSystem.get(UTIL.getConfiguration()),
235      new Path(trackFileDir, StoreFileListFile.TRACK_FILE_PREFIX
236        + StoreFileListFile.TRACK_FILE_SEPARATOR + EnvironmentEdgeManager.currentTime()),
237      storeFileList);
238    IOException error = assertThrows(IOException.class, () -> create().load(false));
239    assertEquals("Higher store file list version detected, expected " + StoreFileListFile.VERSION
240      + ", got " + (StoreFileListFile.VERSION + 1), error.getMessage());
241  }
242}