001/** 002 * Copyright The Apache Software Foundation 003 * 004 * Licensed to the Apache Software Foundation (ASF) under one or more 005 * contributor license agreements. See the NOTICE file distributed with this 006 * work for additional information regarding copyright ownership. The ASF 007 * licenses this file to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 015 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 016 * License for the specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.hadoop.hbase.io.hfile.bucket; 020 021import java.io.File; 022import java.io.IOException; 023import java.io.RandomAccessFile; 024import java.nio.ByteBuffer; 025import java.nio.channels.ClosedByInterruptException; 026import java.nio.channels.ClosedChannelException; 027import java.nio.channels.FileChannel; 028import java.util.Arrays; 029import org.apache.hadoop.hbase.io.hfile.Cacheable; 030import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType; 031import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; 032import org.apache.hadoop.hbase.nio.ByteBuff; 033import org.apache.hadoop.hbase.nio.SingleByteBuff; 034import org.apache.hadoop.util.StringUtils; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 040import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 041 042/** 043 * IO engine that stores data to a file on the local file system. 044 */ 045@InterfaceAudience.Private 046public class FileIOEngine implements IOEngine { 047 private static final Logger LOG = LoggerFactory.getLogger(FileIOEngine.class); 048 public static final String FILE_DELIMITER = ","; 049 private final String[] filePaths; 050 private final FileChannel[] fileChannels; 051 private final RandomAccessFile[] rafs; 052 053 private final long sizePerFile; 054 private final long capacity; 055 056 private FileReadAccessor readAccessor = new FileReadAccessor(); 057 private FileWriteAccessor writeAccessor = new FileWriteAccessor(); 058 059 public FileIOEngine(long capacity, boolean maintainPersistence, String... filePaths) 060 throws IOException { 061 this.sizePerFile = capacity / filePaths.length; 062 this.capacity = this.sizePerFile * filePaths.length; 063 this.filePaths = filePaths; 064 this.fileChannels = new FileChannel[filePaths.length]; 065 if (!maintainPersistence) { 066 for (String filePath : filePaths) { 067 File file = new File(filePath); 068 if (file.exists()) { 069 if (LOG.isDebugEnabled()) { 070 LOG.debug("File " + filePath + " already exists. Deleting!!"); 071 } 072 file.delete(); 073 // If deletion fails still we can manage with the writes 074 } 075 } 076 } 077 this.rafs = new RandomAccessFile[filePaths.length]; 078 for (int i = 0; i < filePaths.length; i++) { 079 String filePath = filePaths[i]; 080 try { 081 rafs[i] = new RandomAccessFile(filePath, "rw"); 082 long totalSpace = new File(filePath).getTotalSpace(); 083 if (totalSpace < sizePerFile) { 084 // The next setting length will throw exception,logging this message 085 // is just used for the detail reason of exception, 086 String msg = "Only " + StringUtils.byteDesc(totalSpace) 087 + " total space under " + filePath + ", not enough for requested " 088 + StringUtils.byteDesc(sizePerFile); 089 LOG.warn(msg); 090 } 091 rafs[i].setLength(sizePerFile); 092 fileChannels[i] = rafs[i].getChannel(); 093 LOG.info("Allocating cache " + StringUtils.byteDesc(sizePerFile) 094 + ", on the path:" + filePath); 095 } catch (IOException fex) { 096 LOG.error("Failed allocating cache on " + filePath, fex); 097 shutdown(); 098 throw fex; 099 } 100 } 101 } 102 103 @Override 104 public String toString() { 105 return "ioengine=" + this.getClass().getSimpleName() + ", paths=" 106 + Arrays.asList(filePaths) + ", capacity=" + String.format("%,d", this.capacity); 107 } 108 109 /** 110 * File IO engine is always able to support persistent storage for the cache 111 * @return true 112 */ 113 @Override 114 public boolean isPersistent() { 115 return true; 116 } 117 118 /** 119 * Transfers data from file to the given byte buffer 120 * @param offset The offset in the file where the first byte to be read 121 * @param length The length of buffer that should be allocated for reading 122 * from the file channel 123 * @return number of bytes read 124 * @throws IOException 125 */ 126 @Override 127 public Cacheable read(long offset, int length, CacheableDeserializer<Cacheable> deserializer) 128 throws IOException { 129 Preconditions.checkArgument(length >= 0, "Length of read can not be less than 0."); 130 ByteBuffer dstBuffer = ByteBuffer.allocate(length); 131 if (length != 0) { 132 accessFile(readAccessor, dstBuffer, offset); 133 // The buffer created out of the fileChannel is formed by copying the data from the file 134 // Hence in this case there is no shared memory that we point to. Even if the BucketCache evicts 135 // this buffer from the file the data is already copied and there is no need to ensure that 136 // the results are not corrupted before consuming them. 137 if (dstBuffer.limit() != length) { 138 throw new RuntimeException("Only " + dstBuffer.limit() + " bytes read, " + length 139 + " expected"); 140 } 141 } 142 return deserializer.deserialize(new SingleByteBuff(dstBuffer), true, MemoryType.EXCLUSIVE); 143 } 144 145 @VisibleForTesting 146 void closeFileChannels() { 147 for (FileChannel fileChannel: fileChannels) { 148 try { 149 fileChannel.close(); 150 } catch (IOException e) { 151 LOG.warn("Failed to close FileChannel", e); 152 } 153 } 154 } 155 156 /** 157 * Transfers data from the given byte buffer to file 158 * @param srcBuffer the given byte buffer from which bytes are to be read 159 * @param offset The offset in the file where the first byte to be written 160 * @throws IOException 161 */ 162 @Override 163 public void write(ByteBuffer srcBuffer, long offset) throws IOException { 164 if (!srcBuffer.hasRemaining()) { 165 return; 166 } 167 accessFile(writeAccessor, srcBuffer, offset); 168 } 169 170 /** 171 * Sync the data to file after writing 172 * @throws IOException 173 */ 174 @Override 175 public void sync() throws IOException { 176 for (int i = 0; i < fileChannels.length; i++) { 177 try { 178 if (fileChannels[i] != null) { 179 fileChannels[i].force(true); 180 } 181 } catch (IOException ie) { 182 LOG.warn("Failed syncing data to " + this.filePaths[i]); 183 throw ie; 184 } 185 } 186 } 187 188 /** 189 * Close the file 190 */ 191 @Override 192 public void shutdown() { 193 for (int i = 0; i < filePaths.length; i++) { 194 try { 195 if (fileChannels[i] != null) { 196 fileChannels[i].close(); 197 } 198 if (rafs[i] != null) { 199 rafs[i].close(); 200 } 201 } catch (IOException ex) { 202 LOG.error("Failed closing " + filePaths[i] + " when shudown the IOEngine", ex); 203 } 204 } 205 } 206 207 @Override 208 public void write(ByteBuff srcBuffer, long offset) throws IOException { 209 // When caching block into BucketCache there will be single buffer backing for this HFileBlock. 210 assert srcBuffer.hasArray(); 211 write(ByteBuffer.wrap(srcBuffer.array(), srcBuffer.arrayOffset(), 212 srcBuffer.remaining()), offset); 213 } 214 215 private void accessFile(FileAccessor accessor, ByteBuffer buffer, 216 long globalOffset) throws IOException { 217 int startFileNum = getFileNum(globalOffset); 218 int remainingAccessDataLen = buffer.remaining(); 219 int endFileNum = getFileNum(globalOffset + remainingAccessDataLen - 1); 220 int accessFileNum = startFileNum; 221 long accessOffset = getAbsoluteOffsetInFile(accessFileNum, globalOffset); 222 int bufLimit = buffer.limit(); 223 while (true) { 224 FileChannel fileChannel = fileChannels[accessFileNum]; 225 int accessLen = 0; 226 if (endFileNum > accessFileNum) { 227 // short the limit; 228 buffer.limit((int) (buffer.limit() - remainingAccessDataLen 229 + sizePerFile - accessOffset)); 230 } 231 try { 232 accessLen = accessor.access(fileChannel, buffer, accessOffset); 233 } catch (ClosedByInterruptException e) { 234 throw e; 235 } catch (ClosedChannelException e) { 236 LOG.warn("Caught ClosedChannelException accessing BucketCache, reopening file. ", e); 237 refreshFileConnection(accessFileNum); 238 continue; 239 } 240 // recover the limit 241 buffer.limit(bufLimit); 242 if (accessLen < remainingAccessDataLen) { 243 remainingAccessDataLen -= accessLen; 244 accessFileNum++; 245 accessOffset = 0; 246 } else { 247 break; 248 } 249 if (accessFileNum >= fileChannels.length) { 250 throw new IOException("Required data len " + StringUtils.byteDesc(buffer.remaining()) 251 + " exceed the engine's capacity " + StringUtils.byteDesc(capacity) + " where offset=" 252 + globalOffset); 253 } 254 } 255 } 256 257 /** 258 * Get the absolute offset in given file with the relative global offset. 259 * @param fileNum 260 * @param globalOffset 261 * @return the absolute offset 262 */ 263 private long getAbsoluteOffsetInFile(int fileNum, long globalOffset) { 264 return globalOffset - fileNum * sizePerFile; 265 } 266 267 private int getFileNum(long offset) { 268 if (offset < 0) { 269 throw new IllegalArgumentException("Unexpected offset " + offset); 270 } 271 int fileNum = (int) (offset / sizePerFile); 272 if (fileNum >= fileChannels.length) { 273 throw new RuntimeException("Not expected offset " + offset 274 + " where capacity=" + capacity); 275 } 276 return fileNum; 277 } 278 279 @VisibleForTesting 280 FileChannel[] getFileChannels() { 281 return fileChannels; 282 } 283 284 @VisibleForTesting 285 void refreshFileConnection(int accessFileNum) throws IOException { 286 FileChannel fileChannel = fileChannels[accessFileNum]; 287 if (fileChannel != null) { 288 fileChannel.close(); 289 } 290 rafs[accessFileNum] = new RandomAccessFile(filePaths[accessFileNum], "rw"); 291 fileChannels[accessFileNum] = rafs[accessFileNum].getChannel(); 292 } 293 294 private static interface FileAccessor { 295 int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset) 296 throws IOException; 297 } 298 299 private static class FileReadAccessor implements FileAccessor { 300 @Override 301 public int access(FileChannel fileChannel, ByteBuffer byteBuffer, 302 long accessOffset) throws IOException { 303 return fileChannel.read(byteBuffer, accessOffset); 304 } 305 } 306 307 private static class FileWriteAccessor implements FileAccessor { 308 @Override 309 public int access(FileChannel fileChannel, ByteBuffer byteBuffer, 310 long accessOffset) throws IOException { 311 return fileChannel.write(byteBuffer, accessOffset); 312 } 313 } 314}