001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import java.io.IOException;
021import java.io.OutputStream;
022import org.apache.hadoop.fs.FSDataOutputStream;
023import org.apache.hadoop.fs.FileSystem;
024import org.apache.hadoop.fs.Path;
025import org.apache.hadoop.hbase.Cell;
026import org.apache.hadoop.hbase.util.CommonFSUtils;
027import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
028import org.apache.hadoop.hbase.wal.FSHLogProvider;
029import org.apache.hadoop.hbase.wal.WAL.Entry;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
035import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
036
037/**
038 * Writer for protobuf-based WAL.
039 */
040@InterfaceAudience.Private
041public class ProtobufLogWriter extends AbstractProtobufLogWriter
042    implements FSHLogProvider.Writer {
043
044  private static final Logger LOG = LoggerFactory.getLogger(ProtobufLogWriter.class);
045
046  protected FSDataOutputStream output;
047
048  @Override
049  public void append(Entry entry) throws IOException {
050    entry.getKey().getBuilder(compressor).
051        setFollowingKvCount(entry.getEdit().size()).build().writeDelimitedTo(output);
052    for (Cell cell : entry.getEdit().getCells()) {
053      // cellEncoder must assume little about the stream, since we write PB and cells in turn.
054      cellEncoder.write(cell);
055    }
056    length.set(output.getPos());
057  }
058
059  @Override
060  public void close() throws IOException {
061    if (this.output != null) {
062      try {
063        if (!trailerWritten) {
064          writeWALTrailer();
065        }
066        this.output.close();
067      } catch (NullPointerException npe) {
068        // Can get a NPE coming up from down in DFSClient$DFSOutputStream#close
069        LOG.warn(npe.toString(), npe);
070      }
071      this.output = null;
072    }
073  }
074
075  @Override
076  public void sync(boolean forceSync) throws IOException {
077    FSDataOutputStream fsdos = this.output;
078    if (fsdos == null) {
079      return; // Presume closed
080    }
081    fsdos.flush();
082    if (forceSync) {
083      fsdos.hsync();
084    } else {
085      fsdos.hflush();
086    }
087  }
088
089  public FSDataOutputStream getStream() {
090    return this.output;
091  }
092
093  @SuppressWarnings("deprecation")
094  @Override
095  protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
096      short replication, long blockSize) throws IOException, StreamLacksCapabilityException {
097    this.output = CommonFSUtils.createForWal(fs, path, overwritable, bufferSize, replication,
098        blockSize, false);
099    if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true)) {
100      if (!CommonFSUtils.hasCapability(output, "hflush")) {
101        throw new StreamLacksCapabilityException("hflush");
102      }
103      if (!CommonFSUtils.hasCapability(output, "hsync")) {
104        throw new StreamLacksCapabilityException("hsync");
105      }
106    }
107  }
108
109  @Override
110  protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException {
111    output.write(magic);
112    header.writeDelimitedTo(output);
113    return output.getPos();
114  }
115
116  @Override
117  protected OutputStream getOutputStreamForCellEncoder() {
118    return this.output;
119  }
120
121  @Override
122  protected long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) throws IOException {
123    trailer.writeTo(output);
124    output.writeInt(trailer.getSerializedSize());
125    output.write(magic);
126    return output.getPos();
127  }
128}