UnboundID Server SDK

Ping Identity
UnboundID Server SDK Documentation

ExampleSyncSource.java

/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * docs/licenses/cddl.txt
 * or http://www.opensource.org/licenses/cddl1.php.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * docs/licenses/cddl.txt.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2011-2024 Ping Identity Corporation
 */
package com.unboundid.directory.sdk.examples;

import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.io.File;
import java.io.FileFilter;
import java.io.FileReader;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.Serializable;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;

import au.com.bytecode.opencsv.CSVParser;
import au.com.bytecode.opencsv.CSVReader;

import com.unboundid.ldap.sdk.ChangeType;
import com.unboundid.ldap.sdk.Entry;
import com.unboundid.ldap.sdk.DN;
import com.unboundid.ldap.sdk.LDAPException;
import com.unboundid.util.ObjectPair;
import com.unboundid.util.args.ArgumentParser;
import com.unboundid.util.args.ArgumentException;
import com.unboundid.util.args.DNArgument;
import com.unboundid.util.args.FileArgument;
import com.unboundid.util.StaticUtils;

import com.unboundid.directory.sdk.sync.api.SyncSource;
import com.unboundid.directory.sdk.sync.config.SyncSourceConfig;
import com.unboundid.directory.sdk.sync.types.EndpointException;
import com.unboundid.directory.sdk.sync.types.PostStepResult;
import com.unboundid.directory.sdk.sync.types.SyncServerContext;
import com.unboundid.directory.sdk.sync.types.SetStartpointOptions;
import com.unboundid.directory.sdk.sync.types.ChangeRecord;
import com.unboundid.directory.sdk.sync.types.SyncOperation;
import com.unboundid.directory.sdk.common.types.LogSeverity;

import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;


/**
 * This class provides a simple example of a generic Sync Source which will
 * detect CSV files in a given directory and synchronize them to an
 * LDAP destination. More specifically, the extension will look for any files
 * with the ".csv" extension, and interpret each row to be a single LDAP entry
 * to be synchronized. After a file has been completely processed, it will be
 * moved to a separate directory.
 * <p>
 * The CSV header should contain valid LDAP attribute names. The 'uid' attribute
 * is used as the RDN and is required to be present, and the source base DN
 * is configurable using an extension argument. The DNs for the entries will be
 * constructed as follows: uid={uid},{baseDN}. Each row in a CSV file is
 * expected to have values for all of the LDAP attributes, not just those that
 * have changed.
 * <p>
 * This extension creates ChangeRecords using {@link ChangeType#MODIFY}, so when
 * using this example, it's important to set <i>modifies-as-creates=true</i>
 * on the Sync Class in the configuration. This will allow the Sync Pipe to
 * create entries that don't already exist. For the sake of simplicity, this
 * example does not handle DELETE operations.
 * <p>
 * This extension uses the OpenCSV library, which is available at
 * <a href="http://opencsv.sourceforge.net">http://opencsv.sourceforge.net</a>
 * and provides simple CSV parsing.
 * <p>
 * The following arguments are defined:
 * <UL>
 *   <LI>csv-files-dir -- The filesystem directory to monitor for CSV files</LI>
 *   <LI>processed-files-dir -- The filesystem directory where finished CSV
 *       files should be moved</LI>
 *   <LI>base-dn -- The base DN to use when creating LDAP entries from the CSV
 *       content</LI>
 * </UL>
 */
public final class ExampleSyncSource extends SyncSource
{
  //The server context which can be used for obtaining the server state,
  //logging, etc.
  private SyncServerContext serverContext;

  //A set of files that have been completely detected by getNextBatchOfChanges()
  //(e.g. all rows have been read and returned). This is a map so that we know
  //how many records were in each file.
  private final Map<File,Long> pendingFiles =
                  new ConcurrentHashMap<File,Long>();

  //A state holder to keep track of which line of which file we are currently
  //at.
  private ObjectPair<File,Long> currentLine;

  //The directory which we will monitor for new CSV files.
  private File csvFilesDir;

  //The directory where we will move files after they have been processed.
  private File processedFilesDir;

  //The base DN to use for entries in the CSV files.
  private DN baseDN;

  //The set of attribute names for the current CSV file. (This represents the
  //CSV header).
  private String[] attributeNames;

  //The file filter used to match CSV files in the csvDirectory.
  private FileFilter csvFilter;

  //A Comparator to sort the files from oldest to newest
  private Comparator<File> oldestToNewestComparator;

  //The attachment key used to place the CSV filename on the ChangeRecord
  //so that acknowledgeCompletedOps() can reference it.
  private static final String FILE_ATTACHMENT_KEY = "File.Attachment";

