/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * docs/licenses/cddl.txt * or http://www.opensource.org/licenses/cddl1.php. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * docs/licenses/cddl.txt. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2011-2024 Ping Identity Corporation */ package com.unboundid.directory.sdk.examples; import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.io.File; import java.io.FileFilter; import java.io.FileReader; import java.io.IOException; import java.io.RandomAccessFile; import java.io.Serializable; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; import au.com.bytecode.opencsv.CSVParser; import au.com.bytecode.opencsv.CSVReader; import com.unboundid.ldap.sdk.ChangeType; import com.unboundid.ldap.sdk.Entry; import com.unboundid.ldap.sdk.DN; import com.unboundid.ldap.sdk.LDAPException; import com.unboundid.util.ObjectPair; import com.unboundid.util.args.ArgumentParser; import com.unboundid.util.args.ArgumentException; import com.unboundid.util.args.DNArgument; import com.unboundid.util.args.FileArgument; import com.unboundid.util.StaticUtils; import com.unboundid.directory.sdk.sync.api.SyncSource; import com.unboundid.directory.sdk.sync.config.SyncSourceConfig; import com.unboundid.directory.sdk.sync.types.EndpointException; import com.unboundid.directory.sdk.sync.types.PostStepResult; import com.unboundid.directory.sdk.sync.types.SyncServerContext; import com.unboundid.directory.sdk.sync.types.SetStartpointOptions; import com.unboundid.directory.sdk.sync.types.ChangeRecord; import com.unboundid.directory.sdk.sync.types.SyncOperation; import com.unboundid.directory.sdk.common.types.LogSeverity; import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; /** * This class provides a simple example of a generic Sync Source which will * detect CSV files in a given directory and synchronize them to an * LDAP destination. More specifically, the extension will look for any files * with the ".csv" extension, and interpret each row to be a single LDAP entry * to be synchronized. After a file has been completely processed, it will be * moved to a separate directory. * <p> * The CSV header should contain valid LDAP attribute names. The 'uid' attribute * is used as the RDN and is required to be present, and the source base DN * is configurable using an extension argument. The DNs for the entries will be * constructed as follows: uid={uid},{baseDN}. Each row in a CSV file is * expected to have values for all of the LDAP attributes, not just those that * have changed. * <p> * This extension creates ChangeRecords using {@link ChangeType#MODIFY}, so when * using this example, it's important to set <i>modifies-as-creates=true</i> * on the Sync Class in the configuration. This will allow the Sync Pipe to * create entries that don't already exist. For the sake of simplicity, this * example does not handle DELETE operations. * <p> * This extension uses the OpenCSV library, which is available at * <a href="http://opencsv.sourceforge.net">http://opencsv.sourceforge.net</a> * and provides simple CSV parsing. * <p> * The following arguments are defined: * <UL> * <LI>csv-files-dir -- The filesystem directory to monitor for CSV files</LI> * <LI>processed-files-dir -- The filesystem directory where finished CSV * files should be moved</LI> * <LI>base-dn -- The base DN to use when creating LDAP entries from the CSV * content</LI> * </UL> */ public final class ExampleSyncSource extends SyncSource { //The server context which can be used for obtaining the server state, //logging, etc. private SyncServerContext serverContext; //A set of files that have been completely detected by getNextBatchOfChanges() //(e.g. all rows have been read and returned). This is a map so that we know //how many records were in each file. private final Map<File,Long> pendingFiles = new ConcurrentHashMap<File,Long>(); //A state holder to keep track of which line of which file we are currently //at. private ObjectPair<File,Long> currentLine; //The directory which we will monitor for new CSV files. private File csvFilesDir; //The directory where we will move files after they have been processed. private File processedFilesDir; //The base DN to use for entries in the CSV files. private DN baseDN; //The set of attribute names for the current CSV file. (This represents the //CSV header). private String[] attributeNames; //The file filter used to match CSV files in the csvDirectory. private FileFilter csvFilter; //A Comparator to sort the files from oldest to newest private Comparator<File> oldestToNewestComparator; //The attachment key used to place the CSV filename on the ChangeRecord //so that acknowledgeCompletedOps() can reference it. private static final String FILE_ATTACHMENT_KEY = "File.Attachment"; //The attachment key used to place the row number of a record in the CSV on //the ChangeRecord when it is built. private static final String CSV_ROW_NUMBER_KEY = "CSV.RowNumber"; /** * Retrieves a human-readable name for this extension. * * @return A human-readable name for this extension. */ @Override public String getExtensionName() { return "Example Sync Source"; } /** * Retrieves a human-readable description for this extension. Each element * of the array that is returned will be considered a separate paragraph in * generated documentation. * * @return A human-readable description for this extension, or {@code null} * or an empty array if no description should be available. */ @Override public String[] getExtensionDescription() { return new String[] { "This extension implements a Sync Source which can be used " + "to detect and synchronize changes from CSV files in a given " + "directory. The extension will look for any files with the .csv " + "extension and interpret each row to be a single LDAP entry to " + "be synchronized. After a file has been completely processed, " + "it will be moved to a separate directory.", "For the sake of simplicity, the columns in the files " + "should be LDAP attributes (cn, sn, givenname, etc). The " + "first line (CSV header) should contain the attribute names " + "and the first column is always required to be the 'uid' " + "attribute. The DNs for the entries will be constructed as " + "follows: uid={uid},{baseDN}. Each row in a CSV file is " + "expected to have values for all of the LDAP attributes, not " + "just those that have changed." }; } /** * Updates the provided argument parser to define any configuration arguments * which may be used by this extension. The argument parser may also be * updated to define relationships between arguments (e.g. to specify * required, exclusive, or dependent argument sets). * * @param parser The argument parser to be updated with the configuration * arguments which may be used by this extension. * * @throws ArgumentException If a problem is encountered while updating the * provided argument parser. */ @Override public void defineConfigArguments(final ArgumentParser parser) throws ArgumentException { FileArgument newFilesArg = new FileArgument( 'd', "csv-files-dir", true, 1, "{path}", "The file system directory to monitor for CSV files", true, true, false, true); parser.addArgument(newFilesArg); FileArgument processedFilesArg = new FileArgument( 'p', "processed-files-dir", true, 1, "{path}", "The file system directory where the extension should " + "move CSV files after it finishes processing them", true, true, false, true); parser.addArgument(processedFilesArg); DN defaultDN = null; try { defaultDN = new DN("dc=example,dc=com"); } catch(LDAPException e) { throw new ArgumentException("Could not construct a default DN", e); } DNArgument baseDNArg = new DNArgument('b', "base-dn", true, 1, "{base-dn}", "The base DN to use for entries in the CSV files", defaultDN); parser.addArgument(baseDNArg); } /** * This hook is called when a Sync Pipe first starts up, when the * <i>resync</i> process first starts up, or when the set-startpoint * subcommand is called from the <i>realtime-sync</i> command line tool. * Any initialization of this sync source should be performed here. This * method should generally store the {@link SyncServerContext} in a class * member so that it can be used elsewhere in the implementation. * * @param serverContext A handle to the server context for the server in * which this extension is running. * @param config The general configuration for this sync source. * @param parser The argument parser which has been initialized from * the configuration for this sync source. */ @Override public void initializeSyncSource(final SyncServerContext serverContext, final SyncSourceConfig config, final ArgumentParser parser) { this.serverContext = serverContext; csvFilter = new FileFilter() { public boolean accept(final File f) { return f.isFile() && f.canRead() && f.getName().endsWith(".csv"); } }; oldestToNewestComparator = new Comparator<File>() { public int compare(final File f1, final File f2) { if(f1.lastModified() - f2.lastModified() > 0) { return 1; } else if(f1.lastModified() - f2.lastModified() < 0) { return -1; } else { return 0; } } }; FileArgument newFilesArg = (FileArgument) parser.getNamedArgument("csv-files-dir"); csvFilesDir = newFilesArg.getValue(); FileArgument processedFilesArg = (FileArgument) parser.getNamedArgument("processed-files-dir"); processedFilesDir = processedFilesArg.getValue(); DNArgument baseDNArg = (DNArgument) parser.getNamedArgument("base-dn"); baseDN = baseDNArg.getValue(); } /** * This hook is called when a Sync Pipe shuts down, when the <i>resync</i> * process shuts down, or when the set-startpoint subcommand (from the * <i>realtime-sync</i> command line tool) is finished. Any clean up of this * sync source should be performed here. */ @Override public void finalizeSyncSource() { //No implementation necessary. } /** * This method should effectively set the starting point for synchronization * to the place specified by the <code>options</code> parameter. This should * cause all changes previous to the specified start point to be disregarded * and only changes after that point to be returned by * {@link #getNextBatchOfChanges(int, AtomicLong)}. * <p> * There are several different startpoint types (see * {@link SetStartpointOptions}), and this implementation is not required to * support them all. If the specified startpoint type is unsupported, this * method should throw an {@link UnsupportedOperationException}. * <p> * <b>IMPORTANT</b>: The <code>RESUME_AT_SERIALIZABLE</code> startpoint type * must be supported by your implementation, because this is used when a Sync * Pipe first starts up. The {@link Serializable} in this case is the same * type that is returned by {@link #getStartpoint()}; the * Data Sync Server persists it and passes it back in on a restart. * <p> * This method can be called from two different contexts: * <ul> * <li>When the 'set-startpoint' subcommand of the realtime-sync CLI is used * (the Sync Pipe is required to be stopped in this context)</li> * <li>Immediately after a Sync Pipe starts up and a connection is first * established to the source server (e.g. before the first call to * {@link #getNextBatchOfChanges(int, AtomicLong)})</li> * </ul> * * @param options * an object which indicates where exactly to start synchronizing * (e.g. the end of the changelog, specific change number, a certain * time ago, etc) * @throws EndpointException * if there is any error while setting the start point */ @Override @SuppressWarnings("unchecked") public void setStartpoint(final SetStartpointOptions options) throws EndpointException { File[] files; switch(options.getStartpointType()) { case BEGINNING_OF_CHANGELOG: //Nothing to do here, getNextBatchOfChanges() will automatically start //at the beginning (e.g. it will pick up at the first file in the //csvDirectory). break; case END_OF_CHANGELOG: //Move all files to the processed files directory files = csvFilesDir.listFiles(csvFilter); if (files == null) { throw new EndpointException(PostStepResult.ABORT_OPERATION, "Unable to open CSV file directory " + csvFilesDir.getAbsolutePath()); } for(File f : files) { moveCompletedFile(f); } break; case RESUME_AT_CHANGE_TIME: long time = options.getChangeTime().getTime(); files = csvFilesDir.listFiles(csvFilter); if (files == null) { throw new EndpointException(PostStepResult.ABORT_OPERATION, "Unable to open CSV file directory " + csvFilesDir.getAbsolutePath()); } Arrays.sort(files, oldestToNewestComparator); for(File f : files) { if(f.lastModified() < time) { moveCompletedFile(f); } else { break; } } break; case RESUME_AT_SERIALIZABLE: //When sync first starts up, this method is called with //RESUME_AT_SERIALIZABLE to initialize the internal state. //Since this extension does not save any state, we do not have to //set anything here. break; default: throw new IllegalArgumentException( "This startpoint type is not supported: " + options.getStartpointType().toString()); } } /** * Gets the current value of the startpoint for change detection. This is the * "bookmark" which indicates which changes have already been processed and * which have not. In most cases, a change number is used to detect changes * and is managed by the Data Sync Server, in which case this * implementation needs only to return the latest acknowledged * change number. In other cases, the return value may correspond to a * different value, such as the SYS_CHANGE_VERSION in Microsoft SQL Server. * In any case, this method should return the value that is updated by * {@link #acknowledgeCompletedOps(LinkedList)}. * <p> * This method is called periodically and the return value is saved in the * persistent state for the Sync Pipe that uses this extension as its Sync * Source. * <p> * <b>IMPORTANT</b>: The internal value for the startpoint should only be * updated after a sync operation is acknowledged back to this extension (via * {@link #acknowledgeCompletedOps(LinkedList)}). * Otherwise it will be possible for changes to be missed when the * Data Sync Server is restarted or a connection error occurs. * @return a value to store in the persistent state for the Sync Pipe. This is * usually a change number, but if a changelog table is not used to * detect changes, this value should represent some other token to * pass into {@link #setStartpoint(SetStartpointOptions)} * when the sync pipe starts up. */ @Override public Serializable getStartpoint() { //We return null here because we are synchronizing from a directory of files //where the files are removed once they have been synchronized. In this //case, we do not need to keep track of where to start; we always start at //the oldest file in the directory and work forward. return null; } /** * Return the URL or path identifying the source endpoint * from which this extension is transmitting data. This is used for logging * purposes only, so it could just be a server name or hostname and port, etc. * * @return the path to the source endpoint */ @Override public String getCurrentEndpointURL() { return csvFilesDir.getAbsolutePath(); } /** * Return a full source entry (in LDAP form) from the source, corresponding * to the {@link ChangeRecord} that is passed in through the * {@link SyncOperation}. This method should perform any queries necessary to * gather the latest values for all the attributes to be synchronized. * <p> * This method <b>must be thread safe</b>, as it will be called repeatedly and * concurrently by each of the Sync Pipe worker threads as they process * entries. * <p> * If the original ChangeRecord has the full entry already set on it * (which can be done using the * {@link com.unboundid.directory.sdk.sync.types.ChangeRecord.Builder#fullEntry(Entry)}), * then this method will not get called, and the Data Sync Server * will automatically use the full entry from the ChangeRecord. In this case, * the implementation can always return {@code null}. * * @param operation * the SyncOperation which identifies the source "entry" to * fetch. The ChangeRecord can be obtained by calling * <code>operation.getChangeRecord()</code>. * These ChangeRecords are generated by * {@link #getNextBatchOfChanges(int, AtomicLong)} * or by * {@link #listAllEntries(BlockingQueue)}. * * @return a full LDAP Entry, or null if no such entry exists. * @throws EndpointException * if there is an error fetching the entry */ @Override public Entry fetchEntry(final SyncOperation operation) throws EndpointException { ChangeRecord record = operation.getChangeRecord(); throw new IllegalStateException( "fetchEntry() should not be called because the full entry is set " + "on the ChangeRecord: " + record.toString()); } /** * Provides a way for the Data Sync Server to acknowledge back to the * extension which sync operations it has processed. This method should update * the official startpoint which was set by * {@link #setStartpoint(SetStartpointOptions)} and is * returned by {@link #getStartpoint()}. * <p> * <b>IMPORTANT</b>: The internal value for the startpoint should only be * updated after a sync operation is acknowledged back to this extension (via * this method). Otherwise it will be possible for changes to be missed when * the Data Sync Server is restarted or a connection error occurs. * * @param completedOps * a list of {@link SyncOperation}s that have finished processing. * The records are listed in the order they were first detected. * @throws EndpointException * if there is an error acknowledging the changes back to the * database */ @Override public void acknowledgeCompletedOps( final LinkedList<SyncOperation> completedOps) throws EndpointException { ChangeRecord lastRecord = completedOps.getLast().getChangeRecord(); File lastFile = (File) lastRecord.getProperty(FILE_ATTACHMENT_KEY); Long rowNumber = (Long) lastRecord.getProperty(CSV_ROW_NUMBER_KEY); File[] files = csvFilesDir.listFiles(csvFilter); if (files == null) { throw new EndpointException(PostStepResult.ABORT_OPERATION, "Unable to open CSV file directory " + csvFilesDir.getAbsolutePath()); } //Sort the files from oldest to newest Arrays.sort(files, oldestToNewestComparator); //Move completed files to the completed files directory for(File f : files) { //Because changes are acknowledged in order and we detect them from oldest //to newest, we know if the file is older than the lastFile from above, //then we have finished processing it. if(f.lastModified() < lastFile.lastModified() && pendingFiles.containsKey(f)) { moveCompletedFile(f); pendingFiles.remove(f); } } //If the last acknowledged file is completely finished, move it to the //processed files directory as well. if(pendingFiles.get(lastFile) == rowNumber) { moveCompletedFile(lastFile); pendingFiles.remove(lastFile); } } /** * Return the next batch of change records from the source. Change records * are usually just hints that a change happened; they do not include * the full contents of the target entry. In an effort to never synchronize * stale data, the Data Sync Server will go back and fetch the full * target entry for each change record. * <p> * On the first invocation, this should return changes starting from the * startpoint that was set by * {@link #setStartpoint(SetStartpointOptions)}. This method is also * responsible for updating the internal state such that subsequent * invocations do not return duplicate changes. * <p> * The resulting list should be limited by <code>maxChanges</code>. The * <code>numStillPending</code> reference should be set to the estimated * number of changes that haven't yet been retrieved from the source endpoint * when this method returns, or zero if all the current changes have been * retrieved. * <p> * <b>IMPORTANT</b>: While this method needs to keep track of which changes * have already been returned so that it does not return them again, it should * <b>NOT</b> modify the official startpoint. The internal value for the * startpoint should only be updated after a sync operation is acknowledged * back to this extension (via * {@link #acknowledgeCompletedOps(LinkedList)}). * Otherwise it will be possible for changes to be missed when the * Data Sync Server is restarted or a connection error occurs. The * startpoint should not change as a result of this method. * <p> * This method <b>does not need to be thread-safe</b>. It will be invoked * repeatedly by a single thread, based on the polling interval set in the * Sync Pipe configuration. * * @param maxChanges * the maximum number of changes to retrieve * @param numStillPending * this should be set to the number of unretrieved changes that * are still pending after this batch has been retrieved. This will * be passed in * as zero, and may be left that way if the actual value cannot be * determined. * @return a list of {@link ChangeRecord} instances, each * corresponding to a single change at the source endpoint. * If there are no new changes to return, this method should return * an empty list. * @throws EndpointException * if there is any error while retrieving the next batch of changes */ @Override public List<ChangeRecord> getNextBatchOfChanges( final int maxChanges, final AtomicLong numStillPending) throws EndpointException { List<ChangeRecord> results = new ArrayList<ChangeRecord>(); File[] files = csvFilesDir.listFiles(csvFilter); if (files == null) { throw new EndpointException( PostStepResult.ABORT_OPERATION, "Unable to open CSV file directory " + csvFilesDir.getAbsolutePath()); } //Sort the files from oldest to newest Arrays.sort(files, oldestToNewestComparator); for(File f : files) { if(pendingFiles.containsKey(f) || !f.exists()) { //This file has already been completely detected. continue; } CSVReader csvReader = null; RandomAccessFile fileHandle = null; try { fileHandle = new RandomAccessFile(f, "rw"); FileChannel channel = fileHandle.getChannel(); //Try to acquire an exclusive lock on the file to make sure no other //processes are currently using the file. FileLock lock = channel.tryLock(); if(lock == null) { //Another process is using the file; this could happen when the file //is first written to the CSV directory, for example. serverContext.logMessage(LogSeverity.MILD_WARNING, "Another process is using file: " + f.getPath()); continue; } //We use the openCSV library here to handle CSV parsing csvReader = new CSVReader(new FileReader(fileHandle.getFD())); //The header line contains the attribute names attributeNames = csvReader.readNext(); if(attributeNames == null) { throw new EndpointException(PostStepResult.ABORT_OPERATION, "Cannot read CSV header from " + f.getAbsolutePath()); } //Check if the first attribute is 'uid' if(!"uid".equalsIgnoreCase(attributeNames[0])) { throw new EndpointException(PostStepResult.ABORT_OPERATION, "First attribute is not 'uid' in file " + f.getAbsolutePath()); } long currentRowNum = 0; String[] attributeValues; while ((attributeValues = csvReader.readNext()) != null) { currentRowNum++; if (currentLine != null && currentLine.getFirst().equals(f) && currentLine.getSecond() >= currentRowNum) { continue; } //Create the entry Entry e = new Entry("uid=" + attributeValues[0] + "," + baseDN); //Add the attributes for(int i = 0; i < attributeValues.length; i++) { e.setAttribute(attributeNames[i], attributeValues[i]); } //Construct a ChangeRecord ChangeRecord.Builder bldr = new ChangeRecord.Builder( ChangeType.MODIFY, e.getParsedDN()); bldr.changedAttributes(attributeNames); bldr.changeTime(System.currentTimeMillis()); //Add the full entry to the ChangeRecord. This will cause the //Sync engine to skip the call to fetchEntry() and instead use this //entry. bldr.fullEntry(e); //Add the file as an attachment so that we can reference it when this //operation makes it back to acknowledgeCompletedOps() bldr.addProperty(FILE_ATTACHMENT_KEY, f); //Add the row number as an attachment so that we can update the //starting point when this operation makes it back to //acknowledgeCompletedOps() bldr.addProperty(CSV_ROW_NUMBER_KEY, currentRowNum); //Build the ChangeRecord and add it to the result list results.add(bldr.build()); if(results.size() >= maxChanges) { //Don't look at any more rows in this file if we've reached the //maximum number of changes for this batch. break; } } if(attributeValues == null) { //We've reach the end of this file. Bookmark the file and how many //rows were in it so that we can properly move it to the processed //files directory once it is acknowledged. pendingFiles.put(f, currentRowNum); currentLine = null; } else { //We're stopping mid-file, so we need to record which line we're //currently on so we can pick back up at this spot on the next //invocation of getNextBatchOfChanges(). currentLine = new ObjectPair<File,Long>(f, currentRowNum); } if(results.size() >= maxChanges) { //Don't look at any more files if we've reached the maximum number of //changes for this batch. break; } } catch(LDAPException e) { throw new EndpointException(e); } catch(IOException e) { throw new EndpointException(PostStepResult.ABORT_OPERATION, "Problem with file: " + f.getName() + " --> " + StaticUtils.getExceptionMessage(e)); } finally { if(csvReader != null) { //Close the CSVReader. try { csvReader.close(); } catch(IOException e) { serverContext.logMessage(LogSeverity.SEVERE_ERROR, StaticUtils.getExceptionMessage(e)); } } if(fileHandle != null) { //Release the exclusive lock we had on the file (this implicitly //closes the underlying FileChannel). try { fileHandle.close(); } catch(IOException e) { serverContext.logMessage(LogSeverity.SEVERE_ERROR, StaticUtils.getExceptionMessage(e)); } } } } return results; } /** * Gets a list of all the entries in the source endpoint. This is used by the * 'resync' command line tool. The default implementation throws a * {@link UnsupportedOperationException}; subclasses should override if the * resync functionality is needed. * <p> * The <code>outputQueue</code> should contain {@link ChangeRecord} objects * with the <code>ChangeType</code> set to <code>null</code> to indicate that * these are resync operations. * <p> * This method should not return until all the entries at the source * have been added to the output queue. Separate threads will concurrently * drain entries from the queue and process them. The queue typically should * not contain full entries, but rather ChangeRecord objects which identify * the full source entries. These objects are then individually passed in to * {@link #fetchEntry(SyncOperation)}. Therefore, it is important to make sure * that the ChangeRecord instances contain enough identifiable information * (e.g. primary keys) for each entry so that the entry can be found again. * <p> * The lifecycle of resync is similar to that of real-time sync, with a few * differences: * <ol> * <li>Stream out a list of identifiers for the entries in the source * endpoint, using a ChangeRecord as the identifier</li> * <li>Fetch full source entry for a ChangeRecord</li> * <li>Perform any mappings and compute the equivalent destination entry</li> * <li>Fetch full destination entry</li> * <li>Diff the computed destination entry and actual destination entry</li> * <li>Apply the minimal set of changes at the destination to bring it in sync * </li> * </ol> * <p> * Alternatively, the full entry can be set on the ChangeRecord within this * method, which will cause the "fetch full entry" step to be skipped. In this * case the Data Sync Server will just use the entry specified on the * ChangeRecord. * <p> * If the total set of entries is very large, it is fine to split up the work * into multiple network queries within this method. The queue will not grow * out of control because it blocks when it becomes full. The queue capacity * is fixed at 1000. * <p> * @param outputQueue * a queue of ChangeRecord objects which will be individually * fetched via {@link #fetchEntry(SyncOperation)} * @throws EndpointException * if there is an error retrieving the list of entries to resync */ @Override public void listAllEntries(final BlockingQueue<ChangeRecord> outputQueue) throws EndpointException { serverContext.debugInfo("Beginning to dump all entries..."); File[] files = csvFilesDir.listFiles(csvFilter); if (files == null) { throw new EndpointException(PostStepResult.ABORT_OPERATION, "Unable to open CSV file directory " + csvFilesDir.getAbsolutePath()); } for(File f : files) { try { //We use the openCSV library here to handle CSV parsing CSVReader reader = new CSVReader(new FileReader(f)); //The header line contains the attribute names String[] attrNames = reader.readNext(); if(attrNames == null) { throw new EndpointException(PostStepResult.ABORT_OPERATION, "Cannot read header line from " + f.getAbsolutePath()); } //Check if the first attribute is 'uid' if(!"uid".equalsIgnoreCase(attrNames[0])) { throw new EndpointException(PostStepResult.ABORT_OPERATION, "First attribute is not 'uid' in file " + f.getAbsolutePath()); } String[] attributeValues; while ((attributeValues = reader.readNext()) != null) { //Create the entry Entry e = new Entry("uid=" + attributeValues[0] + "," + baseDN); //Add the attributes for(int i = 0; i < attributeValues.length; i++) { e.setAttribute(attrNames[i], attributeValues[i]); } //Construct a ChangeRecord with a null ChangeType to indicate resync ChangeRecord.Builder bldr = new ChangeRecord.Builder(null, e.getParsedDN()); bldr.changedAttributes(attrNames); bldr.changeTime(f.lastModified()); //Add the full entry to the ChangeRecord. This will cause the //Sync engine to skip the call to fetchEntry() and instead use this //entry. bldr.fullEntry(e); //Note: there is no "acknowledge" phase for resync, so we do not need //to worry about adding the file attachment here like we do in //getNextBatchOfChanges(). outputQueue.put(bldr.build()); } } catch(Exception e) { serverContext.logMessage(LogSeverity.SEVERE_ERROR, StaticUtils.getExceptionMessage(e)); } } } /** * Gets a list of all the entries in the source from a given file input. * This is used by the 'resync' command line tool. The default implementation * throws a {@link UnsupportedOperationException}; subclasses should override * if the resync functionality is needed for specific records, which * can be specified in the input file. * <p> * The format for the <code>inputLines</code> (e.g. the content of the file) * is user-defined; it may be key/value pairs, primary keys, or full SQL * statements, for example. The use of this method is triggered via the * <i>--sourceInputFile</i> argument on the resync CLI. The * <code>outputQueue</code> should contain {@link ChangeRecord} * objects with the <code>ChangeType</code> set to <code>null</code> to * indicate that these are resync operations. * <p> * This method should not return until all the entries specified by the input * file have been added to the output queue. Separate threads will * concurrently drain entries from the queue and process them. The queue * typically should not contain full entries, but rather ChangeRecord * objects which identify the full source entries. These objects are then * individually passed in to {@link #fetchEntry(SyncOperation)}. Therefore, * it is important to make sure that the ChangeRecord instances * contain enough identifiable information (e.g. primary keys) for each entry * so that the entry can be found again. * <p> * The lifecycle of resync is similar to that of real-time sync, with a few * differences: * <ol> * <li>Stream out a list of identifiers for entries in the source endpoint, * using the given input file as the basis for which entries to resync * </li> * <li>Fetch full source entry for an identifier</li> * <li>Perform any mappings and compute the equivalent destination entry</li> * <li>Fetch full destination entry</li> * <li>Diff the computed destination entry and actual destination entry</li> * <li>Apply the minimal set of changes at the destination to bring it in sync * </li> * </ol> * <p> * Alternatively, the full entry can be set on the ChangeRecord within this * method, which will cause the "fetch full entry" step to be skipped. In this * case the Data Sync Server will just use the entry specified on the * ChangeRecord. * <p> * If the total set of entries is very large, it is fine to split up the work * into multiple network queries within this method. The queue will not grow * out of control because it blocks when it becomes full. The queue capacity * is fixed at 1000. * <p> * @param inputLines * an Iterator containing the lines from the specified input file to * resync (this is specified on the CLI for the resync command). * These lines can be any format, for example a set of primary keys, * a set of WHERE clauses, a set of full SQL queries, etc. * @param outputQueue * a queue of ChangeRecord objects which will be individually * fetched via {@link #fetchEntry(SyncOperation)} * @throws EndpointException * if there is an error retrieving the list of entries to resync */ @Override public void listAllEntries(final Iterator<String> inputLines, final BlockingQueue<ChangeRecord> outputQueue) throws EndpointException { serverContext.debugInfo("Beginning to dump entries from file..."); try { //We use the openCSV library here to handle CSV parsing CSVParser parser = new CSVParser(); String headerLine = inputLines.next(); String[] attrNames = parser.parseLine(headerLine); //The header line contains the attribute names if(attrNames == null) { throw new EndpointException(PostStepResult.ABORT_OPERATION, "Cannot read header line from input file"); } //Check if the first attribute is 'uid' if(!"uid".equalsIgnoreCase(attrNames[0])) { throw new EndpointException(PostStepResult.ABORT_OPERATION, "First attribute is not 'uid' in the input file"); } while(inputLines.hasNext()) { String[] attributeValues; while ((attributeValues = parser.parseLine(inputLines.next())) != null) { //Create the entry Entry e = new Entry("uid=" + attributeValues[0] + "," + baseDN); //Add the attributes for(int i = 0; i < attributeValues.length; i++) { e.setAttribute(attrNames[i], attributeValues[i]); } //Construct a ChangeRecord with a null ChangeType to indicate resync ChangeRecord.Builder bldr = new ChangeRecord.Builder(null, e.getParsedDN()); bldr.changedAttributes(attrNames); //Add the full entry to the ChangeRecord. This will cause the //Sync engine to skip the call to fetchEntry() and instead use this //entry. bldr.fullEntry(e); //Note: there is no "acknowledge" phase for resync, so we do not need //to worry about adding the file attachment here like we do in //getNextBatchOfChanges(). //Add the ChangeRecord to the output queue outputQueue.put(bldr.build()); } } } catch(Exception e) { serverContext.logMessage(LogSeverity.SEVERE_ERROR, StaticUtils.getExceptionMessage(e)); } } /** * Moves a file that we have finished processing to the processed files * directory and deletes the original file. * * @param srcFile the file to move * @throws EndpointException if there is a problem moving the file */ private void moveCompletedFile(final File srcFile) throws EndpointException { final File destFile = new File(processedFilesDir, srcFile.getName()); try { Files.move(srcFile.toPath(), destFile.toPath(), REPLACE_EXISTING); } catch(Exception e) { throw new EndpointException(PostStepResult.ABORT_OPERATION, "Could not move " + srcFile.getAbsolutePath() + " to " + destFile.getAbsolutePath(), e); } } }