View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements. See the NOTICE file distributed with this
6    * work for additional information regarding copyright ownership. The ASF
7    * licenses this file to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16   * License for the specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile.bucket;
20  
21  import java.io.IOException;
22  import java.io.RandomAccessFile;
23  import java.nio.ByteBuffer;
24  import java.nio.channels.FileChannel;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.hbase.classification.InterfaceAudience;
29  import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
30  import org.apache.hadoop.hbase.nio.ByteBuff;
31  import org.apache.hadoop.hbase.nio.SingleByteBuff;
32  import org.apache.hadoop.hbase.util.Pair;
33  import org.apache.hadoop.util.StringUtils;
34  
35  /**
36   * IO engine that stores data to a file on the local file system.
37   */
38  @InterfaceAudience.Private
39  public class FileIOEngine implements IOEngine {
40    private static final Log LOG = LogFactory.getLog(FileIOEngine.class);
41    private final RandomAccessFile raf;
42    private final FileChannel fileChannel;
43    private final String path;
44    private long size;
45  
46    public FileIOEngine(String filePath, long fileSize) throws IOException {
47      this.path = filePath;
48      this.size = fileSize;
49      try {
50        raf = new RandomAccessFile(filePath, "rw");
51      } catch (java.io.FileNotFoundException fex) {
52        LOG.error("Can't create bucket cache file " + filePath, fex);
53        throw fex;
54      }
55  
56      try {
57        raf.setLength(fileSize);
58      } catch (IOException ioex) {
59        LOG.error("Can't extend bucket cache file; insufficient space for "
60            + StringUtils.byteDesc(fileSize), ioex);
61        raf.close();
62        throw ioex;
63      }
64  
65      fileChannel = raf.getChannel();
66      LOG.info("Allocating " + StringUtils.byteDesc(fileSize) + ", on the path:" + filePath);
67    }
68  
69    @Override
70    public String toString() {
71      return "ioengine=" + this.getClass().getSimpleName() + ", path=" + this.path +
72        ", size=" + String.format("%,d", this.size);
73    }
74  
75    /**
76     * File IO engine is always able to support persistent storage for the cache
77     * @return true
78     */
79    @Override
80    public boolean isPersistent() {
81      return true;
82    }
83  
84    /**
85     * Transfers data from file to the given byte buffer
86     * @param offset The offset in the file where the first byte to be read
87     * @param length The length of buffer that should be allocated for reading
88     *               from the file channel
89     * @return number of bytes read
90     * @throws IOException
91     */
92    @Override
93    public Pair<ByteBuff, MemoryType> read(long offset, int length) throws IOException {
94      ByteBuffer dstBuffer = ByteBuffer.allocate(length);
95      fileChannel.read(dstBuffer, offset);
96      // The buffer created out of the fileChannel is formed by copying the data from the file
97      // Hence in this case there is no shared memory that we point to. Even if the BucketCache evicts
98      // this buffer from the file the data is already copied and there is no need to ensure that
99      // the results are not corrupted before consuming them.
100     if (dstBuffer.limit() != length) {
101       throw new RuntimeException("Only " + dstBuffer.limit() + " bytes read, " + length
102           + " expected");
103     }
104     return new Pair<ByteBuff, MemoryType>(new SingleByteBuff(dstBuffer), MemoryType.EXCLUSIVE);
105   }
106 
107   /**
108    * Transfers data from the given byte buffer to file
109    * @param srcBuffer the given byte buffer from which bytes are to be read
110    * @param offset The offset in the file where the first byte to be written
111    * @throws IOException
112    */
113   @Override
114   public void write(ByteBuffer srcBuffer, long offset) throws IOException {
115     fileChannel.write(srcBuffer, offset);
116   }
117 
118   /**
119    * Sync the data to file after writing
120    * @throws IOException
121    */
122   @Override
123   public void sync() throws IOException {
124     fileChannel.force(true);
125   }
126 
127   /**
128    * Close the file
129    */
130   @Override
131   public void shutdown() {
132     try {
133       fileChannel.close();
134     } catch (IOException ex) {
135       LOG.error("Can't shutdown cleanly", ex);
136     }
137     try {
138       raf.close();
139     } catch (IOException ex) {
140       LOG.error("Can't shutdown cleanly", ex);
141     }
142   }
143 
144   @Override
145   public void write(ByteBuff srcBuffer, long offset) throws IOException {
146     // When caching block into BucketCache there will be single buffer backing for this HFileBlock.
147     assert srcBuffer.hasArray();
148     fileChannel.write(
149         ByteBuffer.wrap(srcBuffer.array(), srcBuffer.arrayOffset(), srcBuffer.remaining()), offset);
150   }
151 }