001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import java.io.IOException; 021import java.nio.ByteBuffer; 022import java.util.concurrent.CompletableFuture; 023import java.util.concurrent.ExecutorService; 024import java.util.concurrent.Executors; 025 026import org.apache.hadoop.fs.FSDataOutputStream; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.io.ByteArrayOutputStream; 029import org.apache.hadoop.hbase.util.CancelableProgressable; 030import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 031import org.apache.yetus.audience.InterfaceAudience; 032 033import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 034import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 035 036/** 037 * An {@link AsyncFSOutput} wraps a {@link FSDataOutputStream}. 038 */ 039@InterfaceAudience.Private 040public class WrapperAsyncFSOutput implements AsyncFSOutput { 041 042 private final FSDataOutputStream out; 043 044 private ByteArrayOutputStream buffer = new ByteArrayOutputStream(); 045 046 private final ExecutorService executor; 047 048 public WrapperAsyncFSOutput(Path file, FSDataOutputStream out) { 049 this.out = out; 050 this.executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true) 051 .setNameFormat("AsyncFSOutputFlusher-" + file.toString().replace("%", "%%")).build()); 052 } 053 054 @Override 055 public void write(byte[] b) { 056 write(b, 0, b.length); 057 } 058 059 @Override 060 public void write(byte[] b, int off, int len) { 061 buffer.write(b, off, len); 062 } 063 064 @Override 065 public void writeInt(int i) { 066 buffer.writeInt(i); 067 } 068 069 @Override 070 public void write(ByteBuffer bb) { 071 buffer.write(bb, bb.position(), bb.remaining()); 072 } 073 074 @Override 075 public int buffered() { 076 return buffer.size(); 077 } 078 079 @Override 080 public DatanodeInfo[] getPipeline() { 081 return new DatanodeInfo[0]; 082 } 083 084 private void flush0(CompletableFuture<Long> future, ByteArrayOutputStream buffer, boolean sync) { 085 try { 086 if (buffer.size() > 0) { 087 out.write(buffer.getBuffer(), 0, buffer.size()); 088 if (sync) { 089 out.hsync(); 090 } else { 091 out.hflush(); 092 } 093 } 094 future.complete(out.getPos()); 095 } catch (IOException e) { 096 future.completeExceptionally(e); 097 return; 098 } 099 } 100 101 @Override 102 public CompletableFuture<Long> flush(boolean sync) { 103 CompletableFuture<Long> future = new CompletableFuture<>(); 104 ByteArrayOutputStream buffer = this.buffer; 105 this.buffer = new ByteArrayOutputStream(); 106 executor.execute(() -> flush0(future, buffer, sync)); 107 return future; 108 } 109 110 @Override 111 public void recoverAndClose(CancelableProgressable reporter) throws IOException { 112 executor.shutdown(); 113 out.close(); 114 } 115 116 @Override 117 public void close() throws IOException { 118 Preconditions.checkState(buffer.size() == 0, "should call flush first before calling close"); 119 executor.shutdown(); 120 out.close(); 121 } 122 123 @Override 124 public boolean isBroken() { 125 return false; 126 } 127}