UnboundID Server SDK

Ping Identity
UnboundID Server SDK Documentation

ExampleChangeDetector.java

/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * docs/licenses/cddl.txt
 * or http://www.opensource.org/licenses/cddl1.php.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * docs/licenses/cddl.txt.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2018-2024 Ping Identity Corporation
 */
package com.unboundid.directory.sdk.examples;

import com.unboundid.directory.sdk.common.types.LogSeverity;
import com.unboundid.directory.sdk.sync.api.ChangeDetector;
import com.unboundid.directory.sdk.sync.types.ChangeRecord;
import com.unboundid.directory.sdk.sync.types.EndpointException;
import com.unboundid.directory.sdk.sync.types.PostStepResult;
import com.unboundid.directory.sdk.sync.types.SetStartpointOptions;
import com.unboundid.directory.sdk.sync.types.SyncOperation;
import com.unboundid.directory.sdk.sync.types.SyncServerContext;
import com.unboundid.directory.sdk.sync.types.SyncSourceContext;
import com.unboundid.ldap.sdk.ChangeType;
import com.unboundid.ldap.sdk.DN;
import com.unboundid.ldap.sdk.LDAPException;
import com.unboundid.ldap.sdk.Modification;
import com.unboundid.ldif.LDIFChangeRecord;
import com.unboundid.ldif.LDIFException;
import com.unboundid.ldif.LDIFModifyChangeRecord;
import com.unboundid.ldif.LDIFModifyDNChangeRecord;
import com.unboundid.ldif.LDIFReader;
import com.unboundid.util.ObjectPair;
import com.unboundid.util.StaticUtils;
import com.unboundid.util.args.ArgumentException;
import com.unboundid.util.args.ArgumentParser;
import com.unboundid.util.args.DNArgument;
import com.unboundid.util.args.FileArgument;

import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.io.Serializable;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;

/**
 * This class provides a simple example of an LDAP Change Detector which
 * uses an audit log from a PingDirectory instance as the source for
 * changes. More specifically, the extension will look for any files that begin
 * with "audit", and process them using an {@code LDIFReader}. This
 * extension assumes that the log files contain valid LDIF change records.
 * Just as with cn=changelog, the changes found by the Change Detector
 * are only used as hints about what has changed. The full entry is always
 * fetched from the source prior to calculating any changes for the destination.
 * <p>
 * The following arguments are defined:
 * <UL>
 * <LI>log-files-dir -- The filesystem directory to monitor for log files</LI>
 * <LI>processed-files-dir -- The filesystem directory where finished log
 * files should be moved</LI>
 * <LI>base-dn -- The base DN to use when creating LDAP entries from the log
 * file content</LI>
 * </UL>
 */
public final class ExampleChangeDetector extends ChangeDetector {

  // The server context which can be used for obtaining the server state,
  // logging, etc.
  private SyncServerContext serverContext;

  // An LDIF Processor.
  private LDIFProcessor ldifProcessor = null;

  // A set of files that have been completely detected by getNextBatchOfChanges
  // (e.g. all rows have been read and returned). This is a map so that we know
  // how many records were in each file.
  private final Map<File, Long> pendingFiles =
          new ConcurrentHashMap<File, Long>();

  // The directory which we will monitor for new log files.
  private File logFilesDir;

  // The directory where we will move files after they have been processed.
  private File processedFilesDir;

  // The base DN to use for entries in the log files.
  private DN baseDN;

  // The file filter used to match log files in the LogDirectory.
  private FileFilter logFilter;

  // A Comparator to sort the files from oldest to newest.
  private Comparator<File> oldestToNewestComparator;

  private AtomicLong changeNumber = new AtomicLong(0);

  // A bookmark to keep position in the current file.
  private ObjectPair<File, Long> currentState;

  // The attachment key used to place the log filename on the ChangeRecord
  // so that acknowledgeCompletedOps() can reference it.
  private static final String FILE_ATTACHMENT_KEY = "File.Attachment";