  //The attachment key used to place the row number of a record in the CSV on
  //the ChangeRecord when it is built.
  private static final String CSV_ROW_NUMBER_KEY = "CSV.RowNumber";


  /**
   * Retrieves a human-readable name for this extension.
   *
   * @return  A human-readable name for this extension.
   */
  @Override
  public String getExtensionName()
  {
    return "Example Sync Source";
  }


  /**
   * Retrieves a human-readable description for this extension.  Each element
   * of the array that is returned will be considered a separate paragraph in
   * generated documentation.
   *
   * @return  A human-readable description for this extension, or {@code null}
   *          or an empty array if no description should be available.
   */
  @Override
  public String[] getExtensionDescription()
  {
    return new String[] {
             "This extension implements a Sync Source which can be used " +
             "to detect and synchronize changes from CSV files in a given " +
             "directory. The extension will look for any files with the .csv " +
             "extension and interpret each row to be a single LDAP entry to " +
             "be synchronized. After a file has been completely processed, " +
             "it will be moved to a separate directory.",

             "For the sake of simplicity, the columns in the files " +
             "should be LDAP attributes (cn, sn, givenname, etc). The " +
             "first line (CSV header) should contain the attribute names " +
             "and the first column is always required to be the 'uid' " +
             "attribute. The DNs for the entries will be constructed as " +
             "follows: uid={uid},{baseDN}. Each row in a CSV file is " +
             "expected to have values for all of the LDAP attributes, not " +
             "just those that have changed."
    };
  }


  /**
   * Updates the provided argument parser to define any configuration arguments
   * which may be used by this extension.  The argument parser may also be
   * updated to define relationships between arguments (e.g. to specify
   * required, exclusive, or dependent argument sets).
   *
   * @param  parser  The argument parser to be updated with the configuration
   *                 arguments which may be used by this extension.
   *
   * @throws  ArgumentException  If a problem is encountered while updating the
   *                             provided argument parser.
   */
  @Override
  public void defineConfigArguments(final ArgumentParser parser)
         throws ArgumentException
  {
    FileArgument newFilesArg = new FileArgument(
                     'd', "csv-files-dir", true, 1, "{path}",
                     "The file system directory to monitor for CSV files",
                     true, true, false, true);
    parser.addArgument(newFilesArg);

    FileArgument processedFilesArg = new FileArgument(
                     'p', "processed-files-dir", true, 1, "{path}",
                     "The file system directory where the extension should " +
                     "move CSV files after it finishes processing them",
                     true, true, false, true);
    parser.addArgument(processedFilesArg);

    DN defaultDN = null;
    try
    {
      defaultDN = new DN("dc=example,dc=com");
    }
    catch(LDAPException e)
    {
      throw new ArgumentException("Could not construct a default DN", e);
    }

    DNArgument baseDNArg = new DNArgument('b', "base-dn", true, 1, "{base-dn}",
                            "The base DN to use for entries in the CSV files",
                            defaultDN);
    parser.addArgument(baseDNArg);
  }


  /**
   * This hook is called when a Sync Pipe first starts up, when the
   * <i>resync</i> process first starts up, or when the set-startpoint
   * subcommand is called from the <i>realtime-sync</i> command line tool.
   * Any initialization of this sync source should be performed here. This
   * method should generally store the {@link SyncServerContext} in a class
   * member so that it can be used elsewhere in the implementation.
   *
   * @param  serverContext  A handle to the server context for the server in
   *                        which this extension is running.
   * @param  config         The general configuration for this sync source.
   * @param  parser         The argument parser which has been initialized from
   *                        the configuration for this sync source.
   */
  @Override
  public void initializeSyncSource(final SyncServerContext serverContext,
                                   final SyncSourceConfig config,
                                   final ArgumentParser parser)
  {
    this.serverContext = serverContext;

    csvFilter = new FileFilter()
    {
      public boolean accept(final File f)
      {
        return f.isFile() && f.canRead() && f.getName().endsWith(".csv");
      }
    };

    oldestToNewestComparator = new Comparator<File>()
    {
      public int compare(final File f1, final File f2)
      {
        if(f1.lastModified() - f2.lastModified() > 0)
        {
          return 1;
        }
        else if(f1.lastModified() - f2.lastModified() < 0)
        {
          return -1;
        }
        else
        {
          return 0;
        }
      }
    };

    FileArgument newFilesArg =
            (FileArgument) parser.getNamedArgument("csv-files-dir");
    csvFilesDir = newFilesArg.getValue();

    FileArgument processedFilesArg =
      (FileArgument) parser.getNamedArgument("processed-files-dir");
    processedFilesDir = processedFilesArg.getValue();

    DNArgument baseDNArg = (DNArgument) parser.getNamedArgument("base-dn");
    baseDN = baseDNArg.getValue();
  }


