001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import java.io.File;
021import java.io.IOException;
022import java.io.RandomAccessFile;
023import java.nio.ByteBuffer;
024import java.nio.channels.FileChannel;
025import java.util.concurrent.atomic.AtomicInteger;
026
027import org.apache.hadoop.hbase.io.hfile.Cacheable;
028import org.apache.hadoop.hbase.nio.ByteBuff;
029import org.apache.hadoop.hbase.util.ByteBufferAllocator;
030import org.apache.hadoop.hbase.util.ByteBufferArray;
031import org.apache.hadoop.util.StringUtils;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036/**
037 * IO engine that stores data to a file on the specified file system using memory mapping
038 * mechanism
039 */
040@InterfaceAudience.Private
041public abstract class FileMmapIOEngine extends PersistentIOEngine {
042  static final Logger LOG = LoggerFactory.getLogger(FileMmapIOEngine.class);
043
044  protected final String path;
045  protected long size;
046  protected ByteBufferArray bufferArray;
047  private final FileChannel fileChannel;
048  private RandomAccessFile raf = null;
049
050  public FileMmapIOEngine(String filePath, long capacity) throws IOException {
051    super(filePath);
052    this.path = filePath;
053    this.size = capacity;
054    long fileSize = 0;
055    try {
056      raf = new RandomAccessFile(filePath, "rw");
057      fileSize = roundUp(capacity, ByteBufferArray.DEFAULT_BUFFER_SIZE);
058      File file = new File(filePath);
059      // setLength() method will change file's last modified time. So if don't do
060      // this check, wrong time will be used when calculating checksum.
061      if (file.length() != fileSize) {
062        raf.setLength(fileSize);
063      }
064      fileChannel = raf.getChannel();
065      LOG.info("Allocating " + StringUtils.byteDesc(fileSize) + ", on the path:" + filePath);
066    } catch (java.io.FileNotFoundException fex) {
067      LOG.error("Can't create bucket cache file " + filePath, fex);
068      throw fex;
069    } catch (IOException ioex) {
070      LOG.error(
071        "Can't extend bucket cache file; insufficient space for " + StringUtils.byteDesc(fileSize),
072        ioex);
073      shutdown();
074      throw ioex;
075    }
076    ByteBufferAllocator allocator = new ByteBufferAllocator() {
077      AtomicInteger pos = new AtomicInteger(0);
078
079      @Override
080      public ByteBuffer allocate(long size) throws IOException {
081        ByteBuffer buffer = fileChannel.map(java.nio.channels.FileChannel.MapMode.READ_WRITE,
082          pos.getAndIncrement() * size, size);
083        return buffer;
084      }
085    };
086    bufferArray = new ByteBufferArray(fileSize, allocator);
087  }
088
089  private long roundUp(long n, long to) {
090    return ((n + to - 1) / to) * to;
091  }
092
093  @Override
094  public String toString() {
095    return "ioengine=" + this.getClass().getSimpleName() + ", path=" + this.path + ", size="
096        + String.format("%,d", this.size);
097  }
098
099  /**
100   * File IO engine is always able to support persistent storage for the cache
101   * @return true
102   */
103  @Override
104  public boolean isPersistent() {
105    // TODO : HBASE-21981 needed for persistence to really work
106    return true;
107  }
108
109  @Override
110  public abstract Cacheable read(BucketEntry be) throws IOException;
111
112  /**
113   * Transfers data from the given byte buffer to file
114   * @param srcBuffer the given byte buffer from which bytes are to be read
115   * @param offset The offset in the file where the first byte to be written
116   * @throws IOException
117   */
118  @Override
119  public void write(ByteBuffer srcBuffer, long offset) throws IOException {
120    bufferArray.write(offset, ByteBuff.wrap(srcBuffer));
121  }
122
123  @Override
124  public void write(ByteBuff srcBuffer, long offset) throws IOException {
125    bufferArray.write(offset, srcBuffer);
126  }
127
128  /**
129   * Sync the data to file after writing
130   * @throws IOException
131   */
132  @Override
133  public void sync() throws IOException {
134    if (fileChannel != null) {
135      fileChannel.force(true);
136    }
137  }
138
139  /**
140   * Close the file
141   */
142  @Override
143  public void shutdown() {
144    try {
145      fileChannel.close();
146    } catch (IOException ex) {
147      LOG.error("Can't shutdown cleanly", ex);
148    }
149    try {
150      raf.close();
151    } catch (IOException ex) {
152      LOG.error("Can't shutdown cleanly", ex);
153    }
154  }
155}