001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import java.io.File;
021import java.io.IOException;
022import java.io.RandomAccessFile;
023import java.nio.ByteBuffer;
024import java.nio.channels.ClosedByInterruptException;
025import java.nio.channels.ClosedChannelException;
026import java.nio.channels.FileChannel;
027import java.util.Arrays;
028import java.util.concurrent.locks.ReentrantLock;
029import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
030import org.apache.hadoop.hbase.io.hfile.Cacheable;
031import org.apache.hadoop.hbase.nio.ByteBuff;
032import org.apache.hadoop.util.StringUtils;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
038
039/**
040 * IO engine that stores data to a file on the local file system.
041 */
042@InterfaceAudience.Private
043public class FileIOEngine extends PersistentIOEngine {
044  private static final Logger LOG = LoggerFactory.getLogger(FileIOEngine.class);
045  public static final String FILE_DELIMITER = ",";
046  private final FileChannel[] fileChannels;
047  private final RandomAccessFile[] rafs;
048  private final ReentrantLock[] channelLocks;
049
050  private final long sizePerFile;
051  private final long capacity;
052
053  private FileReadAccessor readAccessor = new FileReadAccessor();
054  private FileWriteAccessor writeAccessor = new FileWriteAccessor();
055
056  public FileIOEngine(long capacity, boolean maintainPersistence, String... filePaths)
057    throws IOException {
058    super(filePaths);
059    this.sizePerFile = capacity / filePaths.length;
060    this.capacity = this.sizePerFile * filePaths.length;
061    this.fileChannels = new FileChannel[filePaths.length];
062    if (!maintainPersistence) {
063      for (String filePath : filePaths) {
064        File file = new File(filePath);
065        if (file.exists()) {
066          if (LOG.isDebugEnabled()) {
067            LOG.debug("File " + filePath + " already exists. Deleting!!");
068          }
069          file.delete();
070          // If deletion fails still we can manage with the writes
071        }
072      }
073    }
074    this.rafs = new RandomAccessFile[filePaths.length];
075    this.channelLocks = new ReentrantLock[filePaths.length];
076    for (int i = 0; i < filePaths.length; i++) {
077      String filePath = filePaths[i];
078      try {
079        rafs[i] = new RandomAccessFile(filePath, "rw");
080        long totalSpace = new File(filePath).getTotalSpace();
081        if (totalSpace < sizePerFile) {
082          // The next setting length will throw exception,logging this message
083          // is just used for the detail reason of exception,
084          String msg = "Only " + StringUtils.byteDesc(totalSpace) + " total space under " + filePath
085            + ", not enough for requested " + StringUtils.byteDesc(sizePerFile);
086          LOG.warn(msg);
087        }
088        File file = new File(filePath);
089        // setLength() method will change file's last modified time. So if don't do
090        // this check, wrong time will be used when calculating checksum.
091        if (file.length() != sizePerFile) {
092          rafs[i].setLength(sizePerFile);
093        }
094        fileChannels[i] = rafs[i].getChannel();
095        channelLocks[i] = new ReentrantLock();
096        LOG.info(
097          "Allocating cache " + StringUtils.byteDesc(sizePerFile) + ", on the path:" + filePath);
098      } catch (IOException fex) {
099        LOG.error("Failed allocating cache on " + filePath, fex);
100        shutdown();
101        throw fex;
102      }
103    }
104  }
105
106  @Override
107  public String toString() {
108    return "ioengine=" + this.getClass().getSimpleName() + ", paths=" + Arrays.asList(filePaths)
109      + ", capacity=" + String.format("%,d", this.capacity);
110  }
111
112  /**
113   * File IO engine is always able to support persistent storage for the cache n
114   */
115  @Override
116  public boolean isPersistent() {
117    return true;
118  }
119
120  /**
121   * Transfers data from file to the given byte buffer
122   * @param be an {@link BucketEntry} which maintains an (offset, len, refCnt)
123   * @return the {@link Cacheable} with block data inside.
124   * @throws IOException if any IO error happen.
125   */
126  @Override
127  public Cacheable read(BucketEntry be) throws IOException {
128    long offset = be.offset();
129    int length = be.getLength();
130    Preconditions.checkArgument(length >= 0, "Length of read can not be less than 0.");
131    ByteBuff dstBuff = be.allocator.allocate(length);
132    if (length != 0) {
133      try {
134        accessFile(readAccessor, dstBuff, offset);
135        // The buffer created out of the fileChannel is formed by copying the data from the file
136        // Hence in this case there is no shared memory that we point to. Even if the BucketCache
137        // evicts this buffer from the file the data is already copied and there is no need to
138        // ensure that the results are not corrupted before consuming them.
139        if (dstBuff.limit() != length) {
140          throw new IllegalArgumentIOException(
141            "Only " + dstBuff.limit() + " bytes read, " + length + " expected");
142        }
143      } catch (IOException ioe) {
144        dstBuff.release();
145        throw ioe;
146      }
147    }
148    dstBuff.rewind();
149    return be.wrapAsCacheable(dstBuff);
150  }
151
152  void closeFileChannels() {
153    for (FileChannel fileChannel : fileChannels) {
154      try {
155        fileChannel.close();
156      } catch (IOException e) {
157        LOG.warn("Failed to close FileChannel", e);
158      }
159    }
160  }
161
162  /**
163   * Transfers data from the given byte buffer to file
164   * @param srcBuffer the given byte buffer from which bytes are to be read
165   * @param offset    The offset in the file where the first byte to be written n
166   */
167  @Override
168  public void write(ByteBuffer srcBuffer, long offset) throws IOException {
169    write(ByteBuff.wrap(srcBuffer), offset);
170  }
171
172  /**
173   * Sync the data to file after writing n
174   */
175  @Override
176  public void sync() throws IOException {
177    for (int i = 0; i < fileChannels.length; i++) {
178      try {
179        if (fileChannels[i] != null) {
180          fileChannels[i].force(true);
181        }
182      } catch (IOException ie) {
183        LOG.warn("Failed syncing data to " + this.filePaths[i]);
184        throw ie;
185      }
186    }
187  }
188
189  /**
190   * Close the file
191   */
192  @Override
193  public void shutdown() {
194    for (int i = 0; i < filePaths.length; i++) {
195      try {
196        if (fileChannels[i] != null) {
197          fileChannels[i].close();
198        }
199        if (rafs[i] != null) {
200          rafs[i].close();
201        }
202      } catch (IOException ex) {
203        LOG.error("Failed closing " + filePaths[i] + " when shudown the IOEngine", ex);
204      }
205    }
206  }
207
208  @Override
209  public void write(ByteBuff srcBuff, long offset) throws IOException {
210    if (!srcBuff.hasRemaining()) {
211      return;
212    }
213    accessFile(writeAccessor, srcBuff, offset);
214  }
215
216  private void accessFile(FileAccessor accessor, ByteBuff buff, long globalOffset)
217    throws IOException {
218    int startFileNum = getFileNum(globalOffset);
219    int remainingAccessDataLen = buff.remaining();
220    int endFileNum = getFileNum(globalOffset + remainingAccessDataLen - 1);
221    int accessFileNum = startFileNum;
222    long accessOffset = getAbsoluteOffsetInFile(accessFileNum, globalOffset);
223    int bufLimit = buff.limit();
224    while (true) {
225      FileChannel fileChannel = fileChannels[accessFileNum];
226      int accessLen = 0;
227      if (endFileNum > accessFileNum) {
228        // short the limit;
229        buff.limit((int) (buff.limit() - remainingAccessDataLen + sizePerFile - accessOffset));
230      }
231      try {
232        accessLen = accessor.access(fileChannel, buff, accessOffset);
233      } catch (ClosedByInterruptException e) {
234        throw e;
235      } catch (ClosedChannelException e) {
236        refreshFileConnection(accessFileNum, e);
237        continue;
238      }
239      // recover the limit
240      buff.limit(bufLimit);
241      if (accessLen < remainingAccessDataLen) {
242        remainingAccessDataLen -= accessLen;
243        accessFileNum++;
244        accessOffset = 0;
245      } else {
246        break;
247      }
248      if (accessFileNum >= fileChannels.length) {
249        throw new IOException("Required data len " + StringUtils.byteDesc(buff.remaining())
250          + " exceed the engine's capacity " + StringUtils.byteDesc(capacity) + " where offset="
251          + globalOffset);
252      }
253    }
254  }
255
256  /**
257   * Get the absolute offset in given file with the relative global offset. nn * @return the
258   * absolute offset
259   */
260  private long getAbsoluteOffsetInFile(int fileNum, long globalOffset) {
261    return globalOffset - fileNum * sizePerFile;
262  }
263
264  private int getFileNum(long offset) {
265    if (offset < 0) {
266      throw new IllegalArgumentException("Unexpected offset " + offset);
267    }
268    int fileNum = (int) (offset / sizePerFile);
269    if (fileNum >= fileChannels.length) {
270      throw new RuntimeException("Not expected offset " + offset + " where capacity=" + capacity);
271    }
272    return fileNum;
273  }
274
275  FileChannel[] getFileChannels() {
276    return fileChannels;
277  }
278
279  void refreshFileConnection(int accessFileNum, IOException ioe) throws IOException {
280    ReentrantLock channelLock = channelLocks[accessFileNum];
281    channelLock.lock();
282    try {
283      FileChannel fileChannel = fileChannels[accessFileNum];
284      if (fileChannel != null) {
285        // Don't re-open a channel if we were waiting on another
286        // thread to re-open the channel and it is now open.
287        if (fileChannel.isOpen()) {
288          return;
289        }
290        fileChannel.close();
291      }
292      LOG.warn("Caught ClosedChannelException accessing BucketCache, reopening file: "
293        + filePaths[accessFileNum], ioe);
294      rafs[accessFileNum] = new RandomAccessFile(filePaths[accessFileNum], "rw");
295      fileChannels[accessFileNum] = rafs[accessFileNum].getChannel();
296    } finally {
297      channelLock.unlock();
298    }
299  }
300
301  private interface FileAccessor {
302    int access(FileChannel fileChannel, ByteBuff buff, long accessOffset) throws IOException;
303  }
304
305  private static class FileReadAccessor implements FileAccessor {
306    @Override
307    public int access(FileChannel fileChannel, ByteBuff buff, long accessOffset)
308      throws IOException {
309      return buff.read(fileChannel, accessOffset);
310    }
311  }
312
313  private static class FileWriteAccessor implements FileAccessor {
314    @Override
315    public int access(FileChannel fileChannel, ByteBuff buff, long accessOffset)
316      throws IOException {
317      return buff.write(fileChannel, accessOffset);
318    }
319  }
320}