  /**
   * This hook is called when a Sync Pipe shuts down, when the <i>resync</i>
   * process shuts down, or when the set-startpoint subcommand (from the
   * <i>realtime-sync</i> command line tool) is finished. Any clean up of this
   * sync source should be performed here.
   */
  @Override
  public void finalizeSyncSource()
  {
    //No implementation necessary.
  }


  /**
   * This method should effectively set the starting point for synchronization
   * to the place specified by the <code>options</code> parameter. This should
   * cause all changes previous to the specified start point to be disregarded
   * and only changes after that point to be returned by
   * {@link #getNextBatchOfChanges(int, AtomicLong)}.
   * <p>
   * There are several different startpoint types (see
   * {@link SetStartpointOptions}), and this implementation is not required to
   * support them all. If the specified startpoint type is unsupported, this
   * method should throw an {@link UnsupportedOperationException}.
   * <p>
   * <b>IMPORTANT</b>: The <code>RESUME_AT_SERIALIZABLE</code> startpoint type
   * must be supported by your implementation, because this is used when a Sync
   * Pipe first starts up. The {@link Serializable} in this case is the same
   * type that is returned by {@link #getStartpoint()}; the
   * Data Sync Server persists it and passes it back in on a restart.
   * <p>
   * This method can be called from two different contexts:
   * <ul>
   * <li>When the 'set-startpoint' subcommand of the realtime-sync CLI is used
   * (the Sync Pipe is required to be stopped in this context)</li>
   * <li>Immediately after a Sync Pipe starts up and a connection is first
   *     established to the source server (e.g. before the first call to
   * {@link #getNextBatchOfChanges(int, AtomicLong)})</li>
   * </ul>
   *
   * @param options
   *          an object which indicates where exactly to start synchronizing
   *          (e.g. the end of the changelog, specific change number, a certain
   *          time ago, etc)
   * @throws EndpointException
   *           if there is any error while setting the start point
   */
  @Override
  @SuppressWarnings("unchecked")
  public void setStartpoint(final SetStartpointOptions options)
                                throws EndpointException
  {
    File[] files;

    switch(options.getStartpointType())
    {
      case BEGINNING_OF_CHANGELOG:
        //Nothing to do here, getNextBatchOfChanges() will automatically start
        //at the beginning (e.g. it will pick up at the first file in the
        //csvDirectory).
        break;
      case END_OF_CHANGELOG:
        //Move all files to the processed files directory
        files = csvFilesDir.listFiles(csvFilter);
        if (files == null)
        {
          throw new EndpointException(PostStepResult.ABORT_OPERATION,
              "Unable to open CSV file directory " +
                  csvFilesDir.getAbsolutePath());
        }
        for(File f : files)
        {
          moveCompletedFile(f);
        }
        break;
      case RESUME_AT_CHANGE_TIME:
        long time = options.getChangeTime().getTime();
        files = csvFilesDir.listFiles(csvFilter);
        if (files == null)
        {
          throw new EndpointException(PostStepResult.ABORT_OPERATION,
              "Unable to open CSV file directory " +
                  csvFilesDir.getAbsolutePath());
        }
        Arrays.sort(files, oldestToNewestComparator);
        for(File f : files)
        {
          if(f.lastModified() < time)
          {
            moveCompletedFile(f);
          }
          else
          {
            break;
          }
        }
        break;
      case RESUME_AT_SERIALIZABLE:
        //When sync first starts up, this method is called with
        //RESUME_AT_SERIALIZABLE to initialize the internal state.
        //Since this extension does not save any state, we do not have to
        //set anything here.
        break;
      default:
        throw new IllegalArgumentException(
                              "This startpoint type is not supported: " +
                                options.getStartpointType().toString());
    }
  }