  // The attachment key used to place the change number on
  // the ChangeRecord when it is built.
  private static final String LOG_CHANGE_NUMBER_KEY = "Log.ChangeNumber";

  /**
   * Retrieves a human-readable name for this extension.
   *
   * @return A human-readable name for this extension.
   */
  @Override
  public String getExtensionName() {
    return "Example LDAP Change Detector";
  }

  /**
   * Retrieves a human-readable description for this extension.  Each element
   * of the array that is returned will be considered a separate paragraph in
   * generated documentation.
   *
   * @return A human-readable description for this extension, or {@code null}
   * or an empty array if no description should be available.
   */
  @Override
  public String[] getExtensionDescription() {
    return new String[]{
            "This extension implements a Change Detector which " +
                    "can be used to detect changes from a set of audit log " +
                    "files in a given directory. The extension will look for " +
                    "any files starting with 'audit' and interpret each " +
                    "entry as a change to be processed. After a file has " +
                    "been completely processed, it will be moved to a " +
                    "separate directory."
    };
  }

  /**
   * Updates the provided argument parser to define any configuration arguments
   * which may be used by this extension.  The argument parser may also be
   * updated to define relationships between arguments (e.g. to specify
   * required, exclusive, or dependent argument sets).
   *
   * @param parser The argument parser to be updated with the configuration
   *               arguments which may be used by this extension.
   * @throws ArgumentException If a problem is encountered while updating the
   *                           provided argument parser.
   */
  @Override
  public void defineConfigArguments(final ArgumentParser parser)
          throws ArgumentException {
    FileArgument newFilesArg = new FileArgument(
            'd', "log-files-dir", true, 1, "{path}",
            "The file system directory to monitor for log files",
            true, true, false, true);
    parser.addArgument(newFilesArg);

    FileArgument processedFilesArg = new FileArgument(
            'p', "processed-files-dir", true, 1, "{path}",
            "The file system directory where the extension should " +
                    "move log files after it finishes processing them",
            true, true, false, true);
    parser.addArgument(processedFilesArg);

    DNArgument baseDNArg = new DNArgument('b', "base-dn", true, 1, "{base-dn}",
            "The base DN to use for entries in the log files");
    parser.addArgument(baseDNArg);
  }

  /**
   * This hook is called when a Sync Pipe first starts up or when the
   * set-startpoint subcommand is called from the <i>realtime-sync</i> command
   * line tool. Any initialization of this Change Detector should be performed
   * here. This method should generally store both the {@link SyncServerContext}
   * and the {@link SyncSourceContext} in a class member so that it can be used
   * elsewhere in the implementation.
   *
   * @param serverContext     A handle to the server context for the server in
   *                          which this extension is running.
   * @param syncSourceContext An interface for interacting with the Sync
   *                          Source that owns this Change Detector.
   * @param parser            The argument parser which has been initialized
   *                          from the configuration for this Change Detector.
   */
  @Override
  public void initializeChangeDetector(
          final SyncServerContext serverContext,
          final SyncSourceContext syncSourceContext,
          final ArgumentParser parser) {
    this.serverContext = serverContext;

    logFilter = new FileFilter() {
      @Override
      public boolean accept(final File pathname) {
        return pathname.isFile()
                && pathname.canRead()
                && pathname.getName().startsWith("audit");
      }
    };

    oldestToNewestComparator = new Comparator<File>() {
      public int compare(final File f1, final File f2) {
        if (f1.lastModified() - f2.lastModified() > 0) {
          return 1;
        } else if (f1.lastModified() - f2.lastModified() < 0) {
          return -1;
        } else {
          return 0;
        }
      }
    };

    FileArgument newFilesArg =
            (FileArgument) parser.getNamedArgument("log-files-dir");
    logFilesDir = newFilesArg.getValue();

    debugInfo("Watching folder for logs: "
            + logFilesDir.getAbsolutePath());

    FileArgument processedFilesArg =
            (FileArgument) parser.getNamedArgument("processed-files-dir");
    processedFilesDir = processedFilesArg.getValue();

    debugInfo("Moving processed logs to: "
            + processedFilesDir.getAbsolutePath());

    DNArgument baseDNArg = (DNArgument) parser.getNamedArgument("base-dn");
    baseDN = baseDNArg.getValue();

    debugInfo("Ignoring changes outside the base dn: "
            + baseDN.toNormalizedString());
  }

