001/** 002 * Copyright The Apache Software Foundation 003 * 004 * Licensed to the Apache Software Foundation (ASF) under one or more 005 * contributor license agreements. See the NOTICE file distributed with this 006 * work for additional information regarding copyright ownership. The ASF 007 * licenses this file to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 015 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 016 * License for the specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.hadoop.hbase.io.hfile.bucket; 020 021import java.io.IOException; 022import java.io.RandomAccessFile; 023import java.nio.ByteBuffer; 024import java.nio.channels.FileChannel; 025import java.util.concurrent.atomic.AtomicInteger; 026 027import org.apache.yetus.audience.InterfaceAudience; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030import org.apache.hadoop.hbase.io.hfile.Cacheable; 031import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType; 032import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; 033import org.apache.hadoop.hbase.nio.ByteBuff; 034import org.apache.hadoop.hbase.nio.SingleByteBuff; 035import org.apache.hadoop.hbase.util.ByteBufferAllocator; 036import org.apache.hadoop.hbase.util.ByteBufferArray; 037import org.apache.hadoop.util.StringUtils; 038 039/** 040 * IO engine that stores data to a file on the local file system using memory mapping 041 * mechanism 042 */ 043@InterfaceAudience.Private 044public class FileMmapEngine implements IOEngine { 045 static final Logger LOG = LoggerFactory.getLogger(FileMmapEngine.class); 046 047 private final String path; 048 private long size; 049 private ByteBufferArray bufferArray; 050 private final FileChannel fileChannel; 051 private RandomAccessFile raf = null; 052 053 public FileMmapEngine(String filePath, long capacity) throws IOException { 054 this.path = filePath; 055 this.size = capacity; 056 long fileSize = 0; 057 try { 058 raf = new RandomAccessFile(filePath, "rw"); 059 fileSize = roundUp(capacity, ByteBufferArray.DEFAULT_BUFFER_SIZE); 060 raf.setLength(fileSize); 061 fileChannel = raf.getChannel(); 062 LOG.info("Allocating " + StringUtils.byteDesc(fileSize) + ", on the path:" + filePath); 063 } catch (java.io.FileNotFoundException fex) { 064 LOG.error("Can't create bucket cache file " + filePath, fex); 065 throw fex; 066 } catch (IOException ioex) { 067 LOG.error("Can't extend bucket cache file; insufficient space for " 068 + StringUtils.byteDesc(fileSize), ioex); 069 shutdown(); 070 throw ioex; 071 } 072 ByteBufferAllocator allocator = new ByteBufferAllocator() { 073 AtomicInteger pos = new AtomicInteger(0); 074 @Override 075 public ByteBuffer allocate(long size) throws IOException { 076 ByteBuffer buffer = fileChannel.map(java.nio.channels.FileChannel.MapMode.READ_WRITE, 077 pos.getAndIncrement() * size, size); 078 return buffer; 079 } 080 }; 081 bufferArray = new ByteBufferArray(fileSize, allocator); 082 } 083 084 private long roundUp(long n, long to) { 085 return ((n + to - 1) / to) * to; 086 } 087 088 @Override 089 public String toString() { 090 return "ioengine=" + this.getClass().getSimpleName() + ", path=" + this.path + 091 ", size=" + String.format("%,d", this.size); 092 } 093 094 /** 095 * File IO engine is always able to support persistent storage for the cache 096 * @return true 097 */ 098 @Override 099 public boolean isPersistent() { 100 return true; 101 } 102 103 @Override 104 public Cacheable read(long offset, int length, CacheableDeserializer<Cacheable> deserializer) 105 throws IOException { 106 byte[] dst = new byte[length]; 107 bufferArray.getMultiple(offset, length, dst); 108 return deserializer.deserialize(new SingleByteBuff(ByteBuffer.wrap(dst)), true, 109 MemoryType.EXCLUSIVE); 110 } 111 112 /** 113 * Transfers data from the given byte buffer to file 114 * @param srcBuffer the given byte buffer from which bytes are to be read 115 * @param offset The offset in the file where the first byte to be written 116 * @throws IOException 117 */ 118 @Override 119 public void write(ByteBuffer srcBuffer, long offset) throws IOException { 120 assert srcBuffer.hasArray(); 121 bufferArray.putMultiple(offset, srcBuffer.remaining(), srcBuffer.array(), 122 srcBuffer.arrayOffset()); 123 } 124 125 @Override 126 public void write(ByteBuff srcBuffer, long offset) throws IOException { 127 // This singleByteBuff can be considered to be array backed 128 assert srcBuffer.hasArray(); 129 bufferArray.putMultiple(offset, srcBuffer.remaining(), srcBuffer.array(), 130 srcBuffer.arrayOffset()); 131 132 } 133 /** 134 * Sync the data to file after writing 135 * @throws IOException 136 */ 137 @Override 138 public void sync() throws IOException { 139 if (fileChannel != null) { 140 fileChannel.force(true); 141 } 142 } 143 144 /** 145 * Close the file 146 */ 147 @Override 148 public void shutdown() { 149 try { 150 fileChannel.close(); 151 } catch (IOException ex) { 152 LOG.error("Can't shutdown cleanly", ex); 153 } 154 try { 155 raf.close(); 156 } catch (IOException ex) { 157 LOG.error("Can't shutdown cleanly", ex); 158 } 159 } 160}