001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store.wal;
019
020import java.io.IOException;
021import org.apache.hadoop.fs.FSDataInputStream;
022import org.apache.hadoop.fs.FileStatus;
023import org.apache.hadoop.fs.FileSystem;
024import org.apache.hadoop.fs.Path;
025import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
026import org.apache.yetus.audience.InterfaceAudience;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
031import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
032import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALTrailer;
033
034/**
035 * Describes a WAL File
036 */
037@InterfaceAudience.Private
038public class ProcedureWALFile implements Comparable<ProcedureWALFile> {
039  private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFile.class);
040
041  private ProcedureWALHeader header;
042  private FSDataInputStream stream;
043  private FileSystem fs;
044  private Path logFile;
045  private long startPos;
046  private long minProcId;
047  private long maxProcId;
048  private long logSize;
049  private long timestamp;
050
051  public ProcedureStoreTracker getTracker() {
052    return tracker;
053  }
054
055  private final ProcedureStoreTracker tracker = new ProcedureStoreTracker();
056
057  public ProcedureWALFile(final FileSystem fs, final FileStatus logStatus) {
058    this.fs = fs;
059    this.logFile = logStatus.getPath();
060    this.logSize = logStatus.getLen();
061    this.timestamp = logStatus.getModificationTime();
062    tracker.setPartialFlag(true);
063  }
064
065  public ProcedureWALFile(FileSystem fs, Path logFile, ProcedureWALHeader header,
066      long startPos, long timestamp) {
067    this.fs = fs;
068    this.header = header;
069    this.logFile = logFile;
070    this.startPos = startPos;
071    this.logSize = startPos;
072    this.timestamp = timestamp;
073    tracker.setPartialFlag(true);
074  }
075
076  public void open() throws IOException {
077    if (stream == null) {
078      stream = fs.open(logFile);
079    }
080
081    if (header == null) {
082      header = ProcedureWALFormat.readHeader(stream);
083      startPos = stream.getPos();
084    } else {
085      stream.seek(startPos);
086    }
087  }
088
089  public ProcedureWALTrailer readTrailer() throws IOException {
090    try {
091      return ProcedureWALFormat.readTrailer(stream, startPos, logSize);
092    } finally {
093      stream.seek(startPos);
094    }
095  }
096
097  public void readTracker() throws IOException {
098    ProcedureWALTrailer trailer = readTrailer();
099    try {
100      stream.seek(trailer.getTrackerPos());
101      final ProcedureProtos.ProcedureStoreTracker trackerProtoBuf =
102          ProcedureProtos.ProcedureStoreTracker.parseDelimitedFrom(stream);
103      tracker.resetToProto(trackerProtoBuf);
104    } finally {
105      stream.seek(startPos);
106    }
107  }
108
109  public void updateLocalTracker(ProcedureStoreTracker tracker) {
110    this.tracker.resetTo(tracker);
111  }
112
113  public void close() {
114    if (stream == null) {
115      return;
116    }
117
118    try {
119      stream.close();
120    } catch (IOException e) {
121      LOG.warn("unable to close the wal file: " + logFile, e);
122    } finally {
123      stream = null;
124    }
125  }
126
127  public FSDataInputStream getStream() {
128    return stream;
129  }
130
131  public ProcedureWALHeader getHeader() {
132    return header;
133  }
134
135  public long getTimestamp() {
136    return timestamp;
137  }
138
139  public boolean isCompacted() {
140    return header.getType() == ProcedureWALFormat.LOG_TYPE_COMPACTED;
141  }
142
143  public long getLogId() {
144    return header.getLogId();
145  }
146
147  public long getSize() {
148    return logSize;
149  }
150
151  /**
152   * Used to update in-progress log sizes. the FileStatus will report 0 otherwise.
153   */
154  void addToSize(long size) {
155    this.logSize += size;
156  }
157
158  public void removeFile(final Path walArchiveDir) throws IOException {
159    close();
160    boolean archived = false;
161    if (walArchiveDir != null) {
162      Path archivedFile = new Path(walArchiveDir, logFile.getName());
163      LOG.info("Archiving " + logFile + " to " + archivedFile);
164      if (!fs.rename(logFile, archivedFile)) {
165        LOG.warn("Failed archive of " + logFile + ", deleting");
166      } else {
167        archived = true;
168      }
169    }
170    if (!archived) {
171      if (!fs.delete(logFile, false)) {
172        LOG.warn("Failed delete of " + logFile);
173      }
174    }
175  }
176
177  public void setProcIds(long minId, long maxId) {
178    this.minProcId = minId;
179    this.maxProcId = maxId;
180  }
181
182  public long getMinProcId() {
183    return minProcId;
184  }
185
186  public long getMaxProcId() {
187    return maxProcId;
188  }
189
190  @Override
191  public int compareTo(final ProcedureWALFile other) {
192    long diff = header.getLogId() - other.header.getLogId();
193    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
194  }
195
196  @Override
197  public boolean equals(Object o) {
198    if (this == o) {
199      return true;
200    }
201
202    if (!(o instanceof ProcedureWALFile)) {
203      return false;
204    }
205
206    return compareTo((ProcedureWALFile)o) == 0;
207  }
208
209  @Override
210  public int hashCode() {
211    return logFile.hashCode();
212  }
213
214  @Override
215  public String toString() {
216    return logFile.toString();
217  }
218}