001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import java.io.File;
021import java.io.IOException;
022import java.io.RandomAccessFile;
023import java.nio.ByteBuffer;
024import java.nio.channels.FileChannel;
025import java.util.concurrent.atomic.AtomicInteger;
026import org.apache.hadoop.hbase.io.hfile.Cacheable;
027import org.apache.hadoop.hbase.nio.ByteBuff;
028import org.apache.hadoop.hbase.util.ByteBufferAllocator;
029import org.apache.hadoop.hbase.util.ByteBufferArray;
030import org.apache.hadoop.util.StringUtils;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035/**
036 * IO engine that stores data to a file on the specified file system using memory mapping mechanism
037 */
038@InterfaceAudience.Private
039public abstract class FileMmapIOEngine extends PersistentIOEngine {
040  static final Logger LOG = LoggerFactory.getLogger(FileMmapIOEngine.class);
041
042  protected final String path;
043  protected long size;
044  protected ByteBufferArray bufferArray;
045  private final FileChannel fileChannel;
046  private RandomAccessFile raf = null;
047
048  public FileMmapIOEngine(String filePath, long capacity) throws IOException {
049    super(filePath);
050    this.path = filePath;
051    this.size = capacity;
052    long fileSize = 0;
053    try {
054      raf = new RandomAccessFile(filePath, "rw");
055      fileSize = roundUp(capacity, ByteBufferArray.DEFAULT_BUFFER_SIZE);
056      File file = new File(filePath);
057      // setLength() method will change file's last modified time. So if don't do
058      // this check, wrong time will be used when calculating checksum.
059      if (file.length() != fileSize) {
060        raf.setLength(fileSize);
061      }
062      fileChannel = raf.getChannel();
063      LOG.info("Allocating " + StringUtils.byteDesc(fileSize) + ", on the path:" + filePath);
064    } catch (java.io.FileNotFoundException fex) {
065      LOG.error("Can't create bucket cache file " + filePath, fex);
066      throw fex;
067    } catch (IOException ioex) {
068      LOG.error(
069        "Can't extend bucket cache file; insufficient space for " + StringUtils.byteDesc(fileSize),
070        ioex);
071      shutdown();
072      throw ioex;
073    }
074    ByteBufferAllocator allocator = new ByteBufferAllocator() {
075      AtomicInteger pos = new AtomicInteger(0);
076
077      @Override
078      public ByteBuffer allocate(long size) throws IOException {
079        ByteBuffer buffer = fileChannel.map(java.nio.channels.FileChannel.MapMode.READ_WRITE,
080          pos.getAndIncrement() * size, size);
081        return buffer;
082      }
083    };
084    bufferArray = new ByteBufferArray(fileSize, allocator);
085  }
086
087  private long roundUp(long n, long to) {
088    return ((n + to - 1) / to) * to;
089  }
090
091  @Override
092  public String toString() {
093    return "ioengine=" + this.getClass().getSimpleName() + ", path=" + this.path + ", size="
094      + String.format("%,d", this.size);
095  }
096
097  /**
098   * File IO engine is always able to support persistent storage for the cache n
099   */
100  @Override
101  public boolean isPersistent() {
102    // TODO : HBASE-21981 needed for persistence to really work
103    return true;
104  }
105
106  @Override
107  public abstract Cacheable read(BucketEntry be) throws IOException;
108
109  /**
110   * Transfers data from the given byte buffer to file
111   * @param srcBuffer the given byte buffer from which bytes are to be read
112   * @param offset    The offset in the file where the first byte to be written n
113   */
114  @Override
115  public void write(ByteBuffer srcBuffer, long offset) throws IOException {
116    bufferArray.write(offset, ByteBuff.wrap(srcBuffer));
117  }
118
119  @Override
120  public void write(ByteBuff srcBuffer, long offset) throws IOException {
121    bufferArray.write(offset, srcBuffer);
122  }
123
124  /**
125   * Sync the data to file after writing n
126   */
127  @Override
128  public void sync() throws IOException {
129    if (fileChannel != null) {
130      fileChannel.force(true);
131    }
132  }
133
134  /**
135   * Close the file
136   */
137  @Override
138  public void shutdown() {
139    try {
140      fileChannel.close();
141    } catch (IOException ex) {
142      LOG.error("Can't shutdown cleanly", ex);
143    }
144    try {
145      raf.close();
146    } catch (IOException ex) {
147      LOG.error("Can't shutdown cleanly", ex);
148    }
149  }
150}