001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import java.io.File;
021import java.io.IOException;
022import java.io.RandomAccessFile;
023import java.nio.ByteBuffer;
024import java.nio.channels.ClosedByInterruptException;
025import java.nio.channels.ClosedChannelException;
026import java.nio.channels.FileChannel;
027import java.util.Arrays;
028import java.util.concurrent.locks.ReentrantLock;
029import org.apache.hadoop.hbase.HBaseIOException;
030import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
031import org.apache.hadoop.hbase.io.hfile.Cacheable;
032import org.apache.hadoop.hbase.nio.ByteBuff;
033import org.apache.hadoop.util.StringUtils;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
039
040/**
041 * IO engine that stores data to a file on the local file system.
042 */
043@InterfaceAudience.Private
044public class FileIOEngine extends PersistentIOEngine {
045  private static final Logger LOG = LoggerFactory.getLogger(FileIOEngine.class);
046  public static final String FILE_DELIMITER = ",";
047  private final FileChannel[] fileChannels;
048  private final RandomAccessFile[] rafs;
049  private final ReentrantLock[] channelLocks;
050
051  private final long sizePerFile;
052  private final long capacity;
053  private boolean maintainPersistence;
054
055  private FileReadAccessor readAccessor = new FileReadAccessor();
056  private FileWriteAccessor writeAccessor = new FileWriteAccessor();
057
058  public FileIOEngine(long capacity, boolean maintainPersistence, String... filePaths)
059    throws IOException {
060    super(filePaths);
061    this.sizePerFile = capacity / filePaths.length;
062    this.capacity = this.sizePerFile * filePaths.length;
063    this.fileChannels = new FileChannel[filePaths.length];
064    this.maintainPersistence = maintainPersistence;
065    if (!maintainPersistence) {
066      for (String filePath : filePaths) {
067        File file = new File(filePath);
068        if (file.exists()) {
069          if (LOG.isDebugEnabled()) {
070            LOG.debug("File " + filePath + " already exists. Deleting!!");
071          }
072          file.delete();
073          // If deletion fails still we can manage with the writes
074        }
075      }
076    }
077    this.rafs = new RandomAccessFile[filePaths.length];
078    this.channelLocks = new ReentrantLock[filePaths.length];
079    for (int i = 0; i < filePaths.length; i++) {
080      String filePath = filePaths[i];
081      try {
082        rafs[i] = new RandomAccessFile(filePath, "rw");
083        long totalSpace = new File(filePath).getTotalSpace();
084        if (totalSpace < sizePerFile) {
085          // The next setting length will throw exception,logging this message
086          // is just used for the detail reason of exception,
087          String msg = "Only " + StringUtils.byteDesc(totalSpace) + " total space under " + filePath
088            + ", not enough for requested " + StringUtils.byteDesc(sizePerFile);
089          LOG.warn(msg);
090        }
091        File file = new File(filePath);
092        // setLength() method will change file's last modified time. So if don't do
093        // this check, wrong time will be used when calculating checksum.
094        if (file.length() != sizePerFile) {
095          rafs[i].setLength(sizePerFile);
096        }
097        fileChannels[i] = rafs[i].getChannel();
098        channelLocks[i] = new ReentrantLock();
099        LOG.info(
100          "Allocating cache " + StringUtils.byteDesc(sizePerFile) + ", on the path:" + filePath);
101      } catch (IOException fex) {
102        LOG.error("Failed allocating cache on " + filePath, fex);
103        shutdown();
104        throw fex;
105      }
106    }
107  }
108
109  @Override
110  public String toString() {
111    return "ioengine=" + this.getClass().getSimpleName() + ", paths=" + Arrays.asList(filePaths)
112      + ", capacity=" + String.format("%,d", this.capacity);
113  }
114
115  /**
116   * File IO engine is always able to support persistent storage for the cache
117   */
118  @Override
119  public boolean isPersistent() {
120    return true;
121  }
122
123  /**
124   * Transfers data from file to the given byte buffer
125   * @param be an {@link BucketEntry} which maintains an (offset, len, refCnt)
126   * @return the {@link Cacheable} with block data inside.
127   * @throws IOException if any IO error happen.
128   */
129  @Override
130  public Cacheable read(BucketEntry be) throws IOException {
131    long offset = be.offset();
132    int length = be.getLength();
133    Preconditions.checkArgument(length >= 0, "Length of read can not be less than 0.");
134    ByteBuff dstBuff = be.allocator.allocate(length);
135    if (length != 0) {
136      try {
137        accessFile(readAccessor, dstBuff, offset);
138        // The buffer created out of the fileChannel is formed by copying the data from the file
139        // Hence in this case there is no shared memory that we point to. Even if the BucketCache
140        // evicts this buffer from the file the data is already copied and there is no need to
141        // ensure that the results are not corrupted before consuming them.
142        if (dstBuff.limit() != length) {
143          throw new IllegalArgumentIOException(
144            "Only " + dstBuff.limit() + " bytes read, " + length + " expected");
145        }
146      } catch (IOException ioe) {
147        dstBuff.release();
148        throw ioe;
149      }
150    }
151    if (maintainPersistence) {
152      dstBuff.rewind();
153      long cachedNanoTime = dstBuff.getLong();
154      if (be.getCachedTime() != cachedNanoTime) {
155        dstBuff.release();
156        throw new HBaseIOException("The cached time recorded within the cached block: "
157          + cachedNanoTime + " differs from its bucket entry: " + be.getCachedTime());
158      }
159      dstBuff.limit(length);
160      dstBuff = dstBuff.slice();
161    } else {
162      dstBuff.rewind();
163    }
164    return be.wrapAsCacheable(dstBuff);
165  }
166
167  void checkCacheTime(BucketEntry be) throws IOException {
168    long offset = be.offset();
169    ByteBuff dstBuff = be.allocator.allocate(Long.BYTES);
170    try {
171      accessFile(readAccessor, dstBuff, offset);
172    } catch (IOException ioe) {
173      dstBuff.release();
174      throw ioe;
175    }
176    dstBuff.rewind();
177    long cachedNanoTime = dstBuff.getLong();
178    if (be.getCachedTime() != cachedNanoTime) {
179      dstBuff.release();
180      throw new HBaseIOException("The cached time recorded within the cached block: "
181        + cachedNanoTime + " differs from its bucket entry: " + be.getCachedTime());
182    }
183  }
184
185  void closeFileChannels() {
186    for (FileChannel fileChannel : fileChannels) {
187      try {
188        fileChannel.close();
189      } catch (IOException e) {
190        LOG.warn("Failed to close FileChannel", e);
191      }
192    }
193  }
194
195  /**
196   * Transfers data from the given byte buffer to file
197   * @param srcBuffer the given byte buffer from which bytes are to be read
198   * @param offset    The offset in the file where the first byte to be written
199   */
200  @Override
201  public void write(ByteBuffer srcBuffer, long offset) throws IOException {
202    write(ByteBuff.wrap(srcBuffer), offset);
203  }
204
205  /**
206   * Sync the data to file after writing
207   */
208  @Override
209  public void sync() throws IOException {
210    for (int i = 0; i < fileChannels.length; i++) {
211      try {
212        if (fileChannels[i] != null) {
213          fileChannels[i].force(true);
214        }
215      } catch (IOException ie) {
216        LOG.warn("Failed syncing data to " + this.filePaths[i]);
217        throw ie;
218      }
219    }
220  }
221
222  /**
223   * Close the file
224   */
225  @Override
226  public void shutdown() {
227    for (int i = 0; i < filePaths.length; i++) {
228      try {
229        if (fileChannels[i] != null) {
230          fileChannels[i].close();
231        }
232        if (rafs[i] != null) {
233          rafs[i].close();
234        }
235      } catch (IOException ex) {
236        LOG.error("Failed closing " + filePaths[i] + " when shudown the IOEngine", ex);
237      }
238    }
239  }
240
241  @Override
242  public void write(ByteBuff srcBuff, long offset) throws IOException {
243    if (!srcBuff.hasRemaining()) {
244      return;
245    }
246    accessFile(writeAccessor, srcBuff, offset);
247  }
248
249  private void accessFile(FileAccessor accessor, ByteBuff buff, long globalOffset)
250    throws IOException {
251    int startFileNum = getFileNum(globalOffset);
252    int remainingAccessDataLen = buff.remaining();
253    int endFileNum = getFileNum(globalOffset + remainingAccessDataLen - 1);
254    int accessFileNum = startFileNum;
255    long accessOffset = getAbsoluteOffsetInFile(accessFileNum, globalOffset);
256    int bufLimit = buff.limit();
257    while (true) {
258      FileChannel fileChannel = fileChannels[accessFileNum];
259      int accessLen = 0;
260      if (endFileNum > accessFileNum) {
261        // short the limit;
262        buff.limit((int) (buff.limit() - remainingAccessDataLen + sizePerFile - accessOffset));
263      }
264      try {
265        accessLen = accessor.access(fileChannel, buff, accessOffset);
266      } catch (ClosedByInterruptException e) {
267        throw e;
268      } catch (ClosedChannelException e) {
269        refreshFileConnection(accessFileNum, e);
270        continue;
271      }
272      // recover the limit
273      buff.limit(bufLimit);
274      if (accessLen < remainingAccessDataLen) {
275        remainingAccessDataLen -= accessLen;
276        accessFileNum++;
277        accessOffset = 0;
278      } else {
279        break;
280      }
281      if (accessFileNum >= fileChannels.length) {
282        throw new IOException("Required data len " + StringUtils.byteDesc(buff.remaining())
283          + " exceed the engine's capacity " + StringUtils.byteDesc(capacity) + " where offset="
284          + globalOffset);
285      }
286    }
287  }
288
289  /**
290   * Get the absolute offset in given file with the relative global offset.
291   * @return the absolute offset
292   */
293  private long getAbsoluteOffsetInFile(int fileNum, long globalOffset) {
294    return globalOffset - fileNum * sizePerFile;
295  }
296
297  private int getFileNum(long offset) {
298    if (offset < 0) {
299      throw new IllegalArgumentException("Unexpected offset " + offset);
300    }
301    int fileNum = (int) (offset / sizePerFile);
302    if (fileNum >= fileChannels.length) {
303      throw new RuntimeException("Not expected offset " + offset + " where capacity=" + capacity);
304    }
305    return fileNum;
306  }
307
308  FileChannel[] getFileChannels() {
309    return fileChannels;
310  }
311
312  void refreshFileConnection(int accessFileNum, IOException ioe) throws IOException {
313    ReentrantLock channelLock = channelLocks[accessFileNum];
314    channelLock.lock();
315    try {
316      FileChannel fileChannel = fileChannels[accessFileNum];
317      if (fileChannel != null) {
318        // Don't re-open a channel if we were waiting on another
319        // thread to re-open the channel and it is now open.
320        if (fileChannel.isOpen()) {
321          return;
322        }
323        fileChannel.close();
324      }
325      LOG.warn("Caught ClosedChannelException accessing BucketCache, reopening file: "
326        + filePaths[accessFileNum], ioe);
327      rafs[accessFileNum] = new RandomAccessFile(filePaths[accessFileNum], "rw");
328      fileChannels[accessFileNum] = rafs[accessFileNum].getChannel();
329    } finally {
330      channelLock.unlock();
331    }
332  }
333
334  private interface FileAccessor {
335    int access(FileChannel fileChannel, ByteBuff buff, long accessOffset) throws IOException;
336  }
337
338  private static class FileReadAccessor implements FileAccessor {
339    @Override
340    public int access(FileChannel fileChannel, ByteBuff buff, long accessOffset)
341      throws IOException {
342      return buff.read(fileChannel, accessOffset);
343    }
344  }
345
346  private static class FileWriteAccessor implements FileAccessor {
347    @Override
348    public int access(FileChannel fileChannel, ByteBuff buff, long accessOffset)
349      throws IOException {
350      return buff.write(fileChannel, accessOffset);
351    }
352  }
353}