  /**
   * This method should effectively set the starting point for synchronization
   * to the place specified by the <code>options</code> parameter. This should
   * cause all changes previous to the specified start point to be disregarded
   * and only changes after that point to be returned by
   * {@link #getNextBatchOfChanges(int, AtomicLong)}.
   * <p>
   * There are several different startpoint types (see
   * {@link SetStartpointOptions}), and this implementation is not required to
   * support them all. If the specified startpoint type is unsupported, this
   * method should throw an {@link UnsupportedOperationException}.
   *
   * <b>IMPORTANT</b>: The <code>RESUME_AT_SERIALIZABLE</code> startpoint type
   * must be supported by your implementation, because this is used when a Sync
   * Pipe first starts up. The {@link Serializable} in this case is the same
   * type that is returned by {@link #getStartpoint()}; the Sync Server persists
   * it and passes it back in on a restart.
   * <p>
   * This method can be called from two different contexts:
   * <ul>
   * <li>When the 'set-startpoint' subcommand of the realtime-sync CLI is used
   * (the Sync Pipe is required to be stopped in this context)</li>
   * <li>Immediately after a Sync Pipe starts up and a connection is first
   * established to the source server (e.g. before the first call to
   * {@link #getNextBatchOfChanges(int, AtomicLong)})</li>
   * </ul>
   *
   * @param options an object which indicates where exactly to start
   *                synchronizing (e.g. the end of the changelog, specific
   *                change number, a certain time ago, etc)
   * @throws EndpointException if there is any error while setting the
   *                           start point
   */
  @Override
  public void setStartpoint(final SetStartpointOptions options)
          throws EndpointException {
    File[] files;

    switch (options.getStartpointType()) {
      case BEGINNING_OF_CHANGELOG:
        debugInfo("Setting startpoint to beginning of the changelog.");
        files = logFilesDir.listFiles(logFilter);
        if (files == null) {
          throw new EndpointException(
                  PostStepResult.ABORT_OPERATION,
                  "Unable to open log file directory "
                          + logFilesDir.getAbsolutePath());
        }

        // Sort the files from oldest to newest
        Arrays.sort(files, oldestToNewestComparator);

        // Set the bookmark to the beginning of the first file
        currentState = new ObjectPair<>(files[0], 0L);

        // Initialize the LDIFReader
        initializeLDIFProcessor(files[0], 0);

        // Set our incrementing change number back to 0
        changeNumber.set(0);
        break;
      case END_OF_CHANGELOG:
        debugInfo("Setting startpoint to end of changelog.");
        // Move all files to the processed files directory
        files = logFilesDir.listFiles(logFilter);
        if (files == null) {
          throw new EndpointException(
                  PostStepResult.ABORT_OPERATION,
                  "Unable to open log file directory " +
                          logFilesDir.getAbsolutePath());
        }
        for (File f : files) {
          moveCompletedFile(f);
        }

        // Wipe out the current state of the LDIF Reader
        if (ldifProcessor != null) {
          try {
            ldifProcessor.close();
          } catch (IOException e) {
            serverContext.logMessage(LogSeverity.MILD_ERROR,
                    e.getLocalizedMessage());
          } finally {
            ldifProcessor = null;

            // Clear the bookmark
            currentState = null;
          }
        }
        break;
      case RESUME_AT_CHANGE_TIME:
        long time = options.getChangeTime().getTime();
        debugInfo("Setting startpoint to resume at time " + time);

        files = logFilesDir.listFiles(logFilter);
        if (files == null) {
          throw new EndpointException(PostStepResult.ABORT_OPERATION,
                  "Unable to open log file directory " +
                          logFilesDir.getAbsolutePath());
        }
        Arrays.sort(files, oldestToNewestComparator);
        for (File f : files) {
          if (f.lastModified() < time) {
            moveCompletedFile(f);
          } else {
            // Update the bookmark to the file nearest the specified time
            currentState = new ObjectPair<>(f, 0L);
            break;
          }
        }

        if (ldifProcessor != null) {
          try {
            ldifProcessor.close();
          } catch (IOException e) {
            ldifProcessor = null;
            serverContext.logMessage(LogSeverity.MILD_ERROR,
                    e.getLocalizedMessage());
          } finally {
            // This will only be an approximation as the previous file
            // may contain changes that occurred after the specified
            // resume time.
            initializeLDIFProcessor(currentState);
          }
        }
        break;
      case RESUME_AT_SERIALIZABLE:
        debugInfo("Setting startpoint to resume at serialized state.");
        if (currentState == null) {
          // If there is no bookmark, start from the beginning
          files = logFilesDir.listFiles(logFilter);
          if (files == null) {
            throw new EndpointException(
                    PostStepResult.ABORT_OPERATION,
                    "Unable to open log file directory "
                            + logFilesDir.getAbsolutePath());
          }

          // Sort the files from oldest to newest
          Arrays.sort(files, oldestToNewestComparator);
          currentState = new ObjectPair<>(files[0], 0L);
          changeNumber.set(0);
        } else {
          // Get the current bookmark from the options
          Serializable token = options.getSerializableValue();
          if (token != null) {
            ChangeDetectorState state =
                    (ChangeDetectorState) token;
            currentState =
                    new ObjectPair<>(state.getFile(), state.getPosition());
          }
        }
        initializeLDIFProcessor(currentState);
        break;
      default:
        throw new IllegalArgumentException(
                "This startpoint type is not supported: " +
                        options.getStartpointType().toString());
    }
  }

