001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import java.io.IOException; 021import java.io.OutputStream; 022import java.util.concurrent.atomic.AtomicLong; 023import org.apache.hadoop.fs.FSDataOutputStream; 024import org.apache.hadoop.fs.FSDataOutputStreamBuilder; 025import org.apache.hadoop.fs.FileSystem; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.fs.StreamCapabilities; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 030import org.apache.hadoop.hbase.util.AtomicUtils; 031import org.apache.hadoop.hbase.util.CommonFSUtils; 032import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; 033import org.apache.hadoop.hbase.wal.FSHLogProvider; 034import org.apache.hadoop.hbase.wal.WAL.Entry; 035import org.apache.hadoop.hdfs.DistributedFileSystem; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; 041import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; 042 043/** 044 * Writer for protobuf-based WAL. 045 */ 046@InterfaceAudience.Private 047public class ProtobufLogWriter extends AbstractProtobufLogWriter implements FSHLogProvider.Writer { 048 049 private static final Logger LOG = LoggerFactory.getLogger(ProtobufLogWriter.class); 050 051 protected FSDataOutputStream output; 052 053 private final AtomicLong syncedLength = new AtomicLong(0); 054 055 @Override 056 public void append(Entry entry) throws IOException { 057 entry.getKey().getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build() 058 .writeDelimitedTo(output); 059 for (Cell cell : entry.getEdit().getCells()) { 060 // cellEncoder must assume little about the stream, since we write PB and cells in turn. 061 cellEncoder.write(cell); 062 } 063 length.set(output.getPos()); 064 } 065 066 @Override 067 public void close() throws IOException { 068 if (this.output != null) { 069 if (!trailerWritten) { 070 writeWALTrailer(); 071 } 072 this.output.close(); 073 this.output = null; 074 } 075 } 076 077 @Override 078 public void sync(boolean forceSync) throws IOException { 079 FSDataOutputStream fsdos = this.output; 080 if (fsdos == null) { 081 return; // Presume closed 082 } 083 fsdos.flush(); 084 if (forceSync) { 085 fsdos.hsync(); 086 } else { 087 fsdos.hflush(); 088 } 089 AtomicUtils.updateMax(this.syncedLength, fsdos.getPos()); 090 } 091 092 @Override 093 public long getSyncedLength() { 094 return this.syncedLength.get(); 095 } 096 097 public FSDataOutputStream getStream() { 098 return this.output; 099 } 100 101 @Override 102 protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, 103 short replication, long blockSize, StreamSlowMonitor monitor, boolean noLocalWrite) 104 throws IOException, StreamLacksCapabilityException { 105 FSDataOutputStreamBuilder<?, ?> builder = fs.createFile(path).overwrite(overwritable) 106 .bufferSize(bufferSize).replication(replication).blockSize(blockSize); 107 if (builder instanceof DistributedFileSystem.HdfsDataOutputStreamBuilder) { 108 DistributedFileSystem.HdfsDataOutputStreamBuilder dfsBuilder = 109 (DistributedFileSystem.HdfsDataOutputStreamBuilder) builder; 110 dfsBuilder.replicate(); 111 if (noLocalWrite) { 112 dfsBuilder.noLocalWrite(); 113 } 114 this.output = dfsBuilder.build(); 115 } else { 116 this.output = builder.build(); 117 } 118 119 if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true)) { 120 if (!output.hasCapability(StreamCapabilities.HFLUSH)) { 121 throw new StreamLacksCapabilityException(StreamCapabilities.HFLUSH); 122 } 123 if (!output.hasCapability(StreamCapabilities.HSYNC)) { 124 throw new StreamLacksCapabilityException(StreamCapabilities.HSYNC); 125 } 126 } 127 } 128 129 @Override 130 protected void closeOutputIfNecessary() { 131 if (this.output != null) { 132 try { 133 this.output.close(); 134 } catch (IOException e) { 135 LOG.warn("Close output failed", e); 136 } 137 } 138 } 139 140 @Override 141 protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException { 142 output.write(magic); 143 header.writeDelimitedTo(output); 144 return output.getPos(); 145 } 146 147 @Override 148 protected OutputStream getOutputStreamForCellEncoder() { 149 return this.output; 150 } 151 152 @Override 153 protected long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) throws IOException { 154 trailer.writeTo(output); 155 output.writeInt(trailer.getSerializedSize()); 156 output.write(magic); 157 return output.getPos(); 158 } 159}