001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.List;
026
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.CellComparator;
030import org.apache.hadoop.hbase.monitoring.MonitoredTask;
031import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
032import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037/**
038 * Stripe implementation of StoreFlusher. Flushes files either into L0 file w/o metadata, or
039 * into separate striped files, avoiding L0.
040 */
041@InterfaceAudience.Private
042public class StripeStoreFlusher extends StoreFlusher {
043  private static final Logger LOG = LoggerFactory.getLogger(StripeStoreFlusher.class);
044  private final Object flushLock = new Object();
045  private final StripeCompactionPolicy policy;
046  private final StripeCompactionPolicy.StripeInformationProvider stripes;
047
048  public StripeStoreFlusher(Configuration conf, HStore store,
049      StripeCompactionPolicy policy, StripeStoreFileManager stripes) {
050    super(conf, store);
051    this.policy = policy;
052    this.stripes = stripes;
053  }
054
055  @Override
056  public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
057      MonitoredTask status, ThroughputController throughputController,
058      FlushLifeCycleTracker tracker) throws IOException {
059    List<Path> result = new ArrayList<>();
060    int cellsCount = snapshot.getCellsCount();
061    if (cellsCount == 0) return result; // don't flush if there are no entries
062
063    InternalScanner scanner = createScanner(snapshot.getScanners(), tracker);
064
065    // Let policy select flush method.
066    StripeFlushRequest req = this.policy.selectFlush(store.getComparator(), this.stripes,
067      cellsCount);
068
069    boolean success = false;
070    StripeMultiFileWriter mw = null;
071    try {
072      mw = req.createWriter(); // Writer according to the policy.
073      StripeMultiFileWriter.WriterFactory factory = createWriterFactory(cellsCount);
074      StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
075      mw.init(storeScanner, factory);
076
077      synchronized (flushLock) {
078        performFlush(scanner, mw, throughputController);
079        result = mw.commitWriters(cacheFlushSeqNum, false);
080        success = true;
081      }
082    } finally {
083      if (!success && (mw != null)) {
084        for (Path leftoverFile : mw.abortWriters()) {
085          try {
086            store.getFileSystem().delete(leftoverFile, false);
087          } catch (Exception e) {
088            LOG.error("Failed to delete a file after failed flush: " + e);
089          }
090        }
091      }
092      try {
093        scanner.close();
094      } catch (IOException ex) {
095        LOG.warn("Failed to close flush scanner, ignoring", ex);
096      }
097    }
098    return result;
099  }
100
101  private StripeMultiFileWriter.WriterFactory createWriterFactory(final long kvCount) {
102    return new StripeMultiFileWriter.WriterFactory() {
103      @Override
104      public StoreFileWriter createWriter() throws IOException {
105        StoreFileWriter writer = store.createWriterInTmp(kvCount,
106            store.getColumnFamilyDescriptor().getCompressionType(), false, true, true, false);
107        return writer;
108      }
109    };
110  }
111
112  /** Stripe flush request wrapper that writes a non-striped file. */
113  public static class StripeFlushRequest {
114
115    protected final CellComparator comparator;
116
117    public StripeFlushRequest(CellComparator comparator) {
118      this.comparator = comparator;
119    }
120
121    public StripeMultiFileWriter createWriter() throws IOException {
122      StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter(comparator, 1,
123          Long.MAX_VALUE, OPEN_KEY, OPEN_KEY);
124      writer.setNoStripeMetadata();
125      return writer;
126    }
127  }
128
129  /** Stripe flush request wrapper based on boundaries. */
130  public static class BoundaryStripeFlushRequest extends StripeFlushRequest {
131    private final List<byte[]> targetBoundaries;
132
133    /** @param targetBoundaries New files should be written with these boundaries. */
134    public BoundaryStripeFlushRequest(CellComparator comparator, List<byte[]> targetBoundaries) {
135      super(comparator);
136      this.targetBoundaries = targetBoundaries;
137    }
138
139    @Override
140    public StripeMultiFileWriter createWriter() throws IOException {
141      return new StripeMultiFileWriter.BoundaryMultiWriter(comparator, targetBoundaries, null,
142          null);
143    }
144  }
145
146  /** Stripe flush request wrapper based on size. */
147  public static class SizeStripeFlushRequest extends StripeFlushRequest {
148    private final int targetCount;
149    private final long targetKvs;
150
151    /**
152     * @param targetCount The maximum number of stripes to flush into.
153     * @param targetKvs The KV count of each segment. If targetKvs*targetCount is less than
154     *                  total number of kvs, all the overflow data goes into the last stripe.
155     */
156    public SizeStripeFlushRequest(CellComparator comparator, int targetCount, long targetKvs) {
157      super(comparator);
158      this.targetCount = targetCount;
159      this.targetKvs = targetKvs;
160    }
161
162    @Override
163    public StripeMultiFileWriter createWriter() throws IOException {
164      return new StripeMultiFileWriter.SizeMultiWriter(comparator, this.targetCount, this.targetKvs,
165          OPEN_KEY, OPEN_KEY);
166    }
167  }
168}