001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Map;
030import java.util.Random;
031import java.util.Set;
032import java.util.concurrent.ThreadLocalRandom;
033import java.util.concurrent.atomic.AtomicBoolean;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.FileUtil;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.Cell;
039import org.apache.hadoop.hbase.CellUtil;
040import org.apache.hadoop.hbase.HBaseClassTestRule;
041import org.apache.hadoop.hbase.HBaseTestingUtility;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.HRegionInfo;
044import org.apache.hadoop.hbase.KeyValue;
045import org.apache.hadoop.hbase.Stoppable;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.TableNotFoundException;
048import org.apache.hadoop.hbase.client.Admin;
049import org.apache.hadoop.hbase.client.Connection;
050import org.apache.hadoop.hbase.client.ConnectionFactory;
051import org.apache.hadoop.hbase.client.Get;
052import org.apache.hadoop.hbase.client.RegionLocator;
053import org.apache.hadoop.hbase.client.Result;
054import org.apache.hadoop.hbase.client.ResultScanner;
055import org.apache.hadoop.hbase.client.RetriesExhaustedException;
056import org.apache.hadoop.hbase.client.Scan;
057import org.apache.hadoop.hbase.client.Table;
058import org.apache.hadoop.hbase.testclassification.LargeTests;
059import org.apache.hadoop.hbase.testclassification.ReplicationTests;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.hbase.util.CommonFSUtils;
062import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
063import org.apache.hadoop.hbase.util.HFileTestUtil;
064import org.junit.AfterClass;
065import org.junit.Assert;
066import org.junit.Before;
067import org.junit.BeforeClass;
068import org.junit.ClassRule;
069import org.junit.Test;
070import org.junit.experimental.categories.Category;
071import org.slf4j.Logger;
072import org.slf4j.LoggerFactory;
073
074import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
075
076import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.UUID;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey;
081
082@Category({ ReplicationTests.class, LargeTests.class })
083public class TestReplicationSink {
084
085  @ClassRule
086  public static final HBaseClassTestRule CLASS_RULE =
087    HBaseClassTestRule.forClass(TestReplicationSink.class);
088
089  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSink.class);
090  private static final int BATCH_SIZE = 10;
091
092  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
093
094  protected static ReplicationSink SINK;
095
096  protected static final TableName TABLE_NAME1 = TableName.valueOf("table1");
097  protected static final TableName TABLE_NAME2 = TableName.valueOf("table2");
098
099  protected static final byte[] FAM_NAME1 = Bytes.toBytes("info1");
100  protected static final byte[] FAM_NAME2 = Bytes.toBytes("info2");
101
102  protected static Table table1;
103  protected static Stoppable STOPPABLE = new Stoppable() {
104    final AtomicBoolean stop = new AtomicBoolean(false);
105
106    @Override
107    public boolean isStopped() {
108      return this.stop.get();
109    }
110
111    @Override
112    public void stop(String why) {
113      LOG.info("STOPPING BECAUSE: " + why);
114      this.stop.set(true);
115    }
116
117  };
118
119  protected static Table table2;
120  protected static String baseNamespaceDir;
121  protected static String hfileArchiveDir;
122  protected static String replicationClusterId;
123
124  /**
125   * @throws java.lang.Exception
126   */
127  @BeforeClass
128  public static void setUpBeforeClass() throws Exception {
129    TEST_UTIL.getConfiguration().set("hbase.replication.source.fs.conf.provider",
130      TestSourceFSConfigurationProvider.class.getCanonicalName());
131    TEST_UTIL.startMiniCluster(3);
132    SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()));
133    table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
134    table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
135    Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration());
136    baseNamespaceDir = new Path(rootDir, new Path(HConstants.BASE_NAMESPACE_DIR)).toString();
137    hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY)).toString();
138    replicationClusterId = "12345";
139  }
140
141  /**
142   * @throws java.lang.Exception
143   */
144  @AfterClass
145  public static void tearDownAfterClass() throws Exception {
146    STOPPABLE.stop("Shutting down");
147    TEST_UTIL.shutdownMiniCluster();
148  }
149
150  /**
151   * @throws java.lang.Exception
152   */
153  @Before
154  public void setUp() throws Exception {
155    table1 = TEST_UTIL.deleteTableData(TABLE_NAME1);
156    table2 = TEST_UTIL.deleteTableData(TABLE_NAME2);
157  }
158
159  /**
160   * Insert a whole batch of entries n
161   */
162  @Test
163  public void testBatchSink() throws Exception {
164    List<WALEntry> entries = new ArrayList<>(BATCH_SIZE);
165    List<Cell> cells = new ArrayList<>();
166    for (int i = 0; i < BATCH_SIZE; i++) {
167      entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
168    }
169    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
170      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
171    Scan scan = new Scan();
172    ResultScanner scanRes = table1.getScanner(scan);
173    assertEquals(BATCH_SIZE, scanRes.next(BATCH_SIZE).length);
174  }
175
176  /**
177   * Insert a mix of puts and deletes n
178   */
179  @Test
180  public void testMixedPutDelete() throws Exception {
181    List<WALEntry> entries = new ArrayList<>(BATCH_SIZE / 2);
182    List<Cell> cells = new ArrayList<>();
183    for (int i = 0; i < BATCH_SIZE / 2; i++) {
184      entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
185    }
186    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId,
187      baseNamespaceDir, hfileArchiveDir);
188
189    entries = new ArrayList<>(BATCH_SIZE);
190    cells = new ArrayList<>();
191    for (int i = 0; i < BATCH_SIZE; i++) {
192      entries.add(createEntry(TABLE_NAME1, i,
193        i % 2 != 0 ? KeyValue.Type.Put : KeyValue.Type.DeleteColumn, cells));
194    }
195
196    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
197      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
198    Scan scan = new Scan();
199    ResultScanner scanRes = table1.getScanner(scan);
200    assertEquals(BATCH_SIZE / 2, scanRes.next(BATCH_SIZE).length);
201  }
202
203  @Test
204  public void testLargeEditsPutDelete() throws Exception {
205    List<WALEntry> entries = new ArrayList<>();
206    List<Cell> cells = new ArrayList<>();
207    for (int i = 0; i < 5510; i++) {
208      entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
209    }
210    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId,
211      baseNamespaceDir, hfileArchiveDir);
212
213    ResultScanner resultScanner = table1.getScanner(new Scan());
214    int totalRows = 0;
215    while (resultScanner.next() != null) {
216      totalRows++;
217    }
218    assertEquals(5510, totalRows);
219
220    entries = new ArrayList<>();
221    cells = new ArrayList<>();
222    for (int i = 0; i < 11000; i++) {
223      entries.add(createEntry(TABLE_NAME1, i,
224        i % 2 != 0 ? KeyValue.Type.Put : KeyValue.Type.DeleteColumn, cells));
225    }
226    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId,
227      baseNamespaceDir, hfileArchiveDir);
228    resultScanner = table1.getScanner(new Scan());
229    totalRows = 0;
230    while (resultScanner.next() != null) {
231      totalRows++;
232    }
233    assertEquals(5500, totalRows);
234  }
235
236  /**
237   * Insert to 2 different tables n
238   */
239  @Test
240  public void testMixedPutTables() throws Exception {
241    List<WALEntry> entries = new ArrayList<>(BATCH_SIZE / 2);
242    List<Cell> cells = new ArrayList<>();
243    for (int i = 0; i < BATCH_SIZE; i++) {
244      entries.add(createEntry(i % 2 == 0 ? TABLE_NAME2 : TABLE_NAME1, i, KeyValue.Type.Put, cells));
245    }
246
247    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
248      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
249    Scan scan = new Scan();
250    ResultScanner scanRes = table2.getScanner(scan);
251    for (Result res : scanRes) {
252      assertEquals(0, Bytes.toInt(res.getRow()) % 2);
253    }
254    scanRes = table1.getScanner(scan);
255    for (Result res : scanRes) {
256      assertEquals(1, Bytes.toInt(res.getRow()) % 2);
257    }
258  }
259
260  /**
261   * Insert then do different types of deletes n
262   */
263  @Test
264  public void testMixedDeletes() throws Exception {
265    List<WALEntry> entries = new ArrayList<>(3);
266    List<Cell> cells = new ArrayList<>();
267    for (int i = 0; i < 3; i++) {
268      entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
269    }
270    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
271      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
272    entries = new ArrayList<>(3);
273    cells = new ArrayList<>();
274    entries.add(createEntry(TABLE_NAME1, 0, KeyValue.Type.DeleteColumn, cells));
275    entries.add(createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily, cells));
276    entries.add(createEntry(TABLE_NAME1, 2, KeyValue.Type.DeleteColumn, cells));
277
278    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
279      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
280
281    Scan scan = new Scan();
282    ResultScanner scanRes = table1.getScanner(scan);
283    assertEquals(0, scanRes.next(3).length);
284  }
285
286  /**
287   * Puts are buffered, but this tests when a delete (not-buffered) is applied before the actual Put
288   * that creates it. n
289   */
290  @Test
291  public void testApplyDeleteBeforePut() throws Exception {
292    List<WALEntry> entries = new ArrayList<>(5);
293    List<Cell> cells = new ArrayList<>();
294    for (int i = 0; i < 2; i++) {
295      entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
296    }
297    entries.add(createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily, cells));
298    for (int i = 3; i < 5; i++) {
299      entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
300    }
301    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
302      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
303    Get get = new Get(Bytes.toBytes(1));
304    Result res = table1.get(get);
305    assertEquals(0, res.size());
306  }
307
308  @Test
309  public void testRethrowRetriesExhaustedWithDetailsException() throws Exception {
310    TableName notExistTable = TableName.valueOf("notExistTable");
311    List<WALEntry> entries = new ArrayList<>();
312    List<Cell> cells = new ArrayList<>();
313    for (int i = 0; i < 10; i++) {
314      entries.add(createEntry(notExistTable, i, KeyValue.Type.Put, cells));
315    }
316    try {
317      SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
318        replicationClusterId, baseNamespaceDir, hfileArchiveDir);
319      Assert.fail("Should re-throw TableNotFoundException.");
320    } catch (TableNotFoundException e) {
321    }
322    entries.clear();
323    cells.clear();
324    for (int i = 0; i < 10; i++) {
325      entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
326    }
327    try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) {
328      try (Admin admin = conn.getAdmin()) {
329        admin.disableTable(TABLE_NAME1);
330        try {
331          SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
332            replicationClusterId, baseNamespaceDir, hfileArchiveDir);
333          Assert.fail("Should re-throw RetriesExhaustedException.");
334        } catch (RetriesExhaustedException e) {
335        } finally {
336          admin.enableTable(TABLE_NAME1);
337        }
338      }
339    }
340  }
341
342  /**
343   * Test replicateEntries with a bulk load entry for 25 HFiles
344   */
345  @Test
346  public void testReplicateEntriesForHFiles() throws Exception {
347    Path dir = TEST_UTIL.getDataTestDirOnTestFS("testReplicateEntries");
348    Path familyDir = new Path(dir, Bytes.toString(FAM_NAME1));
349    int numRows = 10;
350    List<Path> p = new ArrayList<>(1);
351    final String hfilePrefix = "hfile-";
352
353    // 1. Generate 25 hfile ranges
354    Random rand = ThreadLocalRandom.current();
355    Set<Integer> numbers = new HashSet<>();
356    while (numbers.size() < 50) {
357      numbers.add(rand.nextInt(1000));
358    }
359    List<Integer> numberList = new ArrayList<>(numbers);
360    Collections.sort(numberList);
361    Map<String, Long> storeFilesSize = new HashMap<>(1);
362
363    // 2. Create 25 hfiles
364    Configuration conf = TEST_UTIL.getConfiguration();
365    FileSystem fs = dir.getFileSystem(conf);
366    Iterator<Integer> numbersItr = numberList.iterator();
367    for (int i = 0; i < 25; i++) {
368      Path hfilePath = new Path(familyDir, hfilePrefix + i);
369      HFileTestUtil.createHFile(conf, fs, hfilePath, FAM_NAME1, FAM_NAME1,
370        Bytes.toBytes(numbersItr.next()), Bytes.toBytes(numbersItr.next()), numRows);
371      p.add(hfilePath);
372      storeFilesSize.put(hfilePath.getName(), fs.getFileStatus(hfilePath).getLen());
373    }
374
375    // 3. Create a BulkLoadDescriptor and a WALEdit
376    Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
377    storeFiles.put(FAM_NAME1, p);
378    org.apache.hadoop.hbase.wal.WALEdit edit = null;
379    WALProtos.BulkLoadDescriptor loadDescriptor = null;
380
381    try (Connection c = ConnectionFactory.createConnection(conf);
382      RegionLocator l = c.getRegionLocator(TABLE_NAME1)) {
383      HRegionInfo regionInfo = l.getAllRegionLocations().get(0).getRegionInfo();
384      loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(TABLE_NAME1,
385        UnsafeByteOperations.unsafeWrap(regionInfo.getEncodedNameAsBytes()), storeFiles,
386        storeFilesSize, 1);
387      edit = org.apache.hadoop.hbase.wal.WALEdit.createBulkLoadEvent(regionInfo, loadDescriptor);
388    }
389    List<WALEntry> entries = new ArrayList<>(1);
390
391    // 4. Create a WALEntryBuilder
392    WALEntry.Builder builder = createWALEntryBuilder(TABLE_NAME1);
393
394    // 5. Copy the hfile to the path as it is in reality
395    for (int i = 0; i < 25; i++) {
396      String pathToHfileFromNS = new StringBuilder(100).append(TABLE_NAME1.getNamespaceAsString())
397        .append(Path.SEPARATOR).append(Bytes.toString(TABLE_NAME1.getName())).append(Path.SEPARATOR)
398        .append(Bytes.toString(loadDescriptor.getEncodedRegionName().toByteArray()))
399        .append(Path.SEPARATOR).append(Bytes.toString(FAM_NAME1)).append(Path.SEPARATOR)
400        .append(hfilePrefix + i).toString();
401      String dst = baseNamespaceDir + Path.SEPARATOR + pathToHfileFromNS;
402      Path dstPath = new Path(dst);
403      FileUtil.copy(fs, p.get(0), fs, dstPath, false, conf);
404    }
405
406    entries.add(builder.build());
407    try (ResultScanner scanner = table1.getScanner(new Scan())) {
408      // 6. Assert no existing data in table
409      assertEquals(0, scanner.next(numRows).length);
410    }
411    // 7. Replicate the bulk loaded entry
412    SINK.replicateEntries(entries, CellUtil.createCellScanner(edit.getCells().iterator()),
413      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
414    try (ResultScanner scanner = table1.getScanner(new Scan())) {
415      // 8. Assert data is replicated
416      assertEquals(numRows, scanner.next(numRows).length);
417    }
418    // Clean up the created hfiles or it will mess up subsequent tests
419  }
420
421  /**
422   * Test failure metrics produced for failed replication edits
423   */
424  @Test
425  public void testFailedReplicationSinkMetrics() throws IOException {
426    long initialFailedBatches = SINK.getSinkMetrics().getFailedBatches();
427    long errorCount = 0L;
428    List<WALEntry> entries = new ArrayList<>(BATCH_SIZE);
429    List<Cell> cells = new ArrayList<>();
430    for (int i = 0; i < BATCH_SIZE; i++) {
431      entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
432    }
433    cells.clear(); // cause IndexOutOfBoundsException
434    try {
435      SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
436        replicationClusterId, baseNamespaceDir, hfileArchiveDir);
437      Assert.fail("Should re-throw ArrayIndexOutOfBoundsException.");
438    } catch (ArrayIndexOutOfBoundsException e) {
439      errorCount++;
440      assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches());
441    }
442
443    entries.clear();
444    cells.clear();
445    TableName notExistTable = TableName.valueOf("notExistTable"); // cause TableNotFoundException
446    for (int i = 0; i < BATCH_SIZE; i++) {
447      entries.add(createEntry(notExistTable, i, KeyValue.Type.Put, cells));
448    }
449    try {
450      SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
451        replicationClusterId, baseNamespaceDir, hfileArchiveDir);
452      Assert.fail("Should re-throw TableNotFoundException.");
453    } catch (TableNotFoundException e) {
454      errorCount++;
455      assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches());
456    }
457
458    entries.clear();
459    cells.clear();
460    for (int i = 0; i < BATCH_SIZE; i++) {
461      entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
462    }
463    // cause IOException in batch()
464    try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) {
465      try (Admin admin = conn.getAdmin()) {
466        admin.disableTable(TABLE_NAME1);
467        try {
468          SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
469            replicationClusterId, baseNamespaceDir, hfileArchiveDir);
470          Assert.fail("Should re-throw IOException.");
471        } catch (IOException e) {
472          errorCount++;
473          assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches());
474        } finally {
475          admin.enableTable(TABLE_NAME1);
476        }
477      }
478    }
479  }
480
481  private WALEntry createEntry(TableName table, int row, KeyValue.Type type, List<Cell> cells) {
482    byte[] fam = table.equals(TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2;
483    byte[] rowBytes = Bytes.toBytes(row);
484    // Just make sure we don't get the same ts for two consecutive rows with
485    // same key
486    try {
487      Thread.sleep(1);
488    } catch (InterruptedException e) {
489      LOG.info("Was interrupted while sleep, meh", e);
490    }
491    final long now = EnvironmentEdgeManager.currentTime();
492    KeyValue kv = null;
493    if (type.getCode() == KeyValue.Type.Put.getCode()) {
494      kv = new KeyValue(rowBytes, fam, fam, now, KeyValue.Type.Put, Bytes.toBytes(row));
495    } else if (type.getCode() == KeyValue.Type.DeleteColumn.getCode()) {
496      kv = new KeyValue(rowBytes, fam, fam, now, KeyValue.Type.DeleteColumn);
497    } else if (type.getCode() == KeyValue.Type.DeleteFamily.getCode()) {
498      kv = new KeyValue(rowBytes, fam, null, now, KeyValue.Type.DeleteFamily);
499    }
500    WALEntry.Builder builder = createWALEntryBuilder(table);
501    cells.add(kv);
502
503    return builder.build();
504  }
505
506  private WALEntry.Builder createWALEntryBuilder(TableName table) {
507    WALEntry.Builder builder = WALEntry.newBuilder();
508    builder.setAssociatedCellCount(1);
509    WALKey.Builder keyBuilder = WALKey.newBuilder();
510    UUID.Builder uuidBuilder = UUID.newBuilder();
511    uuidBuilder.setLeastSigBits(HConstants.DEFAULT_CLUSTER_ID.getLeastSignificantBits());
512    uuidBuilder.setMostSigBits(HConstants.DEFAULT_CLUSTER_ID.getMostSignificantBits());
513    keyBuilder.setClusterId(uuidBuilder.build());
514    keyBuilder.setTableName(UnsafeByteOperations.unsafeWrap(table.getName()));
515    keyBuilder.setWriteTime(EnvironmentEdgeManager.currentTime());
516    keyBuilder.setEncodedRegionName(UnsafeByteOperations.unsafeWrap(HConstants.EMPTY_BYTE_ARRAY));
517    keyBuilder.setLogSequenceNumber(-1);
518    builder.setKey(keyBuilder.build());
519    return builder;
520  }
521}