001/** 002 * Copyright The Apache Software Foundation 003 * 004 * Licensed to the Apache Software Foundation (ASF) under one or more 005 * contributor license agreements. See the NOTICE file distributed with this 006 * work for additional information regarding copyright ownership. The ASF 007 * licenses this file to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 015 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 016 * License for the specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.hadoop.hbase.io.hfile.bucket; 020 021import java.io.File; 022import java.io.IOException; 023import java.io.RandomAccessFile; 024import java.nio.ByteBuffer; 025import java.nio.channels.ClosedByInterruptException; 026import java.nio.channels.ClosedChannelException; 027import java.nio.channels.FileChannel; 028import java.util.Arrays; 029import java.util.concurrent.locks.ReentrantLock; 030import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 031import org.apache.hadoop.hbase.io.hfile.Cacheable; 032import org.apache.hadoop.hbase.nio.ByteBuff; 033import org.apache.hadoop.util.StringUtils; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 039 040/** 041 * IO engine that stores data to a file on the local file system. 042 */ 043@InterfaceAudience.Private 044public class FileIOEngine extends PersistentIOEngine { 045 private static final Logger LOG = LoggerFactory.getLogger(FileIOEngine.class); 046 public static final String FILE_DELIMITER = ","; 047 private final FileChannel[] fileChannels; 048 private final RandomAccessFile[] rafs; 049 private final ReentrantLock[] channelLocks; 050 051 private final long sizePerFile; 052 private final long capacity; 053 054 private FileReadAccessor readAccessor = new FileReadAccessor(); 055 private FileWriteAccessor writeAccessor = new FileWriteAccessor(); 056 057 public FileIOEngine(long capacity, boolean maintainPersistence, String... filePaths) 058 throws IOException { 059 super(filePaths); 060 this.sizePerFile = capacity / filePaths.length; 061 this.capacity = this.sizePerFile * filePaths.length; 062 this.fileChannels = new FileChannel[filePaths.length]; 063 if (!maintainPersistence) { 064 for (String filePath : filePaths) { 065 File file = new File(filePath); 066 if (file.exists()) { 067 if (LOG.isDebugEnabled()) { 068 LOG.debug("File " + filePath + " already exists. Deleting!!"); 069 } 070 file.delete(); 071 // If deletion fails still we can manage with the writes 072 } 073 } 074 } 075 this.rafs = new RandomAccessFile[filePaths.length]; 076 this.channelLocks = new ReentrantLock[filePaths.length]; 077 for (int i = 0; i < filePaths.length; i++) { 078 String filePath = filePaths[i]; 079 try { 080 rafs[i] = new RandomAccessFile(filePath, "rw"); 081 long totalSpace = new File(filePath).getTotalSpace(); 082 if (totalSpace < sizePerFile) { 083 // The next setting length will throw exception,logging this message 084 // is just used for the detail reason of exception, 085 String msg = "Only " + StringUtils.byteDesc(totalSpace) 086 + " total space under " + filePath + ", not enough for requested " 087 + StringUtils.byteDesc(sizePerFile); 088 LOG.warn(msg); 089 } 090 File file = new File(filePath); 091 // setLength() method will change file's last modified time. So if don't do 092 // this check, wrong time will be used when calculating checksum. 093 if (file.length() != sizePerFile) { 094 rafs[i].setLength(sizePerFile); 095 } 096 fileChannels[i] = rafs[i].getChannel(); 097 channelLocks[i] = new ReentrantLock(); 098 LOG.info("Allocating cache " + StringUtils.byteDesc(sizePerFile) 099 + ", on the path:" + filePath); 100 } catch (IOException fex) { 101 LOG.error("Failed allocating cache on " + filePath, fex); 102 shutdown(); 103 throw fex; 104 } 105 } 106 } 107 108 @Override 109 public String toString() { 110 return "ioengine=" + this.getClass().getSimpleName() + ", paths=" 111 + Arrays.asList(filePaths) + ", capacity=" + String.format("%,d", this.capacity); 112 } 113 114 /** 115 * File IO engine is always able to support persistent storage for the cache 116 * @return true 117 */ 118 @Override 119 public boolean isPersistent() { 120 return true; 121 } 122 123 /** 124 * Transfers data from file to the given byte buffer 125 * @param be an {@link BucketEntry} which maintains an (offset, len, refCnt) 126 * @return the {@link Cacheable} with block data inside. 127 * @throws IOException if any IO error happen. 128 */ 129 @Override 130 public Cacheable read(BucketEntry be) throws IOException { 131 long offset = be.offset(); 132 int length = be.getLength(); 133 Preconditions.checkArgument(length >= 0, "Length of read can not be less than 0."); 134 ByteBuff dstBuff = be.allocator.allocate(length); 135 if (length != 0) { 136 try { 137 accessFile(readAccessor, dstBuff, offset); 138 // The buffer created out of the fileChannel is formed by copying the data from the file 139 // Hence in this case there is no shared memory that we point to. Even if the BucketCache 140 // evicts this buffer from the file the data is already copied and there is no need to 141 // ensure that the results are not corrupted before consuming them. 142 if (dstBuff.limit() != length) { 143 throw new IllegalArgumentIOException( 144 "Only " + dstBuff.limit() + " bytes read, " + length + " expected"); 145 } 146 } catch (IOException ioe) { 147 dstBuff.release(); 148 throw ioe; 149 } 150 } 151 dstBuff.rewind(); 152 return be.wrapAsCacheable(dstBuff); 153 } 154 155 void closeFileChannels() { 156 for (FileChannel fileChannel: fileChannels) { 157 try { 158 fileChannel.close(); 159 } catch (IOException e) { 160 LOG.warn("Failed to close FileChannel", e); 161 } 162 } 163 } 164 165 /** 166 * Transfers data from the given byte buffer to file 167 * @param srcBuffer the given byte buffer from which bytes are to be read 168 * @param offset The offset in the file where the first byte to be written 169 * @throws IOException 170 */ 171 @Override 172 public void write(ByteBuffer srcBuffer, long offset) throws IOException { 173 write(ByteBuff.wrap(srcBuffer), offset); 174 } 175 176 /** 177 * Sync the data to file after writing 178 * @throws IOException 179 */ 180 @Override 181 public void sync() throws IOException { 182 for (int i = 0; i < fileChannels.length; i++) { 183 try { 184 if (fileChannels[i] != null) { 185 fileChannels[i].force(true); 186 } 187 } catch (IOException ie) { 188 LOG.warn("Failed syncing data to " + this.filePaths[i]); 189 throw ie; 190 } 191 } 192 } 193 194 /** 195 * Close the file 196 */ 197 @Override 198 public void shutdown() { 199 for (int i = 0; i < filePaths.length; i++) { 200 try { 201 if (fileChannels[i] != null) { 202 fileChannels[i].close(); 203 } 204 if (rafs[i] != null) { 205 rafs[i].close(); 206 } 207 } catch (IOException ex) { 208 LOG.error("Failed closing " + filePaths[i] + " when shudown the IOEngine", ex); 209 } 210 } 211 } 212 213 @Override 214 public void write(ByteBuff srcBuff, long offset) throws IOException { 215 if (!srcBuff.hasRemaining()) { 216 return; 217 } 218 accessFile(writeAccessor, srcBuff, offset); 219 } 220 221 private void accessFile(FileAccessor accessor, ByteBuff buff, 222 long globalOffset) throws IOException { 223 int startFileNum = getFileNum(globalOffset); 224 int remainingAccessDataLen = buff.remaining(); 225 int endFileNum = getFileNum(globalOffset + remainingAccessDataLen - 1); 226 int accessFileNum = startFileNum; 227 long accessOffset = getAbsoluteOffsetInFile(accessFileNum, globalOffset); 228 int bufLimit = buff.limit(); 229 while (true) { 230 FileChannel fileChannel = fileChannels[accessFileNum]; 231 int accessLen = 0; 232 if (endFileNum > accessFileNum) { 233 // short the limit; 234 buff.limit((int) (buff.limit() - remainingAccessDataLen + sizePerFile - accessOffset)); 235 } 236 try { 237 accessLen = accessor.access(fileChannel, buff, accessOffset); 238 } catch (ClosedByInterruptException e) { 239 throw e; 240 } catch (ClosedChannelException e) { 241 refreshFileConnection(accessFileNum, e); 242 continue; 243 } 244 // recover the limit 245 buff.limit(bufLimit); 246 if (accessLen < remainingAccessDataLen) { 247 remainingAccessDataLen -= accessLen; 248 accessFileNum++; 249 accessOffset = 0; 250 } else { 251 break; 252 } 253 if (accessFileNum >= fileChannels.length) { 254 throw new IOException("Required data len " + StringUtils.byteDesc(buff.remaining()) 255 + " exceed the engine's capacity " + StringUtils.byteDesc(capacity) + " where offset=" 256 + globalOffset); 257 } 258 } 259 } 260 261 /** 262 * Get the absolute offset in given file with the relative global offset. 263 * @param fileNum 264 * @param globalOffset 265 * @return the absolute offset 266 */ 267 private long getAbsoluteOffsetInFile(int fileNum, long globalOffset) { 268 return globalOffset - fileNum * sizePerFile; 269 } 270 271 private int getFileNum(long offset) { 272 if (offset < 0) { 273 throw new IllegalArgumentException("Unexpected offset " + offset); 274 } 275 int fileNum = (int) (offset / sizePerFile); 276 if (fileNum >= fileChannels.length) { 277 throw new RuntimeException("Not expected offset " + offset 278 + " where capacity=" + capacity); 279 } 280 return fileNum; 281 } 282 283 FileChannel[] getFileChannels() { 284 return fileChannels; 285 } 286 287 void refreshFileConnection(int accessFileNum, IOException ioe) throws IOException { 288 ReentrantLock channelLock = channelLocks[accessFileNum]; 289 channelLock.lock(); 290 try { 291 FileChannel fileChannel = fileChannels[accessFileNum]; 292 if (fileChannel != null) { 293 // Don't re-open a channel if we were waiting on another 294 // thread to re-open the channel and it is now open. 295 if (fileChannel.isOpen()) { 296 return; 297 } 298 fileChannel.close(); 299 } 300 LOG.warn("Caught ClosedChannelException accessing BucketCache, reopening file: " 301 + filePaths[accessFileNum], ioe); 302 rafs[accessFileNum] = new RandomAccessFile(filePaths[accessFileNum], "rw"); 303 fileChannels[accessFileNum] = rafs[accessFileNum].getChannel(); 304 } finally{ 305 channelLock.unlock(); 306 } 307 } 308 309 private interface FileAccessor { 310 int access(FileChannel fileChannel, ByteBuff buff, long accessOffset) 311 throws IOException; 312 } 313 314 private static class FileReadAccessor implements FileAccessor { 315 @Override 316 public int access(FileChannel fileChannel, ByteBuff buff, 317 long accessOffset) throws IOException { 318 return buff.read(fileChannel, accessOffset); 319 } 320 } 321 322 private static class FileWriteAccessor implements FileAccessor { 323 @Override 324 public int access(FileChannel fileChannel, ByteBuff buff, 325 long accessOffset) throws IOException { 326 return buff.write(fileChannel, accessOffset); 327 } 328 } 329}