001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure.flush;
019
020import java.util.Arrays;
021import java.util.Collections;
022import java.util.List;
023import java.util.concurrent.Callable;
024
025import org.apache.yetus.audience.InterfaceAudience;
026import org.slf4j.Logger;
027import org.slf4j.LoggerFactory;
028import org.apache.hadoop.hbase.errorhandling.ForeignException;
029import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
030import org.apache.hadoop.hbase.procedure.ProcedureMember;
031import org.apache.hadoop.hbase.procedure.Subprocedure;
032import org.apache.hadoop.hbase.procedure.flush.RegionServerFlushTableProcedureManager.FlushTableSubprocedurePool;
033import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
034import org.apache.hadoop.hbase.regionserver.HRegion;
035import org.apache.hadoop.hbase.util.Bytes;
036
037/**
038 * This flush region implementation uses the distributed procedure framework to flush
039 * table regions.
040 * Its acquireBarrier stage does nothing.  Its insideBarrier stage flushes the regions.
041 */
042@InterfaceAudience.Private
043public class FlushTableSubprocedure extends Subprocedure {
044  private static final Logger LOG = LoggerFactory.getLogger(FlushTableSubprocedure.class);
045
046  private final String table;
047  private final String family;
048  private final List<HRegion> regions;
049  private final FlushTableSubprocedurePool taskManager;
050
051  public FlushTableSubprocedure(ProcedureMember member,
052      ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout,
053      List<HRegion> regions, String table, String family,
054      FlushTableSubprocedurePool taskManager) {
055    super(member, table, errorListener, wakeFrequency, timeout);
056    this.table = table;
057    this.family = family;
058    this.regions = regions;
059    this.taskManager = taskManager;
060  }
061
062  private static class RegionFlushTask implements Callable<Void> {
063    HRegion region;
064    List<byte[]> families;
065    RegionFlushTask(HRegion region, List<byte[]> families) {
066      this.region = region;
067      this.families = families;
068    }
069
070    @Override
071    public Void call() throws Exception {
072      LOG.debug("Starting region operation on " + region);
073      region.startRegionOperation();
074      try {
075        LOG.debug("Flush region " + region.toString() + " started...");
076        if (families == null) {
077          region.flush(true);
078        } else {
079          region.flushcache(families, false, FlushLifeCycleTracker.DUMMY);
080        }
081        // TODO: flush result is not checked?
082      } finally {
083        LOG.debug("Closing region operation on " + region);
084        region.closeRegionOperation();
085      }
086      return null;
087    }
088  }
089
090  private void flushRegions() throws ForeignException {
091    if (regions.isEmpty()) {
092      // No regions on this RS, we are basically done.
093      return;
094    }
095
096    monitor.rethrowException();
097
098    // assert that the taskManager is empty.
099    if (taskManager.hasTasks()) {
100      throw new IllegalStateException("Attempting to flush "
101          + table + " but we currently have outstanding tasks");
102    }
103    List<byte[]> families = null;
104    if (family != null) {
105      LOG.debug("About to flush family {} on all regions for table {}", family, table);
106      families = Collections.singletonList(Bytes.toBytes(family));
107    }
108    // Add all hfiles already existing in region.
109    for (HRegion region : regions) {
110      // submit one task per region for parallelize by region.
111      taskManager.submitTask(new RegionFlushTask(region, families));
112      monitor.rethrowException();
113    }
114
115    // wait for everything to complete.
116    LOG.debug("Flush region tasks submitted for " + regions.size() + " regions");
117    try {
118      taskManager.waitForOutstandingTasks();
119    } catch (InterruptedException e) {
120      throw new ForeignException(getMemberName(), e);
121    }
122  }
123
124  /**
125   * Flush the online regions on this rs for the target table.
126   */
127  @Override
128  public void acquireBarrier() throws ForeignException {
129    flushRegions();
130  }
131
132  @Override
133  public byte[] insideBarrier() throws ForeignException {
134    // No-Op
135    return new byte[0];
136  }
137
138  /**
139   * Cancel threads if they haven't finished.
140   */
141  @Override
142  public void cleanup(Exception e) {
143    LOG.info("Aborting all flush region subprocedure task threads for '"
144        + table + "' due to error", e);
145    try {
146      taskManager.cancelTasks();
147    } catch (InterruptedException e1) {
148      Thread.currentThread().interrupt();
149    }
150  }
151
152  public void releaseBarrier() {
153    // NO OP
154  }
155
156}