  /**
   * Gets the current value of the startpoint for change detection. This is the
   * "bookmark" which indicates which changes have already been processed and
   * which have not. In most cases, a change number is used to detect changes
   * and is managed by the Data Sync Server, in which case this
   * implementation needs only to return the latest acknowledged
   * change number. In other cases, the return value may correspond to a
   * different value, such as the SYS_CHANGE_VERSION in Microsoft SQL Server.
   * In any case, this method should return the value that is updated by
   * {@link #acknowledgeCompletedOps(LinkedList)}.
   * <p>
   * This method is called periodically and the return value is saved in the
   * persistent state for the Sync Pipe that uses this extension as a Change
   * Detector.
   *
   * <b>IMPORTANT</b>: The internal value for the startpoint should only be
   * updated after a sync operation is acknowledged back to this extension (via
   * {@link #acknowledgeCompletedOps(LinkedList)}).
   * Otherwise it will be possible for changes to be missed when the
   * Data Sync Server is restarted or a connection error occurs.
   *
   * @return a value to store in the persistent state for the Sync Pipe. This is
   * usually a change number, but if a changelog table is not used to
   * detect changes, this value should represent some other token to
   * pass into {@link #setStartpoint(SetStartpointOptions)}
   * when the sync pipe starts up.
   */
  @Override
  public Serializable getStartpoint() {
    // As the bookmark contains both the file and change number it contains
    // enough information to be a useful startpoint.
    return ChangeDetectorState.createState(currentState);
  }

