001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
022import static org.hamcrest.Matchers.allOf;
023import static org.hamcrest.Matchers.hasItem;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertTrue;
026
027import io.opentelemetry.api.trace.SpanKind;
028import io.opentelemetry.api.trace.StatusCode;
029import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
030import io.opentelemetry.sdk.trace.data.SpanData;
031import java.util.Objects;
032import java.util.Optional;
033import java.util.concurrent.CompletableFuture;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.HRegionLocation;
038import org.apache.hadoop.hbase.MatcherPredicate;
039import org.apache.hadoop.hbase.RegionLocations;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.Waiter;
043import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes;
044import org.hamcrest.Matcher;
045import org.junit.Before;
046import org.junit.ClassRule;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050public class TestTracingBase {
051  private static final Logger LOG = LoggerFactory.getLogger(TestTracingBase.class);
052
053  protected static final ServerName MASTER_HOST = ServerName.valueOf("localhost", 16010, 12345);
054  protected static final RegionLocations META_REGION_LOCATION =
055    new RegionLocations(new HRegionLocation(RegionInfoBuilder.FIRST_META_REGIONINFO, MASTER_HOST));
056
057  protected Configuration conf;
058
059  @ClassRule
060  public static OpenTelemetryRule TRACE_RULE = OpenTelemetryRule.create();
061
062  @Before
063  public void setUp() throws Exception {
064    conf = HBaseConfiguration.create();
065    conf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
066      RegistryForTracingTest.class.getName());
067    TRACE_RULE.clearSpans();
068  }
069
070  protected void assertTrace(String className, String methodName, ServerName serverName,
071    TableName tableName) {
072    String expectedSpanName = String.format("%s.%s", className, methodName);
073    Waiter.waitFor(conf, 1000,
074      () -> TRACE_RULE.getSpans().stream().anyMatch(span -> span.getName().equals(expectedSpanName)
075        && span.getKind() == SpanKind.INTERNAL && span.hasEnded()));
076    SpanData data = TRACE_RULE.getSpans().stream().filter(s -> s.getName().equals(expectedSpanName))
077      .findFirst().get();
078    assertEquals(StatusCode.OK, data.getStatus().getStatusCode());
079
080    if (serverName != null) {
081      Optional<SpanData> foundServerName =
082        TRACE_RULE.getSpans().stream().filter(s -> s.getName().equals(expectedSpanName))
083          .filter(s -> Objects.equals(serverName.getServerName(),
084            s.getAttributes().get(HBaseSemanticAttributes.SERVER_NAME_KEY)))
085          .findAny();
086      assertTrue(foundServerName.isPresent());
087    }
088
089    if (tableName != null) {
090      assertEquals(tableName.getNamespaceAsString(),
091        data.getAttributes().get(HBaseSemanticAttributes.DB_NAME));
092      assertEquals(tableName.getNameAsString(),
093        data.getAttributes().get(HBaseSemanticAttributes.TABLE_KEY));
094    }
095  }
096
097  protected SpanData waitSpan(String name) {
098    return waitSpan(hasName(name));
099  }
100
101  protected SpanData waitSpan(Matcher<SpanData> matcher) {
102    Matcher<SpanData> spanLocator = allOf(matcher, hasEnded());
103    try {
104      Waiter.waitFor(conf, 1000, new MatcherPredicate<>("waiting for span",
105        () -> TRACE_RULE.getSpans(), hasItem(spanLocator)));
106    } catch (AssertionError e) {
107      LOG.error("AssertionError while waiting for matching span. Span reservoir contains: {}",
108        TRACE_RULE.getSpans());
109      throw e;
110    }
111    return TRACE_RULE.getSpans().stream().filter(spanLocator::matches).findFirst()
112      .orElseThrow(AssertionError::new);
113  }
114
115  static class RegistryForTracingTest implements ConnectionRegistry {
116
117    public RegistryForTracingTest(Configuration conf) {
118    }
119
120    @Override
121    public CompletableFuture<RegionLocations> getMetaRegionLocations() {
122      return CompletableFuture.completedFuture(META_REGION_LOCATION);
123    }
124
125    @Override
126    public CompletableFuture<String> getClusterId() {
127      return CompletableFuture.completedFuture(HConstants.CLUSTER_ID_DEFAULT);
128    }
129
130    @Override
131    public CompletableFuture<ServerName> getActiveMaster() {
132      return CompletableFuture.completedFuture(MASTER_HOST);
133    }
134
135    @Override
136    public String getConnectionString() {
137      return "nothing";
138    }
139
140    @Override
141    public void close() {
142
143    }
144  }
145
146}