  /**
   * Gets the current value of the startpoint for change detection. This is the
   * "bookmark" which indicates which changes have already been processed and
   * which have not. In most cases, a change number is used to detect changes
   * and is managed by the Data Sync Server, in which case this
   * implementation needs only to return the latest acknowledged
   * change number. In other cases, the return value may correspond to a
   * different value, such as the SYS_CHANGE_VERSION in Microsoft SQL Server.
   * In any case, this method should return the value that is updated by
   * {@link #acknowledgeCompletedOps(LinkedList)}.
   * <p>
   * This method is called periodically and the return value is saved in the
   * persistent state for the Sync Pipe that uses this extension as its Sync
   * Source.
   * <p>
   * <b>IMPORTANT</b>: The internal value for the startpoint should only be
   * updated after a sync operation is acknowledged back to this extension (via
   * {@link #acknowledgeCompletedOps(LinkedList)}).
   * Otherwise it will be possible for changes to be missed when the
   * Data Sync Server is restarted or a connection error occurs.
   * @return a value to store in the persistent state for the Sync Pipe. This is
   *         usually a change number, but if a changelog table is not used to
   *         detect changes, this value should represent some other token to
   *         pass into {@link #setStartpoint(SetStartpointOptions)}
   *         when the sync pipe starts up.
   */
  @Override
  public Serializable getStartpoint()
  {
    //We return null here because we are synchronizing from a directory of files
    //where the files are removed once they have been synchronized. In this
    //case, we do not need to keep track of where to start; we always start at
    //the oldest file in the directory and work forward.
    return null;
  }


  /**
   * Return the URL or path identifying the source endpoint
   * from which this extension is transmitting data. This is used for logging
   * purposes only, so it could just be a server name or hostname and port, etc.
   *
   * @return the path to the source endpoint
   */
  @Override
  public String getCurrentEndpointURL()
  {
    return csvFilesDir.getAbsolutePath();
  }


  /**
   * Return a full source entry (in LDAP form) from the source, corresponding
   * to the {@link ChangeRecord} that is passed in through the
   * {@link SyncOperation}. This method should perform any queries necessary to
   * gather the latest values for all the attributes to be synchronized.
   * <p>
   * This method <b>must be thread safe</b>, as it will be called repeatedly and
   * concurrently by each of the Sync Pipe worker threads as they process
   * entries.
   * <p>
   * If the original ChangeRecord has the full entry already set on it
   * (which can be done using the
   * {@link
 com.unboundid.directory.sdk.sync.types.ChangeRecord.Builder#fullEntry(Entry)}),
   * then this method will not get called, and the Data Sync Server
   * will automatically use the full entry from the ChangeRecord. In this case,
   * the implementation can always return {@code null}.
   *
   * @param operation
   *          the SyncOperation which identifies the source "entry" to
   *          fetch. The ChangeRecord can be obtained by calling
   *          <code>operation.getChangeRecord()</code>.
   *          These ChangeRecords are generated by
   *        {@link #getNextBatchOfChanges(int, AtomicLong)}
   *          or by
   *        {@link #listAllEntries(BlockingQueue)}.
   *
   * @return a full LDAP Entry, or null if no such entry exists.
   * @throws EndpointException
   *           if there is an error fetching the entry
   */
  @Override
  public Entry fetchEntry(final SyncOperation operation)
                                 throws EndpointException
  {
    ChangeRecord record = operation.getChangeRecord();
    throw new IllegalStateException(
            "fetchEntry() should not be called because the full entry is set " +
            "on the ChangeRecord: " + record.toString());
  }


  /**
   * Provides a way for the Data Sync Server to acknowledge back to the
   * extension which sync operations it has processed. This method should update
   * the official startpoint which was set by
   * {@link #setStartpoint(SetStartpointOptions)} and is
   * returned by {@link #getStartpoint()}.
   * <p>
   * <b>IMPORTANT</b>: The internal value for the startpoint should only be
   * updated after a sync operation is acknowledged back to this extension (via
   * this method). Otherwise it will be possible for changes to be missed when
   * the Data Sync Server is restarted or a connection error occurs.
   *
   * @param completedOps
   *          a list of {@link SyncOperation}s that have finished processing.
   *          The records are listed in the order they were first detected.
   * @throws EndpointException
   *           if there is an error acknowledging the changes back to the
   *           database
   */
  @Override
  public void acknowledgeCompletedOps(
                                   final LinkedList<SyncOperation> completedOps)
                                        throws EndpointException
  {
    ChangeRecord lastRecord = completedOps.getLast().getChangeRecord();
    File lastFile = (File) lastRecord.getProperty(FILE_ATTACHMENT_KEY);
    Long rowNumber = (Long) lastRecord.getProperty(CSV_ROW_NUMBER_KEY);

    File[] files = csvFilesDir.listFiles(csvFilter);
    if (files == null)
    {
      throw new EndpointException(PostStepResult.ABORT_OPERATION,
          "Unable to open CSV file directory " + csvFilesDir.getAbsolutePath());
    }

    //Sort the files from oldest to newest
    Arrays.sort(files, oldestToNewestComparator);

    //Move completed files to the completed files directory
    for(File f : files)
    {
      //Because changes are acknowledged in order and we detect them from oldest
      //to newest, we know if the file is older than the lastFile from above,
      //then we have finished processing it.
      if(f.lastModified() < lastFile.lastModified() &&
              pendingFiles.containsKey(f))
      {
        moveCompletedFile(f);
        pendingFiles.remove(f);
      }
    }

    //If the last acknowledged file is completely finished, move it to the
    //processed files directory as well.
    if(pendingFiles.get(lastFile) == rowNumber)
    {
      moveCompletedFile(lastFile);
      pendingFiles.remove(lastFile);
    }
  }