  /**
   * Return the next batch of change records from the source. Change records
   * are usually just hints that a change happened; they do not include
   * the full contents of the target entry. In an effort to never synchronize
   * stale data, the Data Sync Server will go back and fetch the full
   * target entry for each change record.
   * <p>
   * On the first invocation, this should return changes starting from the
   * startpoint that was set by
   * {@link #setStartpoint(SetStartpointOptions)}. This method is also
   * responsible for updating the internal state such that subsequent
   * invocations do not return duplicate changes.
   * <p>
   * The resulting list should be limited by <code>maxChanges</code>. The
   * <code>numStillPending</code> reference should be set to the estimated
   * number of changes that haven't yet been retrieved from the source endpoint
   * when this method returns, or zero if all the current changes have been
   * retrieved.
   *
   * <b>IMPORTANT</b>: While this method needs to keep track of which changes
   * have already been returned so that it does not return them again, it should
   * <b>NOT</b> modify the official startpoint. The internal value for the
   * startpoint should only be updated after a sync operation is acknowledged
   * back to this extension (via
   * {@link #acknowledgeCompletedOps(LinkedList)}).
   * Otherwise it will be possible for changes to be missed when the
   * Data Sync Server is restarted or a connection error occurs. The
   * startpoint should not change as a result of this method.
   * <p>
   * This method <b>does not need to be thread-safe</b>. It will be invoked
   * repeatedly by a single thread, based on the polling interval set in the
   * Sync Pipe configuration.
   *
   * @param maxChanges      the maximum number of changes to retrieve
   * @param numStillPending this should be set to the number of unretrieved
   *                        changes that are still pending after this batch
   *                        has been retrieved. This will be passed in as zero,
   *                        and may be left that way if the actual value cannot
   *                        be determined.
   * @return a list of {@link ChangeRecord} instances, each
   * corresponding to a single change at the source endpoint.
   * If there are no new changes to return, this method should return
   * an empty list.
   * @throws EndpointException if there is any error while retrieving the
   *                           next batch of changes
   */
  @Override
  public List<ChangeRecord> getNextBatchOfChanges(
          final int maxChanges,
          final AtomicLong numStillPending)
          throws EndpointException {
    List<ChangeRecord> results = new ArrayList<ChangeRecord>();
    boolean morePending = false;
    File currentFile = currentState == null ? null : currentState.getFirst();
    long currentFilePosition =
            currentState == null ? 0L : currentState.getSecond();

    try {
      if (ldifProcessor != null) {
        // We still have an unexhausted LDIFProcessor, keep processing
        morePending = processChanges(results, maxChanges);

        if (!morePending) {
          // We've exhausted the current LDIFProcessor. We need to move
          // the current file and position into pendingFiles so that we
          // don't try to process the same file again.
          pendingFiles.put(ldifProcessor.getCurrentState().getFirst(),
                  ldifProcessor.getCurrentState().getSecond());
        }
      }

      // If we haven't reached the limit for the number of results to
      // return we will continue processing.
      if (results.size() < maxChanges) {
        File[] files = logFilesDir.listFiles(logFilter);
        if (files == null) {
          throw new EndpointException(
                  PostStepResult.ABORT_OPERATION,
                  "Unable to open log file directory "
                          + logFilesDir.getAbsolutePath());
        }

        // Nothing to process
        if (files.length < 1) {
          return results;
        }

        // Sort the files from oldest to newest
        Arrays.sort(files, oldestToNewestComparator);
        for (File file : files) {
          // Keep track of which file we're currently working on
          if (pendingFiles.containsKey(file) || !file.exists()) {
            // File has already been processed.
            continue;
          }

          // Initialize the LDIFProcessor with the current file
          // and start at the beginning.
          initializeLDIFProcessor(file, 0);

          if (ldifProcessor == null) {
            return results;
          }

          // Start processing
          morePending = processChanges(results, maxChanges);

          // Keep track of our current file and position for later processing.
          currentFile = ldifProcessor.getCurrentState().getFirst();
          currentFilePosition = ldifProcessor.getCurrentState().getSecond();

          if (results.size() >= maxChanges) {
            // Don't look at any more files if we've reached the maximum number
            // of changes for this batch.
            break;
          }
        }
      }
    } catch (IOException | LDIFException e) {
      throw new EndpointException(
              PostStepResult.ABORT_OPERATION,
              "Problem with LDIFProcessor: "
                      + StaticUtils.getExceptionMessage(e));
    } catch (LDAPException e) {
      throw new EndpointException(e);
    }

    if (!morePending) {
      // We've processed all the currently held files.
      currentState = null;
      if (currentFile != null) {
        pendingFiles.put(currentFile, currentFilePosition);
      }

      if (ldifProcessor != null) {
        try {
          ldifProcessor.close();
        } catch (IOException e) {
          serverContext.logMessage(LogSeverity.MILD_ERROR,
                  e.getLocalizedMessage());
        } finally {
          ldifProcessor = null;
        }
      }
    } else {
      // Need to indicate that there are more changes to get.
      numStillPending.set(1);

      // Update our bookmark
      currentState = new ObjectPair<>(currentFile, currentFilePosition);
    }

    return results;
  }

