001/** 002 * Copyright The Apache Software Foundation 003 * 004 * Licensed to the Apache Software Foundation (ASF) under one or more 005 * contributor license agreements. See the NOTICE file distributed with this 006 * work for additional information regarding copyright ownership. The ASF 007 * licenses this file to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 015 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 016 * License for the specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.hadoop.hbase.io.hfile.bucket; 020 021import java.io.File; 022import java.io.IOException; 023import java.io.RandomAccessFile; 024import java.nio.ByteBuffer; 025import java.nio.channels.ClosedByInterruptException; 026import java.nio.channels.ClosedChannelException; 027import java.nio.channels.FileChannel; 028import java.util.Arrays; 029import java.util.concurrent.locks.ReentrantLock; 030 031import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 032import org.apache.hadoop.hbase.io.hfile.Cacheable; 033import org.apache.hadoop.hbase.nio.ByteBuff; 034import org.apache.hadoop.util.StringUtils; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 040import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 041 042/** 043 * IO engine that stores data to a file on the local file system. 044 */ 045@InterfaceAudience.Private 046public class FileIOEngine extends PersistentIOEngine { 047 private static final Logger LOG = LoggerFactory.getLogger(FileIOEngine.class); 048 public static final String FILE_DELIMITER = ","; 049 private final FileChannel[] fileChannels; 050 private final RandomAccessFile[] rafs; 051 private final ReentrantLock[] channelLocks; 052 053 private final long sizePerFile; 054 private final long capacity; 055 056 private FileReadAccessor readAccessor = new FileReadAccessor(); 057 private FileWriteAccessor writeAccessor = new FileWriteAccessor(); 058 059 public FileIOEngine(long capacity, boolean maintainPersistence, String... filePaths) 060 throws IOException { 061 super(filePaths); 062 this.sizePerFile = capacity / filePaths.length; 063 this.capacity = this.sizePerFile * filePaths.length; 064 this.fileChannels = new FileChannel[filePaths.length]; 065 if (!maintainPersistence) { 066 for (String filePath : filePaths) { 067 File file = new File(filePath); 068 if (file.exists()) { 069 if (LOG.isDebugEnabled()) { 070 LOG.debug("File " + filePath + " already exists. Deleting!!"); 071 } 072 file.delete(); 073 // If deletion fails still we can manage with the writes 074 } 075 } 076 } 077 this.rafs = new RandomAccessFile[filePaths.length]; 078 this.channelLocks = new ReentrantLock[filePaths.length]; 079 for (int i = 0; i < filePaths.length; i++) { 080 String filePath = filePaths[i]; 081 try { 082 rafs[i] = new RandomAccessFile(filePath, "rw"); 083 long totalSpace = new File(filePath).getTotalSpace(); 084 if (totalSpace < sizePerFile) { 085 // The next setting length will throw exception,logging this message 086 // is just used for the detail reason of exception, 087 String msg = "Only " + StringUtils.byteDesc(totalSpace) 088 + " total space under " + filePath + ", not enough for requested " 089 + StringUtils.byteDesc(sizePerFile); 090 LOG.warn(msg); 091 } 092 File file = new File(filePath); 093 // setLength() method will change file's last modified time. So if don't do 094 // this check, wrong time will be used when calculating checksum. 095 if (file.length() != sizePerFile) { 096 rafs[i].setLength(sizePerFile); 097 } 098 fileChannels[i] = rafs[i].getChannel(); 099 channelLocks[i] = new ReentrantLock(); 100 LOG.info("Allocating cache " + StringUtils.byteDesc(sizePerFile) 101 + ", on the path:" + filePath); 102 } catch (IOException fex) { 103 LOG.error("Failed allocating cache on " + filePath, fex); 104 shutdown(); 105 throw fex; 106 } 107 } 108 } 109 110 @Override 111 public String toString() { 112 return "ioengine=" + this.getClass().getSimpleName() + ", paths=" 113 + Arrays.asList(filePaths) + ", capacity=" + String.format("%,d", this.capacity); 114 } 115 116 /** 117 * File IO engine is always able to support persistent storage for the cache 118 * @return true 119 */ 120 @Override 121 public boolean isPersistent() { 122 return true; 123 } 124 125 /** 126 * Transfers data from file to the given byte buffer 127 * @param be an {@link BucketEntry} which maintains an (offset, len, refCnt) 128 * @return the {@link Cacheable} with block data inside. 129 * @throws IOException if any IO error happen. 130 */ 131 @Override 132 public Cacheable read(BucketEntry be) throws IOException { 133 long offset = be.offset(); 134 int length = be.getLength(); 135 Preconditions.checkArgument(length >= 0, "Length of read can not be less than 0."); 136 ByteBuff dstBuff = be.allocator.allocate(length); 137 if (length != 0) { 138 try { 139 accessFile(readAccessor, dstBuff, offset); 140 // The buffer created out of the fileChannel is formed by copying the data from the file 141 // Hence in this case there is no shared memory that we point to. Even if the BucketCache 142 // evicts this buffer from the file the data is already copied and there is no need to 143 // ensure that the results are not corrupted before consuming them. 144 if (dstBuff.limit() != length) { 145 throw new IllegalArgumentIOException( 146 "Only " + dstBuff.limit() + " bytes read, " + length + " expected"); 147 } 148 } catch (IOException ioe) { 149 dstBuff.release(); 150 throw ioe; 151 } 152 } 153 dstBuff.rewind(); 154 return be.wrapAsCacheable(dstBuff); 155 } 156 157 @VisibleForTesting 158 void closeFileChannels() { 159 for (FileChannel fileChannel: fileChannels) { 160 try { 161 fileChannel.close(); 162 } catch (IOException e) { 163 LOG.warn("Failed to close FileChannel", e); 164 } 165 } 166 } 167 168 /** 169 * Transfers data from the given byte buffer to file 170 * @param srcBuffer the given byte buffer from which bytes are to be read 171 * @param offset The offset in the file where the first byte to be written 172 * @throws IOException 173 */ 174 @Override 175 public void write(ByteBuffer srcBuffer, long offset) throws IOException { 176 write(ByteBuff.wrap(srcBuffer), offset); 177 } 178 179 /** 180 * Sync the data to file after writing 181 * @throws IOException 182 */ 183 @Override 184 public void sync() throws IOException { 185 for (int i = 0; i < fileChannels.length; i++) { 186 try { 187 if (fileChannels[i] != null) { 188 fileChannels[i].force(true); 189 } 190 } catch (IOException ie) { 191 LOG.warn("Failed syncing data to " + this.filePaths[i]); 192 throw ie; 193 } 194 } 195 } 196 197 /** 198 * Close the file 199 */ 200 @Override 201 public void shutdown() { 202 for (int i = 0; i < filePaths.length; i++) { 203 try { 204 if (fileChannels[i] != null) { 205 fileChannels[i].close(); 206 } 207 if (rafs[i] != null) { 208 rafs[i].close(); 209 } 210 } catch (IOException ex) { 211 LOG.error("Failed closing " + filePaths[i] + " when shudown the IOEngine", ex); 212 } 213 } 214 } 215 216 @Override 217 public void write(ByteBuff srcBuff, long offset) throws IOException { 218 if (!srcBuff.hasRemaining()) { 219 return; 220 } 221 accessFile(writeAccessor, srcBuff, offset); 222 } 223 224 private void accessFile(FileAccessor accessor, ByteBuff buff, 225 long globalOffset) throws IOException { 226 int startFileNum = getFileNum(globalOffset); 227 int remainingAccessDataLen = buff.remaining(); 228 int endFileNum = getFileNum(globalOffset + remainingAccessDataLen - 1); 229 int accessFileNum = startFileNum; 230 long accessOffset = getAbsoluteOffsetInFile(accessFileNum, globalOffset); 231 int bufLimit = buff.limit(); 232 while (true) { 233 FileChannel fileChannel = fileChannels[accessFileNum]; 234 int accessLen = 0; 235 if (endFileNum > accessFileNum) { 236 // short the limit; 237 buff.limit((int) (buff.limit() - remainingAccessDataLen + sizePerFile - accessOffset)); 238 } 239 try { 240 accessLen = accessor.access(fileChannel, buff, accessOffset); 241 } catch (ClosedByInterruptException e) { 242 throw e; 243 } catch (ClosedChannelException e) { 244 refreshFileConnection(accessFileNum, e); 245 continue; 246 } 247 // recover the limit 248 buff.limit(bufLimit); 249 if (accessLen < remainingAccessDataLen) { 250 remainingAccessDataLen -= accessLen; 251 accessFileNum++; 252 accessOffset = 0; 253 } else { 254 break; 255 } 256 if (accessFileNum >= fileChannels.length) { 257 throw new IOException("Required data len " + StringUtils.byteDesc(buff.remaining()) 258 + " exceed the engine's capacity " + StringUtils.byteDesc(capacity) + " where offset=" 259 + globalOffset); 260 } 261 } 262 } 263 264 /** 265 * Get the absolute offset in given file with the relative global offset. 266 * @param fileNum 267 * @param globalOffset 268 * @return the absolute offset 269 */ 270 private long getAbsoluteOffsetInFile(int fileNum, long globalOffset) { 271 return globalOffset - fileNum * sizePerFile; 272 } 273 274 private int getFileNum(long offset) { 275 if (offset < 0) { 276 throw new IllegalArgumentException("Unexpected offset " + offset); 277 } 278 int fileNum = (int) (offset / sizePerFile); 279 if (fileNum >= fileChannels.length) { 280 throw new RuntimeException("Not expected offset " + offset 281 + " where capacity=" + capacity); 282 } 283 return fileNum; 284 } 285 286 @VisibleForTesting 287 FileChannel[] getFileChannels() { 288 return fileChannels; 289 } 290 291 @VisibleForTesting 292 void refreshFileConnection(int accessFileNum, IOException ioe) throws IOException { 293 ReentrantLock channelLock = channelLocks[accessFileNum]; 294 channelLock.lock(); 295 try { 296 FileChannel fileChannel = fileChannels[accessFileNum]; 297 if (fileChannel != null) { 298 // Don't re-open a channel if we were waiting on another 299 // thread to re-open the channel and it is now open. 300 if (fileChannel.isOpen()) { 301 return; 302 } 303 fileChannel.close(); 304 } 305 LOG.warn("Caught ClosedChannelException accessing BucketCache, reopening file: " 306 + filePaths[accessFileNum], ioe); 307 rafs[accessFileNum] = new RandomAccessFile(filePaths[accessFileNum], "rw"); 308 fileChannels[accessFileNum] = rafs[accessFileNum].getChannel(); 309 } finally{ 310 channelLock.unlock(); 311 } 312 } 313 314 private interface FileAccessor { 315 int access(FileChannel fileChannel, ByteBuff buff, long accessOffset) 316 throws IOException; 317 } 318 319 private static class FileReadAccessor implements FileAccessor { 320 @Override 321 public int access(FileChannel fileChannel, ByteBuff buff, 322 long accessOffset) throws IOException { 323 return buff.read(fileChannel, accessOffset); 324 } 325 } 326 327 private static class FileWriteAccessor implements FileAccessor { 328 @Override 329 public int access(FileChannel fileChannel, ByteBuff buff, 330 long accessOffset) throws IOException { 331 return buff.write(fileChannel, accessOffset); 332 } 333 } 334}