  /**
   * Return the next batch of change records from the source. Change records
   * are usually just hints that a change happened; they do not include
   * the full contents of the target entry. In an effort to never synchronize
   * stale data, the Data Sync Server will go back and fetch the full
   * target entry for each change record.
   * <p>
   * On the first invocation, this should return changes starting from the
   * startpoint that was set by
   * {@link #setStartpoint(SetStartpointOptions)}. This method is also
   * responsible for updating the internal state such that subsequent
   * invocations do not return duplicate changes.
   * <p>
   * The resulting list should be limited by <code>maxChanges</code>. The
   * <code>numStillPending</code> reference should be set to the estimated
   * number of changes that haven't yet been retrieved from the source endpoint
   * when this method returns, or zero if all the current changes have been
   * retrieved.
   * <p>
   * <b>IMPORTANT</b>: While this method needs to keep track of which changes
   * have already been returned so that it does not return them again, it should
   * <b>NOT</b> modify the official startpoint. The internal value for the
   * startpoint should only be updated after a sync operation is acknowledged
   * back to this extension (via
   * {@link #acknowledgeCompletedOps(LinkedList)}).
   * Otherwise it will be possible for changes to be missed when the
   * Data Sync Server is restarted or a connection error occurs. The
   * startpoint should not change as a result of this method.
   * <p>
   * This method <b>does not need to be thread-safe</b>. It will be invoked
   * repeatedly by a single thread, based on the polling interval set in the
   * Sync Pipe configuration.
   *
   * @param maxChanges
   *          the maximum number of changes to retrieve
   * @param numStillPending
   *          this should be set to the number of unretrieved changes that
   *          are still pending after this batch has been retrieved. This will
   *          be passed in
   *          as zero, and may be left that way if the actual value cannot be
   *          determined.
   * @return a list of {@link ChangeRecord} instances, each
   *         corresponding to a single change at the source endpoint.
   *         If there are no new changes to return, this method should return
   *         an empty list.
   * @throws EndpointException
   *           if there is any error while retrieving the next batch of changes
   */
  @Override
  public List<ChangeRecord> getNextBatchOfChanges(
                                               final int maxChanges,
                                               final AtomicLong numStillPending)
                                                       throws EndpointException
  {
    List<ChangeRecord> results = new ArrayList<ChangeRecord>();
    File[] files = csvFilesDir.listFiles(csvFilter);
    if (files == null)
    {
      throw new EndpointException(
          PostStepResult.ABORT_OPERATION,
          "Unable to open CSV file directory " + csvFilesDir.getAbsolutePath());
    }

    //Sort the files from oldest to newest
    Arrays.sort(files, oldestToNewestComparator);

    for(File f : files)
    {
      if(pendingFiles.containsKey(f) || !f.exists())
      {
        //This file has already been completely detected.
        continue;
      }

      CSVReader csvReader = null;
      RandomAccessFile fileHandle = null;

      try
      {
        fileHandle = new RandomAccessFile(f, "rw");
        FileChannel channel = fileHandle.getChannel();

        //Try to acquire an exclusive lock on the file to make sure no other
        //processes are currently using the file.
        FileLock lock = channel.tryLock();
        if(lock == null)
        {
          //Another process is using the file; this could happen when the file
          //is first written to the CSV directory, for example.
          serverContext.logMessage(LogSeverity.MILD_WARNING,
                  "Another process is using file: " + f.getPath());
          continue;
        }

        //We use the openCSV library here to handle CSV parsing
        csvReader = new CSVReader(new FileReader(fileHandle.getFD()));

        //The header line contains the attribute names
        attributeNames = csvReader.readNext();
        if(attributeNames == null)
        {
          throw new EndpointException(PostStepResult.ABORT_OPERATION,
                  "Cannot read CSV header from " + f.getAbsolutePath());
        }

        //Check if the first attribute is 'uid'
        if(!"uid".equalsIgnoreCase(attributeNames[0]))
        {
          throw new EndpointException(PostStepResult.ABORT_OPERATION,
                 "First attribute is not 'uid' in file " + f.getAbsolutePath());
        }

        long currentRowNum = 0;
        String[] attributeValues;

        while ((attributeValues = csvReader.readNext()) != null) {

          currentRowNum++;

          if (currentLine != null &&
              currentLine.getFirst().equals(f) &&
              currentLine.getSecond() >= currentRowNum)
          {
            continue;
          }

          //Create the entry
          Entry e = new Entry("uid=" + attributeValues[0] + "," + baseDN);

          //Add the attributes
          for(int i = 0; i < attributeValues.length; i++)
          {
            e.setAttribute(attributeNames[i], attributeValues[i]);
          }

          //Construct a ChangeRecord
          ChangeRecord.Builder bldr = new ChangeRecord.Builder(
                  ChangeType.MODIFY, e.getParsedDN());

          bldr.changedAttributes(attributeNames);
          bldr.changeTime(System.currentTimeMillis());

          //Add the full entry to the ChangeRecord. This will cause the
          //Sync engine to skip the call to fetchEntry() and instead use this
          //entry.
          bldr.fullEntry(e);

          //Add the file as an attachment so that we can reference it when this
          //operation makes it back to acknowledgeCompletedOps()
          bldr.addProperty(FILE_ATTACHMENT_KEY, f);

          //Add the row number as an attachment so that we can update the
          //starting point when this operation makes it back to
          //acknowledgeCompletedOps()
          bldr.addProperty(CSV_ROW_NUMBER_KEY, currentRowNum);

          //Build the ChangeRecord and add it to the result list
          results.add(bldr.build());

          if(results.size() >= maxChanges)
          {
            //Don't look at any more rows in this file if we've reached the
            //maximum number of changes for this batch.
            break;
          }
        }

        if(attributeValues == null)
        {
          //We've reach the end of this file. Bookmark the file and how many
          //rows were in it so that we can properly move it to the processed
          //files directory once it is acknowledged.
          pendingFiles.put(f, currentRowNum);
          currentLine = null;
        }
        else
        {
          //We're stopping mid-file, so we need to record which line we're
          //currently on so we can pick back up at this spot on the next
          //invocation of getNextBatchOfChanges().
          currentLine = new ObjectPair<File,Long>(f, currentRowNum);
        }

        if(results.size() >= maxChanges)
        {
          //Don't look at any more files if we've reached the maximum number of
          //changes for this batch.
          break;
        }
      }
      catch(LDAPException e)
      {
        throw new EndpointException(e);
      }
      catch(IOException e)
      {
        throw new EndpointException(PostStepResult.ABORT_OPERATION,
                "Problem with file: " + f.getName() + " --> " +
                StaticUtils.getExceptionMessage(e));
      }
      finally
      {
        if(csvReader != null)
        {
          //Close the CSVReader.
          try
          {
            csvReader.close();
          }
          catch(IOException e)
          {
            serverContext.logMessage(LogSeverity.SEVERE_ERROR,
                    StaticUtils.getExceptionMessage(e));
          }
        }

        if(fileHandle != null)
        {
          //Release the exclusive lock we had on the file (this implicitly
          //closes the underlying FileChannel).
          try
          {
            fileHandle.close();
          }
          catch(IOException e)
          {
            serverContext.logMessage(LogSeverity.SEVERE_ERROR,
                    StaticUtils.getExceptionMessage(e));
          }
        }
      }
    }

    return results;
  }