  /**
   * Processes changes from audit log files and returns them as a list of
   * {@code ChangeRecord}s.
   *
   * @param results         The list of ChangeRecords.
   * @param maxChanges      The maximum number of changes to process.
   *
   * @return                True if more changes to process in current file.
   *
   * @throws IOException    If unable to read file.
   * @throws LDIFException  If error processing the file as LDIF records.
   * @throws LDAPException  If unable to parse the DN from a change record.
   */
  private boolean processChanges(final List<ChangeRecord> results,
                                 final int maxChanges)
          throws IOException, LDIFException, LDAPException {
    LDIFChangeRecord ldifChangeRecord;

    // Iterate through all of the change records in the current file
    while (ldifProcessor.hasNext()) {
      ldifChangeRecord = ldifProcessor.next();
      ObjectPair<File, Long> processorState =
              ldifProcessor.getCurrentState();

      // Skip ahead until we get to the stated position in currentState.
      if (currentState != null &&
              currentState.getSecond() > processorState.getSecond()) {
        continue;
      }

      if (baseDN.isAncestorOf(ldifChangeRecord.getParsedDN(), false)) {
        ChangeRecord.Builder bldr = new ChangeRecord.Builder(
                ldifChangeRecord.getChangeType(),
                ldifChangeRecord.getParsedDN());

        // If the change is a moddn then set the identifiableInfoAfterChange
        if (ldifChangeRecord.getChangeType() == ChangeType.MODIFY_DN) {
          DN dnAfterChange =
                  ((LDIFModifyDNChangeRecord) ldifChangeRecord).getNewDN();
          bldr.identifiableInfoAfterChange(dnAfterChange);
          bldr.changedAttributes(dnAfterChange.getRDN().getAttributeNames());
        }

        // If the change is a modification add all of the changed attribute
        // names to the ChangeRecord.
        if (ldifChangeRecord.getChangeType() == ChangeType.MODIFY) {
          Modification[] mods =
                  ((LDIFModifyChangeRecord) ldifChangeRecord)
                          .getModifications();
          List<String> changedAttrNames = new ArrayList<>();
          for (Modification mod : mods) {
            changedAttrNames.add(mod.getAttributeName());
          }
          bldr.changedAttributes(changedAttrNames.toArray(new String[0]));
        }

        // This example is using the system time as an approximation of
        // the actual change time. If a true change time is available it
        // is best to use that.
        bldr.changeTime(System.currentTimeMillis());
        bldr.changeNumber(changeNumber.getAndAdd(1));

        // Attach the current file and file position so that we can
        // update the internal state in inOrderAcknowledge.
        bldr.addProperty(FILE_ATTACHMENT_KEY, processorState.getFirst());
        bldr.addProperty(LOG_CHANGE_NUMBER_KEY, processorState.getSecond());

        ChangeRecord change = bldr.build();

        serverContext.debugInfo("Found change: " + change.toString());

        results.add(change);

        if (results.size() >= maxChanges) {
          // Don't look at any more rows in this file if we've reached the
          // maximum number of changes for this batch.
          return true;
        }
      } else {
        serverContext.logMessage(LogSeverity.DEBUG,
                "Ignoring out-of-scope change: "
                        + ldifChangeRecord.getDN());
      }
    }
    return false;
  }

