View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver.wal;
21  
22  import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.DEFAULT_WAL_TRAILER_WARN_SIZE;
23  import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.WAL_TRAILER_WARN_SIZE;
24  
25  import java.io.IOException;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.fs.FSDataOutputStream;
31  import org.apache.hadoop.fs.FileSystem;
32  import org.apache.hadoop.fs.Path;
33  import org.apache.hadoop.hbase.Cell;
34  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
35  import org.apache.hadoop.hbase.classification.InterfaceAudience;
36  import org.apache.hadoop.hbase.codec.Codec;
37  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader;
38  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
39  import org.apache.hadoop.hbase.util.FSUtils;
40  import org.apache.hadoop.hbase.wal.WAL.Entry;
41  
42  /**
43   * Writer for protobuf-based WAL.
44   */
45  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
46  public class ProtobufLogWriter extends WriterBase {
47    private static final Log LOG = LogFactory.getLog(ProtobufLogWriter.class);
48    protected FSDataOutputStream output;
49    protected Codec.Encoder cellEncoder;
50    protected WALCellCodec.ByteStringCompressor compressor;
51    private boolean trailerWritten;
52    private WALTrailer trailer;
53    // maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger
54    // than this size, it is written/read respectively, with a WARN message in the log.
55    private int trailerWarnSize;
56  
57    public ProtobufLogWriter() {
58      super();
59    }
60  
61    protected WALCellCodec getCodec(Configuration conf, CompressionContext compressionContext)
62        throws IOException {
63      return WALCellCodec.create(conf, null, compressionContext);
64    }
65  
66    protected WALHeader buildWALHeader(Configuration conf, WALHeader.Builder builder)
67        throws IOException {
68      if (!builder.hasWriterClsName()) {
69        builder.setWriterClsName(ProtobufLogWriter.class.getSimpleName());
70      }
71      if (!builder.hasCellCodecClsName()) {
72        builder.setCellCodecClsName(WALCellCodec.getWALCellCodecClass(conf).getName());
73      }
74      return builder.build();
75    }
76  
77    @Override
78    @SuppressWarnings("deprecation")
79    public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable)
80    throws IOException {
81      super.init(fs, path, conf, overwritable);
82      assert this.output == null;
83      boolean doCompress = initializeCompressionContext(conf, path);
84      this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
85      int bufferSize = FSUtils.getDefaultBufferSize(fs);
86      short replication = (short)conf.getInt(
87          "hbase.regionserver.hlog.replication", FSUtils.getDefaultReplication(fs, path));
88      long blockSize = conf.getLong("hbase.regionserver.hlog.blocksize",
89          FSUtils.getDefaultBlockSize(fs, path));
90      output = fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, null);
91      output.write(ProtobufLogReader.PB_WAL_MAGIC);
92      boolean doTagCompress = doCompress
93          && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
94      buildWALHeader(conf,
95          WALHeader.newBuilder().setHasCompression(doCompress).setHasTagCompression(doTagCompress))
96          .writeDelimitedTo(output);
97  
98      initAfterHeader(doCompress);
99  
100     // instantiate trailer to default value.
101     trailer = WALTrailer.newBuilder().build();
102     if (LOG.isTraceEnabled()) {
103       LOG.trace("Initialized protobuf WAL=" + path + ", compression=" + doCompress);
104     }
105   }
106 
107   protected void initAfterHeader(boolean doCompress) throws IOException {
108     WALCellCodec codec = getCodec(conf, this.compressionContext);
109     this.cellEncoder = codec.getEncoder(this.output);
110     if (doCompress) {
111       this.compressor = codec.getByteStringCompressor();
112     }
113   }
114 
115   @Override
116   public void append(Entry entry) throws IOException {
117     entry.setCompressionContext(compressionContext);
118     entry.getKey().getBuilder(compressor).
119       setFollowingKvCount(entry.getEdit().size()).build().writeDelimitedTo(output);
120     for (Cell cell : entry.getEdit().getCells()) {
121       // cellEncoder must assume little about the stream, since we write PB and cells in turn.
122       cellEncoder.write(cell);
123     }
124   }
125 
126   @Override
127   public void close() throws IOException {
128     if (this.output != null) {
129       try {
130         if (!trailerWritten) writeWALTrailer();
131         this.output.close();
132       } catch (NullPointerException npe) {
133         // Can get a NPE coming up from down in DFSClient$DFSOutputStream#close
134         LOG.warn(npe);
135       }
136       this.output = null;
137     }
138   }
139 
140   WALTrailer buildWALTrailer(WALTrailer.Builder builder) {
141     return builder.build();
142   }
143 
144   private void writeWALTrailer() {
145     try {
146       int trailerSize = 0;
147       if (this.trailer == null) {
148         // use default trailer.
149         LOG.warn("WALTrailer is null. Continuing with default.");
150         this.trailer = buildWALTrailer(WALTrailer.newBuilder());
151         trailerSize = this.trailer.getSerializedSize();
152       } else if ((trailerSize = this.trailer.getSerializedSize()) > this.trailerWarnSize) {
153         // continue writing after warning the user.
154         LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum size : " +
155           trailerSize + " > " + this.trailerWarnSize);
156       }
157       this.trailer.writeTo(output);
158       output.writeInt(trailerSize);
159       output.write(ProtobufLogReader.PB_WAL_COMPLETE_MAGIC);
160       this.trailerWritten = true;
161     } catch (IOException ioe) {
162       LOG.warn("Failed to write trailer, non-fatal, continuing...", ioe);
163     }
164   }
165 
166   @Override
167   public void sync() throws IOException {
168     FSDataOutputStream fsdos = this.output;
169     if (fsdos == null) return; // Presume closed
170     fsdos.flush();
171     fsdos.hflush();
172   }
173 
174   @Override
175   public long getLength() throws IOException {
176     try {
177       return this.output.getPos();
178     } catch (NullPointerException npe) {
179       // Concurrent close...
180       throw new IOException(npe);
181     }
182   }
183 
184   public FSDataOutputStream getStream() {
185     return this.output;
186   }
187 
188   void setWALTrailer(WALTrailer walTrailer) {
189     this.trailer = walTrailer;
190   }
191 }