  /**
   * Gets a list of all the entries in the source endpoint. This is used by the
   * 'resync' command line tool. The default implementation throws a
   * {@link UnsupportedOperationException}; subclasses should override if the
   * resync functionality is needed.
   * <p>
   * The <code>outputQueue</code> should contain {@link ChangeRecord} objects
   * with the <code>ChangeType</code> set to <code>null</code> to indicate that
   * these are resync operations.
   * <p>
   * This method should not return until all the entries at the source
   * have been added to the output queue. Separate threads will concurrently
   * drain entries from the queue and process them. The queue typically should
   * not contain full entries, but rather ChangeRecord objects which identify
   * the full source entries. These objects are then individually passed in to
   * {@link #fetchEntry(SyncOperation)}. Therefore, it is important to make sure
   * that the ChangeRecord instances contain enough identifiable information
   * (e.g. primary keys) for each entry so that the entry can be found again.
   * <p>
   * The lifecycle of resync is similar to that of real-time sync, with a few
   * differences:
   * <ol>
   * <li>Stream out a list of identifiers for the entries in the source
   *     endpoint, using a ChangeRecord as the identifier</li>
   * <li>Fetch full source entry for a ChangeRecord</li>
   * <li>Perform any mappings and compute the equivalent destination entry</li>
   * <li>Fetch full destination entry</li>
   * <li>Diff the computed destination entry and actual destination entry</li>
   * <li>Apply the minimal set of changes at the destination to bring it in sync
   * </li>
   * </ol>
   * <p>
   * Alternatively, the full entry can be set on the ChangeRecord within this
   * method, which will cause the "fetch full entry" step to be skipped. In this
   * case the Data Sync Server will just use the entry specified on the
   * ChangeRecord.
   * <p>
   * If the total set of entries is very large, it is fine to split up the work
   * into multiple network queries within this method. The queue will not grow
   * out of control because it blocks when it becomes full. The queue capacity
   * is fixed at 1000.
   * <p>
   * @param outputQueue
   *          a queue of ChangeRecord objects which will be individually
   *          fetched via {@link #fetchEntry(SyncOperation)}
   * @throws EndpointException
   *           if there is an error retrieving the list of entries to resync
   */
  @Override
  public void listAllEntries(final BlockingQueue<ChangeRecord> outputQueue)
                               throws EndpointException
  {
    serverContext.debugInfo("Beginning to dump all entries...");
    File[] files = csvFilesDir.listFiles(csvFilter);
    if (files == null)
    {
      throw new EndpointException(PostStepResult.ABORT_OPERATION,
          "Unable to open CSV file directory " + csvFilesDir.getAbsolutePath());
    }

    for(File f : files)
    {
      try
      {
        //We use the openCSV library here to handle CSV parsing
        CSVReader reader = new CSVReader(new FileReader(f));

        //The header line contains the attribute names
        String[] attrNames = reader.readNext();
        if(attrNames == null)
        {
          throw new EndpointException(PostStepResult.ABORT_OPERATION,
                  "Cannot read header line from " + f.getAbsolutePath());
        }

        //Check if the first attribute is 'uid'
        if(!"uid".equalsIgnoreCase(attrNames[0]))
        {
          throw new EndpointException(PostStepResult.ABORT_OPERATION,
                 "First attribute is not 'uid' in file " + f.getAbsolutePath());
        }

        String[] attributeValues;
        while ((attributeValues = reader.readNext()) != null) {
          //Create the entry
          Entry e = new Entry("uid=" + attributeValues[0] + "," + baseDN);

          //Add the attributes
          for(int i = 0; i < attributeValues.length; i++)
          {
            e.setAttribute(attrNames[i], attributeValues[i]);
          }

          //Construct a ChangeRecord with a null ChangeType to indicate resync
          ChangeRecord.Builder bldr =
                new ChangeRecord.Builder(null, e.getParsedDN());
          bldr.changedAttributes(attrNames);
          bldr.changeTime(f.lastModified());

          //Add the full entry to the ChangeRecord. This will cause the
          //Sync engine to skip the call to fetchEntry() and instead use this
          //entry.
          bldr.fullEntry(e);

          //Note: there is no "acknowledge" phase for resync, so we do not need
          //to worry about adding the file attachment here like we do in
          //getNextBatchOfChanges().

          outputQueue.put(bldr.build());
        }
      }
      catch(Exception e)
      {
        serverContext.logMessage(LogSeverity.SEVERE_ERROR,
                                    StaticUtils.getExceptionMessage(e));
      }
    }
  }


