1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver.wal;
21
22 import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.DEFAULT_WAL_TRAILER_WARN_SIZE;
23 import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.WAL_TRAILER_WARN_SIZE;
24
25 import java.io.IOException;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.fs.FSDataOutputStream;
31 import org.apache.hadoop.fs.FileSystem;
32 import org.apache.hadoop.fs.Path;
33 import org.apache.hadoop.hbase.Cell;
34 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
35 import org.apache.hadoop.hbase.classification.InterfaceAudience;
36 import org.apache.hadoop.hbase.codec.Codec;
37 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader;
38 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
39 import org.apache.hadoop.hbase.util.FSUtils;
40 import org.apache.hadoop.hbase.wal.WAL.Entry;
41
42
43
44
45 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
46 public class ProtobufLogWriter extends WriterBase {
47 private static final Log LOG = LogFactory.getLog(ProtobufLogWriter.class);
48 protected FSDataOutputStream output;
49 protected Codec.Encoder cellEncoder;
50 protected WALCellCodec.ByteStringCompressor compressor;
51 private boolean trailerWritten;
52 private WALTrailer trailer;
53
54
55 private int trailerWarnSize;
56
57 public ProtobufLogWriter() {
58 super();
59 }
60
61 protected WALCellCodec getCodec(Configuration conf, CompressionContext compressionContext)
62 throws IOException {
63 return WALCellCodec.create(conf, null, compressionContext);
64 }
65
66 protected WALHeader buildWALHeader(Configuration conf, WALHeader.Builder builder)
67 throws IOException {
68 if (!builder.hasWriterClsName()) {
69 builder.setWriterClsName(ProtobufLogWriter.class.getSimpleName());
70 }
71 if (!builder.hasCellCodecClsName()) {
72 builder.setCellCodecClsName(WALCellCodec.getWALCellCodecClass(conf).getName());
73 }
74 return builder.build();
75 }
76
77 @Override
78 @SuppressWarnings("deprecation")
79 public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable)
80 throws IOException {
81 super.init(fs, path, conf, overwritable);
82 assert this.output == null;
83 boolean doCompress = initializeCompressionContext(conf, path);
84 this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
85 int bufferSize = FSUtils.getDefaultBufferSize(fs);
86 short replication = (short)conf.getInt(
87 "hbase.regionserver.hlog.replication", FSUtils.getDefaultReplication(fs, path));
88 long blockSize = conf.getLong("hbase.regionserver.hlog.blocksize",
89 FSUtils.getDefaultBlockSize(fs, path));
90 output = fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, null);
91 output.write(ProtobufLogReader.PB_WAL_MAGIC);
92 boolean doTagCompress = doCompress
93 && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
94 buildWALHeader(conf,
95 WALHeader.newBuilder().setHasCompression(doCompress).setHasTagCompression(doTagCompress))
96 .writeDelimitedTo(output);
97
98 initAfterHeader(doCompress);
99
100
101 trailer = WALTrailer.newBuilder().build();
102 if (LOG.isTraceEnabled()) {
103 LOG.trace("Initialized protobuf WAL=" + path + ", compression=" + doCompress);
104 }
105 }
106
107 protected void initAfterHeader(boolean doCompress) throws IOException {
108 WALCellCodec codec = getCodec(conf, this.compressionContext);
109 this.cellEncoder = codec.getEncoder(this.output);
110 if (doCompress) {
111 this.compressor = codec.getByteStringCompressor();
112 }
113 }
114
115 @Override
116 public void append(Entry entry) throws IOException {
117 entry.setCompressionContext(compressionContext);
118 entry.getKey().getBuilder(compressor).
119 setFollowingKvCount(entry.getEdit().size()).build().writeDelimitedTo(output);
120 for (Cell cell : entry.getEdit().getCells()) {
121
122 cellEncoder.write(cell);
123 }
124 }
125
126 @Override
127 public void close() throws IOException {
128 if (this.output != null) {
129 try {
130 if (!trailerWritten) writeWALTrailer();
131 this.output.close();
132 } catch (NullPointerException npe) {
133
134 LOG.warn(npe);
135 }
136 this.output = null;
137 }
138 }
139
140 WALTrailer buildWALTrailer(WALTrailer.Builder builder) {
141 return builder.build();
142 }
143
144 private void writeWALTrailer() {
145 try {
146 int trailerSize = 0;
147 if (this.trailer == null) {
148
149 LOG.warn("WALTrailer is null. Continuing with default.");
150 this.trailer = buildWALTrailer(WALTrailer.newBuilder());
151 trailerSize = this.trailer.getSerializedSize();
152 } else if ((trailerSize = this.trailer.getSerializedSize()) > this.trailerWarnSize) {
153
154 LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum size : " +
155 trailerSize + " > " + this.trailerWarnSize);
156 }
157 this.trailer.writeTo(output);
158 output.writeInt(trailerSize);
159 output.write(ProtobufLogReader.PB_WAL_COMPLETE_MAGIC);
160 this.trailerWritten = true;
161 } catch (IOException ioe) {
162 LOG.warn("Failed to write trailer, non-fatal, continuing...", ioe);
163 }
164 }
165
166 @Override
167 public void sync() throws IOException {
168 FSDataOutputStream fsdos = this.output;
169 if (fsdos == null) return;
170 fsdos.flush();
171 fsdos.hflush();
172 }
173
174 @Override
175 public long getLength() throws IOException {
176 try {
177 return this.output.getPos();
178 } catch (NullPointerException npe) {
179
180 throw new IOException(npe);
181 }
182 }
183
184 public FSDataOutputStream getStream() {
185 return this.output;
186 }
187
188 void setWALTrailer(WALTrailer walTrailer) {
189 this.trailer = walTrailer;
190 }
191 }