001/**
002 * Copyright The Apache Software Foundation
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one or more
005 * contributor license agreements. See the NOTICE file distributed with this
006 * work for additional information regarding copyright ownership. The ASF
007 * licenses this file to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
015 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
016 * License for the specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.hadoop.hbase.io.hfile.bucket;
020
021import java.io.File;
022import java.io.IOException;
023import java.io.RandomAccessFile;
024import java.nio.ByteBuffer;
025import java.nio.channels.ClosedByInterruptException;
026import java.nio.channels.ClosedChannelException;
027import java.nio.channels.FileChannel;
028import java.util.Arrays;
029import org.apache.hadoop.hbase.io.hfile.Cacheable;
030import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
031import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
032import org.apache.hadoop.hbase.nio.ByteBuff;
033import org.apache.hadoop.hbase.nio.SingleByteBuff;
034import org.apache.hadoop.util.StringUtils;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
040import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
041
042/**
043 * IO engine that stores data to a file on the local file system.
044 */
045@InterfaceAudience.Private
046public class FileIOEngine implements IOEngine {
047  private static final Logger LOG = LoggerFactory.getLogger(FileIOEngine.class);
048  public static final String FILE_DELIMITER = ",";
049  private final String[] filePaths;
050  private final FileChannel[] fileChannels;
051  private final RandomAccessFile[] rafs;
052
053  private final long sizePerFile;
054  private final long capacity;
055
056  private FileReadAccessor readAccessor = new FileReadAccessor();
057  private FileWriteAccessor writeAccessor = new FileWriteAccessor();
058
059  public FileIOEngine(long capacity, boolean maintainPersistence, String... filePaths)
060      throws IOException {
061    this.sizePerFile = capacity / filePaths.length;
062    this.capacity = this.sizePerFile * filePaths.length;
063    this.filePaths = filePaths;
064    this.fileChannels = new FileChannel[filePaths.length];
065    if (!maintainPersistence) {
066      for (String filePath : filePaths) {
067        File file = new File(filePath);
068        if (file.exists()) {
069          if (LOG.isDebugEnabled()) {
070            LOG.debug("File " + filePath + " already exists. Deleting!!");
071          }
072          file.delete();
073          // If deletion fails still we can manage with the writes
074        }
075      }
076    }
077    this.rafs = new RandomAccessFile[filePaths.length];
078    for (int i = 0; i < filePaths.length; i++) {
079      String filePath = filePaths[i];
080      try {
081        rafs[i] = new RandomAccessFile(filePath, "rw");
082        long totalSpace = new File(filePath).getTotalSpace();
083        if (totalSpace < sizePerFile) {
084          // The next setting length will throw exception,logging this message
085          // is just used for the detail reason of exception,
086          String msg = "Only " + StringUtils.byteDesc(totalSpace)
087              + " total space under " + filePath + ", not enough for requested "
088              + StringUtils.byteDesc(sizePerFile);
089          LOG.warn(msg);
090        }
091        rafs[i].setLength(sizePerFile);
092        fileChannels[i] = rafs[i].getChannel();
093        LOG.info("Allocating cache " + StringUtils.byteDesc(sizePerFile)
094            + ", on the path:" + filePath);
095      } catch (IOException fex) {
096        LOG.error("Failed allocating cache on " + filePath, fex);
097        shutdown();
098        throw fex;
099      }
100    }
101  }
102
103  @Override
104  public String toString() {
105    return "ioengine=" + this.getClass().getSimpleName() + ", paths="
106        + Arrays.asList(filePaths) + ", capacity=" + String.format("%,d", this.capacity);
107  }
108
109  /**
110   * File IO engine is always able to support persistent storage for the cache
111   * @return true
112   */
113  @Override
114  public boolean isPersistent() {
115    return true;
116  }
117
118  /**
119   * Transfers data from file to the given byte buffer
120   * @param offset The offset in the file where the first byte to be read
121   * @param length The length of buffer that should be allocated for reading
122   *               from the file channel
123   * @return number of bytes read
124   * @throws IOException
125   */
126  @Override
127  public Cacheable read(long offset, int length, CacheableDeserializer<Cacheable> deserializer)
128      throws IOException {
129    Preconditions.checkArgument(length >= 0, "Length of read can not be less than 0.");
130    ByteBuffer dstBuffer = ByteBuffer.allocate(length);
131    if (length != 0) {
132      accessFile(readAccessor, dstBuffer, offset);
133      // The buffer created out of the fileChannel is formed by copying the data from the file
134      // Hence in this case there is no shared memory that we point to. Even if the BucketCache evicts
135      // this buffer from the file the data is already copied and there is no need to ensure that
136      // the results are not corrupted before consuming them.
137      if (dstBuffer.limit() != length) {
138        throw new RuntimeException("Only " + dstBuffer.limit() + " bytes read, " + length
139            + " expected");
140      }
141    }
142    return deserializer.deserialize(new SingleByteBuff(dstBuffer), true, MemoryType.EXCLUSIVE);
143  }
144
145  @VisibleForTesting
146  void closeFileChannels() {
147    for (FileChannel fileChannel: fileChannels) {
148      try {
149        fileChannel.close();
150      } catch (IOException e) {
151        LOG.warn("Failed to close FileChannel", e);
152      }
153    }
154  }
155
156  /**
157   * Transfers data from the given byte buffer to file
158   * @param srcBuffer the given byte buffer from which bytes are to be read
159   * @param offset The offset in the file where the first byte to be written
160   * @throws IOException
161   */
162  @Override
163  public void write(ByteBuffer srcBuffer, long offset) throws IOException {
164    if (!srcBuffer.hasRemaining()) {
165      return;
166    }
167    accessFile(writeAccessor, srcBuffer, offset);
168  }
169
170  /**
171   * Sync the data to file after writing
172   * @throws IOException
173   */
174  @Override
175  public void sync() throws IOException {
176    for (int i = 0; i < fileChannels.length; i++) {
177      try {
178        if (fileChannels[i] != null) {
179          fileChannels[i].force(true);
180        }
181      } catch (IOException ie) {
182        LOG.warn("Failed syncing data to " + this.filePaths[i]);
183        throw ie;
184      }
185    }
186  }
187
188  /**
189   * Close the file
190   */
191  @Override
192  public void shutdown() {
193    for (int i = 0; i < filePaths.length; i++) {
194      try {
195        if (fileChannels[i] != null) {
196          fileChannels[i].close();
197        }
198        if (rafs[i] != null) {
199          rafs[i].close();
200        }
201      } catch (IOException ex) {
202        LOG.error("Failed closing " + filePaths[i] + " when shudown the IOEngine", ex);
203      }
204    }
205  }
206
207  @Override
208  public void write(ByteBuff srcBuffer, long offset) throws IOException {
209    // When caching block into BucketCache there will be single buffer backing for this HFileBlock.
210    assert srcBuffer.hasArray();
211    write(ByteBuffer.wrap(srcBuffer.array(), srcBuffer.arrayOffset(),
212            srcBuffer.remaining()), offset);
213  }
214
215  private void accessFile(FileAccessor accessor, ByteBuffer buffer,
216      long globalOffset) throws IOException {
217    int startFileNum = getFileNum(globalOffset);
218    int remainingAccessDataLen = buffer.remaining();
219    int endFileNum = getFileNum(globalOffset + remainingAccessDataLen - 1);
220    int accessFileNum = startFileNum;
221    long accessOffset = getAbsoluteOffsetInFile(accessFileNum, globalOffset);
222    int bufLimit = buffer.limit();
223    while (true) {
224      FileChannel fileChannel = fileChannels[accessFileNum];
225      int accessLen = 0;
226      if (endFileNum > accessFileNum) {
227        // short the limit;
228        buffer.limit((int) (buffer.limit() - remainingAccessDataLen
229            + sizePerFile - accessOffset));
230      }
231      try {
232        accessLen = accessor.access(fileChannel, buffer, accessOffset);
233      } catch (ClosedByInterruptException e) {
234        throw e;
235      } catch (ClosedChannelException e) {
236        LOG.warn("Caught ClosedChannelException accessing BucketCache, reopening file. ", e);
237        refreshFileConnection(accessFileNum);
238        continue;
239      }
240      // recover the limit
241      buffer.limit(bufLimit);
242      if (accessLen < remainingAccessDataLen) {
243        remainingAccessDataLen -= accessLen;
244        accessFileNum++;
245        accessOffset = 0;
246      } else {
247        break;
248      }
249      if (accessFileNum >= fileChannels.length) {
250        throw new IOException("Required data len " + StringUtils.byteDesc(buffer.remaining())
251            + " exceed the engine's capacity " + StringUtils.byteDesc(capacity) + " where offset="
252            + globalOffset);
253      }
254    }
255  }
256
257  /**
258   * Get the absolute offset in given file with the relative global offset.
259   * @param fileNum
260   * @param globalOffset
261   * @return the absolute offset
262   */
263  private long getAbsoluteOffsetInFile(int fileNum, long globalOffset) {
264    return globalOffset - fileNum * sizePerFile;
265  }
266
267  private int getFileNum(long offset) {
268    if (offset < 0) {
269      throw new IllegalArgumentException("Unexpected offset " + offset);
270    }
271    int fileNum = (int) (offset / sizePerFile);
272    if (fileNum >= fileChannels.length) {
273      throw new RuntimeException("Not expected offset " + offset
274          + " where capacity=" + capacity);
275    }
276    return fileNum;
277  }
278
279  @VisibleForTesting
280  FileChannel[] getFileChannels() {
281    return fileChannels;
282  }
283
284  @VisibleForTesting
285  void refreshFileConnection(int accessFileNum) throws IOException {
286    FileChannel fileChannel = fileChannels[accessFileNum];
287    if (fileChannel != null) {
288      fileChannel.close();
289    }
290    rafs[accessFileNum] = new RandomAccessFile(filePaths[accessFileNum], "rw");
291    fileChannels[accessFileNum] = rafs[accessFileNum].getChannel();
292  }
293
294  private static interface FileAccessor {
295    int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset)
296        throws IOException;
297  }
298
299  private static class FileReadAccessor implements FileAccessor {
300    @Override
301    public int access(FileChannel fileChannel, ByteBuffer byteBuffer,
302        long accessOffset) throws IOException {
303      return fileChannel.read(byteBuffer, accessOffset);
304    }
305  }
306
307  private static class FileWriteAccessor implements FileAccessor {
308    @Override
309    public int access(FileChannel fileChannel, ByteBuffer byteBuffer,
310        long accessOffset) throws IOException {
311      return fileChannel.write(byteBuffer, accessOffset);
312    }
313  }
314}