001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import java.io.IOException; 021import java.nio.ByteBuffer; 022import java.util.concurrent.CompletableFuture; 023import java.util.concurrent.ExecutorService; 024import java.util.concurrent.Executors; 025import org.apache.hadoop.fs.FSDataOutputStream; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.io.ByteArrayOutputStream; 028import org.apache.hadoop.hbase.util.CancelableProgressable; 029import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 030import org.apache.yetus.audience.InterfaceAudience; 031 032import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 033import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 034 035/** 036 * An {@link AsyncFSOutput} wraps a {@link FSDataOutputStream}. 037 */ 038@InterfaceAudience.Private 039public class WrapperAsyncFSOutput implements AsyncFSOutput { 040 041 private final FSDataOutputStream out; 042 043 private ByteArrayOutputStream buffer = new ByteArrayOutputStream(); 044 045 private final ExecutorService executor; 046 047 private volatile long syncedLength = 0; 048 049 public WrapperAsyncFSOutput(Path file, FSDataOutputStream out) { 050 this.out = out; 051 this.executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true) 052 .setNameFormat("AsyncFSOutputFlusher-" + file.toString().replace("%", "%%")).build()); 053 } 054 055 @Override 056 public void write(byte[] b) { 057 write(b, 0, b.length); 058 } 059 060 @Override 061 public void write(byte[] b, int off, int len) { 062 buffer.write(b, off, len); 063 } 064 065 @Override 066 public void writeInt(int i) { 067 buffer.writeInt(i); 068 } 069 070 @Override 071 public void write(ByteBuffer bb) { 072 buffer.write(bb, bb.position(), bb.remaining()); 073 } 074 075 @Override 076 public int buffered() { 077 return buffer.size(); 078 } 079 080 @Override 081 public DatanodeInfo[] getPipeline() { 082 return new DatanodeInfo[0]; 083 } 084 085 private void flush0(CompletableFuture<Long> future, ByteArrayOutputStream buffer, boolean sync) { 086 try { 087 if (buffer.size() > 0) { 088 out.write(buffer.getBuffer(), 0, buffer.size()); 089 if (sync) { 090 out.hsync(); 091 } else { 092 out.hflush(); 093 } 094 } 095 long pos = out.getPos(); 096 /** 097 * This flush0 method could only be called by single thread, so here we could safely overwrite 098 * without any synchronization. 099 */ 100 this.syncedLength = pos; 101 future.complete(pos); 102 } catch (IOException e) { 103 future.completeExceptionally(e); 104 return; 105 } 106 } 107 108 @Override 109 public CompletableFuture<Long> flush(boolean sync) { 110 CompletableFuture<Long> future = new CompletableFuture<>(); 111 ByteArrayOutputStream buffer = this.buffer; 112 this.buffer = new ByteArrayOutputStream(); 113 executor.execute(() -> flush0(future, buffer, sync)); 114 return future; 115 } 116 117 @Override 118 public void recoverAndClose(CancelableProgressable reporter) throws IOException { 119 executor.shutdown(); 120 out.close(); 121 } 122 123 @Override 124 public void close() throws IOException { 125 Preconditions.checkState(buffer.size() == 0, "should call flush first before calling close"); 126 executor.shutdown(); 127 out.close(); 128 } 129 130 @Override 131 public boolean isBroken() { 132 return false; 133 } 134 135 @Override 136 public long getSyncedLength() { 137 return this.syncedLength; 138 } 139}