View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements. See the NOTICE file distributed with this
6    * work for additional information regarding copyright ownership. The ASF
7    * licenses this file to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16   * License for the specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile.bucket;
20  
21  import java.io.IOException;
22  import java.io.RandomAccessFile;
23  import java.nio.ByteBuffer;
24  import java.nio.channels.FileChannel;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.hbase.classification.InterfaceAudience;
29  import org.apache.hadoop.hbase.io.hfile.Cacheable;
30  import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
31  import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
32  import org.apache.hadoop.hbase.nio.ByteBuff;
33  import org.apache.hadoop.hbase.nio.SingleByteBuff;
34  import org.apache.hadoop.util.StringUtils;
35  
36  /**
37   * IO engine that stores data to a file on the local file system.
38   */
39  @InterfaceAudience.Private
40  public class FileIOEngine implements IOEngine {
41    private static final Log LOG = LogFactory.getLog(FileIOEngine.class);
42    private final RandomAccessFile raf;
43    private final FileChannel fileChannel;
44    private final String path;
45    private long size;
46  
47    public FileIOEngine(String filePath, long fileSize) throws IOException {
48      this.path = filePath;
49      this.size = fileSize;
50      try {
51        raf = new RandomAccessFile(filePath, "rw");
52      } catch (java.io.FileNotFoundException fex) {
53        LOG.error("Can't create bucket cache file " + filePath, fex);
54        throw fex;
55      }
56  
57      try {
58        raf.setLength(fileSize);
59      } catch (IOException ioex) {
60        LOG.error("Can't extend bucket cache file; insufficient space for "
61            + StringUtils.byteDesc(fileSize), ioex);
62        raf.close();
63        throw ioex;
64      }
65  
66      fileChannel = raf.getChannel();
67      LOG.info("Allocating " + StringUtils.byteDesc(fileSize) + ", on the path:" + filePath);
68    }
69  
70    @Override
71    public String toString() {
72      return "ioengine=" + this.getClass().getSimpleName() + ", path=" + this.path +
73        ", size=" + String.format("%,d", this.size);
74    }
75  
76    /**
77     * File IO engine is always able to support persistent storage for the cache
78     * @return true
79     */
80    @Override
81    public boolean isPersistent() {
82      return true;
83    }
84  
85    /**
86     * Transfers data from file to the given byte buffer
87     * @param offset The offset in the file where the first byte to be read
88     * @param length The length of buffer that should be allocated for reading
89     *               from the file channel
90     * @return number of bytes read
91     * @throws IOException
92     */
93    @Override
94    public Cacheable read(long offset, int length, CacheableDeserializer<Cacheable> deserializer)
95        throws IOException {
96      ByteBuffer dstBuffer = ByteBuffer.allocate(length);
97      fileChannel.read(dstBuffer, offset);
98      // The buffer created out of the fileChannel is formed by copying the data from the file
99      // Hence in this case there is no shared memory that we point to. Even if the BucketCache evicts
100     // this buffer from the file the data is already copied and there is no need to ensure that
101     // the results are not corrupted before consuming them.
102     if (dstBuffer.limit() != length) {
103       throw new RuntimeException("Only " + dstBuffer.limit() + " bytes read, " + length
104           + " expected");
105     }
106     return deserializer.deserialize(new SingleByteBuff(dstBuffer), true, MemoryType.EXCLUSIVE);
107   }
108 
109   /**
110    * Transfers data from the given byte buffer to file
111    * @param srcBuffer the given byte buffer from which bytes are to be read
112    * @param offset The offset in the file where the first byte to be written
113    * @throws IOException
114    */
115   @Override
116   public void write(ByteBuffer srcBuffer, long offset) throws IOException {
117     fileChannel.write(srcBuffer, offset);
118   }
119 
120   /**
121    * Sync the data to file after writing
122    * @throws IOException
123    */
124   @Override
125   public void sync() throws IOException {
126     fileChannel.force(true);
127   }
128 
129   /**
130    * Close the file
131    */
132   @Override
133   public void shutdown() {
134     try {
135       fileChannel.close();
136     } catch (IOException ex) {
137       LOG.error("Can't shutdown cleanly", ex);
138     }
139     try {
140       raf.close();
141     } catch (IOException ex) {
142       LOG.error("Can't shutdown cleanly", ex);
143     }
144   }
145 
146   @Override
147   public void write(ByteBuff srcBuffer, long offset) throws IOException {
148     // When caching block into BucketCache there will be single buffer backing for this HFileBlock.
149     assert srcBuffer.hasArray();
150     fileChannel.write(
151         ByteBuffer.wrap(srcBuffer.array(), srcBuffer.arrayOffset(), srcBuffer.remaining()), offset);
152   }
153 }