001/**
002 * Copyright The Apache Software Foundation
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one or more
005 * contributor license agreements. See the NOTICE file distributed with this
006 * work for additional information regarding copyright ownership. The ASF
007 * licenses this file to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
015 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
016 * License for the specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.hadoop.hbase.io.hfile.bucket;
020
021import java.io.File;
022import java.io.IOException;
023import java.io.RandomAccessFile;
024import java.nio.ByteBuffer;
025import java.nio.channels.ClosedByInterruptException;
026import java.nio.channels.ClosedChannelException;
027import java.nio.channels.FileChannel;
028import java.util.Arrays;
029import java.util.concurrent.locks.ReentrantLock;
030import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
031import org.apache.hadoop.hbase.io.hfile.Cacheable;
032import org.apache.hadoop.hbase.nio.ByteBuff;
033import org.apache.hadoop.util.StringUtils;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
039
040/**
041 * IO engine that stores data to a file on the local file system.
042 */
043@InterfaceAudience.Private
044public class FileIOEngine extends PersistentIOEngine {
045  private static final Logger LOG = LoggerFactory.getLogger(FileIOEngine.class);
046  public static final String FILE_DELIMITER = ",";
047  private final FileChannel[] fileChannels;
048  private final RandomAccessFile[] rafs;
049  private final ReentrantLock[] channelLocks;
050
051  private final long sizePerFile;
052  private final long capacity;
053
054  private FileReadAccessor readAccessor = new FileReadAccessor();
055  private FileWriteAccessor writeAccessor = new FileWriteAccessor();
056
057  public FileIOEngine(long capacity, boolean maintainPersistence, String... filePaths)
058      throws IOException {
059    super(filePaths);
060    this.sizePerFile = capacity / filePaths.length;
061    this.capacity = this.sizePerFile * filePaths.length;
062    this.fileChannels = new FileChannel[filePaths.length];
063    if (!maintainPersistence) {
064      for (String filePath : filePaths) {
065        File file = new File(filePath);
066        if (file.exists()) {
067          if (LOG.isDebugEnabled()) {
068            LOG.debug("File " + filePath + " already exists. Deleting!!");
069          }
070          file.delete();
071          // If deletion fails still we can manage with the writes
072        }
073      }
074    }
075    this.rafs = new RandomAccessFile[filePaths.length];
076    this.channelLocks = new ReentrantLock[filePaths.length];
077    for (int i = 0; i < filePaths.length; i++) {
078      String filePath = filePaths[i];
079      try {
080        rafs[i] = new RandomAccessFile(filePath, "rw");
081        long totalSpace = new File(filePath).getTotalSpace();
082        if (totalSpace < sizePerFile) {
083          // The next setting length will throw exception,logging this message
084          // is just used for the detail reason of exception,
085          String msg = "Only " + StringUtils.byteDesc(totalSpace)
086              + " total space under " + filePath + ", not enough for requested "
087              + StringUtils.byteDesc(sizePerFile);
088          LOG.warn(msg);
089        }
090        File file = new File(filePath);
091        // setLength() method will change file's last modified time. So if don't do
092        // this check, wrong time will be used when calculating checksum.
093        if (file.length() != sizePerFile) {
094          rafs[i].setLength(sizePerFile);
095        }
096        fileChannels[i] = rafs[i].getChannel();
097        channelLocks[i] = new ReentrantLock();
098        LOG.info("Allocating cache " + StringUtils.byteDesc(sizePerFile)
099            + ", on the path:" + filePath);
100      } catch (IOException fex) {
101        LOG.error("Failed allocating cache on " + filePath, fex);
102        shutdown();
103        throw fex;
104      }
105    }
106  }
107
108  @Override
109  public String toString() {
110    return "ioengine=" + this.getClass().getSimpleName() + ", paths="
111        + Arrays.asList(filePaths) + ", capacity=" + String.format("%,d", this.capacity);
112  }
113
114  /**
115   * File IO engine is always able to support persistent storage for the cache
116   * @return true
117   */
118  @Override
119  public boolean isPersistent() {
120    return true;
121  }
122
123  /**
124   * Transfers data from file to the given byte buffer
125   * @param be an {@link BucketEntry} which maintains an (offset, len, refCnt)
126   * @return the {@link Cacheable} with block data inside.
127   * @throws IOException if any IO error happen.
128   */
129  @Override
130  public Cacheable read(BucketEntry be) throws IOException {
131    long offset = be.offset();
132    int length = be.getLength();
133    Preconditions.checkArgument(length >= 0, "Length of read can not be less than 0.");
134    ByteBuff dstBuff = be.allocator.allocate(length);
135    if (length != 0) {
136      try {
137        accessFile(readAccessor, dstBuff, offset);
138        // The buffer created out of the fileChannel is formed by copying the data from the file
139        // Hence in this case there is no shared memory that we point to. Even if the BucketCache
140        // evicts this buffer from the file the data is already copied and there is no need to
141        // ensure that the results are not corrupted before consuming them.
142        if (dstBuff.limit() != length) {
143          throw new IllegalArgumentIOException(
144              "Only " + dstBuff.limit() + " bytes read, " + length + " expected");
145        }
146      } catch (IOException ioe) {
147        dstBuff.release();
148        throw ioe;
149      }
150    }
151    dstBuff.rewind();
152    return be.wrapAsCacheable(dstBuff);
153  }
154
155  void closeFileChannels() {
156    for (FileChannel fileChannel: fileChannels) {
157      try {
158        fileChannel.close();
159      } catch (IOException e) {
160        LOG.warn("Failed to close FileChannel", e);
161      }
162    }
163  }
164
165  /**
166   * Transfers data from the given byte buffer to file
167   * @param srcBuffer the given byte buffer from which bytes are to be read
168   * @param offset The offset in the file where the first byte to be written
169   * @throws IOException
170   */
171  @Override
172  public void write(ByteBuffer srcBuffer, long offset) throws IOException {
173    write(ByteBuff.wrap(srcBuffer), offset);
174  }
175
176  /**
177   * Sync the data to file after writing
178   * @throws IOException
179   */
180  @Override
181  public void sync() throws IOException {
182    for (int i = 0; i < fileChannels.length; i++) {
183      try {
184        if (fileChannels[i] != null) {
185          fileChannels[i].force(true);
186        }
187      } catch (IOException ie) {
188        LOG.warn("Failed syncing data to " + this.filePaths[i]);
189        throw ie;
190      }
191    }
192  }
193
194  /**
195   * Close the file
196   */
197  @Override
198  public void shutdown() {
199    for (int i = 0; i < filePaths.length; i++) {
200      try {
201        if (fileChannels[i] != null) {
202          fileChannels[i].close();
203        }
204        if (rafs[i] != null) {
205          rafs[i].close();
206        }
207      } catch (IOException ex) {
208        LOG.error("Failed closing " + filePaths[i] + " when shudown the IOEngine", ex);
209      }
210    }
211  }
212
213  @Override
214  public void write(ByteBuff srcBuff, long offset) throws IOException {
215    if (!srcBuff.hasRemaining()) {
216      return;
217    }
218    accessFile(writeAccessor, srcBuff, offset);
219  }
220
221  private void accessFile(FileAccessor accessor, ByteBuff buff,
222      long globalOffset) throws IOException {
223    int startFileNum = getFileNum(globalOffset);
224    int remainingAccessDataLen = buff.remaining();
225    int endFileNum = getFileNum(globalOffset + remainingAccessDataLen - 1);
226    int accessFileNum = startFileNum;
227    long accessOffset = getAbsoluteOffsetInFile(accessFileNum, globalOffset);
228    int bufLimit = buff.limit();
229    while (true) {
230      FileChannel fileChannel = fileChannels[accessFileNum];
231      int accessLen = 0;
232      if (endFileNum > accessFileNum) {
233        // short the limit;
234        buff.limit((int) (buff.limit() - remainingAccessDataLen + sizePerFile - accessOffset));
235      }
236      try {
237        accessLen = accessor.access(fileChannel, buff, accessOffset);
238      } catch (ClosedByInterruptException e) {
239        throw e;
240      } catch (ClosedChannelException e) {
241        refreshFileConnection(accessFileNum, e);
242        continue;
243      }
244      // recover the limit
245      buff.limit(bufLimit);
246      if (accessLen < remainingAccessDataLen) {
247        remainingAccessDataLen -= accessLen;
248        accessFileNum++;
249        accessOffset = 0;
250      } else {
251        break;
252      }
253      if (accessFileNum >= fileChannels.length) {
254        throw new IOException("Required data len " + StringUtils.byteDesc(buff.remaining())
255            + " exceed the engine's capacity " + StringUtils.byteDesc(capacity) + " where offset="
256            + globalOffset);
257      }
258    }
259  }
260
261  /**
262   * Get the absolute offset in given file with the relative global offset.
263   * @param fileNum
264   * @param globalOffset
265   * @return the absolute offset
266   */
267  private long getAbsoluteOffsetInFile(int fileNum, long globalOffset) {
268    return globalOffset - fileNum * sizePerFile;
269  }
270
271  private int getFileNum(long offset) {
272    if (offset < 0) {
273      throw new IllegalArgumentException("Unexpected offset " + offset);
274    }
275    int fileNum = (int) (offset / sizePerFile);
276    if (fileNum >= fileChannels.length) {
277      throw new RuntimeException("Not expected offset " + offset
278          + " where capacity=" + capacity);
279    }
280    return fileNum;
281  }
282
283  FileChannel[] getFileChannels() {
284    return fileChannels;
285  }
286
287  void refreshFileConnection(int accessFileNum, IOException ioe) throws IOException {
288    ReentrantLock channelLock = channelLocks[accessFileNum];
289    channelLock.lock();
290    try {
291      FileChannel fileChannel = fileChannels[accessFileNum];
292      if (fileChannel != null) {
293        // Don't re-open a channel if we were waiting on another
294        // thread to re-open the channel and it is now open.
295        if (fileChannel.isOpen()) {
296          return;
297        }
298        fileChannel.close();
299      }
300      LOG.warn("Caught ClosedChannelException accessing BucketCache, reopening file: "
301          + filePaths[accessFileNum], ioe);
302      rafs[accessFileNum] = new RandomAccessFile(filePaths[accessFileNum], "rw");
303      fileChannels[accessFileNum] = rafs[accessFileNum].getChannel();
304    } finally{
305      channelLock.unlock();
306    }
307  }
308
309  private interface FileAccessor {
310    int access(FileChannel fileChannel, ByteBuff buff, long accessOffset)
311        throws IOException;
312  }
313
314  private static class FileReadAccessor implements FileAccessor {
315    @Override
316    public int access(FileChannel fileChannel, ByteBuff buff,
317        long accessOffset) throws IOException {
318      return buff.read(fileChannel, accessOffset);
319    }
320  }
321
322  private static class FileWriteAccessor implements FileAccessor {
323    @Override
324    public int access(FileChannel fileChannel, ByteBuff buff,
325        long accessOffset) throws IOException {
326      return buff.write(fileChannel, accessOffset);
327    }
328  }
329}