View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver.snapshot;
19  
20  import java.util.List;
21  import java.util.concurrent.Callable;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.hbase.classification.InterfaceAudience;
26  import org.apache.hadoop.hbase.classification.InterfaceStability;
27  import org.apache.hadoop.hbase.errorhandling.ForeignException;
28  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
29  import org.apache.hadoop.hbase.procedure.ProcedureMember;
30  import org.apache.hadoop.hbase.procedure.Subprocedure;
31  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
32  import org.apache.hadoop.hbase.regionserver.HRegion;
33  import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager.SnapshotSubprocedurePool;
34  import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
35  
36  /**
37   * This online snapshot implementation uses the distributed procedure framework to force a
38   * store flush and then records the hfiles.  Its enter stage does nothing.  Its leave stage then
39   * flushes the memstore, builds the region server's snapshot manifest from its hfiles list, and
40   * copies .regioninfos into the snapshot working directory.  At the master side, there is an atomic
41   * rename of the working dir into the proper snapshot directory.
42   */
43  @InterfaceAudience.Private
44  @InterfaceStability.Unstable
45  public class FlushSnapshotSubprocedure extends Subprocedure {
46    private static final Log LOG = LogFactory.getLog(FlushSnapshotSubprocedure.class);
47  
48    private final List<HRegion> regions;
49    private final SnapshotDescription snapshot;
50    private final SnapshotSubprocedurePool taskManager;
51    private boolean snapshotSkipFlush = false;
52  
53    public FlushSnapshotSubprocedure(ProcedureMember member,
54        ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout,
55        List<HRegion> regions, SnapshotDescription snapshot,
56        SnapshotSubprocedurePool taskManager) {
57      super(member, snapshot.getName(), errorListener, wakeFrequency, timeout);
58      this.snapshot = snapshot;
59  
60      if (this.snapshot.getType() == SnapshotDescription.Type.SKIPFLUSH) {
61        snapshotSkipFlush = true;
62      }
63      this.regions = regions;
64      this.taskManager = taskManager;
65    }
66  
67    /**
68     * Callable for adding files to snapshot manifest working dir.  Ready for multithreading.
69     */
70    private class RegionSnapshotTask implements Callable<Void> {
71      HRegion region;
72      RegionSnapshotTask(HRegion region) {
73        this.region = region;
74      }
75  
76      @Override
77      public Void call() throws Exception {
78        // Taking the region read lock prevents the individual region from being closed while a
79        // snapshot is in progress.  This is helpful but not sufficient for preventing races with
80        // snapshots that involve multiple regions and regionservers.  It is still possible to have
81        // an interleaving such that globally regions are missing, so we still need the verification
82        // step.
83        LOG.debug("Starting region operation on " + region);
84        region.startRegionOperation();
85        try {
86          if (snapshotSkipFlush) {
87          /*
88           * This is to take an online-snapshot without force a coordinated flush to prevent pause
89           * The snapshot type is defined inside the snapshot description. FlushSnapshotSubprocedure
90           * should be renamed to distributedSnapshotSubprocedure, and the flush() behavior can be
91           * turned on/off based on the flush type.
92           * To minimized the code change, class name is not changed.
93           */
94            LOG.debug("take snapshot without flush memstore first");
95          } else {
96            LOG.debug("Flush Snapshotting region " + region.toString() + " started...");
97            region.flushcache();
98          }
99          region.addRegionToSnapshot(snapshot, monitor);
100         if (snapshotSkipFlush) {
101           LOG.debug("... SkipFlush Snapshotting region " + region.toString() + " completed.");
102         } else {
103           LOG.debug("... Flush Snapshotting region " + region.toString() + " completed.");
104         }
105       } finally {
106         LOG.debug("Closing region operation on " + region);
107         region.closeRegionOperation();
108       }
109       return null;
110     }
111   }
112 
113   private void flushSnapshot() throws ForeignException {
114     if (regions.isEmpty()) {
115       // No regions on this RS, we are basically done.
116       return;
117     }
118 
119     monitor.rethrowException();
120 
121     // assert that the taskManager is empty.
122     if (taskManager.hasTasks()) {
123       throw new IllegalStateException("Attempting to take snapshot "
124           + ClientSnapshotDescriptionUtils.toString(snapshot)
125           + " but we currently have outstanding tasks");
126     }
127 
128     // Add all hfiles already existing in region.
129     for (HRegion region : regions) {
130       // submit one task per region for parallelize by region.
131       taskManager.submitTask(new RegionSnapshotTask(region));
132       monitor.rethrowException();
133     }
134 
135     // wait for everything to complete.
136     LOG.debug("Flush Snapshot Tasks submitted for " + regions.size() + " regions");
137     try {
138       taskManager.waitForOutstandingTasks();
139     } catch (InterruptedException e) {
140       throw new ForeignException(getMemberName(), e);
141     }
142   }
143 
144   /**
145    * do nothing, core of snapshot is executed in {@link #insideBarrier} step.
146    */
147   @Override
148   public void acquireBarrier() throws ForeignException {
149     // NO OP
150   }
151 
152   /**
153    * do a flush snapshot of every region on this rs from the target table.
154    */
155   @Override
156   public byte[] insideBarrier() throws ForeignException {
157     flushSnapshot();
158     return new byte[0];
159   }
160 
161   /**
162    * Cancel threads if they haven't finished.
163    */
164   @Override
165   public void cleanup(Exception e) {
166     LOG.info("Aborting all online FLUSH snapshot subprocedure task threads for '"
167         + snapshot.getName() + "' due to error", e);
168     try {
169       taskManager.cancelTasks();
170     } catch (InterruptedException e1) {
171       Thread.currentThread().interrupt();
172     }
173   }
174 
175   /**
176    * Hooray!
177    */
178   public void releaseBarrier() {
179     // NO OP
180   }
181 
182 }