001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store.wal; 019 020import java.io.IOException; 021import org.apache.hadoop.fs.FSDataInputStream; 022import org.apache.hadoop.fs.FileStatus; 023import org.apache.hadoop.fs.FileSystem; 024import org.apache.hadoop.fs.Path; 025import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; 026import org.apache.yetus.audience.InterfaceAudience; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029 030import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 031import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader; 032import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALTrailer; 033 034/** 035 * Describes a WAL File 036 */ 037@InterfaceAudience.Private 038public class ProcedureWALFile implements Comparable<ProcedureWALFile> { 039 private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFile.class); 040 041 private ProcedureWALHeader header; 042 private FSDataInputStream stream; 043 private FileSystem fs; 044 private Path logFile; 045 private long startPos; 046 private long minProcId; 047 private long maxProcId; 048 private long logSize; 049 private long timestamp; 050 051 public ProcedureStoreTracker getTracker() { 052 return tracker; 053 } 054 055 private final ProcedureStoreTracker tracker = new ProcedureStoreTracker(); 056 057 public ProcedureWALFile(final FileSystem fs, final FileStatus logStatus) { 058 this.fs = fs; 059 this.logFile = logStatus.getPath(); 060 this.logSize = logStatus.getLen(); 061 this.timestamp = logStatus.getModificationTime(); 062 tracker.setPartialFlag(true); 063 } 064 065 public ProcedureWALFile(FileSystem fs, Path logFile, ProcedureWALHeader header, 066 long startPos, long timestamp) { 067 this.fs = fs; 068 this.header = header; 069 this.logFile = logFile; 070 this.startPos = startPos; 071 this.logSize = startPos; 072 this.timestamp = timestamp; 073 tracker.setPartialFlag(true); 074 } 075 076 public void open() throws IOException { 077 if (stream == null) { 078 stream = fs.open(logFile); 079 } 080 081 if (header == null) { 082 header = ProcedureWALFormat.readHeader(stream); 083 startPos = stream.getPos(); 084 } else { 085 stream.seek(startPos); 086 } 087 } 088 089 public ProcedureWALTrailer readTrailer() throws IOException { 090 try { 091 return ProcedureWALFormat.readTrailer(stream, startPos, logSize); 092 } finally { 093 stream.seek(startPos); 094 } 095 } 096 097 public void readTracker() throws IOException { 098 ProcedureWALTrailer trailer = readTrailer(); 099 try { 100 stream.seek(trailer.getTrackerPos()); 101 final ProcedureProtos.ProcedureStoreTracker trackerProtoBuf = 102 ProcedureProtos.ProcedureStoreTracker.parseDelimitedFrom(stream); 103 tracker.resetToProto(trackerProtoBuf); 104 } finally { 105 stream.seek(startPos); 106 } 107 } 108 109 public void updateLocalTracker(ProcedureStoreTracker tracker) { 110 this.tracker.resetTo(tracker); 111 } 112 113 public void close() { 114 if (stream == null) { 115 return; 116 } 117 118 try { 119 stream.close(); 120 } catch (IOException e) { 121 LOG.warn("unable to close the wal file: " + logFile, e); 122 } finally { 123 stream = null; 124 } 125 } 126 127 public FSDataInputStream getStream() { 128 return stream; 129 } 130 131 public ProcedureWALHeader getHeader() { 132 return header; 133 } 134 135 public long getTimestamp() { 136 return timestamp; 137 } 138 139 public boolean isCompacted() { 140 return header.getType() == ProcedureWALFormat.LOG_TYPE_COMPACTED; 141 } 142 143 public long getLogId() { 144 return header.getLogId(); 145 } 146 147 public long getSize() { 148 return logSize; 149 } 150 151 /** 152 * Used to update in-progress log sizes. the FileStatus will report 0 otherwise. 153 */ 154 void addToSize(long size) { 155 this.logSize += size; 156 } 157 158 public void removeFile(final Path walArchiveDir) throws IOException { 159 close(); 160 boolean archived = false; 161 if (walArchiveDir != null) { 162 Path archivedFile = new Path(walArchiveDir, logFile.getName()); 163 LOG.info("Archiving " + logFile + " to " + archivedFile); 164 if (!fs.rename(logFile, archivedFile)) { 165 LOG.warn("Failed archive of " + logFile + ", deleting"); 166 } else { 167 archived = true; 168 } 169 } 170 if (!archived) { 171 if (!fs.delete(logFile, false)) { 172 LOG.warn("Failed delete of " + logFile); 173 } 174 } 175 } 176 177 public void setProcIds(long minId, long maxId) { 178 this.minProcId = minId; 179 this.maxProcId = maxId; 180 } 181 182 public long getMinProcId() { 183 return minProcId; 184 } 185 186 public long getMaxProcId() { 187 return maxProcId; 188 } 189 190 @Override 191 public int compareTo(final ProcedureWALFile other) { 192 long diff = header.getLogId() - other.header.getLogId(); 193 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; 194 } 195 196 @Override 197 public boolean equals(Object o) { 198 if (this == o) { 199 return true; 200 } 201 202 if (!(o instanceof ProcedureWALFile)) { 203 return false; 204 } 205 206 return compareTo((ProcedureWALFile)o) == 0; 207 } 208 209 @Override 210 public int hashCode() { 211 return logFile.hashCode(); 212 } 213 214 @Override 215 public String toString() { 216 return logFile.toString(); 217 } 218}