001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store.wal; 019 020import java.io.IOException; 021import org.apache.hadoop.fs.FSDataInputStream; 022import org.apache.hadoop.fs.FileStatus; 023import org.apache.hadoop.fs.FileSystem; 024import org.apache.hadoop.fs.Path; 025import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; 026import org.apache.yetus.audience.InterfaceAudience; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029 030import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 031import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader; 032import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALTrailer; 033 034/** 035 * Describes a WAL File 036 */ 037@InterfaceAudience.Private 038public class ProcedureWALFile implements Comparable<ProcedureWALFile> { 039 private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFile.class); 040 041 private ProcedureWALHeader header; 042 private FSDataInputStream stream; 043 private FileSystem fs; 044 private Path logFile; 045 private long startPos; 046 private long minProcId; 047 private long maxProcId; 048 private long logSize; 049 private long timestamp; 050 051 public ProcedureStoreTracker getTracker() { 052 return tracker; 053 } 054 055 private final ProcedureStoreTracker tracker = new ProcedureStoreTracker(); 056 057 public ProcedureWALFile(final FileSystem fs, final FileStatus logStatus) { 058 this.fs = fs; 059 this.logFile = logStatus.getPath(); 060 this.logSize = logStatus.getLen(); 061 this.timestamp = logStatus.getModificationTime(); 062 tracker.setPartialFlag(true); 063 } 064 065 public ProcedureWALFile(FileSystem fs, Path logFile, ProcedureWALHeader header, 066 long startPos, long timestamp) { 067 this.fs = fs; 068 this.header = header; 069 this.logFile = logFile; 070 this.startPos = startPos; 071 this.logSize = startPos; 072 this.timestamp = timestamp; 073 tracker.setPartialFlag(true); 074 } 075 076 public void open() throws IOException { 077 if (stream == null) { 078 stream = fs.open(logFile); 079 } 080 081 if (header == null) { 082 header = ProcedureWALFormat.readHeader(stream); 083 startPos = stream.getPos(); 084 } else { 085 stream.seek(startPos); 086 } 087 } 088 089 public ProcedureWALTrailer readTrailer() throws IOException { 090 try { 091 return ProcedureWALFormat.readTrailer(stream, startPos, logSize); 092 } finally { 093 stream.seek(startPos); 094 } 095 } 096 097 public void readTracker() throws IOException { 098 ProcedureWALTrailer trailer = readTrailer(); 099 try { 100 stream.seek(trailer.getTrackerPos()); 101 final ProcedureProtos.ProcedureStoreTracker trackerProtoBuf = 102 ProcedureProtos.ProcedureStoreTracker.parseDelimitedFrom(stream); 103 tracker.resetToProto(trackerProtoBuf); 104 } finally { 105 stream.seek(startPos); 106 } 107 } 108 109 public void updateLocalTracker(ProcedureStoreTracker tracker) { 110 this.tracker.resetTo(tracker); 111 } 112 113 public void close() { 114 if (stream == null) return; 115 try { 116 stream.close(); 117 } catch (IOException e) { 118 LOG.warn("unable to close the wal file: " + logFile, e); 119 } finally { 120 stream = null; 121 } 122 } 123 124 public FSDataInputStream getStream() { 125 return stream; 126 } 127 128 public ProcedureWALHeader getHeader() { 129 return header; 130 } 131 132 public long getTimestamp() { 133 return timestamp; 134 } 135 136 public boolean isCompacted() { 137 return header.getType() == ProcedureWALFormat.LOG_TYPE_COMPACTED; 138 } 139 140 public long getLogId() { 141 return header.getLogId(); 142 } 143 144 public long getSize() { 145 return logSize; 146 } 147 148 /** 149 * Used to update in-progress log sizes. the FileStatus will report 0 otherwise. 150 */ 151 void addToSize(long size) { 152 this.logSize += size; 153 } 154 155 public void removeFile(final Path walArchiveDir) throws IOException { 156 close(); 157 boolean archived = false; 158 if (walArchiveDir != null) { 159 Path archivedFile = new Path(walArchiveDir, logFile.getName()); 160 LOG.info("Archiving " + logFile + " to " + archivedFile); 161 if (!fs.rename(logFile, archivedFile)) { 162 LOG.warn("Failed archive of " + logFile + ", deleting"); 163 } else { 164 archived = true; 165 } 166 } 167 if (!archived) { 168 if (!fs.delete(logFile, false)) { 169 LOG.warn("Failed delete of " + logFile); 170 } 171 } 172 } 173 174 public void setProcIds(long minId, long maxId) { 175 this.minProcId = minId; 176 this.maxProcId = maxId; 177 } 178 179 public long getMinProcId() { 180 return minProcId; 181 } 182 183 public long getMaxProcId() { 184 return maxProcId; 185 } 186 187 @Override 188 public int compareTo(final ProcedureWALFile other) { 189 long diff = header.getLogId() - other.header.getLogId(); 190 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; 191 } 192 193 @Override 194 public boolean equals(Object o) { 195 if (this == o) return true; 196 if (!(o instanceof ProcedureWALFile)) return false; 197 return compareTo((ProcedureWALFile)o) == 0; 198 } 199 200 @Override 201 public int hashCode() { 202 return logFile.hashCode(); 203 } 204 205 @Override 206 public String toString() { 207 return logFile.toString(); 208 } 209}