001/** 002 * Copyright The Apache Software Foundation 003 * 004 * Licensed to the Apache Software Foundation (ASF) under one or more 005 * contributor license agreements. See the NOTICE file distributed with this 006 * work for additional information regarding copyright ownership. The ASF 007 * licenses this file to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 015 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 016 * License for the specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.hadoop.hbase.io.hfile.bucket; 020 021import java.io.File; 022import java.io.IOException; 023import java.io.RandomAccessFile; 024import java.nio.ByteBuffer; 025import java.nio.channels.ClosedByInterruptException; 026import java.nio.channels.ClosedChannelException; 027import java.nio.channels.FileChannel; 028import java.util.Arrays; 029import java.util.concurrent.locks.ReentrantLock; 030import org.apache.hadoop.hbase.io.hfile.Cacheable; 031import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType; 032import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; 033import org.apache.hadoop.hbase.nio.ByteBuff; 034import org.apache.hadoop.hbase.nio.SingleByteBuff; 035import org.apache.hadoop.util.StringUtils; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 041import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 042 043/** 044 * IO engine that stores data to a file on the local file system. 045 */ 046@InterfaceAudience.Private 047public class FileIOEngine implements IOEngine { 048 private static final Logger LOG = LoggerFactory.getLogger(FileIOEngine.class); 049 public static final String FILE_DELIMITER = ","; 050 private final String[] filePaths; 051 private final FileChannel[] fileChannels; 052 private final RandomAccessFile[] rafs; 053 private final ReentrantLock[] channelLocks; 054 055 private final long sizePerFile; 056 private final long capacity; 057 058 private FileReadAccessor readAccessor = new FileReadAccessor(); 059 private FileWriteAccessor writeAccessor = new FileWriteAccessor(); 060 061 public FileIOEngine(long capacity, boolean maintainPersistence, String... filePaths) 062 throws IOException { 063 this.sizePerFile = capacity / filePaths.length; 064 this.capacity = this.sizePerFile * filePaths.length; 065 this.filePaths = filePaths; 066 this.fileChannels = new FileChannel[filePaths.length]; 067 if (!maintainPersistence) { 068 for (String filePath : filePaths) { 069 File file = new File(filePath); 070 if (file.exists()) { 071 if (LOG.isDebugEnabled()) { 072 LOG.debug("File " + filePath + " already exists. Deleting!!"); 073 } 074 file.delete(); 075 // If deletion fails still we can manage with the writes 076 } 077 } 078 } 079 this.rafs = new RandomAccessFile[filePaths.length]; 080 this.channelLocks = new ReentrantLock[filePaths.length]; 081 for (int i = 0; i < filePaths.length; i++) { 082 String filePath = filePaths[i]; 083 try { 084 rafs[i] = new RandomAccessFile(filePath, "rw"); 085 long totalSpace = new File(filePath).getTotalSpace(); 086 if (totalSpace < sizePerFile) { 087 // The next setting length will throw exception,logging this message 088 // is just used for the detail reason of exception, 089 String msg = "Only " + StringUtils.byteDesc(totalSpace) 090 + " total space under " + filePath + ", not enough for requested " 091 + StringUtils.byteDesc(sizePerFile); 092 LOG.warn(msg); 093 } 094 rafs[i].setLength(sizePerFile); 095 fileChannels[i] = rafs[i].getChannel(); 096 channelLocks[i] = new ReentrantLock(); 097 LOG.info("Allocating cache " + StringUtils.byteDesc(sizePerFile) 098 + ", on the path:" + filePath); 099 } catch (IOException fex) { 100 LOG.error("Failed allocating cache on " + filePath, fex); 101 shutdown(); 102 throw fex; 103 } 104 } 105 } 106 107 @Override 108 public String toString() { 109 return "ioengine=" + this.getClass().getSimpleName() + ", paths=" 110 + Arrays.asList(filePaths) + ", capacity=" + String.format("%,d", this.capacity); 111 } 112 113 /** 114 * File IO engine is always able to support persistent storage for the cache 115 * @return true 116 */ 117 @Override 118 public boolean isPersistent() { 119 return true; 120 } 121 122 /** 123 * Transfers data from file to the given byte buffer 124 * @param offset The offset in the file where the first byte to be read 125 * @param length The length of buffer that should be allocated for reading 126 * from the file channel 127 * @return number of bytes read 128 * @throws IOException 129 */ 130 @Override 131 public Cacheable read(long offset, int length, CacheableDeserializer<Cacheable> deserializer) 132 throws IOException { 133 Preconditions.checkArgument(length >= 0, "Length of read can not be less than 0."); 134 ByteBuffer dstBuffer = ByteBuffer.allocate(length); 135 if (length != 0) { 136 accessFile(readAccessor, dstBuffer, offset); 137 // The buffer created out of the fileChannel is formed by copying the data from the file 138 // Hence in this case there is no shared memory that we point to. Even if the BucketCache evicts 139 // this buffer from the file the data is already copied and there is no need to ensure that 140 // the results are not corrupted before consuming them. 141 if (dstBuffer.limit() != length) { 142 throw new RuntimeException("Only " + dstBuffer.limit() + " bytes read, " + length 143 + " expected"); 144 } 145 } 146 return deserializer.deserialize(new SingleByteBuff(dstBuffer), true, MemoryType.EXCLUSIVE); 147 } 148 149 @VisibleForTesting 150 void closeFileChannels() { 151 for (FileChannel fileChannel: fileChannels) { 152 try { 153 fileChannel.close(); 154 } catch (IOException e) { 155 LOG.warn("Failed to close FileChannel", e); 156 } 157 } 158 } 159 160 /** 161 * Transfers data from the given byte buffer to file 162 * @param srcBuffer the given byte buffer from which bytes are to be read 163 * @param offset The offset in the file where the first byte to be written 164 * @throws IOException 165 */ 166 @Override 167 public void write(ByteBuffer srcBuffer, long offset) throws IOException { 168 if (!srcBuffer.hasRemaining()) { 169 return; 170 } 171 accessFile(writeAccessor, srcBuffer, offset); 172 } 173 174 /** 175 * Sync the data to file after writing 176 * @throws IOException 177 */ 178 @Override 179 public void sync() throws IOException { 180 for (int i = 0; i < fileChannels.length; i++) { 181 try { 182 if (fileChannels[i] != null) { 183 fileChannels[i].force(true); 184 } 185 } catch (IOException ie) { 186 LOG.warn("Failed syncing data to " + this.filePaths[i]); 187 throw ie; 188 } 189 } 190 } 191 192 /** 193 * Close the file 194 */ 195 @Override 196 public void shutdown() { 197 for (int i = 0; i < filePaths.length; i++) { 198 try { 199 if (fileChannels[i] != null) { 200 fileChannels[i].close(); 201 } 202 if (rafs[i] != null) { 203 rafs[i].close(); 204 } 205 } catch (IOException ex) { 206 LOG.error("Failed closing " + filePaths[i] + " when shudown the IOEngine", ex); 207 } 208 } 209 } 210 211 @Override 212 public void write(ByteBuff srcBuffer, long offset) throws IOException { 213 // When caching block into BucketCache there will be single buffer backing for this HFileBlock. 214 assert srcBuffer.hasArray(); 215 write(ByteBuffer.wrap(srcBuffer.array(), srcBuffer.arrayOffset(), 216 srcBuffer.remaining()), offset); 217 } 218 219 private void accessFile(FileAccessor accessor, ByteBuffer buffer, 220 long globalOffset) throws IOException { 221 int startFileNum = getFileNum(globalOffset); 222 int remainingAccessDataLen = buffer.remaining(); 223 int endFileNum = getFileNum(globalOffset + remainingAccessDataLen - 1); 224 int accessFileNum = startFileNum; 225 long accessOffset = getAbsoluteOffsetInFile(accessFileNum, globalOffset); 226 int bufLimit = buffer.limit(); 227 while (true) { 228 FileChannel fileChannel = fileChannels[accessFileNum]; 229 int accessLen = 0; 230 if (endFileNum > accessFileNum) { 231 // short the limit; 232 buffer.limit((int) (buffer.limit() - remainingAccessDataLen 233 + sizePerFile - accessOffset)); 234 } 235 try { 236 accessLen = accessor.access(fileChannel, buffer, accessOffset); 237 } catch (ClosedByInterruptException e) { 238 throw e; 239 } catch (ClosedChannelException e) { 240 refreshFileConnection(accessFileNum, e); 241 continue; 242 } 243 // recover the limit 244 buffer.limit(bufLimit); 245 if (accessLen < remainingAccessDataLen) { 246 remainingAccessDataLen -= accessLen; 247 accessFileNum++; 248 accessOffset = 0; 249 } else { 250 break; 251 } 252 if (accessFileNum >= fileChannels.length) { 253 throw new IOException("Required data len " + StringUtils.byteDesc(buffer.remaining()) 254 + " exceed the engine's capacity " + StringUtils.byteDesc(capacity) + " where offset=" 255 + globalOffset); 256 } 257 } 258 } 259 260 /** 261 * Get the absolute offset in given file with the relative global offset. 262 * @param fileNum 263 * @param globalOffset 264 * @return the absolute offset 265 */ 266 private long getAbsoluteOffsetInFile(int fileNum, long globalOffset) { 267 return globalOffset - fileNum * sizePerFile; 268 } 269 270 private int getFileNum(long offset) { 271 if (offset < 0) { 272 throw new IllegalArgumentException("Unexpected offset " + offset); 273 } 274 int fileNum = (int) (offset / sizePerFile); 275 if (fileNum >= fileChannels.length) { 276 throw new RuntimeException("Not expected offset " + offset 277 + " where capacity=" + capacity); 278 } 279 return fileNum; 280 } 281 282 @VisibleForTesting 283 FileChannel[] getFileChannels() { 284 return fileChannels; 285 } 286 287 @VisibleForTesting 288 void refreshFileConnection(int accessFileNum, IOException ioe) throws IOException { 289 ReentrantLock channelLock = channelLocks[accessFileNum]; 290 channelLock.lock(); 291 try { 292 FileChannel fileChannel = fileChannels[accessFileNum]; 293 if (fileChannel != null) { 294 // Don't re-open a channel if we were waiting on another 295 // thread to re-open the channel and it is now open. 296 if (fileChannel.isOpen()) { 297 return; 298 } 299 fileChannel.close(); 300 } 301 LOG.warn("Caught ClosedChannelException accessing BucketCache, reopening file: " 302 + filePaths[accessFileNum], ioe); 303 rafs[accessFileNum] = new RandomAccessFile(filePaths[accessFileNum], "rw"); 304 fileChannels[accessFileNum] = rafs[accessFileNum].getChannel(); 305 } finally{ 306 channelLock.unlock(); 307 } 308 } 309 310 private static interface FileAccessor { 311 int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset) 312 throws IOException; 313 } 314 315 private static class FileReadAccessor implements FileAccessor { 316 @Override 317 public int access(FileChannel fileChannel, ByteBuffer byteBuffer, 318 long accessOffset) throws IOException { 319 return fileChannel.read(byteBuffer, accessOffset); 320 } 321 } 322 323 private static class FileWriteAccessor implements FileAccessor { 324 @Override 325 public int access(FileChannel fileChannel, ByteBuffer byteBuffer, 326 long accessOffset) throws IOException { 327 return fileChannel.write(byteBuffer, accessOffset); 328 } 329 } 330}