  /**
   * Provides a way for the Data Sync Server to acknowledge back to the
   * extension which sync operations it has processed. This method should update
   * the official startpoint which was set by
   * {@link #setStartpoint(SetStartpointOptions)} and is
   * returned by {@link #getStartpoint()}.
   *
   * <b>IMPORTANT</b>: The internal value for the startpoint should only be
   * updated after a sync operation is acknowledged back to this extension (via
   * this method). Otherwise it will be possible for changes to be missed when
   * the Data Sync Server is restarted or a connection error occurs.
   *
   * @param completedOps a list of {@link SyncOperation}s that have finished
   *                     processing. The records are listed in the order they
   *                     were first detected.
   * @throws EndpointException if there is an error acknowledging the changes
   *                           back to the source
   */
  @Override
  public void acknowledgeCompletedOps(
          final LinkedList<SyncOperation> completedOps)
          throws EndpointException {
    ChangeRecord lastRecord = completedOps.getLast().getChangeRecord();
    File sourceFile = (File) lastRecord.getProperty(FILE_ATTACHMENT_KEY);
    Long logChangeNumber = (Long) lastRecord.getProperty(LOG_CHANGE_NUMBER_KEY);

    File[] files = logFilesDir.listFiles(logFilter);
    if (files == null) {
      throw new EndpointException(PostStepResult.ABORT_OPERATION,
              "Unable to open log file directory " +
                      logFilesDir.getAbsolutePath());
    }

    Arrays.sort(files, oldestToNewestComparator);

    for (File f : files) {
      if (f.lastModified() < sourceFile.lastModified() &&
              pendingFiles.containsKey(f)) {
        moveCompletedFile(f);
        pendingFiles.remove(f);
      }
    }

    if (pendingFiles.get(sourceFile) == logChangeNumber) {
      moveCompletedFile(sourceFile);
      pendingFiles.remove(sourceFile);
    }
  }

  /**
   * Moves a file that we have finished processing to the processed files
   * directory and deletes the original file.
   *
   * @param srcFile the file to move
   * @throws EndpointException if there is a problem moving the file
   */
  private void moveCompletedFile(final File srcFile) throws EndpointException {
    final File destFile = new File(processedFilesDir, srcFile.getName());
    try {
      debugInfo("Moving file " + srcFile.getAbsolutePath()
              + " to " + destFile.getAbsolutePath());
      Files.move(srcFile.toPath(), destFile.toPath(), REPLACE_EXISTING);
    } catch (Exception e) {
      throw new EndpointException(PostStepResult.ABORT_OPERATION,
              "Could not move " + srcFile.getAbsolutePath() +
                      " to " + destFile.getAbsolutePath(), e);
    }
  }

  /**
   * Initializes an {@code LDIFProcessor} using the current state.
   *
   * @param file             The file to initialize the LDIFProcessor.
   * @param startingPosition The zero indexed change from which to begin
   *                         processing.
   * @throws EndpointException if the LDIFProcessor cannot be successfully
   *                           initialized.
   */
  private void initializeLDIFProcessor(final File file,
                                       final long startingPosition)
          throws EndpointException {
    if (ldifProcessor != null) {
      try {
        ldifProcessor.close();
      } catch (IOException e) {
        serverContext.logMessage(LogSeverity.MILD_ERROR,
                "Unable to close LDIFProcessor. "
                        + e.getLocalizedMessage());
      }
    }

    try {
      debugInfo("Initializing LDIFProcessor with " + file.getAbsolutePath()
              + " starting at change " + startingPosition);
      ldifProcessor = new LDIFProcessor(file, startingPosition);
    } catch (IOException | LDIFException e) {
      throw new EndpointException(
              PostStepResult.ABORT_OPERATION,
              "Unable to initialize LDIFProcessor! " + e.getLocalizedMessage());
    }
  }

