001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.EOFException;
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.util.zip.CRC32;
025import org.apache.hadoop.fs.FSDataInputStream;
026import org.apache.hadoop.fs.FSDataOutputStream;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
034
035import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
036
037/**
038 * A file storage which supports atomic update through two files, i.e, rotating. The implementation
039 * does not require atomic rename.
040 */
041@InterfaceAudience.Private
042public class RotateFile {
043
044  private static final Logger LOG = LoggerFactory.getLogger(RotateFile.class);
045
046  private final FileSystem fs;
047
048  private final long maxFileSize;
049
050  private final Path[] files = new Path[2];
051
052  // this is used to make sure that we do not go backwards
053  private long prevTimestamp = -1;
054
055  private int nextFile = -1;
056
057  /**
058   * Constructs a new RotateFile object with the given parameters.
059   * @param fs          the file system to use.
060   * @param dir         the directory where the files will be created.
061   * @param name        the base name for the files.
062   * @param maxFileSize the maximum size of each file.
063   */
064  public RotateFile(FileSystem fs, Path dir, String name, long maxFileSize) {
065    this.fs = fs;
066    this.maxFileSize = maxFileSize;
067    this.files[0] = new Path(dir, name + "-0");
068    this.files[1] = new Path(dir, name + "-1");
069  }
070
071  private HBaseProtos.RotateFileData read(Path path) throws IOException {
072    byte[] data;
073    int expectedChecksum;
074    try (FSDataInputStream in = fs.open(path)) {
075      int length = in.readInt();
076      if (length <= 0 || length > maxFileSize) {
077        throw new IOException("Invalid file length " + length
078          + ", either less than 0 or greater then max allowed size " + maxFileSize);
079      }
080      data = new byte[length];
081      in.readFully(data);
082      expectedChecksum = in.readInt();
083    }
084    CRC32 crc32 = new CRC32();
085    crc32.update(data);
086    int calculatedChecksum = (int) crc32.getValue();
087    if (expectedChecksum != calculatedChecksum) {
088      throw new IOException(
089        "Checksum mismatch, expected " + expectedChecksum + ", actual " + calculatedChecksum);
090    }
091    return HBaseProtos.RotateFileData.parseFrom(data);
092  }
093
094  private int select(HBaseProtos.RotateFileData[] datas) {
095    if (datas[0] == null) {
096      return 1;
097    }
098    if (datas[1] == null) {
099      return 0;
100    }
101    return datas[0].getTimestamp() >= datas[1].getTimestamp() ? 0 : 1;
102  }
103
104  /**
105   * Reads the content of the rotate file by selecting the winner file based on the timestamp of the
106   * data inside the files. It reads the content of both files and selects the one with the latest
107   * timestamp as the winner. If a file is incomplete or does not exist, it logs the error and moves
108   * on to the next file. It returns the content of the winner file as a byte array. If none of the
109   * files have valid data, it returns null.
110   * @return a byte array containing the data from the winner file, or null if no valid data is
111   *         found.
112   * @throws IOException if an error occurs while reading the files.
113   */
114  public byte[] read() throws IOException {
115    HBaseProtos.RotateFileData[] datas = new HBaseProtos.RotateFileData[2];
116    for (int i = 0; i < 2; i++) {
117      try {
118        datas[i] = read(files[i]);
119      } catch (FileNotFoundException e) {
120        LOG.debug("file {} does not exist", files[i], e);
121      } catch (EOFException e) {
122        LOG.debug("file {} is incomplete", files[i], e);
123      }
124    }
125    int winnerIndex = select(datas);
126    nextFile = 1 - winnerIndex;
127    if (datas[winnerIndex] != null) {
128      prevTimestamp = datas[winnerIndex].getTimestamp();
129      return datas[winnerIndex].getData().toByteArray();
130    } else {
131      return null;
132    }
133  }
134
135  @RestrictedApi(explanation = "Should only be called in tests", link = "",
136      allowedOnPath = ".*/RotateFile.java|.*/src/test/.*")
137  static void write(FileSystem fs, Path file, long timestamp, byte[] data) throws IOException {
138    HBaseProtos.RotateFileData proto = HBaseProtos.RotateFileData.newBuilder()
139      .setTimestamp(timestamp).setData(ByteString.copyFrom(data)).build();
140    byte[] protoData = proto.toByteArray();
141    CRC32 crc32 = new CRC32();
142    crc32.update(protoData);
143    int checksum = (int) crc32.getValue();
144    // 4 bytes length, 8 bytes timestamp, 4 bytes checksum at the end
145    try (FSDataOutputStream out = fs.create(file, true)) {
146      out.writeInt(protoData.length);
147      out.write(protoData);
148      out.writeInt(checksum);
149    }
150  }
151
152  /**
153   * Writes the given data to the next file in the rotation, with a timestamp calculated based on
154   * the previous timestamp and the current time to make sure it is greater than the previous
155   * timestamp. The method also deletes the previous file, which is no longer needed.
156   * <p/>
157   * Notice that, for a newly created {@link RotateFile} instance, you need to call {@link #read()}
158   * first to initialize the nextFile index, before calling this method.
159   * @param data the data to be written to the file
160   * @throws IOException if an I/O error occurs while writing the data to the file
161   */
162  public void write(byte[] data) throws IOException {
163    if (data.length > maxFileSize) {
164      throw new IOException(
165        "Data size " + data.length + " is greater than max allowed size " + maxFileSize);
166    }
167    long timestamp = Math.max(prevTimestamp + 1, EnvironmentEdgeManager.currentTime());
168    write(fs, files[nextFile], timestamp, data);
169    prevTimestamp = timestamp;
170    nextFile = 1 - nextFile;
171    try {
172      fs.delete(files[nextFile], false);
173    } catch (IOException e) {
174      // we will create new file with overwrite = true, so not a big deal here, only for speed up
175      // loading as we do not need to read this file when loading
176      LOG.debug("Failed to delete old file {}, ignoring the exception", files[nextFile], e);
177    }
178  }
179
180  /**
181   * Deletes the two files used for rotating data. If any of the files cannot be deleted, an
182   * IOException is thrown.
183   * @throws IOException if there is an error deleting either file
184   */
185  public void delete() throws IOException {
186    Path next = files[nextFile];
187    // delete next file first, and then the current file, so when failing to delete, we can still
188    // read the correct data
189    if (fs.exists(next) && !fs.delete(next, false)) {
190      throw new IOException("Can not delete " + next);
191    }
192    Path current = files[1 - nextFile];
193    if (fs.exists(current) && !fs.delete(current, false)) {
194      throw new IOException("Can not delete " + current);
195    }
196  }
197}