001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import java.io.IOException;
021import java.io.OutputStream;
022import java.util.concurrent.atomic.AtomicLong;
023import org.apache.hadoop.fs.FSDataOutputStream;
024import org.apache.hadoop.fs.FileSystem;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.fs.StreamCapabilities;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
029import org.apache.hadoop.hbase.util.AtomicUtils;
030import org.apache.hadoop.hbase.util.CommonFSUtils;
031import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
032import org.apache.hadoop.hbase.wal.FSHLogProvider;
033import org.apache.hadoop.hbase.wal.WAL.Entry;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
039import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
040
041/**
042 * Writer for protobuf-based WAL.
043 */
044@InterfaceAudience.Private
045public class ProtobufLogWriter extends AbstractProtobufLogWriter implements FSHLogProvider.Writer {
046
047  private static final Logger LOG = LoggerFactory.getLogger(ProtobufLogWriter.class);
048
049  protected FSDataOutputStream output;
050
051  private final AtomicLong syncedLength = new AtomicLong(0);
052
053  @Override
054  public void append(Entry entry) throws IOException {
055    entry.getKey().getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build()
056      .writeDelimitedTo(output);
057    for (Cell cell : entry.getEdit().getCells()) {
058      // cellEncoder must assume little about the stream, since we write PB and cells in turn.
059      cellEncoder.write(cell);
060    }
061    length.set(output.getPos());
062  }
063
064  @Override
065  public void close() throws IOException {
066    if (this.output != null) {
067      try {
068        if (!trailerWritten) {
069          writeWALTrailer();
070        }
071        this.output.close();
072      } catch (NullPointerException npe) {
073        // Can get a NPE coming up from down in DFSClient$DFSOutputStream#close
074        LOG.warn(npe.toString(), npe);
075      }
076      this.output = null;
077    }
078  }
079
080  @Override
081  public void sync(boolean forceSync) throws IOException {
082    FSDataOutputStream fsdos = this.output;
083    if (fsdos == null) {
084      return; // Presume closed
085    }
086    fsdos.flush();
087    if (forceSync) {
088      fsdos.hsync();
089    } else {
090      fsdos.hflush();
091    }
092    AtomicUtils.updateMax(this.syncedLength, fsdos.getPos());
093  }
094
095  @Override
096  public long getSyncedLength() {
097    return this.syncedLength.get();
098  }
099
100  public FSDataOutputStream getStream() {
101    return this.output;
102  }
103
104  @Override
105  protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
106    short replication, long blockSize, StreamSlowMonitor monitor)
107    throws IOException, StreamLacksCapabilityException {
108    this.output =
109      CommonFSUtils.createForWal(fs, path, overwritable, bufferSize, replication, blockSize, false);
110    if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true)) {
111      if (!output.hasCapability(StreamCapabilities.HFLUSH)) {
112        throw new StreamLacksCapabilityException(StreamCapabilities.HFLUSH);
113      }
114      if (!output.hasCapability(StreamCapabilities.HSYNC)) {
115        throw new StreamLacksCapabilityException(StreamCapabilities.HSYNC);
116      }
117    }
118  }
119
120  @Override
121  protected void closeOutputIfNecessary() {
122    if (this.output != null) {
123      try {
124        this.output.close();
125      } catch (IOException e) {
126        LOG.warn("Close output failed", e);
127      }
128    }
129  }
130
131  @Override
132  protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException {
133    output.write(magic);
134    header.writeDelimitedTo(output);
135    return output.getPos();
136  }
137
138  @Override
139  protected OutputStream getOutputStreamForCellEncoder() {
140    return this.output;
141  }
142
143  @Override
144  protected long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) throws IOException {
145    trailer.writeTo(output);
146    output.writeInt(trailer.getSerializedSize());
147    output.write(magic);
148    return output.getPos();
149  }
150}