  /**
   * Initializes an {@code LDIFProcessor} using the current state.
   *
   * @param state An ObjectPair containing a File and position information.
   * @throws EndpointException if the LDIFProcessor cannot be successfully
   *                           initialized.
   */
  private void initializeLDIFProcessor(final ObjectPair<File, Long> state)
          throws EndpointException {
    initializeLDIFProcessor(state.getFirst(), state.getSecond());
  }

  /**
   * Wrapper for logging debug messages.
   *
   * @param message The message to log.
   */
  private void debugInfo(final String message) {
    if (serverContext != null) {
      serverContext.debugInfo(message);
    } else {
      System.out.println(message);
    }
  }

  /**
   * A Serializable Wrapper class to contain the state stored in
   * {@code currentState}. This is used to retrieve the Change Detector's
   * state when resuming from being shut down.
   */
  static final class ChangeDetectorState implements Serializable {

    private static final long serialVersionUID = -6088093761035832413L;
    private final File file;
    private final long position;

    /**
     * Private Constructor to prevent instantiation.
     *
     * @param file     File part of state.
     * @param position Change position within the file.
     */
    private ChangeDetectorState(final File file, final long position) {
      this.file = file;
      this.position = position;
    }

    /**
     * Creates a ChangeDetectorState from an {@code ObjectPair<File, Long>}.
     *
     * @param state The current state.
     * @return The resulting ChangeDetectorState.
     */
    static ChangeDetectorState createState(final ObjectPair<File, Long> state) {
      return new ChangeDetectorState(state.getFirst(), state.getSecond());
    }

    /**
     * Get the file portion of the state.
     *
     * @return a file.
     */
    File getFile() {
      return file;
    }

    /**
     * Get the position portion of the state.
     *
     * @return the position.
     */
    long getPosition() {
      return position;
    }
  }

  /**
   * Utility class to manage an LDIFReader and its position in the file
   * currently being processed.
   */
  static final class LDIFProcessor {
    private final LDIFReader ldifReader;
    private final File file;

    // Tracks how many changes have been
    // processed in the current file.
    private long currentPosition;

    private LDIFChangeRecord currentRecord;
    private LDIFChangeRecord nextRecord;

    /**
     * Constructs a new LDIFProcessor.
     *
     * @param file              The file to process.
     * @param startingPosition  The desired starting position.
     *
     * @throws IOException      If unable to open the file.
     * @throws LDIFException    If unable to read changes as LDIF records.
     */
    LDIFProcessor(final File file, final long startingPosition)
            throws IOException, LDIFException {
      this.file = file;
      this.ldifReader = new LDIFReader(file);
      this.currentPosition = startingPosition;

      // If starting later in the file discard until the desired change.
      if (startingPosition > 0) {
        for (int i = 0; i < currentPosition; i++) {
          ldifReader.readChangeRecord();
        }
      }

      currentRecord = ldifReader.readChangeRecord(true);
      nextRecord = ldifReader.readChangeRecord(true);
    }

    /**
     * Gets the current state of this LDIFProcessor.
     *
     * @return An ObjectPair containing the current file and position.
     */
    ObjectPair<File, Long> getCurrentState() {
      return new ObjectPair<>(file,
              currentRecord == null ? currentPosition-- : currentPosition);

    }

    /**
     * Close the LDIFProcessor.
     *
     * @throws IOException If error closing LDIFReader.
     */
    void close() throws IOException {
      ldifReader.close();
    }

    /**
     * Returns true if the LDIFProcessor has more changes.
     *
     * @return  True if more changes.
     */
     boolean hasNext() {
      return currentRecord != null;
    }

    /**
     * Get next LDIFChangeRecord.
     *
     * @return                The next change record or null.
     *
     * @throws IOException    If error reading from file.
     * @throws LDIFException  If error parsing file as LDIF.
     */
     LDIFChangeRecord next() throws IOException, LDIFException {

      final LDIFChangeRecord toReturn = currentRecord;
      currentRecord = nextRecord;

      if (currentRecord != null) {
        nextRecord = ldifReader.readChangeRecord(true);
      }

      currentPosition++;

      return toReturn;
    }
  }
}