001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import java.io.File; 021import java.io.IOException; 022import java.io.RandomAccessFile; 023import java.nio.ByteBuffer; 024import java.nio.channels.ClosedByInterruptException; 025import java.nio.channels.ClosedChannelException; 026import java.nio.channels.FileChannel; 027import java.util.Arrays; 028import java.util.concurrent.locks.ReentrantLock; 029import org.apache.hadoop.hbase.HBaseIOException; 030import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 031import org.apache.hadoop.hbase.io.hfile.Cacheable; 032import org.apache.hadoop.hbase.nio.ByteBuff; 033import org.apache.hadoop.util.StringUtils; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 039 040/** 041 * IO engine that stores data to a file on the local file system. 042 */ 043@InterfaceAudience.Private 044public class FileIOEngine extends PersistentIOEngine { 045 private static final Logger LOG = LoggerFactory.getLogger(FileIOEngine.class); 046 public static final String FILE_DELIMITER = ","; 047 private final FileChannel[] fileChannels; 048 private final RandomAccessFile[] rafs; 049 private final ReentrantLock[] channelLocks; 050 051 private final long sizePerFile; 052 private final long capacity; 053 private boolean maintainPersistence; 054 055 private FileReadAccessor readAccessor = new FileReadAccessor(); 056 private FileWriteAccessor writeAccessor = new FileWriteAccessor(); 057 058 public FileIOEngine(long capacity, boolean maintainPersistence, String... filePaths) 059 throws IOException { 060 super(filePaths); 061 this.sizePerFile = capacity / filePaths.length; 062 this.capacity = this.sizePerFile * filePaths.length; 063 this.fileChannels = new FileChannel[filePaths.length]; 064 this.maintainPersistence = maintainPersistence; 065 if (!maintainPersistence) { 066 for (String filePath : filePaths) { 067 File file = new File(filePath); 068 if (file.exists()) { 069 if (LOG.isDebugEnabled()) { 070 LOG.debug("File " + filePath + " already exists. Deleting!!"); 071 } 072 file.delete(); 073 // If deletion fails still we can manage with the writes 074 } 075 } 076 } 077 this.rafs = new RandomAccessFile[filePaths.length]; 078 this.channelLocks = new ReentrantLock[filePaths.length]; 079 for (int i = 0; i < filePaths.length; i++) { 080 String filePath = filePaths[i]; 081 try { 082 rafs[i] = new RandomAccessFile(filePath, "rw"); 083 long totalSpace = new File(filePath).getTotalSpace(); 084 if (totalSpace < sizePerFile) { 085 // The next setting length will throw exception,logging this message 086 // is just used for the detail reason of exception, 087 String msg = "Only " + StringUtils.byteDesc(totalSpace) + " total space under " + filePath 088 + ", not enough for requested " + StringUtils.byteDesc(sizePerFile); 089 LOG.warn(msg); 090 } 091 File file = new File(filePath); 092 // setLength() method will change file's last modified time. So if don't do 093 // this check, wrong time will be used when calculating checksum. 094 if (file.length() != sizePerFile) { 095 rafs[i].setLength(sizePerFile); 096 } 097 fileChannels[i] = rafs[i].getChannel(); 098 channelLocks[i] = new ReentrantLock(); 099 LOG.info( 100 "Allocating cache " + StringUtils.byteDesc(sizePerFile) + ", on the path:" + filePath); 101 } catch (IOException fex) { 102 LOG.error("Failed allocating cache on " + filePath, fex); 103 shutdown(); 104 throw fex; 105 } 106 } 107 } 108 109 @Override 110 public String toString() { 111 return "ioengine=" + this.getClass().getSimpleName() + ", paths=" + Arrays.asList(filePaths) 112 + ", capacity=" + String.format("%,d", this.capacity); 113 } 114 115 /** 116 * File IO engine is always able to support persistent storage for the cache 117 */ 118 @Override 119 public boolean isPersistent() { 120 return true; 121 } 122 123 /** 124 * Transfers data from file to the given byte buffer 125 * @param be an {@link BucketEntry} which maintains an (offset, len, refCnt) 126 * @return the {@link Cacheable} with block data inside. 127 * @throws IOException if any IO error happen. 128 */ 129 @Override 130 public Cacheable read(BucketEntry be) throws IOException { 131 long offset = be.offset(); 132 int length = be.getLength(); 133 Preconditions.checkArgument(length >= 0, "Length of read can not be less than 0."); 134 ByteBuff dstBuff = be.allocator.allocate(length); 135 if (length != 0) { 136 try { 137 accessFile(readAccessor, dstBuff, offset); 138 // The buffer created out of the fileChannel is formed by copying the data from the file 139 // Hence in this case there is no shared memory that we point to. Even if the BucketCache 140 // evicts this buffer from the file the data is already copied and there is no need to 141 // ensure that the results are not corrupted before consuming them. 142 if (dstBuff.limit() != length) { 143 throw new IllegalArgumentIOException( 144 "Only " + dstBuff.limit() + " bytes read, " + length + " expected"); 145 } 146 } catch (IOException ioe) { 147 dstBuff.release(); 148 throw ioe; 149 } 150 } 151 if (maintainPersistence) { 152 dstBuff.rewind(); 153 long cachedNanoTime = dstBuff.getLong(); 154 if (be.getCachedTime() != cachedNanoTime) { 155 dstBuff.release(); 156 throw new HBaseIOException("The cached time recorded within the cached block: " 157 + cachedNanoTime + " differs from its bucket entry: " + be.getCachedTime()); 158 } 159 dstBuff.limit(length); 160 dstBuff = dstBuff.slice(); 161 } else { 162 dstBuff.rewind(); 163 } 164 return be.wrapAsCacheable(dstBuff); 165 } 166 167 void checkCacheTime(BucketEntry be) throws IOException { 168 long offset = be.offset(); 169 ByteBuff dstBuff = be.allocator.allocate(Long.BYTES); 170 try { 171 accessFile(readAccessor, dstBuff, offset); 172 } catch (IOException ioe) { 173 dstBuff.release(); 174 throw ioe; 175 } 176 dstBuff.rewind(); 177 long cachedNanoTime = dstBuff.getLong(); 178 if (be.getCachedTime() != cachedNanoTime) { 179 dstBuff.release(); 180 throw new HBaseIOException("The cached time recorded within the cached block: " 181 + cachedNanoTime + " differs from its bucket entry: " + be.getCachedTime()); 182 } 183 } 184 185 void closeFileChannels() { 186 for (FileChannel fileChannel : fileChannels) { 187 try { 188 fileChannel.close(); 189 } catch (IOException e) { 190 LOG.warn("Failed to close FileChannel", e); 191 } 192 } 193 } 194 195 /** 196 * Transfers data from the given byte buffer to file 197 * @param srcBuffer the given byte buffer from which bytes are to be read 198 * @param offset The offset in the file where the first byte to be written 199 */ 200 @Override 201 public void write(ByteBuffer srcBuffer, long offset) throws IOException { 202 write(ByteBuff.wrap(srcBuffer), offset); 203 } 204 205 /** 206 * Sync the data to file after writing 207 */ 208 @Override 209 public void sync() throws IOException { 210 for (int i = 0; i < fileChannels.length; i++) { 211 try { 212 if (fileChannels[i] != null) { 213 fileChannels[i].force(true); 214 } 215 } catch (IOException ie) { 216 LOG.warn("Failed syncing data to " + this.filePaths[i]); 217 throw ie; 218 } 219 } 220 } 221 222 /** 223 * Close the file 224 */ 225 @Override 226 public void shutdown() { 227 for (int i = 0; i < filePaths.length; i++) { 228 try { 229 if (fileChannels[i] != null) { 230 fileChannels[i].close(); 231 } 232 if (rafs[i] != null) { 233 rafs[i].close(); 234 } 235 } catch (IOException ex) { 236 LOG.error("Failed closing " + filePaths[i] + " when shudown the IOEngine", ex); 237 } 238 } 239 } 240 241 @Override 242 public void write(ByteBuff srcBuff, long offset) throws IOException { 243 if (!srcBuff.hasRemaining()) { 244 return; 245 } 246 accessFile(writeAccessor, srcBuff, offset); 247 } 248 249 private void accessFile(FileAccessor accessor, ByteBuff buff, long globalOffset) 250 throws IOException { 251 int startFileNum = getFileNum(globalOffset); 252 int remainingAccessDataLen = buff.remaining(); 253 int endFileNum = getFileNum(globalOffset + remainingAccessDataLen - 1); 254 int accessFileNum = startFileNum; 255 long accessOffset = getAbsoluteOffsetInFile(accessFileNum, globalOffset); 256 int bufLimit = buff.limit(); 257 while (true) { 258 FileChannel fileChannel = fileChannels[accessFileNum]; 259 int accessLen = 0; 260 if (endFileNum > accessFileNum) { 261 // short the limit; 262 buff.limit((int) (buff.limit() - remainingAccessDataLen + sizePerFile - accessOffset)); 263 } 264 try { 265 accessLen = accessor.access(fileChannel, buff, accessOffset); 266 } catch (ClosedByInterruptException e) { 267 throw e; 268 } catch (ClosedChannelException e) { 269 refreshFileConnection(accessFileNum, e); 270 continue; 271 } 272 // recover the limit 273 buff.limit(bufLimit); 274 if (accessLen < remainingAccessDataLen) { 275 remainingAccessDataLen -= accessLen; 276 accessFileNum++; 277 accessOffset = 0; 278 } else { 279 break; 280 } 281 if (accessFileNum >= fileChannels.length) { 282 throw new IOException("Required data len " + StringUtils.byteDesc(buff.remaining()) 283 + " exceed the engine's capacity " + StringUtils.byteDesc(capacity) + " where offset=" 284 + globalOffset); 285 } 286 } 287 } 288 289 /** 290 * Get the absolute offset in given file with the relative global offset. 291 * @return the absolute offset 292 */ 293 private long getAbsoluteOffsetInFile(int fileNum, long globalOffset) { 294 return globalOffset - fileNum * sizePerFile; 295 } 296 297 private int getFileNum(long offset) { 298 if (offset < 0) { 299 throw new IllegalArgumentException("Unexpected offset " + offset); 300 } 301 int fileNum = (int) (offset / sizePerFile); 302 if (fileNum >= fileChannels.length) { 303 throw new RuntimeException("Not expected offset " + offset + " where capacity=" + capacity); 304 } 305 return fileNum; 306 } 307 308 FileChannel[] getFileChannels() { 309 return fileChannels; 310 } 311 312 void refreshFileConnection(int accessFileNum, IOException ioe) throws IOException { 313 ReentrantLock channelLock = channelLocks[accessFileNum]; 314 channelLock.lock(); 315 try { 316 FileChannel fileChannel = fileChannels[accessFileNum]; 317 if (fileChannel != null) { 318 // Don't re-open a channel if we were waiting on another 319 // thread to re-open the channel and it is now open. 320 if (fileChannel.isOpen()) { 321 return; 322 } 323 fileChannel.close(); 324 } 325 LOG.warn("Caught ClosedChannelException accessing BucketCache, reopening file: " 326 + filePaths[accessFileNum], ioe); 327 rafs[accessFileNum] = new RandomAccessFile(filePaths[accessFileNum], "rw"); 328 fileChannels[accessFileNum] = rafs[accessFileNum].getChannel(); 329 } finally { 330 channelLock.unlock(); 331 } 332 } 333 334 private interface FileAccessor { 335 int access(FileChannel fileChannel, ByteBuff buff, long accessOffset) throws IOException; 336 } 337 338 private static class FileReadAccessor implements FileAccessor { 339 @Override 340 public int access(FileChannel fileChannel, ByteBuff buff, long accessOffset) 341 throws IOException { 342 return buff.read(fileChannel, accessOffset); 343 } 344 } 345 346 private static class FileWriteAccessor implements FileAccessor { 347 @Override 348 public int access(FileChannel fileChannel, ByteBuff buff, long accessOffset) 349 throws IOException { 350 return buff.write(fileChannel, accessOffset); 351 } 352 } 353}