001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.IOException;
021import java.net.URI;
022import java.net.URISyntaxException;
023import java.util.Locale;
024import java.util.ServiceLoader;
025import org.apache.commons.lang3.StringUtils;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.security.User;
029import org.apache.hadoop.hbase.util.ReflectionUtils;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
035
036/**
037 * The entry point for creating a {@link ConnectionRegistry}.
038 */
039@InterfaceAudience.Private
040public final class ConnectionRegistryFactory {
041
042  private static final Logger LOG = LoggerFactory.getLogger(ConnectionRegistryFactory.class);
043
044  private static final ImmutableMap<String, ConnectionRegistryURIFactory> FACTORIES;
045  static {
046    ImmutableMap.Builder<String, ConnectionRegistryURIFactory> builder = ImmutableMap.builder();
047    for (ConnectionRegistryURIFactory factory : ServiceLoader
048      .load(ConnectionRegistryURIFactory.class)) {
049      builder.put(factory.getScheme().toLowerCase(), factory);
050    }
051    // throw IllegalArgumentException if there are duplicated keys
052    FACTORIES = builder.buildOrThrow();
053  }
054
055  private ConnectionRegistryFactory() {
056  }
057
058  /**
059   * Returns the connection registry implementation to use, for the given connection url
060   * {@code uri}.
061   * <p/>
062   * We use {@link ServiceLoader} to load different implementations, and use the scheme of the given
063   * {@code uri} to select. And if there is no protocol specified, or we can not find a
064   * {@link ConnectionRegistryURIFactory} implementation for the given scheme, we will fallback to
065   * use the old way to create the {@link ConnectionRegistry}. Notice that, if fallback happens, the
066   * specified connection url {@code uri} will not take effect, we will load all the related
067   * configurations from the given Configuration instance {@code conf}
068   */
069  static ConnectionRegistry create(URI uri, Configuration conf, User user) throws IOException {
070    if (StringUtils.isBlank(uri.getScheme())) {
071      LOG.warn("No scheme specified for {}, fallback to use old way", uri);
072      return create(conf, user);
073    }
074    ConnectionRegistryURIFactory factory = FACTORIES.get(uri.getScheme().toLowerCase());
075    if (factory == null) {
076      LOG.warn("No factory registered for {}, fallback to use old way", uri);
077      return create(conf, user);
078    }
079    return factory.create(uri, conf, user);
080  }
081
082  /**
083   * Returns the connection registry implementation to use.
084   * <p/>
085   * This is used when we do not have a connection url, we will use the old way to load the
086   * connection registry, by checking the
087   * {@literal HConstants#CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY} configuration.
088   */
089  static ConnectionRegistry create(Configuration conf, User user) {
090    Class<? extends ConnectionRegistry> clazz =
091      conf.getClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
092        RpcConnectionRegistry.class, ConnectionRegistry.class);
093    return ReflectionUtils.newInstance(clazz, conf, user);
094  }
095
096  /**
097   * Check whether the given {@code uri} is valid.
098   * <p/>
099   * Notice that there is no fallback logic for this method, so passing an URI with null scheme can
100   * not pass.
101   * @throws IOException if this is not a valid connection registry URI
102   */
103  public static void validate(URI uri) throws IOException {
104    if (StringUtils.isBlank(uri.getScheme())) {
105      throw new IOException("No schema for uri: " + uri);
106    }
107    ConnectionRegistryURIFactory factory = FACTORIES.get(uri.getScheme().toLowerCase(Locale.ROOT));
108    if (factory == null) {
109      throw new IOException(
110        "No factory registered for scheme " + uri.getScheme() + ", uri: " + uri);
111    }
112    factory.validate(uri);
113  }
114
115  /**
116   * If the given {@code clusterKey} can be parsed to a {@link URI}, and the scheme of the
117   * {@link URI} is supported by us, return the {@link URI}, otherwise return {@code null}.
118   * @param clusterKey the cluster key, typically from replication peer config
119   * @return a {@link URI} or {@code null}.
120   */
121  public static URI tryParseAsConnectionURI(String clusterKey) {
122    // The old cluster key format may not be parsed as URI if we use ip address as the zookeeper
123    // address, so here we need to catch the URISyntaxException and return false
124    URI uri;
125    try {
126      uri = new URI(clusterKey);
127    } catch (URISyntaxException e) {
128      LOG.debug("failed to parse cluster key to URI: {}", clusterKey, e);
129      return null;
130    }
131    if (FACTORIES.containsKey(uri.getScheme())) {
132      return uri;
133    } else {
134      return null;
135    }
136  }
137}