001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID;
021
022import java.io.IOException;
023import java.util.List;
024import java.util.concurrent.ConcurrentHashMap;
025import java.util.concurrent.ConcurrentMap;
026import java.util.concurrent.locks.Lock;
027import java.util.stream.Collectors;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.client.RegionInfo;
033import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.hadoop.hbase.util.KeyLocker;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/**
041 * A WAL Provider that returns a WAL per group of regions. This provider follows the decorator
042 * pattern and mainly holds the logic for WAL grouping. WAL creation/roll/close is delegated to
043 * {@link #DELEGATE_PROVIDER} Region grouping is handled via {@link RegionGroupingStrategy} and can
044 * be configured via the property "hbase.wal.regiongrouping.strategy". Current strategy choices are
045 * <ul>
046 * <li><em>defaultStrategy</em> : Whatever strategy this version of HBase picks. currently
047 * "bounded".</li>
048 * <li><em>identity</em> : each region belongs to its own group.</li>
049 * <li><em>bounded</em> : bounded number of groups and region evenly assigned to each group.</li>
050 * </ul>
051 * Optionally, a FQCN to a custom implementation may be given.
052 */
053@InterfaceAudience.Private
054public class RegionGroupingProvider extends AbstractWALProvider {
055  private static final Logger LOG = LoggerFactory.getLogger(RegionGroupingProvider.class);
056
057  /**
058   * Map identifiers to a group number.
059   */
060  public static interface RegionGroupingStrategy {
061    String GROUP_NAME_DELIMITER = ".";
062
063    /**
064     * Given an identifier and a namespace, pick a group.
065     */
066    String group(final byte[] identifier, byte[] namespace);
067
068    void init(Configuration config, String providerId);
069  }
070
071  /**
072   * Maps between configuration names for strategies and implementation classes.
073   */
074  static enum Strategies {
075    defaultStrategy(BoundedGroupingStrategy.class),
076    identity(IdentityGroupingStrategy.class),
077    bounded(BoundedGroupingStrategy.class),
078    namespace(NamespaceGroupingStrategy.class);
079
080    final Class<? extends RegionGroupingStrategy> clazz;
081
082    Strategies(Class<? extends RegionGroupingStrategy> clazz) {
083      this.clazz = clazz;
084    }
085  }
086
087  /**
088   * instantiate a strategy from a config property. requires conf to have already been set (as well
089   * as anything the provider might need to read).
090   */
091  RegionGroupingStrategy getStrategy(final Configuration conf, final String key,
092    final String defaultValue) throws IOException {
093    Class<? extends RegionGroupingStrategy> clazz;
094    try {
095      clazz = Strategies.valueOf(conf.get(key, defaultValue)).clazz;
096    } catch (IllegalArgumentException exception) {
097      // Fall back to them specifying a class name
098      // Note that the passed default class shouldn't actually be used, since the above only fails
099      // when there is a config value present.
100      clazz = conf.getClass(key, IdentityGroupingStrategy.class, RegionGroupingStrategy.class);
101    }
102    LOG.info("Instantiating RegionGroupingStrategy of type " + clazz);
103    try {
104      final RegionGroupingStrategy result = clazz.getDeclaredConstructor().newInstance();
105      result.init(conf, providerId);
106      return result;
107    } catch (Exception e) {
108      LOG.error(
109        "couldn't set up region grouping strategy, check config key " + REGION_GROUPING_STRATEGY);
110      LOG.debug("Exception details for failure to load region grouping strategy.", e);
111      throw new IOException("couldn't set up region grouping strategy", e);
112    }
113  }
114
115  public static final String REGION_GROUPING_STRATEGY = "hbase.wal.regiongrouping.strategy";
116  public static final String DEFAULT_REGION_GROUPING_STRATEGY = Strategies.defaultStrategy.name();
117
118  /** delegate provider for WAL creation/roll/close, but not support multiwal */
119  public static final String DELEGATE_PROVIDER = "hbase.wal.regiongrouping.delegate.provider";
120  public static final String DEFAULT_DELEGATE_PROVIDER =
121    WALFactory.Providers.defaultProvider.name();
122
123  private static final String META_WAL_GROUP_NAME = "meta";
124
125  /** A group-provider mapping, make sure one-one rather than many-one mapping */
126  private final ConcurrentMap<String, WALProvider> cached = new ConcurrentHashMap<>();
127
128  private final KeyLocker<String> createLock = new KeyLocker<>();
129
130  private RegionGroupingStrategy strategy;
131
132  private Class<? extends WALProvider> providerClass;
133
134  @Override
135  protected void doInit(WALFactory factory, Configuration conf, String providerId)
136    throws IOException {
137    if (null != strategy) {
138      throw new IllegalStateException("WALProvider.init should only be called once.");
139    }
140
141    if (META_WAL_PROVIDER_ID.equals(providerId)) {
142      // do not change the provider id if it is for meta
143      this.providerId = providerId;
144    } else {
145      StringBuilder sb = new StringBuilder().append(factory.factoryId);
146      if (providerId != null) {
147        if (providerId.startsWith(WAL_FILE_NAME_DELIMITER)) {
148          sb.append(providerId);
149        } else {
150          sb.append(WAL_FILE_NAME_DELIMITER).append(providerId);
151        }
152      }
153      this.providerId = sb.toString();
154    }
155    this.strategy = getStrategy(conf, REGION_GROUPING_STRATEGY, DEFAULT_REGION_GROUPING_STRATEGY);
156    this.providerClass = factory.getProviderClass(DELEGATE_PROVIDER, DEFAULT_DELEGATE_PROVIDER);
157    if (providerClass.equals(this.getClass())) {
158      LOG.warn("delegate provider not support multiwal, falling back to defaultProvider.");
159      providerClass = factory.getDefaultProvider().clazz;
160    }
161  }
162
163  private WALProvider createProvider(String group) throws IOException {
164    WALProvider provider = WALFactory.createProvider(providerClass);
165    provider.init(factory, conf,
166      META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : group, this.abortable);
167    provider.addWALActionsListener(new MetricsWAL());
168    return provider;
169  }
170
171  private WALProvider getWALProvider(RegionInfo region) throws IOException {
172    String group;
173    if (META_WAL_PROVIDER_ID.equals(this.providerId)) {
174      group = META_WAL_GROUP_NAME;
175    } else {
176      byte[] id;
177      byte[] namespace;
178      if (region != null) {
179        id = region.getEncodedNameAsBytes();
180        namespace = region.getTable().getNamespace();
181      } else {
182        id = HConstants.EMPTY_BYTE_ARRAY;
183        namespace = null;
184      }
185      group = strategy.group(id, namespace);
186    }
187    WALProvider provider = cached.get(group);
188    if (provider == null) {
189      Lock lock = createLock.acquireLock(group);
190      try {
191        provider = cached.get(group);
192        if (provider == null) {
193          provider = createProvider(group);
194          // Notice that there is an assumption that the addWALActionsListener method must be called
195          // before the getWAL method, so we can make sure there is no sub WALProvider yet, so we
196          // only add the listener to our listeners list without calling addWALActionListener for
197          // each WALProvider. Although it is no hurt to execute an extra loop to call
198          // addWALActionListener for each WALProvider, but if the extra code actually works, then
199          // we will have other big problems. So leave it as is.
200          listeners.forEach(provider::addWALActionsListener);
201          cached.put(group, provider);
202        }
203      } finally {
204        lock.unlock();
205      }
206    }
207    return provider;
208  }
209
210  @Override
211  protected WAL getWAL0(RegionInfo region) throws IOException {
212    String group;
213    if (META_WAL_PROVIDER_ID.equals(this.providerId)) {
214      group = META_WAL_GROUP_NAME;
215    } else {
216      byte[] id;
217      byte[] namespace;
218      if (region != null) {
219        id = region.getEncodedNameAsBytes();
220        namespace = region.getTable().getNamespace();
221      } else {
222        id = HConstants.EMPTY_BYTE_ARRAY;
223        namespace = null;
224      }
225      group = strategy.group(id, namespace);
226    }
227    return getWAL(group);
228  }
229
230  @Override
231  protected List<WAL> getWALs0() {
232    return cached.values().stream().flatMap(p -> p.getWALs().stream()).collect(Collectors.toList());
233  }
234
235  private WAL getWAL(String group) throws IOException {
236    WALProvider provider = cached.get(group);
237    if (provider == null) {
238      Lock lock = createLock.acquireLock(group);
239      try {
240        provider = cached.get(group);
241        if (provider == null) {
242          provider = createProvider(group);
243          listeners.forEach(provider::addWALActionsListener);
244          cached.put(group, provider);
245        }
246      } finally {
247        lock.unlock();
248      }
249    }
250    return provider.getWAL(null);
251  }
252
253  static class IdentityGroupingStrategy implements RegionGroupingStrategy {
254    @Override
255    public void init(Configuration config, String providerId) {
256    }
257
258    @Override
259    public String group(final byte[] identifier, final byte[] namespace) {
260      return Bytes.toString(identifier);
261    }
262  }
263
264  @Override
265  protected WAL createRemoteWAL(RegionInfo region, FileSystem remoteFs, Path remoteWALDir,
266    String prefix, String suffix) throws IOException {
267    WALProvider provider = getWALProvider(region);
268    if (provider instanceof AbstractWALProvider) {
269      return ((AbstractWALProvider) provider).createRemoteWAL(region, remoteFs, remoteWALDir,
270        prefix, suffix);
271    }
272    throw new IOException(
273      provider.getClass().getSimpleName() + " does not support creating remote WAL");
274  }
275
276  @Override
277  protected void shutdown0() throws IOException {
278    // save the last exception and rethrow
279    IOException failure = null;
280    for (WALProvider provider : cached.values()) {
281      try {
282        provider.shutdown();
283      } catch (IOException e) {
284        LOG.error("Problem shutting down wal provider '" + provider + "': " + e.getMessage());
285        if (LOG.isDebugEnabled()) {
286          LOG.debug("Details of problem shutting down wal provider '" + provider + "'", e);
287        }
288        failure = e;
289      }
290    }
291    if (failure != null) {
292      throw failure;
293    }
294  }
295
296  @Override
297  protected void close0() throws IOException {
298    // save the last exception and rethrow
299    IOException failure = null;
300    for (WALProvider provider : cached.values()) {
301      try {
302        provider.close();
303      } catch (IOException e) {
304        LOG.error("Problem closing wal provider '" + provider + "': " + e.getMessage());
305        if (LOG.isDebugEnabled()) {
306          LOG.debug("Details of problem closing wal provider '" + provider + "'", e);
307        }
308        failure = e;
309      }
310    }
311    if (failure != null) {
312      throw failure;
313    }
314  }
315
316  @Override
317  protected long getNumLogFiles0() {
318    long numLogFiles = 0;
319    for (WALProvider provider : cached.values()) {
320      numLogFiles += provider.getNumLogFiles();
321    }
322    return numLogFiles;
323  }
324
325  @Override
326  protected long getLogFileSize0() {
327    long logFileSize = 0;
328    for (WALProvider provider : cached.values()) {
329      logFileSize += provider.getLogFileSize();
330    }
331    return logFileSize;
332  }
333
334}