001/**
002 * Copyright The Apache Software Foundation
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one or more
005 * contributor license agreements. See the NOTICE file distributed with this
006 * work for additional information regarding copyright ownership. The ASF
007 * licenses this file to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
015 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
016 * License for the specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.hadoop.hbase.io.hfile.bucket;
020
021import java.io.File;
022import java.io.IOException;
023import java.io.RandomAccessFile;
024import java.nio.ByteBuffer;
025import java.nio.channels.ClosedByInterruptException;
026import java.nio.channels.ClosedChannelException;
027import java.nio.channels.FileChannel;
028import java.util.Arrays;
029import java.util.concurrent.locks.ReentrantLock;
030import org.apache.hadoop.hbase.io.hfile.Cacheable;
031import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
032import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
033import org.apache.hadoop.hbase.nio.ByteBuff;
034import org.apache.hadoop.hbase.nio.SingleByteBuff;
035import org.apache.hadoop.util.StringUtils;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
041import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
042
043/**
044 * IO engine that stores data to a file on the local file system.
045 */
046@InterfaceAudience.Private
047public class FileIOEngine implements IOEngine {
048  private static final Logger LOG = LoggerFactory.getLogger(FileIOEngine.class);
049  public static final String FILE_DELIMITER = ",";
050  private final String[] filePaths;
051  private final FileChannel[] fileChannels;
052  private final RandomAccessFile[] rafs;
053  private final ReentrantLock[] channelLocks;
054
055  private final long sizePerFile;
056  private final long capacity;
057
058  private FileReadAccessor readAccessor = new FileReadAccessor();
059  private FileWriteAccessor writeAccessor = new FileWriteAccessor();
060
061  public FileIOEngine(long capacity, boolean maintainPersistence, String... filePaths)
062      throws IOException {
063    this.sizePerFile = capacity / filePaths.length;
064    this.capacity = this.sizePerFile * filePaths.length;
065    this.filePaths = filePaths;
066    this.fileChannels = new FileChannel[filePaths.length];
067    if (!maintainPersistence) {
068      for (String filePath : filePaths) {
069        File file = new File(filePath);
070        if (file.exists()) {
071          if (LOG.isDebugEnabled()) {
072            LOG.debug("File " + filePath + " already exists. Deleting!!");
073          }
074          file.delete();
075          // If deletion fails still we can manage with the writes
076        }
077      }
078    }
079    this.rafs = new RandomAccessFile[filePaths.length];
080    this.channelLocks = new ReentrantLock[filePaths.length];
081    for (int i = 0; i < filePaths.length; i++) {
082      String filePath = filePaths[i];
083      try {
084        rafs[i] = new RandomAccessFile(filePath, "rw");
085        long totalSpace = new File(filePath).getTotalSpace();
086        if (totalSpace < sizePerFile) {
087          // The next setting length will throw exception,logging this message
088          // is just used for the detail reason of exception,
089          String msg = "Only " + StringUtils.byteDesc(totalSpace)
090              + " total space under " + filePath + ", not enough for requested "
091              + StringUtils.byteDesc(sizePerFile);
092          LOG.warn(msg);
093        }
094        rafs[i].setLength(sizePerFile);
095        fileChannels[i] = rafs[i].getChannel();
096        channelLocks[i] = new ReentrantLock();
097        LOG.info("Allocating cache " + StringUtils.byteDesc(sizePerFile)
098            + ", on the path:" + filePath);
099      } catch (IOException fex) {
100        LOG.error("Failed allocating cache on " + filePath, fex);
101        shutdown();
102        throw fex;
103      }
104    }
105  }
106
107  @Override
108  public String toString() {
109    return "ioengine=" + this.getClass().getSimpleName() + ", paths="
110        + Arrays.asList(filePaths) + ", capacity=" + String.format("%,d", this.capacity);
111  }
112
113  /**
114   * File IO engine is always able to support persistent storage for the cache
115   * @return true
116   */
117  @Override
118  public boolean isPersistent() {
119    return true;
120  }
121
122  /**
123   * Transfers data from file to the given byte buffer
124   * @param offset The offset in the file where the first byte to be read
125   * @param length The length of buffer that should be allocated for reading
126   *               from the file channel
127   * @return number of bytes read
128   * @throws IOException
129   */
130  @Override
131  public Cacheable read(long offset, int length, CacheableDeserializer<Cacheable> deserializer)
132      throws IOException {
133    Preconditions.checkArgument(length >= 0, "Length of read can not be less than 0.");
134    ByteBuffer dstBuffer = ByteBuffer.allocate(length);
135    if (length != 0) {
136      accessFile(readAccessor, dstBuffer, offset);
137      // The buffer created out of the fileChannel is formed by copying the data from the file
138      // Hence in this case there is no shared memory that we point to. Even if the BucketCache evicts
139      // this buffer from the file the data is already copied and there is no need to ensure that
140      // the results are not corrupted before consuming them.
141      if (dstBuffer.limit() != length) {
142        throw new RuntimeException("Only " + dstBuffer.limit() + " bytes read, " + length
143            + " expected");
144      }
145    }
146    return deserializer.deserialize(new SingleByteBuff(dstBuffer), true, MemoryType.EXCLUSIVE);
147  }
148
149  @VisibleForTesting
150  void closeFileChannels() {
151    for (FileChannel fileChannel: fileChannels) {
152      try {
153        fileChannel.close();
154      } catch (IOException e) {
155        LOG.warn("Failed to close FileChannel", e);
156      }
157    }
158  }
159
160  /**
161   * Transfers data from the given byte buffer to file
162   * @param srcBuffer the given byte buffer from which bytes are to be read
163   * @param offset The offset in the file where the first byte to be written
164   * @throws IOException
165   */
166  @Override
167  public void write(ByteBuffer srcBuffer, long offset) throws IOException {
168    if (!srcBuffer.hasRemaining()) {
169      return;
170    }
171    accessFile(writeAccessor, srcBuffer, offset);
172  }
173
174  /**
175   * Sync the data to file after writing
176   * @throws IOException
177   */
178  @Override
179  public void sync() throws IOException {
180    for (int i = 0; i < fileChannels.length; i++) {
181      try {
182        if (fileChannels[i] != null) {
183          fileChannels[i].force(true);
184        }
185      } catch (IOException ie) {
186        LOG.warn("Failed syncing data to " + this.filePaths[i]);
187        throw ie;
188      }
189    }
190  }
191
192  /**
193   * Close the file
194   */
195  @Override
196  public void shutdown() {
197    for (int i = 0; i < filePaths.length; i++) {
198      try {
199        if (fileChannels[i] != null) {
200          fileChannels[i].close();
201        }
202        if (rafs[i] != null) {
203          rafs[i].close();
204        }
205      } catch (IOException ex) {
206        LOG.error("Failed closing " + filePaths[i] + " when shudown the IOEngine", ex);
207      }
208    }
209  }
210
211  @Override
212  public void write(ByteBuff srcBuffer, long offset) throws IOException {
213    // When caching block into BucketCache there will be single buffer backing for this HFileBlock.
214    assert srcBuffer.hasArray();
215    write(ByteBuffer.wrap(srcBuffer.array(), srcBuffer.arrayOffset(),
216            srcBuffer.remaining()), offset);
217  }
218
219  private void accessFile(FileAccessor accessor, ByteBuffer buffer,
220      long globalOffset) throws IOException {
221    int startFileNum = getFileNum(globalOffset);
222    int remainingAccessDataLen = buffer.remaining();
223    int endFileNum = getFileNum(globalOffset + remainingAccessDataLen - 1);
224    int accessFileNum = startFileNum;
225    long accessOffset = getAbsoluteOffsetInFile(accessFileNum, globalOffset);
226    int bufLimit = buffer.limit();
227    while (true) {
228      FileChannel fileChannel = fileChannels[accessFileNum];
229      int accessLen = 0;
230      if (endFileNum > accessFileNum) {
231        // short the limit;
232        buffer.limit((int) (buffer.limit() - remainingAccessDataLen
233            + sizePerFile - accessOffset));
234      }
235      try {
236        accessLen = accessor.access(fileChannel, buffer, accessOffset);
237      } catch (ClosedByInterruptException e) {
238        throw e;
239      } catch (ClosedChannelException e) {
240        refreshFileConnection(accessFileNum, e);
241        continue;
242      }
243      // recover the limit
244      buffer.limit(bufLimit);
245      if (accessLen < remainingAccessDataLen) {
246        remainingAccessDataLen -= accessLen;
247        accessFileNum++;
248        accessOffset = 0;
249      } else {
250        break;
251      }
252      if (accessFileNum >= fileChannels.length) {
253        throw new IOException("Required data len " + StringUtils.byteDesc(buffer.remaining())
254            + " exceed the engine's capacity " + StringUtils.byteDesc(capacity) + " where offset="
255            + globalOffset);
256      }
257    }
258  }
259
260  /**
261   * Get the absolute offset in given file with the relative global offset.
262   * @param fileNum
263   * @param globalOffset
264   * @return the absolute offset
265   */
266  private long getAbsoluteOffsetInFile(int fileNum, long globalOffset) {
267    return globalOffset - fileNum * sizePerFile;
268  }
269
270  private int getFileNum(long offset) {
271    if (offset < 0) {
272      throw new IllegalArgumentException("Unexpected offset " + offset);
273    }
274    int fileNum = (int) (offset / sizePerFile);
275    if (fileNum >= fileChannels.length) {
276      throw new RuntimeException("Not expected offset " + offset
277          + " where capacity=" + capacity);
278    }
279    return fileNum;
280  }
281
282  @VisibleForTesting
283  FileChannel[] getFileChannels() {
284    return fileChannels;
285  }
286
287  @VisibleForTesting
288  void refreshFileConnection(int accessFileNum, IOException ioe) throws IOException {
289    ReentrantLock channelLock = channelLocks[accessFileNum];
290    channelLock.lock();
291    try {
292      FileChannel fileChannel = fileChannels[accessFileNum];
293      if (fileChannel != null) {
294        // Don't re-open a channel if we were waiting on another
295        // thread to re-open the channel and it is now open.
296        if (fileChannel.isOpen()) {
297          return;
298        }
299        fileChannel.close();
300      }
301      LOG.warn("Caught ClosedChannelException accessing BucketCache, reopening file: "
302          + filePaths[accessFileNum], ioe);
303      rafs[accessFileNum] = new RandomAccessFile(filePaths[accessFileNum], "rw");
304      fileChannels[accessFileNum] = rafs[accessFileNum].getChannel();
305    } finally{
306      channelLock.unlock();
307    }
308  }
309
310  private static interface FileAccessor {
311    int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset)
312        throws IOException;
313  }
314
315  private static class FileReadAccessor implements FileAccessor {
316    @Override
317    public int access(FileChannel fileChannel, ByteBuffer byteBuffer,
318        long accessOffset) throws IOException {
319      return fileChannel.read(byteBuffer, accessOffset);
320    }
321  }
322
323  private static class FileWriteAccessor implements FileAccessor {
324    @Override
325    public int access(FileChannel fileChannel, ByteBuffer byteBuffer,
326        long accessOffset) throws IOException {
327      return fileChannel.write(byteBuffer, accessOffset);
328    }
329  }
330}