001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.wal;
020
021import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID;
022import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ConcurrentMap;
029import java.util.concurrent.locks.Lock;
030import java.util.stream.Collectors;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.client.RegionInfo;
034// imports for classes still in regionserver.wal
035import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.KeyLocker;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * A WAL Provider that returns a WAL per group of regions.
044 *
045 * This provider follows the decorator pattern and mainly holds the logic for WAL grouping.
046 * WAL creation/roll/close is delegated to {@link #DELEGATE_PROVIDER}
047 *
048 * Region grouping is handled via {@link RegionGroupingStrategy} and can be configured via the
049 * property "hbase.wal.regiongrouping.strategy". Current strategy choices are
050 * <ul>
051 *   <li><em>defaultStrategy</em> : Whatever strategy this version of HBase picks. currently
052 *                                  "bounded".</li>
053 *   <li><em>identity</em> : each region belongs to its own group.</li>
054 *   <li><em>bounded</em> : bounded number of groups and region evenly assigned to each group.</li>
055 * </ul>
056 * Optionally, a FQCN to a custom implementation may be given.
057 */
058@InterfaceAudience.Private
059public class RegionGroupingProvider implements WALProvider {
060  private static final Logger LOG = LoggerFactory.getLogger(RegionGroupingProvider.class);
061
062  /**
063   * Map identifiers to a group number.
064   */
065  public static interface RegionGroupingStrategy {
066    String GROUP_NAME_DELIMITER = ".";
067
068    /**
069     * Given an identifier and a namespace, pick a group.
070     */
071    String group(final byte[] identifier, byte[] namespace);
072    void init(Configuration config, String providerId);
073  }
074
075  /**
076   * Maps between configuration names for strategies and implementation classes.
077   */
078  static enum Strategies {
079    defaultStrategy(BoundedGroupingStrategy.class),
080    identity(IdentityGroupingStrategy.class),
081    bounded(BoundedGroupingStrategy.class),
082    namespace(NamespaceGroupingStrategy.class);
083
084    final Class<? extends RegionGroupingStrategy> clazz;
085    Strategies(Class<? extends RegionGroupingStrategy> clazz) {
086      this.clazz = clazz;
087    }
088  }
089
090  /**
091   * instantiate a strategy from a config property.
092   * requires conf to have already been set (as well as anything the provider might need to read).
093   */
094  RegionGroupingStrategy getStrategy(final Configuration conf, final String key,
095      final String defaultValue) throws IOException {
096    Class<? extends RegionGroupingStrategy> clazz;
097    try {
098      clazz = Strategies.valueOf(conf.get(key, defaultValue)).clazz;
099    } catch (IllegalArgumentException exception) {
100      // Fall back to them specifying a class name
101      // Note that the passed default class shouldn't actually be used, since the above only fails
102      // when there is a config value present.
103      clazz = conf.getClass(key, IdentityGroupingStrategy.class, RegionGroupingStrategy.class);
104    }
105    LOG.info("Instantiating RegionGroupingStrategy of type " + clazz);
106    try {
107      final RegionGroupingStrategy result = clazz.getDeclaredConstructor().newInstance();
108      result.init(conf, providerId);
109      return result;
110    } catch (Exception e) {
111      LOG.error("couldn't set up region grouping strategy, check config key " +
112          REGION_GROUPING_STRATEGY);
113      LOG.debug("Exception details for failure to load region grouping strategy.", e);
114      throw new IOException("couldn't set up region grouping strategy", e);
115    }
116  }
117
118  public static final String REGION_GROUPING_STRATEGY = "hbase.wal.regiongrouping.strategy";
119  public static final String DEFAULT_REGION_GROUPING_STRATEGY = Strategies.defaultStrategy.name();
120
121  /** delegate provider for WAL creation/roll/close */
122  public static final String DELEGATE_PROVIDER = "hbase.wal.regiongrouping.delegate.provider";
123  public static final String DEFAULT_DELEGATE_PROVIDER = WALFactory.Providers.defaultProvider
124      .name();
125
126  private static final String META_WAL_GROUP_NAME = "meta";
127
128  /** A group-provider mapping, make sure one-one rather than many-one mapping */
129  private final ConcurrentMap<String, WALProvider> cached = new ConcurrentHashMap<>();
130
131  private final KeyLocker<String> createLock = new KeyLocker<>();
132
133  private RegionGroupingStrategy strategy;
134  private WALFactory factory;
135  private List<WALActionsListener> listeners = new ArrayList<>();
136  private String providerId;
137  private Class<? extends WALProvider> providerClass;
138
139  @Override
140  public void init(WALFactory factory, Configuration conf, String providerId) throws IOException {
141    if (null != strategy) {
142      throw new IllegalStateException("WALProvider.init should only be called once.");
143    }
144    this.factory = factory;
145
146    if (META_WAL_PROVIDER_ID.equals(providerId)) {
147      // do not change the provider id if it is for meta
148      this.providerId = providerId;
149    } else {
150      StringBuilder sb = new StringBuilder().append(factory.factoryId);
151      if (providerId != null) {
152        if (providerId.startsWith(WAL_FILE_NAME_DELIMITER)) {
153          sb.append(providerId);
154        } else {
155          sb.append(WAL_FILE_NAME_DELIMITER).append(providerId);
156        }
157      }
158      this.providerId = sb.toString();
159    }
160    this.strategy = getStrategy(conf, REGION_GROUPING_STRATEGY, DEFAULT_REGION_GROUPING_STRATEGY);
161    this.providerClass = factory.getProviderClass(DELEGATE_PROVIDER, DEFAULT_DELEGATE_PROVIDER);
162  }
163
164  private WALProvider createProvider(String group) throws IOException {
165    if (META_WAL_PROVIDER_ID.equals(providerId)) {
166      return factory.createProvider(providerClass, META_WAL_PROVIDER_ID);
167    } else {
168      return factory.createProvider(providerClass, group);
169    }
170  }
171
172  @Override
173  public List<WAL> getWALs() {
174    return cached.values().stream().flatMap(p -> p.getWALs().stream()).collect(Collectors.toList());
175  }
176
177  private WAL getWAL(String group) throws IOException {
178    WALProvider provider = cached.get(group);
179    if (provider == null) {
180      Lock lock = createLock.acquireLock(group);
181      try {
182        provider = cached.get(group);
183        if (provider == null) {
184          provider = createProvider(group);
185          listeners.forEach(provider::addWALActionsListener);
186          cached.put(group, provider);
187        }
188      } finally {
189        lock.unlock();
190      }
191    }
192    return provider.getWAL(null);
193  }
194
195  @Override
196  public WAL getWAL(RegionInfo region) throws IOException {
197    String group;
198    if (META_WAL_PROVIDER_ID.equals(this.providerId)) {
199      group = META_WAL_GROUP_NAME;
200    } else {
201      byte[] id;
202      byte[] namespace;
203      if (region != null) {
204        id = region.getEncodedNameAsBytes();
205        namespace = region.getTable().getNamespace();
206      } else {
207        id = HConstants.EMPTY_BYTE_ARRAY;
208        namespace = null;
209      }
210      group = strategy.group(id, namespace);
211    }
212    return getWAL(group);
213  }
214
215  @Override
216  public void shutdown() throws IOException {
217    // save the last exception and rethrow
218    IOException failure = null;
219    for (WALProvider provider: cached.values()) {
220      try {
221        provider.shutdown();
222      } catch (IOException e) {
223        LOG.error("Problem shutting down wal provider '" + provider + "': " + e.getMessage());
224        if (LOG.isDebugEnabled()) {
225          LOG.debug("Details of problem shutting down wal provider '" + provider + "'", e);
226        }
227        failure = e;
228      }
229    }
230    if (failure != null) {
231      throw failure;
232    }
233  }
234
235  @Override
236  public void close() throws IOException {
237    // save the last exception and rethrow
238    IOException failure = null;
239    for (WALProvider provider : cached.values()) {
240      try {
241        provider.close();
242      } catch (IOException e) {
243        LOG.error("Problem closing wal provider '" + provider + "': " + e.getMessage());
244        if (LOG.isDebugEnabled()) {
245          LOG.debug("Details of problem closing wal provider '" + provider + "'", e);
246        }
247        failure = e;
248      }
249    }
250    if (failure != null) {
251      throw failure;
252    }
253  }
254
255  static class IdentityGroupingStrategy implements RegionGroupingStrategy {
256    @Override
257    public void init(Configuration config, String providerId) {}
258    @Override
259    public String group(final byte[] identifier, final byte[] namespace) {
260      return Bytes.toString(identifier);
261    }
262  }
263
264  @Override
265  public long getNumLogFiles() {
266    long numLogFiles = 0;
267    for (WALProvider provider : cached.values()) {
268      numLogFiles += provider.getNumLogFiles();
269    }
270    return numLogFiles;
271  }
272
273  @Override
274  public long getLogFileSize() {
275    long logFileSize = 0;
276    for (WALProvider provider : cached.values()) {
277      logFileSize += provider.getLogFileSize();
278    }
279    return logFileSize;
280  }
281
282  @Override
283  public void addWALActionsListener(WALActionsListener listener) {
284    // Notice that there is an assumption that this method must be called before the getWAL above,
285    // so we can make sure there is no sub WALProvider yet, so we only add the listener to our
286    // listeners list without calling addWALActionListener for each WALProvider. Although it is no
287    // hurt to execute an extra loop to call addWALActionListener for each WALProvider, but if the
288    // extra code actually works, then we will have other big problems. So leave it as is.
289    listeners.add(listener);
290  }
291}