001    /*
002     * CDDL HEADER START
003     *
004     * The contents of this file are subject to the terms of the
005     * Common Development and Distribution License, Version 1.0 only
006     * (the "License").  You may not use this file except in compliance
007     * with the License.
008     *
009     * You can obtain a copy of the license at
010     * docs/licenses/cddl.txt
011     * or http://www.opensource.org/licenses/cddl1.php.
012     * See the License for the specific language governing permissions
013     * and limitations under the License.
014     *
015     * When distributing Covered Code, include this CDDL HEADER in each
016     * file and include the License file at
017     * docs/licenses/cddl.txt.  If applicable,
018     * add the following below this CDDL HEADER, with the fields enclosed
019     * by brackets "[]" replaced with your own identifying information:
020     *      Portions Copyright [yyyy] [name of copyright owner]
021     *
022     * CDDL HEADER END
023     *
024     *
025     *      Copyright 2011-2013 UnboundID Corp.
026     */
027    package com.unboundid.directory.sdk.sync.api;
028    
029    
030    import java.io.Serializable;
031    import java.util.Collections;
032    import java.util.Iterator;
033    import java.util.LinkedList;
034    import java.util.List;
035    import java.util.Map;
036    import java.util.concurrent.BlockingQueue;
037    import java.util.concurrent.atomic.AtomicLong;
038    
039    import com.unboundid.directory.sdk.common.internal.Configurable;
040    import com.unboundid.directory.sdk.common.internal.ExampleUsageProvider;
041    import com.unboundid.directory.sdk.common.internal.UnboundIDExtension;
042    import com.unboundid.directory.sdk.sync.config.SyncSourceConfig;
043    import com.unboundid.directory.sdk.sync.internal.SynchronizationServerExtension;
044    import com.unboundid.directory.sdk.sync.types.EndpointException;
045    import com.unboundid.directory.sdk.sync.types.SetStartpointOptions;
046    import com.unboundid.directory.sdk.sync.types.SyncOperation;
047    import com.unboundid.directory.sdk.sync.types.SyncServerContext;
048    import com.unboundid.directory.sdk.sync.types.ChangeRecord;
049    import com.unboundid.ldap.sdk.Entry;
050    import com.unboundid.util.Extensible;
051    import com.unboundid.util.args.ArgumentException;
052    import com.unboundid.util.args.ArgumentParser;
053    
054    
055    
056    /**
057     * This class defines an API that must be implemented by extensions in order to
058     * synchronize data from a generic (non-LDAP and non-JDBC) endpoint. Since the
059     * UnboundID Synchronization Server is LDAP-centric, this API allows you to take
060     * generic content and convert it into LDAP entries which can then be processed
061     * by the Synchronization Server. The lifecycle of a sync operation is as
062     * follows:
063     * <ol>
064     * <li>Detect change at the synchronization source</li>
065     * <li>Fetch full source entry</li>
066     * <li>Perform any mappings and compute the equivalent destination entry</li>
067     * <li>Fetch full destination entry</li>
068     * <li>Diff the computed destination entry and actual (fetched) destination
069     * entry</li>
070     * <li>Apply the minimal set of changes at the destination to bring it in sync
071     * </li>
072     * </ol>
073     * This implies that the
074     * {@link #fetchEntry(SyncOperation)} method will be called once for every
075     * change that is returned by
076     * {@link #getNextBatchOfChanges(int, AtomicLong)}.
077     * <p>
078     * This is a generic interface and there is no protocol-specific connection
079     * management provided. It is expected that implementers will provide their own
080     * libraries for talking to the source endpoint and handle the connection
081     * lifecycle in the {@link #initializeSyncSource(SyncServerContext,
082     * SyncSourceConfig, ArgumentParser)} and {@link #finalizeSyncSource()}
083     * methods of this extension.
084     * <p>
085     * During realtime synchronization (i.e. when a Sync Pipe is running), there is
086     * a sliding window of changes being processed, and this API provides a
087     * distinction between some different points along that window:
088     * <ul>
089     *   <li><b>Old changes</b>: These are changes that the Sync Server has
090     *       processed and acknowledged back to the Sync Source. The Sync Source is
091     *       under no obligation to re-detect these changes.</li>
092     *   <li><b>Startpoint</b>: This marks where the Sync Source will start
093     *       detecting changes if it is restarted.</li>
094     *   <li><b>Detected but unacknowledged</b>: These changes have been returned by
095     *       <code>getNextBatchOfChanges()</code> but not completely processed and
096     *       acknowledged back to the Sync Source.</li>
097     *   <li><b>Undetected changes</b>: The next call to
098     *       <code>getNextBatchOfChanges()</code> should return the first changes
099     *       that have not been detected.  This should be somewhere at or ahead of
100     *       the startpoint.</li>
101     * </ul>
102     * <p>
103     * Several of these methods throw {@link EndpointException}, which should be
104     * used in the case of any connection or endpoint error. For other types of
105     * errors, runtime exceptions may be used (IllegalStateException,
106     * NullPointerException, etc.). The Synchronization Server will automatically
107     * retry operations that fail, up to a configurable amount of attempts. The
108     * EndpointException class allows you to specify a retry policy as well.
109     * <BR>
110     * <H2>Configuring Sync Sources</H2>
111     * In order to configure a Sync Source based on this API and written in Java,
112     * use a command like:
113     * <PRE>
114     *      dsconfig create-sync-source \
115     *           --source-name "<I>{source-name}</I>" \
116     *           --type third-party \
117     *           --set "extension-class:<I>{class-name}</I>" \
118     *           --set "extension-argument:<I>{name=value}</I>"
119     * </PRE>
120     * where "<I>{source-name}</I>" is the name to use for the Sync Source
121     * instance, "<I>{class-name}</I>" is the fully-qualified
122     * name of the Java class written using this API, and "<I>{name=value}</I>"
123     * represents name-value pairs for any arguments to provide to the sync
124     * source.  If multiple arguments should be provided to the sync source,
125     * then the "<CODE>--set extension-argument:<I>{name=value}</I></CODE>" option
126     * should be provided multiple times.
127     */
128    @Extensible()
129    @SynchronizationServerExtension(appliesToLocalContent=false,
130                                    appliesToSynchronizedContent=true)
131    public abstract class SyncSource implements UnboundIDExtension,
132                                                Configurable,
133                                                ExampleUsageProvider
134    {
135      /**
136       * Creates a new instance of this Sync Source.  All sync
137       * source implementations must include a default constructor, but any
138       * initialization should generally be done in the
139       * {@link #initializeSyncSource} method.
140       */
141      public SyncSource()
142      {
143        // No implementation is required.
144      }
145    
146    
147    
148      /**
149       * {@inheritDoc}
150       */
151      public abstract String getExtensionName();
152    
153    
154    
155      /**
156       * {@inheritDoc}
157       */
158      public abstract String[] getExtensionDescription();
159    
160    
161    
162      /**
163       * {@inheritDoc}
164       */
165      public Map<List<String>,String> getExamplesArgumentSets()
166      {
167        return Collections.emptyMap();
168      }
169    
170    
171    
172      /**
173       * {@inheritDoc}
174       */
175      public void defineConfigArguments(final ArgumentParser parser)
176             throws ArgumentException
177      {
178        // No arguments will be allowed by default.
179      }
180    
181    
182    
183      /**
184       * This hook is called when a Sync Pipe first starts up, when the
185       * <i>resync</i> process first starts up, or when the set-startpoint
186       * subcommand is called from the <i>realtime-sync</i> command line tool.
187       * Any initialization of this sync source should be performed here. This
188       * method should generally store the {@link SyncServerContext} in a class
189       * member so that it can be used elsewhere in the implementation.
190       * <p>
191       * The default implementation is empty.
192       *
193       * @param  serverContext  A handle to the server context for the server in
194       *                        which this extension is running.
195       * @param  config         The general configuration for this sync source.
196       * @param  parser         The argument parser which has been initialized from
197       *                        the configuration for this sync source.
198       */
199      public void initializeSyncSource(final SyncServerContext serverContext,
200                                       final SyncSourceConfig config,
201                                       final ArgumentParser parser)
202      {
203        // No initialization will be performed by default.
204      }
205    
206    
207    
208      /**
209       * This hook is called when a Sync Pipe shuts down, when the <i>resync</i>
210       * process shuts down, or when the set-startpoint subcommand (from the
211       * <i>realtime-sync</i> command line tool) is finished. Any clean up of this
212       * sync source should be performed here.
213       * <p>
214       * The default implementation is empty.
215       */
216      public void finalizeSyncSource()
217      {
218        //No implementation required by default.
219      }
220    
221    
222    
223      /**
224       * Return the URL or path identifying the source endpoint
225       * from which this extension is transmitting data. This is used for logging
226       * purposes only, so it could just be a server name or hostname and port, etc.
227       *
228       * @return the path to the source endpoint
229       */
230      public abstract String getCurrentEndpointURL();
231    
232    
233    
234      /**
235       * This method should effectively set the starting point for synchronization
236       * to the place specified by the <code>options</code> parameter. This should
237       * cause all changes previous to the specified start point to be disregarded
238       * and only changes after that point to be returned by
239       * {@link #getNextBatchOfChanges(int, AtomicLong)}.
240       * <p>
241       * There are several different startpoint types (see
242       * {@link SetStartpointOptions}), and this implementation is not required to
243       * support them all. If the specified startpoint type is unsupported, this
244       * method should throw an {@link UnsupportedOperationException}.
245       * <p>
246       * <b>IMPORTANT</b>: The <code>RESUME_AT_SERIALIZABLE</code> startpoint type
247       * must be supported by your implementation, because this is used when a Sync
248       * Pipe first starts up. The {@link Serializable} in this case is the same
249       * type that is returned by {@link #getStartpoint()}; the Sync Server persists
250       * it and passes it back in on on a restart.
251       * <p>
252       * This method can be called from two different contexts:
253       * <ul>
254       * <li>When the 'set-startpoint' subcommand of the realtime-sync CLI is used
255       * (the Sync Pipe is required to be stopped in this context)</li>
256       * <li>Immediately after a Sync Pipe starts up and a connection is first
257       *     established to the source server (e.g. before the first call to
258       * {@link #getNextBatchOfChanges(int, AtomicLong)})</li>
259       * </ul>
260       *
261       * @param options
262       *          an object which indicates where exactly to start synchronizing
263       *          (e.g. the end of the changelog, specific change number, a certain
264       *          time ago, etc)
265       * @throws EndpointException
266       *           if there is any error while setting the start point
267       */
268      public abstract void setStartpoint(final SetStartpointOptions options)
269                                            throws EndpointException;
270    
271    
272    
273      /**
274       * Gets the current value of the startpoint for change detection. This is the
275       * "bookmark" which indicates which changes have already been processed and
276       * which have not. In most cases, a change number is used to detect changes
277       * and is managed by the Synchronization Server, in which case this
278       * implementation needs only to return the latest acknowledged
279       * change number. In other cases, the return value may correspond to a
280       * different value, such as the SYS_CHANGE_VERSION in Microsoft SQL Server.
281       * In any case, this method should return the value that is updated by
282       * {@link #acknowledgeCompletedOps(LinkedList)}.
283       * <p>
284       * This method is called periodically and the return value is saved in the
285       * persistent state for the Sync Pipe that uses this extension as its Sync
286       * Source.
287       * <p>
288       * <b>IMPORTANT</b>: The internal value for the startpoint should only be
289       * updated after a sync operation is acknowledged back to this script (via
290       * {@link #acknowledgeCompletedOps(LinkedList)}).
291       * Otherwise it will be possible for changes to be missed when the
292       * Synchronization Server is restarted or a connection error occurs.
293       * @return a value to store in the persistent state for the Sync Pipe. This is
294       *         usually a change number, but if a changelog table is not used to
295       *         detect changes, this value should represent some other token to
296       *         pass into {@link #setStartpoint(SetStartpointOptions)}
297       *         when the sync pipe starts up.
298       */
299      public abstract Serializable getStartpoint();
300    
301    
302    
303      /**
304       * Return the next batch of change records from the source. Change records
305       * are usually just hints that a change happened; they do not include
306       * the full contents of the target entry. In an effort to never synchronize
307       * stale data, the Synchronization Server will go back and fetch the full
308       * target entry for each change record.
309       * <p>
310       * On the first invocation, this should return changes starting from the
311       * startpoint that was set by
312       * {@link #setStartpoint(SetStartpointOptions)}. This method is also
313       * responsible for updating the internal state such that subsequent
314       * invocations do not return duplicate changes.
315       * <p>
316       * The resulting list should be limited by <code>maxChanges</code>. The
317       * <code>numStillPending</code> reference should be set to the estimated
318       * number of changes that haven't yet been retrieved from the source endpoint
319       * when this method returns, or zero if all the current changes have been
320       * retrieved.
321       * <p>
322       * <b>IMPORTANT</b>: While this method needs to keep track of which changes
323       * have already been returned so that it does not return them again, it should
324       * <b>NOT</b> modify the official startpoint. The internal value for the
325       * startpoint should only be updated after a sync operation is acknowledged
326       * back to this script (via
327       * {@link #acknowledgeCompletedOps(LinkedList)}).
328       * Otherwise it will be possible for changes to be missed when the
329       * Synchronization Server is restarted or a connection error occurs. The
330       * startpoint should not change as a result of this method.
331       * <p>
332       * This method <b>does not need to be thread-safe</b>. It will be invoked
333       * repeatedly by a single thread, based on the polling interval set in the
334       * Sync Pipe configuration.
335       *
336       * @param maxChanges
337       *          the maximum number of changes to retrieve
338       * @param numStillPending
339       *          this should be set to the number of unretrieved changes that
340       *          are still pending after this batch has been retrieved. This will
341       *          be passed in
342       *          as zero, and may be left that way if the actual value cannot be
343       *          determined.
344       * @return a list of {@link ChangeRecord} instances, each
345       *         corresponding to a single change at the source endpoint.
346       *         If there are no new changes to return, this method should return
347       *         an empty list.
348       * @throws EndpointException
349       *           if there is any error while retrieving the next batch of changes
350       */
351      public abstract List<ChangeRecord> getNextBatchOfChanges(
352                                                  final int maxChanges,
353                                                  final AtomicLong numStillPending)
354                                                          throws EndpointException;
355    
356    
357    
358      /**
359       * Return a full source entry (in LDAP form) from the source, corresponding
360       * to the {@link ChangeRecord} that is passed in through the
361       * {@link SyncOperation}. This method should perform any queries necessary to
362       * gather the latest values for all the attributes to be synchronized.
363       * <p>
364       * This method <b>must be thread safe</b>, as it will be called repeatedly and
365       * concurrently by each of the Sync Pipe worker threads as they process
366       * entries.
367       * <p>
368       * If the original ChangeRecord has the full entry already set on it (which
369       * can be done using <code>ChangeRecord.Builder#fullEntry(Entry)</code>,
370       * then this method will not get called, and the Sync Server will
371       * automatically use the full entry from the ChangeRecord. In this case, the
372       * implementation can always return {@code null}.
373       *
374       * @param operation
375       *          the SyncOperation which identifies the source "entry" to
376       *          fetch. The ChangeRecord can be obtained by calling
377       *          <code>operation.getChangeRecord()</code>.
378       *          These ChangeRecords are generated by
379       *        {@link #getNextBatchOfChanges(int, AtomicLong)}
380       *          or by
381       *        {@link #listAllEntries(BlockingQueue)}.
382       *
383       * @return a full LDAP Entry, or null if no such entry exists.
384       * @throws EndpointException
385       *           if there is an error fetching the entry
386       */
387      public abstract Entry fetchEntry(final SyncOperation operation)
388                                          throws EndpointException;
389    
390    
391    
392      /**
393       * Provides a way for the Synchronization Server to acknowledge back to the
394       * script which sync operations it has processed. This method should update
395       * the official startpoint which was set by
396       * {@link #setStartpoint(SetStartpointOptions)} and is
397       * returned by {@link #getStartpoint()}.
398       * <p>
399       * <b>IMPORTANT</b>: The internal value for the startpoint should only be
400       * updated after a sync operation is acknowledged back to this extension (via
401       * this method). Otherwise it will be possible for changes to be missed when
402       * the Synchronization Server is restarted or a connection error occurs.
403       *
404       * @param completedOps
405       *          a list of {@link SyncOperation}s that have finished processing.
406       *          The records are listed in the order they were first detected.
407       * @throws EndpointException
408       *           if there is an error acknowledging the changes back to the
409       *           database
410       */
411      public abstract void acknowledgeCompletedOps(
412                                       final LinkedList<SyncOperation> completedOps)
413                                          throws EndpointException;
414    
415    
416    
417      /**
418       * Gets a list of all the entries in the source endpoint. This is used by the
419       * 'resync' command line tool. The default implementation throws a
420       * {@link UnsupportedOperationException}; subclasses should override if the
421       * resync functionality is needed.
422       * <p>
423       * The <code>outputQueue</code> should contain {@link ChangeRecord} objects
424       * with the <code>ChangeType</code> set to <code>null</code> to indicate that
425       * these are resync operations.
426       * <p>
427       * This method should not return until all the entries at the source
428       * have been added to the output queue. Separate threads will concurrently
429       * drain entries from the queue and process them. The queue typically should
430       * not contain full entries, but rather ChangeRecord objects which identify
431       * the full source entries. These objects are then individually passed in to
432       * {@link #fetchEntry(SyncOperation)}. Therefore, it is important to make sure
433       * that the ChangeRecord instances contain enough identifiable information
434       * (e.g. primary keys) for each entry so that the entry can be found again.
435       * <p>
436       * The lifecycle of resync is similar to that of real-time sync, with a few
437       * differences:
438       * <ol>
439       * <li>Stream out a list of identifiers for the entries in the source
440       *     endpoint, using a ChangeRecord as the identifier</li>
441       * <li>Fetch full source entry for a ChangeRecord</li>
442       * <li>Perform any mappings and compute the equivalent destination entry</li>
443       * <li>Fetch full destination entry</li>
444       * <li>Diff the computed destination entry and actual destination entry</li>
445       * <li>Apply the minimal set of changes at the destination to bring it in sync
446       * </li>
447       * </ol>
448       * <p>
449       * Alternatively, the full entry can be set on the ChangeRecord within this
450       * method, which will cause the "fetch full entry" step to be skipped. In this
451       * case the Sync Server will just use the entry specified on the ChangeRecord.
452       * <p>
453       * If the total set of entries is very large, it is fine to split up the work
454       * into multiple network queries within this method. The queue will not grow
455       * out of control because it blocks when it becomes full. The queue capacity
456       * is fixed at 1000.
457       * <p>
458       * @param outputQueue
459       *          a queue of ChangeRecord objects which will be individually
460       *          fetched via {@link #fetchEntry(SyncOperation)}
461       * @throws EndpointException
462       *           if there is an error retrieving the list of entries to resync
463       */
464      public void listAllEntries(final BlockingQueue<ChangeRecord> outputQueue)
465                                    throws EndpointException
466      {
467        throw new UnsupportedOperationException(
468                "The listAllEntries(BlockingQueue) " +
469                "method must be implemented in the '" +
470                getExtensionName() + "' extension (Java class " +
471                getClass().getName() + ").");
472      }
473    
474    
475    
476      /**
477       * Gets a list of all the entries in the source from a given file input.
478       * This is used by the 'resync' command line tool. The default implementation
479       * throws a {@link UnsupportedOperationException}; subclasses should override
480       * if the resync functionality is needed for specific records, which
481       * can be specified in the input file.
482       * <p>
483       * The format for the <code>inputLines</code> (e.g. the content of the file)
484       * is user-defined; it may be key/value pairs, primary keys, or full SQL
485       * statements, for example. The use of this method is triggered via the
486       * <i>--sourceInputFile</i> argument on the resync CLI. The
487       * <code>outputQueue</code> should contain {@link ChangeRecord}
488       * objects with the <code>ChangeType</code> set to <code>null</code> to
489       * indicate that these are resync operations.
490       * <p>
491       * This method should not return until all the entries specified by the input
492       * file have been added to the output queue. Separate threads will
493       * concurrently drain entries from the queue and process them. The queue
494       * typically should not contain full entries, but rather ChangeRecord
495       * objects which identify the full source entries. These objects are then
496       * individually passed in to {@link #fetchEntry(SyncOperation)}. Therefore,
497       * it is important to make sure that the ChangeRecord instances
498       * contain enough identifiable information (e.g. primary keys) for each entry
499       * so that the entry can be found again.
500       * <p>
501       * The lifecycle of resync is similar to that of real-time sync, with a few
502       * differences:
503       * <ol>
504       * <li>Stream out a list of identifiers for entries in the source endpoint,
505       *     using the given input file as the basis for which entries to resync
506       * </li>
507       * <li>Fetch full source entry for an identifier</li>
508       * <li>Perform any mappings and compute the equivalent destination entry</li>
509       * <li>Fetch full destination entry</li>
510       * <li>Diff the computed destination entry and actual destination entry</li>
511       * <li>Apply the minimal set of changes at the destination to bring it in sync
512       * </li>
513       * </ol>
514       * <p>
515       * Alternatively, the full entry can be set on the ChangeRecord within this
516       * method, which will cause the "fetch full entry" step to be skipped. In this
517       * case the Sync Server will just use the entry specified on the ChangeRecord.
518       * <p>
519       * If the total set of entries is very large, it is fine to split up the work
520       * into multiple network queries within this method. The queue will not grow
521       * out of control because it blocks when it becomes full. The queue capacity
522       * is fixed at 1000.
523       * <p>
524       * @param inputLines
525       *          an Iterator containing the lines from the specified input file to
526       *          resync (this is specified on the CLI for the resync command).
527       *          These lines can be any format, for example a set of primary keys,
528       *          a set of WHERE clauses, a set of full SQL queries, etc.
529       * @param outputQueue
530       *          a queue of ChangeRecord objects which will be individually
531       *          fetched via {@link #fetchEntry(SyncOperation)}
532       * @throws EndpointException
533       *           if there is an error retrieving the list of entries to resync
534       */
535      public void listAllEntries(final Iterator<String> inputLines,
536                                 final BlockingQueue<ChangeRecord> outputQueue)
537                                        throws EndpointException
538      {
539        throw new UnsupportedOperationException(
540                "The listAllEntries(Iterator,BlockingQueue) " +
541                "method must be implemented in the '" +
542                getExtensionName() + "' extension (Java class " +
543                getClass().getName() + ").");
544      }
545    }