001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.List;
026
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.CellComparator;
030import org.apache.hadoop.hbase.monitoring.MonitoredTask;
031import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
032import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
037
038/**
039 * Stripe implementation of StoreFlusher. Flushes files either into L0 file w/o metadata, or
040 * into separate striped files, avoiding L0.
041 */
042@InterfaceAudience.Private
043public class StripeStoreFlusher extends StoreFlusher {
044  private static final Logger LOG = LoggerFactory.getLogger(StripeStoreFlusher.class);
045  private final Object flushLock = new Object();
046  private final StripeCompactionPolicy policy;
047  private final StripeCompactionPolicy.StripeInformationProvider stripes;
048
049  public StripeStoreFlusher(Configuration conf, HStore store,
050      StripeCompactionPolicy policy, StripeStoreFileManager stripes) {
051    super(conf, store);
052    this.policy = policy;
053    this.stripes = stripes;
054  }
055
056  @Override
057  public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
058      MonitoredTask status, ThroughputController throughputController,
059      FlushLifeCycleTracker tracker) throws IOException {
060    List<Path> result = new ArrayList<>();
061    int cellsCount = snapshot.getCellsCount();
062    if (cellsCount == 0) return result; // don't flush if there are no entries
063
064    InternalScanner scanner = createScanner(snapshot.getScanners(), tracker);
065
066    // Let policy select flush method.
067    StripeFlushRequest req = this.policy.selectFlush(store.getComparator(), this.stripes,
068      cellsCount);
069
070    boolean success = false;
071    StripeMultiFileWriter mw = null;
072    try {
073      mw = req.createWriter(); // Writer according to the policy.
074      StripeMultiFileWriter.WriterFactory factory = createWriterFactory(cellsCount);
075      StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
076      mw.init(storeScanner, factory);
077
078      synchronized (flushLock) {
079        performFlush(scanner, mw, throughputController);
080        result = mw.commitWriters(cacheFlushSeqNum, false);
081        success = true;
082      }
083    } finally {
084      if (!success && (mw != null)) {
085        for (Path leftoverFile : mw.abortWriters()) {
086          try {
087            store.getFileSystem().delete(leftoverFile, false);
088          } catch (Exception e) {
089            LOG.error("Failed to delete a file after failed flush: " + e);
090          }
091        }
092      }
093      try {
094        scanner.close();
095      } catch (IOException ex) {
096        LOG.warn("Failed to close flush scanner, ignoring", ex);
097      }
098    }
099    return result;
100  }
101
102  private StripeMultiFileWriter.WriterFactory createWriterFactory(final long kvCount) {
103    return new StripeMultiFileWriter.WriterFactory() {
104      @Override
105      public StoreFileWriter createWriter() throws IOException {
106        StoreFileWriter writer = store.createWriterInTmp(kvCount,
107            store.getColumnFamilyDescriptor().getCompressionType(), false, true, true, false);
108        return writer;
109      }
110    };
111  }
112
113  /** Stripe flush request wrapper that writes a non-striped file. */
114  public static class StripeFlushRequest {
115
116    protected final CellComparator comparator;
117
118    public StripeFlushRequest(CellComparator comparator) {
119      this.comparator = comparator;
120    }
121
122    @VisibleForTesting
123    public StripeMultiFileWriter createWriter() throws IOException {
124      StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter(comparator, 1,
125          Long.MAX_VALUE, OPEN_KEY, OPEN_KEY);
126      writer.setNoStripeMetadata();
127      return writer;
128    }
129  }
130
131  /** Stripe flush request wrapper based on boundaries. */
132  public static class BoundaryStripeFlushRequest extends StripeFlushRequest {
133    private final List<byte[]> targetBoundaries;
134
135    /** @param targetBoundaries New files should be written with these boundaries. */
136    public BoundaryStripeFlushRequest(CellComparator comparator, List<byte[]> targetBoundaries) {
137      super(comparator);
138      this.targetBoundaries = targetBoundaries;
139    }
140
141    @Override
142    public StripeMultiFileWriter createWriter() throws IOException {
143      return new StripeMultiFileWriter.BoundaryMultiWriter(comparator, targetBoundaries, null,
144          null);
145    }
146  }
147
148  /** Stripe flush request wrapper based on size. */
149  public static class SizeStripeFlushRequest extends StripeFlushRequest {
150    private final int targetCount;
151    private final long targetKvs;
152
153    /**
154     * @param targetCount The maximum number of stripes to flush into.
155     * @param targetKvs The KV count of each segment. If targetKvs*targetCount is less than
156     *                  total number of kvs, all the overflow data goes into the last stripe.
157     */
158    public SizeStripeFlushRequest(CellComparator comparator, int targetCount, long targetKvs) {
159      super(comparator);
160      this.targetCount = targetCount;
161      this.targetKvs = targetKvs;
162    }
163
164    @Override
165    public StripeMultiFileWriter createWriter() throws IOException {
166      return new StripeMultiFileWriter.SizeMultiWriter(comparator, this.targetCount, this.targetKvs,
167          OPEN_KEY, OPEN_KEY);
168    }
169  }
170}