001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import java.io.IOException; 021import java.nio.ByteBuffer; 022import java.util.concurrent.CompletableFuture; 023import java.util.concurrent.ExecutorService; 024import java.util.concurrent.Executors; 025 026import org.apache.hadoop.fs.FSDataOutputStream; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.io.ByteArrayOutputStream; 029import org.apache.hadoop.hbase.util.CancelableProgressable; 030import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 031import org.apache.yetus.audience.InterfaceAudience; 032 033import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 034import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 035 036/** 037 * An {@link AsyncFSOutput} wraps a {@link FSDataOutputStream}. 038 */ 039@InterfaceAudience.Private 040public class WrapperAsyncFSOutput implements AsyncFSOutput { 041 042 private final FSDataOutputStream out; 043 044 private ByteArrayOutputStream buffer = new ByteArrayOutputStream(); 045 046 private final ExecutorService executor; 047 048 private volatile long syncedLength = 0; 049 050 public WrapperAsyncFSOutput(Path file, FSDataOutputStream out) { 051 this.out = out; 052 this.executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true) 053 .setNameFormat("AsyncFSOutputFlusher-" + file.toString().replace("%", "%%")).build()); 054 } 055 056 @Override 057 public void write(byte[] b) { 058 write(b, 0, b.length); 059 } 060 061 @Override 062 public void write(byte[] b, int off, int len) { 063 buffer.write(b, off, len); 064 } 065 066 @Override 067 public void writeInt(int i) { 068 buffer.writeInt(i); 069 } 070 071 @Override 072 public void write(ByteBuffer bb) { 073 buffer.write(bb, bb.position(), bb.remaining()); 074 } 075 076 @Override 077 public int buffered() { 078 return buffer.size(); 079 } 080 081 @Override 082 public DatanodeInfo[] getPipeline() { 083 return new DatanodeInfo[0]; 084 } 085 086 private void flush0(CompletableFuture<Long> future, ByteArrayOutputStream buffer, boolean sync) { 087 try { 088 if (buffer.size() > 0) { 089 out.write(buffer.getBuffer(), 0, buffer.size()); 090 if (sync) { 091 out.hsync(); 092 } else { 093 out.hflush(); 094 } 095 } 096 long pos = out.getPos(); 097 /** 098 * This flush0 method could only be called by single thread, so here we could 099 * safely overwrite without any synchronization. 100 */ 101 this.syncedLength = pos; 102 future.complete(pos); 103 } catch (IOException e) { 104 future.completeExceptionally(e); 105 return; 106 } 107 } 108 109 @Override 110 public CompletableFuture<Long> flush(boolean sync) { 111 CompletableFuture<Long> future = new CompletableFuture<>(); 112 ByteArrayOutputStream buffer = this.buffer; 113 this.buffer = new ByteArrayOutputStream(); 114 executor.execute(() -> flush0(future, buffer, sync)); 115 return future; 116 } 117 118 @Override 119 public void recoverAndClose(CancelableProgressable reporter) throws IOException { 120 executor.shutdown(); 121 out.close(); 122 } 123 124 @Override 125 public void close() throws IOException { 126 Preconditions.checkState(buffer.size() == 0, "should call flush first before calling close"); 127 executor.shutdown(); 128 out.close(); 129 } 130 131 @Override 132 public boolean isBroken() { 133 return false; 134 } 135 136 @Override 137 public long getSyncedLength() { 138 return this.syncedLength; 139 } 140}