001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.function.Consumer;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.CellComparator;
029import org.apache.hadoop.hbase.monitoring.MonitoredTask;
030import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
031import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036/**
037 * Stripe implementation of StoreFlusher. Flushes files either into L0 file w/o metadata, or into
038 * separate striped files, avoiding L0.
039 */
040@InterfaceAudience.Private
041public class StripeStoreFlusher extends StoreFlusher {
042  private static final Logger LOG = LoggerFactory.getLogger(StripeStoreFlusher.class);
043  private final Object flushLock = new Object();
044  private final StripeCompactionPolicy policy;
045  private final StripeCompactionPolicy.StripeInformationProvider stripes;
046
047  public StripeStoreFlusher(Configuration conf, HStore store, StripeCompactionPolicy policy,
048    StripeStoreFileManager stripes) {
049    super(conf, store);
050    this.policy = policy;
051    this.stripes = stripes;
052  }
053
054  @Override
055  public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
056    MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker,
057    Consumer<Path> writerCreationTracker) throws IOException {
058    List<Path> result = new ArrayList<>();
059    int cellsCount = snapshot.getCellsCount();
060    if (cellsCount == 0) {
061      // don't flush if there are no entries
062      return result;
063    }
064
065    InternalScanner scanner = createScanner(snapshot.getScanners(), tracker);
066
067    // Let policy select flush method.
068    StripeFlushRequest req =
069      this.policy.selectFlush(store.getComparator(), this.stripes, cellsCount);
070
071    boolean success = false;
072    StripeMultiFileWriter mw = null;
073    try {
074      mw = req.createWriter(); // Writer according to the policy.
075      StripeMultiFileWriter.WriterFactory factory =
076        createWriterFactory(snapshot, writerCreationTracker);
077      StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner) scanner : null;
078      mw.init(storeScanner, factory);
079
080      synchronized (flushLock) {
081        performFlush(scanner, mw, throughputController);
082        result = mw.commitWriters(cacheFlushSeqNum, false);
083        success = true;
084      }
085    } finally {
086      if (!success && (mw != null)) {
087        for (Path leftoverFile : mw.abortWriters()) {
088          try {
089            store.getFileSystem().delete(leftoverFile, false);
090          } catch (Exception e) {
091            LOG.error("Failed to delete a file after failed flush: " + e);
092          }
093        }
094      }
095      try {
096        scanner.close();
097      } catch (IOException ex) {
098        LOG.warn("Failed to close flush scanner, ignoring", ex);
099      }
100    }
101    return result;
102  }
103
104  private StripeMultiFileWriter.WriterFactory createWriterFactory(MemStoreSnapshot snapshot,
105    Consumer<Path> writerCreationTracker) {
106    return new StripeMultiFileWriter.WriterFactory() {
107      @Override
108      public StoreFileWriter createWriter() throws IOException {
109        // XXX: it used to always pass true for includesTag, re-consider?
110        return StripeStoreFlusher.this.createWriter(snapshot, true, writerCreationTracker);
111      }
112    };
113  }
114
115  /** Stripe flush request wrapper that writes a non-striped file. */
116  public static class StripeFlushRequest {
117
118    protected final CellComparator comparator;
119
120    public StripeFlushRequest(CellComparator comparator) {
121      this.comparator = comparator;
122    }
123
124    public StripeMultiFileWriter createWriter() throws IOException {
125      StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter(comparator, 1,
126        Long.MAX_VALUE, OPEN_KEY, OPEN_KEY);
127      writer.setNoStripeMetadata();
128      return writer;
129    }
130  }
131
132  /** Stripe flush request wrapper based on boundaries. */
133  public static class BoundaryStripeFlushRequest extends StripeFlushRequest {
134    private final List<byte[]> targetBoundaries;
135
136    /** @param targetBoundaries New files should be written with these boundaries. */
137    public BoundaryStripeFlushRequest(CellComparator comparator, List<byte[]> targetBoundaries) {
138      super(comparator);
139      this.targetBoundaries = targetBoundaries;
140    }
141
142    @Override
143    public StripeMultiFileWriter createWriter() throws IOException {
144      return new StripeMultiFileWriter.BoundaryMultiWriter(comparator, targetBoundaries, null,
145        null);
146    }
147  }
148
149  /** Stripe flush request wrapper based on size. */
150  public static class SizeStripeFlushRequest extends StripeFlushRequest {
151    private final int targetCount;
152    private final long targetKvs;
153
154    /**
155     * @param targetCount The maximum number of stripes to flush into.
156     * @param targetKvs   The KV count of each segment. If targetKvs*targetCount is less than total
157     *                    number of kvs, all the overflow data goes into the last stripe.
158     */
159    public SizeStripeFlushRequest(CellComparator comparator, int targetCount, long targetKvs) {
160      super(comparator);
161      this.targetCount = targetCount;
162      this.targetKvs = targetKvs;
163    }
164
165    @Override
166    public StripeMultiFileWriter createWriter() throws IOException {
167      return new StripeMultiFileWriter.SizeMultiWriter(comparator, this.targetCount, this.targetKvs,
168        OPEN_KEY, OPEN_KEY);
169    }
170  }
171}