001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import java.io.File; 021import java.io.IOException; 022import java.io.RandomAccessFile; 023import java.nio.ByteBuffer; 024import java.nio.channels.ClosedByInterruptException; 025import java.nio.channels.ClosedChannelException; 026import java.nio.channels.FileChannel; 027import java.util.Arrays; 028import java.util.concurrent.locks.ReentrantLock; 029import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 030import org.apache.hadoop.hbase.io.hfile.Cacheable; 031import org.apache.hadoop.hbase.nio.ByteBuff; 032import org.apache.hadoop.util.StringUtils; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 038 039/** 040 * IO engine that stores data to a file on the local file system. 041 */ 042@InterfaceAudience.Private 043public class FileIOEngine extends PersistentIOEngine { 044 private static final Logger LOG = LoggerFactory.getLogger(FileIOEngine.class); 045 public static final String FILE_DELIMITER = ","; 046 private final FileChannel[] fileChannels; 047 private final RandomAccessFile[] rafs; 048 private final ReentrantLock[] channelLocks; 049 050 private final long sizePerFile; 051 private final long capacity; 052 053 private FileReadAccessor readAccessor = new FileReadAccessor(); 054 private FileWriteAccessor writeAccessor = new FileWriteAccessor(); 055 056 public FileIOEngine(long capacity, boolean maintainPersistence, String... filePaths) 057 throws IOException { 058 super(filePaths); 059 this.sizePerFile = capacity / filePaths.length; 060 this.capacity = this.sizePerFile * filePaths.length; 061 this.fileChannels = new FileChannel[filePaths.length]; 062 if (!maintainPersistence) { 063 for (String filePath : filePaths) { 064 File file = new File(filePath); 065 if (file.exists()) { 066 if (LOG.isDebugEnabled()) { 067 LOG.debug("File " + filePath + " already exists. Deleting!!"); 068 } 069 file.delete(); 070 // If deletion fails still we can manage with the writes 071 } 072 } 073 } 074 this.rafs = new RandomAccessFile[filePaths.length]; 075 this.channelLocks = new ReentrantLock[filePaths.length]; 076 for (int i = 0; i < filePaths.length; i++) { 077 String filePath = filePaths[i]; 078 try { 079 rafs[i] = new RandomAccessFile(filePath, "rw"); 080 long totalSpace = new File(filePath).getTotalSpace(); 081 if (totalSpace < sizePerFile) { 082 // The next setting length will throw exception,logging this message 083 // is just used for the detail reason of exception, 084 String msg = "Only " + StringUtils.byteDesc(totalSpace) + " total space under " + filePath 085 + ", not enough for requested " + StringUtils.byteDesc(sizePerFile); 086 LOG.warn(msg); 087 } 088 File file = new File(filePath); 089 // setLength() method will change file's last modified time. So if don't do 090 // this check, wrong time will be used when calculating checksum. 091 if (file.length() != sizePerFile) { 092 rafs[i].setLength(sizePerFile); 093 } 094 fileChannels[i] = rafs[i].getChannel(); 095 channelLocks[i] = new ReentrantLock(); 096 LOG.info( 097 "Allocating cache " + StringUtils.byteDesc(sizePerFile) + ", on the path:" + filePath); 098 } catch (IOException fex) { 099 LOG.error("Failed allocating cache on " + filePath, fex); 100 shutdown(); 101 throw fex; 102 } 103 } 104 } 105 106 @Override 107 public String toString() { 108 return "ioengine=" + this.getClass().getSimpleName() + ", paths=" + Arrays.asList(filePaths) 109 + ", capacity=" + String.format("%,d", this.capacity); 110 } 111 112 /** 113 * File IO engine is always able to support persistent storage for the cache n 114 */ 115 @Override 116 public boolean isPersistent() { 117 return true; 118 } 119 120 /** 121 * Transfers data from file to the given byte buffer 122 * @param be an {@link BucketEntry} which maintains an (offset, len, refCnt) 123 * @return the {@link Cacheable} with block data inside. 124 * @throws IOException if any IO error happen. 125 */ 126 @Override 127 public Cacheable read(BucketEntry be) throws IOException { 128 long offset = be.offset(); 129 int length = be.getLength(); 130 Preconditions.checkArgument(length >= 0, "Length of read can not be less than 0."); 131 ByteBuff dstBuff = be.allocator.allocate(length); 132 if (length != 0) { 133 try { 134 accessFile(readAccessor, dstBuff, offset); 135 // The buffer created out of the fileChannel is formed by copying the data from the file 136 // Hence in this case there is no shared memory that we point to. Even if the BucketCache 137 // evicts this buffer from the file the data is already copied and there is no need to 138 // ensure that the results are not corrupted before consuming them. 139 if (dstBuff.limit() != length) { 140 throw new IllegalArgumentIOException( 141 "Only " + dstBuff.limit() + " bytes read, " + length + " expected"); 142 } 143 } catch (IOException ioe) { 144 dstBuff.release(); 145 throw ioe; 146 } 147 } 148 dstBuff.rewind(); 149 return be.wrapAsCacheable(dstBuff); 150 } 151 152 void closeFileChannels() { 153 for (FileChannel fileChannel : fileChannels) { 154 try { 155 fileChannel.close(); 156 } catch (IOException e) { 157 LOG.warn("Failed to close FileChannel", e); 158 } 159 } 160 } 161 162 /** 163 * Transfers data from the given byte buffer to file 164 * @param srcBuffer the given byte buffer from which bytes are to be read 165 * @param offset The offset in the file where the first byte to be written n 166 */ 167 @Override 168 public void write(ByteBuffer srcBuffer, long offset) throws IOException { 169 write(ByteBuff.wrap(srcBuffer), offset); 170 } 171 172 /** 173 * Sync the data to file after writing n 174 */ 175 @Override 176 public void sync() throws IOException { 177 for (int i = 0; i < fileChannels.length; i++) { 178 try { 179 if (fileChannels[i] != null) { 180 fileChannels[i].force(true); 181 } 182 } catch (IOException ie) { 183 LOG.warn("Failed syncing data to " + this.filePaths[i]); 184 throw ie; 185 } 186 } 187 } 188 189 /** 190 * Close the file 191 */ 192 @Override 193 public void shutdown() { 194 for (int i = 0; i < filePaths.length; i++) { 195 try { 196 if (fileChannels[i] != null) { 197 fileChannels[i].close(); 198 } 199 if (rafs[i] != null) { 200 rafs[i].close(); 201 } 202 } catch (IOException ex) { 203 LOG.error("Failed closing " + filePaths[i] + " when shudown the IOEngine", ex); 204 } 205 } 206 } 207 208 @Override 209 public void write(ByteBuff srcBuff, long offset) throws IOException { 210 if (!srcBuff.hasRemaining()) { 211 return; 212 } 213 accessFile(writeAccessor, srcBuff, offset); 214 } 215 216 private void accessFile(FileAccessor accessor, ByteBuff buff, long globalOffset) 217 throws IOException { 218 int startFileNum = getFileNum(globalOffset); 219 int remainingAccessDataLen = buff.remaining(); 220 int endFileNum = getFileNum(globalOffset + remainingAccessDataLen - 1); 221 int accessFileNum = startFileNum; 222 long accessOffset = getAbsoluteOffsetInFile(accessFileNum, globalOffset); 223 int bufLimit = buff.limit(); 224 while (true) { 225 FileChannel fileChannel = fileChannels[accessFileNum]; 226 int accessLen = 0; 227 if (endFileNum > accessFileNum) { 228 // short the limit; 229 buff.limit((int) (buff.limit() - remainingAccessDataLen + sizePerFile - accessOffset)); 230 } 231 try { 232 accessLen = accessor.access(fileChannel, buff, accessOffset); 233 } catch (ClosedByInterruptException e) { 234 throw e; 235 } catch (ClosedChannelException e) { 236 refreshFileConnection(accessFileNum, e); 237 continue; 238 } 239 // recover the limit 240 buff.limit(bufLimit); 241 if (accessLen < remainingAccessDataLen) { 242 remainingAccessDataLen -= accessLen; 243 accessFileNum++; 244 accessOffset = 0; 245 } else { 246 break; 247 } 248 if (accessFileNum >= fileChannels.length) { 249 throw new IOException("Required data len " + StringUtils.byteDesc(buff.remaining()) 250 + " exceed the engine's capacity " + StringUtils.byteDesc(capacity) + " where offset=" 251 + globalOffset); 252 } 253 } 254 } 255 256 /** 257 * Get the absolute offset in given file with the relative global offset. nn * @return the 258 * absolute offset 259 */ 260 private long getAbsoluteOffsetInFile(int fileNum, long globalOffset) { 261 return globalOffset - fileNum * sizePerFile; 262 } 263 264 private int getFileNum(long offset) { 265 if (offset < 0) { 266 throw new IllegalArgumentException("Unexpected offset " + offset); 267 } 268 int fileNum = (int) (offset / sizePerFile); 269 if (fileNum >= fileChannels.length) { 270 throw new RuntimeException("Not expected offset " + offset + " where capacity=" + capacity); 271 } 272 return fileNum; 273 } 274 275 FileChannel[] getFileChannels() { 276 return fileChannels; 277 } 278 279 void refreshFileConnection(int accessFileNum, IOException ioe) throws IOException { 280 ReentrantLock channelLock = channelLocks[accessFileNum]; 281 channelLock.lock(); 282 try { 283 FileChannel fileChannel = fileChannels[accessFileNum]; 284 if (fileChannel != null) { 285 // Don't re-open a channel if we were waiting on another 286 // thread to re-open the channel and it is now open. 287 if (fileChannel.isOpen()) { 288 return; 289 } 290 fileChannel.close(); 291 } 292 LOG.warn("Caught ClosedChannelException accessing BucketCache, reopening file: " 293 + filePaths[accessFileNum], ioe); 294 rafs[accessFileNum] = new RandomAccessFile(filePaths[accessFileNum], "rw"); 295 fileChannels[accessFileNum] = rafs[accessFileNum].getChannel(); 296 } finally { 297 channelLock.unlock(); 298 } 299 } 300 301 private interface FileAccessor { 302 int access(FileChannel fileChannel, ByteBuff buff, long accessOffset) throws IOException; 303 } 304 305 private static class FileReadAccessor implements FileAccessor { 306 @Override 307 public int access(FileChannel fileChannel, ByteBuff buff, long accessOffset) 308 throws IOException { 309 return buff.read(fileChannel, accessOffset); 310 } 311 } 312 313 private static class FileWriteAccessor implements FileAccessor { 314 @Override 315 public int access(FileChannel fileChannel, ByteBuff buff, long accessOffset) 316 throws IOException { 317 return buff.write(fileChannel, accessOffset); 318 } 319 } 320}