001/**
002 * Copyright The Apache Software Foundation
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one or more
005 * contributor license agreements. See the NOTICE file distributed with this
006 * work for additional information regarding copyright ownership. The ASF
007 * licenses this file to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
015 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
016 * License for the specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.hadoop.hbase.io.hfile.bucket;
020
021import java.io.File;
022import java.io.IOException;
023import java.io.RandomAccessFile;
024import java.nio.ByteBuffer;
025import java.nio.channels.ClosedByInterruptException;
026import java.nio.channels.ClosedChannelException;
027import java.nio.channels.FileChannel;
028import java.util.Arrays;
029import java.util.concurrent.locks.ReentrantLock;
030
031import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
032import org.apache.hadoop.hbase.io.hfile.Cacheable;
033import org.apache.hadoop.hbase.nio.ByteBuff;
034import org.apache.hadoop.util.StringUtils;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
040import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
041
042/**
043 * IO engine that stores data to a file on the local file system.
044 */
045@InterfaceAudience.Private
046public class FileIOEngine extends PersistentIOEngine {
047  private static final Logger LOG = LoggerFactory.getLogger(FileIOEngine.class);
048  public static final String FILE_DELIMITER = ",";
049  private final FileChannel[] fileChannels;
050  private final RandomAccessFile[] rafs;
051  private final ReentrantLock[] channelLocks;
052
053  private final long sizePerFile;
054  private final long capacity;
055
056  private FileReadAccessor readAccessor = new FileReadAccessor();
057  private FileWriteAccessor writeAccessor = new FileWriteAccessor();
058
059  public FileIOEngine(long capacity, boolean maintainPersistence, String... filePaths)
060      throws IOException {
061    super(filePaths);
062    this.sizePerFile = capacity / filePaths.length;
063    this.capacity = this.sizePerFile * filePaths.length;
064    this.fileChannels = new FileChannel[filePaths.length];
065    if (!maintainPersistence) {
066      for (String filePath : filePaths) {
067        File file = new File(filePath);
068        if (file.exists()) {
069          if (LOG.isDebugEnabled()) {
070            LOG.debug("File " + filePath + " already exists. Deleting!!");
071          }
072          file.delete();
073          // If deletion fails still we can manage with the writes
074        }
075      }
076    }
077    this.rafs = new RandomAccessFile[filePaths.length];
078    this.channelLocks = new ReentrantLock[filePaths.length];
079    for (int i = 0; i < filePaths.length; i++) {
080      String filePath = filePaths[i];
081      try {
082        rafs[i] = new RandomAccessFile(filePath, "rw");
083        long totalSpace = new File(filePath).getTotalSpace();
084        if (totalSpace < sizePerFile) {
085          // The next setting length will throw exception,logging this message
086          // is just used for the detail reason of exception,
087          String msg = "Only " + StringUtils.byteDesc(totalSpace)
088              + " total space under " + filePath + ", not enough for requested "
089              + StringUtils.byteDesc(sizePerFile);
090          LOG.warn(msg);
091        }
092        File file = new File(filePath);
093        // setLength() method will change file's last modified time. So if don't do
094        // this check, wrong time will be used when calculating checksum.
095        if (file.length() != sizePerFile) {
096          rafs[i].setLength(sizePerFile);
097        }
098        fileChannels[i] = rafs[i].getChannel();
099        channelLocks[i] = new ReentrantLock();
100        LOG.info("Allocating cache " + StringUtils.byteDesc(sizePerFile)
101            + ", on the path:" + filePath);
102      } catch (IOException fex) {
103        LOG.error("Failed allocating cache on " + filePath, fex);
104        shutdown();
105        throw fex;
106      }
107    }
108  }
109
110  @Override
111  public String toString() {
112    return "ioengine=" + this.getClass().getSimpleName() + ", paths="
113        + Arrays.asList(filePaths) + ", capacity=" + String.format("%,d", this.capacity);
114  }
115
116  /**
117   * File IO engine is always able to support persistent storage for the cache
118   * @return true
119   */
120  @Override
121  public boolean isPersistent() {
122    return true;
123  }
124
125  /**
126   * Transfers data from file to the given byte buffer
127   * @param be an {@link BucketEntry} which maintains an (offset, len, refCnt)
128   * @return the {@link Cacheable} with block data inside.
129   * @throws IOException if any IO error happen.
130   */
131  @Override
132  public Cacheable read(BucketEntry be) throws IOException {
133    long offset = be.offset();
134    int length = be.getLength();
135    Preconditions.checkArgument(length >= 0, "Length of read can not be less than 0.");
136    ByteBuff dstBuff = be.allocator.allocate(length);
137    if (length != 0) {
138      try {
139        accessFile(readAccessor, dstBuff, offset);
140        // The buffer created out of the fileChannel is formed by copying the data from the file
141        // Hence in this case there is no shared memory that we point to. Even if the BucketCache
142        // evicts this buffer from the file the data is already copied and there is no need to
143        // ensure that the results are not corrupted before consuming them.
144        if (dstBuff.limit() != length) {
145          throw new IllegalArgumentIOException(
146              "Only " + dstBuff.limit() + " bytes read, " + length + " expected");
147        }
148      } catch (IOException ioe) {
149        dstBuff.release();
150        throw ioe;
151      }
152    }
153    dstBuff.rewind();
154    return be.wrapAsCacheable(dstBuff);
155  }
156
157  @VisibleForTesting
158  void closeFileChannels() {
159    for (FileChannel fileChannel: fileChannels) {
160      try {
161        fileChannel.close();
162      } catch (IOException e) {
163        LOG.warn("Failed to close FileChannel", e);
164      }
165    }
166  }
167
168  /**
169   * Transfers data from the given byte buffer to file
170   * @param srcBuffer the given byte buffer from which bytes are to be read
171   * @param offset The offset in the file where the first byte to be written
172   * @throws IOException
173   */
174  @Override
175  public void write(ByteBuffer srcBuffer, long offset) throws IOException {
176    write(ByteBuff.wrap(srcBuffer), offset);
177  }
178
179  /**
180   * Sync the data to file after writing
181   * @throws IOException
182   */
183  @Override
184  public void sync() throws IOException {
185    for (int i = 0; i < fileChannels.length; i++) {
186      try {
187        if (fileChannels[i] != null) {
188          fileChannels[i].force(true);
189        }
190      } catch (IOException ie) {
191        LOG.warn("Failed syncing data to " + this.filePaths[i]);
192        throw ie;
193      }
194    }
195  }
196
197  /**
198   * Close the file
199   */
200  @Override
201  public void shutdown() {
202    for (int i = 0; i < filePaths.length; i++) {
203      try {
204        if (fileChannels[i] != null) {
205          fileChannels[i].close();
206        }
207        if (rafs[i] != null) {
208          rafs[i].close();
209        }
210      } catch (IOException ex) {
211        LOG.error("Failed closing " + filePaths[i] + " when shudown the IOEngine", ex);
212      }
213    }
214  }
215
216  @Override
217  public void write(ByteBuff srcBuff, long offset) throws IOException {
218    if (!srcBuff.hasRemaining()) {
219      return;
220    }
221    accessFile(writeAccessor, srcBuff, offset);
222  }
223
224  private void accessFile(FileAccessor accessor, ByteBuff buff,
225      long globalOffset) throws IOException {
226    int startFileNum = getFileNum(globalOffset);
227    int remainingAccessDataLen = buff.remaining();
228    int endFileNum = getFileNum(globalOffset + remainingAccessDataLen - 1);
229    int accessFileNum = startFileNum;
230    long accessOffset = getAbsoluteOffsetInFile(accessFileNum, globalOffset);
231    int bufLimit = buff.limit();
232    while (true) {
233      FileChannel fileChannel = fileChannels[accessFileNum];
234      int accessLen = 0;
235      if (endFileNum > accessFileNum) {
236        // short the limit;
237        buff.limit((int) (buff.limit() - remainingAccessDataLen + sizePerFile - accessOffset));
238      }
239      try {
240        accessLen = accessor.access(fileChannel, buff, accessOffset);
241      } catch (ClosedByInterruptException e) {
242        throw e;
243      } catch (ClosedChannelException e) {
244        refreshFileConnection(accessFileNum, e);
245        continue;
246      }
247      // recover the limit
248      buff.limit(bufLimit);
249      if (accessLen < remainingAccessDataLen) {
250        remainingAccessDataLen -= accessLen;
251        accessFileNum++;
252        accessOffset = 0;
253      } else {
254        break;
255      }
256      if (accessFileNum >= fileChannels.length) {
257        throw new IOException("Required data len " + StringUtils.byteDesc(buff.remaining())
258            + " exceed the engine's capacity " + StringUtils.byteDesc(capacity) + " where offset="
259            + globalOffset);
260      }
261    }
262  }
263
264  /**
265   * Get the absolute offset in given file with the relative global offset.
266   * @param fileNum
267   * @param globalOffset
268   * @return the absolute offset
269   */
270  private long getAbsoluteOffsetInFile(int fileNum, long globalOffset) {
271    return globalOffset - fileNum * sizePerFile;
272  }
273
274  private int getFileNum(long offset) {
275    if (offset < 0) {
276      throw new IllegalArgumentException("Unexpected offset " + offset);
277    }
278    int fileNum = (int) (offset / sizePerFile);
279    if (fileNum >= fileChannels.length) {
280      throw new RuntimeException("Not expected offset " + offset
281          + " where capacity=" + capacity);
282    }
283    return fileNum;
284  }
285
286  @VisibleForTesting
287  FileChannel[] getFileChannels() {
288    return fileChannels;
289  }
290
291  @VisibleForTesting
292  void refreshFileConnection(int accessFileNum, IOException ioe) throws IOException {
293    ReentrantLock channelLock = channelLocks[accessFileNum];
294    channelLock.lock();
295    try {
296      FileChannel fileChannel = fileChannels[accessFileNum];
297      if (fileChannel != null) {
298        // Don't re-open a channel if we were waiting on another
299        // thread to re-open the channel and it is now open.
300        if (fileChannel.isOpen()) {
301          return;
302        }
303        fileChannel.close();
304      }
305      LOG.warn("Caught ClosedChannelException accessing BucketCache, reopening file: "
306          + filePaths[accessFileNum], ioe);
307      rafs[accessFileNum] = new RandomAccessFile(filePaths[accessFileNum], "rw");
308      fileChannels[accessFileNum] = rafs[accessFileNum].getChannel();
309    } finally{
310      channelLock.unlock();
311    }
312  }
313
314  private interface FileAccessor {
315    int access(FileChannel fileChannel, ByteBuff buff, long accessOffset)
316        throws IOException;
317  }
318
319  private static class FileReadAccessor implements FileAccessor {
320    @Override
321    public int access(FileChannel fileChannel, ByteBuff buff,
322        long accessOffset) throws IOException {
323      return buff.read(fileChannel, accessOffset);
324    }
325  }
326
327  private static class FileWriteAccessor implements FileAccessor {
328    @Override
329    public int access(FileChannel fileChannel, ByteBuff buff,
330        long accessOffset) throws IOException {
331      return buff.write(fileChannel, accessOffset);
332    }
333  }
334}