  /**
   * Gets a list of all the entries in the source from a given file input.
   * This is used by the 'resync' command line tool. The default implementation
   * throws a {@link UnsupportedOperationException}; subclasses should override
   * if the resync functionality is needed for specific records, which
   * can be specified in the input file.
   * <p>
   * The format for the <code>inputLines</code> (e.g. the content of the file)
   * is user-defined; it may be key/value pairs, primary keys, or full SQL
   * statements, for example. The use of this method is triggered via the
   * <i>--sourceInputFile</i> argument on the resync CLI. The
   * <code>outputQueue</code> should contain {@link ChangeRecord}
   * objects with the <code>ChangeType</code> set to <code>null</code> to
   * indicate that these are resync operations.
   * <p>
   * This method should not return until all the entries specified by the input
   * file have been added to the output queue. Separate threads will
   * concurrently drain entries from the queue and process them. The queue
   * typically should not contain full entries, but rather ChangeRecord
   * objects which identify the full source entries. These objects are then
   * individually passed in to {@link #fetchEntry(SyncOperation)}. Therefore,
   * it is important to make sure that the ChangeRecord instances
   * contain enough identifiable information (e.g. primary keys) for each entry
   * so that the entry can be found again.
   * <p>
   * The lifecycle of resync is similar to that of real-time sync, with a few
   * differences:
   * <ol>
   * <li>Stream out a list of identifiers for entries in the source endpoint,
   *     using the given input file as the basis for which entries to resync
   * </li>
   * <li>Fetch full source entry for an identifier</li>
   * <li>Perform any mappings and compute the equivalent destination entry</li>
   * <li>Fetch full destination entry</li>
   * <li>Diff the computed destination entry and actual destination entry</li>
   * <li>Apply the minimal set of changes at the destination to bring it in sync
   * </li>
   * </ol>
   * <p>
   * Alternatively, the full entry can be set on the ChangeRecord within this
   * method, which will cause the "fetch full entry" step to be skipped. In this
   * case the Data Sync Server will just use the entry specified on the
   * ChangeRecord.
   * <p>
   * If the total set of entries is very large, it is fine to split up the work
   * into multiple network queries within this method. The queue will not grow
   * out of control because it blocks when it becomes full. The queue capacity
   * is fixed at 1000.
   * <p>
   * @param inputLines
   *          an Iterator containing the lines from the specified input file to
   *          resync (this is specified on the CLI for the resync command).
   *          These lines can be any format, for example a set of primary keys,
   *          a set of WHERE clauses, a set of full SQL queries, etc.
   * @param outputQueue
   *          a queue of ChangeRecord objects which will be individually
   *          fetched via {@link #fetchEntry(SyncOperation)}
   * @throws EndpointException
   *           if there is an error retrieving the list of entries to resync
   */
  @Override
  public void listAllEntries(final Iterator<String> inputLines,
                             final BlockingQueue<ChangeRecord> outputQueue)
                                throws EndpointException
  {
    serverContext.debugInfo("Beginning to dump entries from file...");

    try
    {
      //We use the openCSV library here to handle CSV parsing
      CSVParser parser = new CSVParser();

      String headerLine = inputLines.next();
      String[] attrNames = parser.parseLine(headerLine);

      //The header line contains the attribute names
      if(attrNames == null)
      {
        throw new EndpointException(PostStepResult.ABORT_OPERATION,
                "Cannot read header line from input file");
      }

      //Check if the first attribute is 'uid'
      if(!"uid".equalsIgnoreCase(attrNames[0]))
      {
        throw new EndpointException(PostStepResult.ABORT_OPERATION,
               "First attribute is not 'uid' in the input file");
      }

      while(inputLines.hasNext())
      {
        String[] attributeValues;
        while ((attributeValues = parser.parseLine(inputLines.next())) != null)
        {
          //Create the entry
          Entry e = new Entry("uid=" + attributeValues[0] + "," + baseDN);

          //Add the attributes
          for(int i = 0; i < attributeValues.length; i++)
          {
            e.setAttribute(attrNames[i], attributeValues[i]);
          }

          //Construct a ChangeRecord with a null ChangeType to indicate resync
          ChangeRecord.Builder bldr =
                new ChangeRecord.Builder(null, e.getParsedDN());
          bldr.changedAttributes(attrNames);

          //Add the full entry to the ChangeRecord. This will cause the
          //Sync engine to skip the call to fetchEntry() and instead use this
          //entry.
          bldr.fullEntry(e);

          //Note: there is no "acknowledge" phase for resync, so we do not need
          //to worry about adding the file attachment here like we do in
          //getNextBatchOfChanges().

          //Add the ChangeRecord to the output queue
          outputQueue.put(bldr.build());
        }
      }
    }
    catch(Exception e)
    {
      serverContext.logMessage(LogSeverity.SEVERE_ERROR,
                                  StaticUtils.getExceptionMessage(e));
    }
  }


  /**
   * Moves a file that we have finished processing to the processed files
   * directory and deletes the original file.
   *
   * @param srcFile the file to move
   * @throws EndpointException if there is a problem moving the file
   */
  private void moveCompletedFile(final File srcFile) throws EndpointException
  {
    final File destFile = new File(processedFilesDir, srcFile.getName());
    try
    {
      Files.move(srcFile.toPath(), destFile.toPath(), REPLACE_EXISTING);
    }
    catch(Exception e)
    {
      throw new EndpointException(PostStepResult.ABORT_OPERATION,
              "Could not move " + srcFile.getAbsolutePath() +
              " to " + destFile.getAbsolutePath(), e);
    }
  }
}