001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import java.io.File; 021import java.io.IOException; 022import java.io.RandomAccessFile; 023import java.nio.ByteBuffer; 024import java.nio.channels.FileChannel; 025import java.util.concurrent.atomic.AtomicInteger; 026import org.apache.hadoop.hbase.io.hfile.Cacheable; 027import org.apache.hadoop.hbase.nio.ByteBuff; 028import org.apache.hadoop.hbase.util.ByteBufferAllocator; 029import org.apache.hadoop.hbase.util.ByteBufferArray; 030import org.apache.hadoop.util.StringUtils; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035/** 036 * IO engine that stores data to a file on the specified file system using memory mapping mechanism 037 */ 038@InterfaceAudience.Private 039public abstract class FileMmapIOEngine extends PersistentIOEngine { 040 static final Logger LOG = LoggerFactory.getLogger(FileMmapIOEngine.class); 041 042 protected final String path; 043 protected long size; 044 protected ByteBufferArray bufferArray; 045 private final FileChannel fileChannel; 046 private RandomAccessFile raf = null; 047 048 public FileMmapIOEngine(String filePath, long capacity) throws IOException { 049 super(filePath); 050 this.path = filePath; 051 this.size = capacity; 052 long fileSize = 0; 053 try { 054 raf = new RandomAccessFile(filePath, "rw"); 055 fileSize = roundUp(capacity, ByteBufferArray.DEFAULT_BUFFER_SIZE); 056 File file = new File(filePath); 057 // setLength() method will change file's last modified time. So if don't do 058 // this check, wrong time will be used when calculating checksum. 059 if (file.length() != fileSize) { 060 raf.setLength(fileSize); 061 } 062 fileChannel = raf.getChannel(); 063 LOG.info("Allocating " + StringUtils.byteDesc(fileSize) + ", on the path:" + filePath); 064 } catch (java.io.FileNotFoundException fex) { 065 LOG.error("Can't create bucket cache file " + filePath, fex); 066 throw fex; 067 } catch (IOException ioex) { 068 LOG.error( 069 "Can't extend bucket cache file; insufficient space for " + StringUtils.byteDesc(fileSize), 070 ioex); 071 shutdown(); 072 throw ioex; 073 } 074 ByteBufferAllocator allocator = new ByteBufferAllocator() { 075 AtomicInteger pos = new AtomicInteger(0); 076 077 @Override 078 public ByteBuffer allocate(long size) throws IOException { 079 ByteBuffer buffer = fileChannel.map(java.nio.channels.FileChannel.MapMode.READ_WRITE, 080 pos.getAndIncrement() * size, size); 081 return buffer; 082 } 083 }; 084 bufferArray = new ByteBufferArray(fileSize, allocator); 085 } 086 087 private long roundUp(long n, long to) { 088 return ((n + to - 1) / to) * to; 089 } 090 091 @Override 092 public String toString() { 093 return "ioengine=" + this.getClass().getSimpleName() + ", path=" + this.path + ", size=" 094 + String.format("%,d", this.size); 095 } 096 097 /** 098 * File IO engine is always able to support persistent storage for the cache n 099 */ 100 @Override 101 public boolean isPersistent() { 102 // TODO : HBASE-21981 needed for persistence to really work 103 return true; 104 } 105 106 @Override 107 public abstract Cacheable read(BucketEntry be) throws IOException; 108 109 /** 110 * Transfers data from the given byte buffer to file 111 * @param srcBuffer the given byte buffer from which bytes are to be read 112 * @param offset The offset in the file where the first byte to be written n 113 */ 114 @Override 115 public void write(ByteBuffer srcBuffer, long offset) throws IOException { 116 bufferArray.write(offset, ByteBuff.wrap(srcBuffer)); 117 } 118 119 @Override 120 public void write(ByteBuff srcBuffer, long offset) throws IOException { 121 bufferArray.write(offset, srcBuffer); 122 } 123 124 /** 125 * Sync the data to file after writing n 126 */ 127 @Override 128 public void sync() throws IOException { 129 if (fileChannel != null) { 130 fileChannel.force(true); 131 } 132 } 133 134 /** 135 * Close the file 136 */ 137 @Override 138 public void shutdown() { 139 try { 140 fileChannel.close(); 141 } catch (IOException ex) { 142 LOG.error("Can't shutdown cleanly", ex); 143 } 144 try { 145 raf.close(); 146 } catch (IOException ex) { 147 LOG.error("Can't shutdown cleanly", ex); 148 } 149 } 150}