001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store.wal;
019
020import java.io.IOException;
021import org.apache.hadoop.fs.FSDataInputStream;
022import org.apache.hadoop.fs.FileStatus;
023import org.apache.hadoop.fs.FileSystem;
024import org.apache.hadoop.fs.Path;
025import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
026import org.apache.yetus.audience.InterfaceAudience;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
031import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
032import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALTrailer;
033
034/**
035 * Describes a WAL File
036 */
037@InterfaceAudience.Private
038public class ProcedureWALFile implements Comparable<ProcedureWALFile> {
039  private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFile.class);
040
041  private ProcedureWALHeader header;
042  private FSDataInputStream stream;
043  private FileSystem fs;
044  private Path logFile;
045  private long startPos;
046  private long minProcId;
047  private long maxProcId;
048  private long logSize;
049  private long timestamp;
050
051  public ProcedureStoreTracker getTracker() {
052    return tracker;
053  }
054
055  private final ProcedureStoreTracker tracker = new ProcedureStoreTracker();
056
057  public ProcedureWALFile(final FileSystem fs, final FileStatus logStatus) {
058    this.fs = fs;
059    this.logFile = logStatus.getPath();
060    this.logSize = logStatus.getLen();
061    this.timestamp = logStatus.getModificationTime();
062    tracker.setPartialFlag(true);
063  }
064
065  public ProcedureWALFile(FileSystem fs, Path logFile, ProcedureWALHeader header,
066      long startPos, long timestamp) {
067    this.fs = fs;
068    this.header = header;
069    this.logFile = logFile;
070    this.startPos = startPos;
071    this.logSize = startPos;
072    this.timestamp = timestamp;
073    tracker.setPartialFlag(true);
074  }
075
076  public void open() throws IOException {
077    if (stream == null) {
078      stream = fs.open(logFile);
079    }
080
081    if (header == null) {
082      header = ProcedureWALFormat.readHeader(stream);
083      startPos = stream.getPos();
084    } else {
085      stream.seek(startPos);
086    }
087  }
088
089  public ProcedureWALTrailer readTrailer() throws IOException {
090    try {
091      return ProcedureWALFormat.readTrailer(stream, startPos, logSize);
092    } finally {
093      stream.seek(startPos);
094    }
095  }
096
097  public void readTracker() throws IOException {
098    ProcedureWALTrailer trailer = readTrailer();
099    try {
100      stream.seek(trailer.getTrackerPos());
101      final ProcedureProtos.ProcedureStoreTracker trackerProtoBuf =
102          ProcedureProtos.ProcedureStoreTracker.parseDelimitedFrom(stream);
103      tracker.resetToProto(trackerProtoBuf);
104    } finally {
105      stream.seek(startPos);
106    }
107  }
108
109  public void updateLocalTracker(ProcedureStoreTracker tracker) {
110    this.tracker.resetTo(tracker);
111  }
112
113  public void close() {
114    if (stream == null) return;
115    try {
116      stream.close();
117    } catch (IOException e) {
118      LOG.warn("unable to close the wal file: " + logFile, e);
119    } finally {
120      stream = null;
121    }
122  }
123
124  public FSDataInputStream getStream() {
125    return stream;
126  }
127
128  public ProcedureWALHeader getHeader() {
129    return header;
130  }
131
132  public long getTimestamp() {
133    return timestamp;
134  }
135
136  public boolean isCompacted() {
137    return header.getType() == ProcedureWALFormat.LOG_TYPE_COMPACTED;
138  }
139
140  public long getLogId() {
141    return header.getLogId();
142  }
143
144  public long getSize() {
145    return logSize;
146  }
147
148  /**
149   * Used to update in-progress log sizes. the FileStatus will report 0 otherwise.
150   */
151  void addToSize(long size) {
152    this.logSize += size;
153  }
154
155  public void removeFile(final Path walArchiveDir) throws IOException {
156    close();
157    boolean archived = false;
158    if (walArchiveDir != null) {
159      Path archivedFile = new Path(walArchiveDir, logFile.getName());
160      LOG.info("Archiving " + logFile + " to " + archivedFile);
161      if (!fs.rename(logFile, archivedFile)) {
162        LOG.warn("Failed archive of " + logFile + ", deleting");
163      } else {
164        archived = true;
165      }
166    }
167    if (!archived) {
168      if (!fs.delete(logFile, false)) {
169        LOG.warn("Failed delete of " + logFile);
170      }
171    }
172  }
173
174  public void setProcIds(long minId, long maxId) {
175    this.minProcId = minId;
176    this.maxProcId = maxId;
177  }
178
179  public long getMinProcId() {
180    return minProcId;
181  }
182
183  public long getMaxProcId() {
184    return maxProcId;
185  }
186
187  @Override
188  public int compareTo(final ProcedureWALFile other) {
189    long diff = header.getLogId() - other.header.getLogId();
190    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
191  }
192
193  @Override
194  public boolean equals(Object o) {
195    if (this == o) return true;
196    if (!(o instanceof ProcedureWALFile)) return false;
197    return compareTo((ProcedureWALFile)o) == 0;
198  }
199
200  @Override
201  public int hashCode() {
202    return logFile.hashCode();
203  }
204
205  @Override
206  public String toString() {
207    return logFile.toString();
208  }
209}