001 /*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License"). You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at
010 * docs/licenses/cddl.txt
011 * or http://www.opensource.org/licenses/cddl1.php.
012 * See the License for the specific language governing permissions
013 * and limitations under the License.
014 *
015 * When distributing Covered Code, include this CDDL HEADER in each
016 * file and include the License file at
017 * docs/licenses/cddl.txt. If applicable,
018 * add the following below this CDDL HEADER, with the fields enclosed
019 * by brackets "[]" replaced with your own identifying information:
020 * Portions Copyright [yyyy] [name of copyright owner]
021 *
022 * CDDL HEADER END
023 *
024 *
025 * Copyright 2011-2013 UnboundID Corp.
026 */
027 package com.unboundid.directory.sdk.sync.scripting;
028
029
030 import java.io.Serializable;
031 import java.util.Iterator;
032 import java.util.LinkedList;
033 import java.util.List;
034 import java.util.concurrent.BlockingQueue;
035 import java.util.concurrent.atomic.AtomicLong;
036
037 import com.unboundid.directory.sdk.common.internal.Configurable;
038 import com.unboundid.directory.sdk.sync.config.SyncSourceConfig;
039 import com.unboundid.directory.sdk.sync.internal.SynchronizationServerExtension;
040 import com.unboundid.directory.sdk.sync.types.EndpointException;
041 import com.unboundid.directory.sdk.sync.types.SetStartpointOptions;
042 import com.unboundid.directory.sdk.sync.types.SyncOperation;
043 import com.unboundid.directory.sdk.sync.types.SyncServerContext;
044 import com.unboundid.directory.sdk.sync.types.ChangeRecord;
045 import com.unboundid.ldap.sdk.Entry;
046 import com.unboundid.util.Extensible;
047 import com.unboundid.util.args.ArgumentException;
048 import com.unboundid.util.args.ArgumentParser;
049
050
051
052 /**
053 * This class defines an API that must be implemented by extensions in order to
054 * synchronize data from a generic (non-LDAP and non-JDBC) endpoint. Since the
055 * UnboundID Synchronization Server is LDAP-centric, this API allows you to take
056 * generic content and convert it into LDAP entries which can then be processed
057 * by the Synchronization Server. The lifecycle of a sync operation is as
058 * follows:
059 * <ol>
060 * <li>Detect change at the synchronization source</li>
061 * <li>Fetch full source entry</li>
062 * <li>Perform any mappings and compute the equivalent destination entry</li>
063 * <li>Fetch full destination entry</li>
064 * <li>Diff the computed destination entry and actual (fetched) destination
065 * entry</li>
066 * <li>Apply the minimal set of changes at the destination to bring it in sync
067 * </li>
068 * </ol>
069 * This implies that the
070 * {@link #fetchEntry(SyncOperation)} method will be called once for every
071 * change that is returned by
072 * {@link #getNextBatchOfChanges(int, AtomicLong)}.
073 * <p>
074 * This is a generic interface and there is no protocol-specific connection
075 * management provided. It is expected that implementers will provide their own
076 * libraries for talking to the source endpoint and handle the connection
077 * lifecycle in the {@link #initializeSyncSource(SyncServerContext,
078 * SyncSourceConfig, ArgumentParser)} and {@link #finalizeSyncSource()}
079 * methods of this extension.
080 * <p>
081 * During realtime synchronization (i.e. when a Sync Pipe is running), there is
082 * a sliding window of changes being processed, and this API provides a
083 * distinction between some different points along that window:
084 * <ul>
085 * <li><b>Old changes</b>: These are changes that the Sync Server has
086 * processed and acknowledged back to the Sync Source. The Sync Source is
087 * under no obligation to re-detect these changes.</li>
088 * <li><b>Startpoint</b>: This marks where the Sync Source will start
089 * detecting changes if it is restarted.</li>
090 * <li><b>Detected but unacknowledged</b>: These changes have been returned by
091 * <code>getNextBatchOfChanges()</code> but not completely processed and
092 * acknowledged back to the Sync Source.</li>
093 * <li><b>Undetected changes</b>: The next call to
094 * <code>getNextBatchOfChanges()</code> should return the first changes
095 * that have not been detected. This should be somewhere at or ahead of
096 * the startpoint.</li>
097 * </ul>
098 * <p>
099 * Several of these methods throw {@link EndpointException}, which should be
100 * used in the case of any connection or endpoint error. For other types of
101 * errors, runtime exceptions may be used (IllegalStateException,
102 * NullPointerException, etc.). The Synchronization Server will automatically
103 * retry operations that fail, up to a configurable amount of attempts. The
104 * EndpointException class allows you to specify a retry policy as well.
105 * <BR>
106 * <H2>Configuring Groovy Scripted Sync Sources</H2>
107 * In order to configure a Sync Source based on this API and written in Groovy,
108 * use a command like:
109 * <PRE>
110 * dsconfig create-sync-source \
111 * --source-name "<I>{source-name}</I>" \
112 * --type groovy-scripted \
113 * --set "script-class:<I>{class-name}</I>" \
114 * --set "script-argument:<I>{name=value}</I>"
115 * </PRE>
116 * where "<I>{source-name}</I>" is the name to use for the Sync Source
117 * instance, "<I>{class-name}</I>" is the fully-qualified
118 * name of the Groovy class written using this API, and "<I>{name=value}</I>"
119 * represents name-value pairs for any arguments to provide to the sync
120 * source. If multiple arguments should be provided to the sync source,
121 * then the "<CODE>--set script-argument:<I>{name=value}</I></CODE>" option
122 * should be provided multiple times.
123 */
124 @Extensible()
125 @SynchronizationServerExtension(appliesToLocalContent=false,
126 appliesToSynchronizedContent=true)
127 public abstract class ScriptedSyncSource implements Configurable
128 {
129 /**
130 * {@inheritDoc}
131 */
132 public void defineConfigArguments(final ArgumentParser parser)
133 throws ArgumentException
134 {
135 // No arguments will be allowed by default.
136 }
137
138
139
140 /**
141 * This hook is called when a Sync Pipe first starts up, when the
142 * <i>resync</i> process first starts up, or when the set-startpoint
143 * subcommand is called from the <i>realtime-sync</i> command line tool.
144 * Any initialization of this sync source should be performed here. This
145 * method should generally store the {@link SyncServerContext} in a class
146 * member so that it can be used elsewhere in the implementation.
147 * <p>
148 * The default implementation is empty.
149 *
150 * @param serverContext A handle to the server context for the server in
151 * which this extension is running.
152 * @param config The general configuration for this sync source.
153 * @param parser The argument parser which has been initialized from
154 * the configuration for this JDBC sync source.
155 */
156 public void initializeSyncSource(final SyncServerContext serverContext,
157 final SyncSourceConfig config,
158 final ArgumentParser parser)
159 {
160 // No initialization will be performed by default.
161 }
162
163
164
165 /**
166 * This hook is called when a Sync Pipe shuts down, when the <i>resync</i>
167 * process shuts down, or when the set-startpoint subcommand (from the
168 * <i>realtime-sync</i> command line tool) is finished. Any clean up of this
169 * sync source should be performed here.
170 * <p>
171 * The default implementation is empty.
172 */
173 public void finalizeSyncSource()
174 {
175 //No implementation required by default.
176 }
177
178
179
180 /**
181 * Return the URL or path identifying the source endpoint
182 * from which this extension is transmitting data. This is used for logging
183 * purposes only, so it could just be a server name or hostname and port, etc.
184 *
185 * @return the path to the source endpoint
186 */
187 public abstract String getCurrentEndpointURL();
188
189
190
191 /**
192 * This method should effectively set the starting point for synchronization
193 * to the place specified by the <code>options</code> parameter. This should
194 * cause all changes previous to the specified start point to be disregarded
195 * and only changes after that point to be returned by
196 * {@link #getNextBatchOfChanges(int, AtomicLong)}.
197 * <p>
198 * There are several different startpoint types (see
199 * {@link SetStartpointOptions}), and this implementation is not required to
200 * support them all. If the specified startpoint type is unsupported, this
201 * method should throw an {@link UnsupportedOperationException}.
202 * <p>
203 * <b>IMPORTANT</b>: The <code>RESUME_AT_SERIALIZABLE</code> startpoint type
204 * must be supported by your implementation, because this is used when a Sync
205 * Pipe first starts up. The {@link Serializable} in this case is the same
206 * type that is returned by {@link #getStartpoint()}; the Sync Server persists
207 * it and passes it back in on on a restart.
208 * <p>
209 * This method can be called from two different contexts:
210 * <ul>
211 * <li>When the 'set-startpoint' subcommand of the realtime-sync CLI is used
212 * (the Sync Pipe is required to be stopped in this context)</li>
213 * <li>Immediately after a Sync Pipe starts up and a connection is first
214 * established to the source server (e.g. before the first call to
215 * {@link #getNextBatchOfChanges(int, AtomicLong)})</li>
216 * </ul>
217 *
218 * @param options
219 * an object which indicates where exactly to start synchronizing
220 * (e.g. the end of the changelog, specific change number, a certain
221 * time ago, etc)
222 * @throws EndpointException
223 * if there is any error while setting the start point
224 */
225 public abstract void setStartpoint(final SetStartpointOptions options)
226 throws EndpointException;
227
228
229
230 /**
231 * Gets the current value of the startpoint for change detection. This is the
232 * "bookmark" which indicates which changes have already been processed and
233 * which have not. In most cases, a change number is used to detect changes
234 * and is managed by the Synchronization Server, in which case this
235 * implementation needs only to return the latest acknowledged
236 * change number. In other cases, the return value may correspond to a
237 * different value, such as the SYS_CHANGE_VERSION in Microsoft SQL Server.
238 * In any case, this method should return the value that is updated by
239 * {@link #acknowledgeCompletedOps(LinkedList)}.
240 * <p>
241 * This method is called periodically and the return value is saved in the
242 * persistent state for the Sync Pipe that uses this extension as its Sync
243 * Source.
244 * <p>
245 * <b>IMPORTANT</b>: The internal value for the startpoint should only be
246 * updated after a sync operation is acknowledged back to this script (via
247 * {@link #acknowledgeCompletedOps(LinkedList)}).
248 * Otherwise it will be possible for changes to be missed when the
249 * Synchronization Server is restarted or a connection error occurs.
250 * @return a value to store in the persistent state for the Sync Pipe. This is
251 * usually a change number, but if a changelog table is not used to
252 * detect changes, this value should represent some other token to
253 * pass into {@link #setStartpoint(SetStartpointOptions)}
254 * when the sync pipe starts up.
255 */
256 public abstract Serializable getStartpoint();
257
258
259
260 /**
261 * Return the next batch of change records from the source. Change records
262 * are usually just hints that a change happened; they do not include
263 * the full contents of the target entry. In an effort to never synchronize
264 * stale data, the Synchronization Server will go back and fetch the full
265 * target entry for each change record.
266 * <p>
267 * On the first invocation, this should return changes starting from the
268 * startpoint that was set by
269 * {@link #setStartpoint(SetStartpointOptions)}. This method is also
270 * responsible for updating the internal state such that subsequent
271 * invocations do not return duplicate changes.
272 * <p>
273 * The resulting list should be limited by <code>maxChanges</code>. The
274 * <code>numStillPending</code> reference should be set to the estimated
275 * number of changes that haven't yet been retrieved from the source endpoint
276 * when this method returns, or zero if all the current changes have been
277 * retrieved.
278 * <p>
279 * <b>IMPORTANT</b>: While this method needs to keep track of which changes
280 * have already been returned so that it does not return them again, it should
281 * <b>NOT</b> modify the official startpoint. The internal value for the
282 * startpoint should only be updated after a sync operation is acknowledged
283 * back to this script (via
284 * {@link #acknowledgeCompletedOps(LinkedList)}).
285 * Otherwise it will be possible for changes to be missed when the
286 * Synchronization Server is restarted or a connection error occurs. The
287 * startpoint should not change as a result of this method.
288 * <p>
289 * This method <b>does not need to be thread-safe</b>. It will be invoked
290 * repeatedly by a single thread, based on the polling interval set in the
291 * Sync Pipe configuration.
292 *
293 * @param maxChanges
294 * the maximum number of changes to retrieve
295 * @param numStillPending
296 * this should be set to the number of unretrieved changes that
297 * are still pending after this batch has been retrieved. This will
298 * be passed in
299 * as zero, and may be left that way if the actual value cannot be
300 * determined.
301 * @return a list of {@link ChangeRecord} instances, each
302 * corresponding to a single change at the source endpoint.
303 * If there are no new changes to return, this method should return
304 * an empty list.
305 * @throws EndpointException
306 * if there is any error while retrieving the next batch of changes
307 */
308 public abstract List<ChangeRecord> getNextBatchOfChanges(
309 final int maxChanges,
310 final AtomicLong numStillPending)
311 throws EndpointException;
312
313
314
315 /**
316 * Return a full source entry (in LDAP form) from the source, corresponding
317 * to the {@link ChangeRecord} that is passed in through the
318 * {@link SyncOperation}. This method should perform any queries necessary to
319 * gather the latest values for all the attributes to be synchronized.
320 * <p>
321 * This method <b>must be thread safe</b>, as it will be called repeatedly and
322 * concurrently by each of the Sync Pipe worker threads as they process
323 * entries.
324 * <p>
325 * If the original ChangeRecord has the full entry already set on it (which
326 * can be done using <code>ChangeRecord.Builder#fullEntry(Entry)</code>,
327 * then this method will not get called, and the Sync Server will
328 * automatically use the full entry from the ChangeRecord. In this case, the
329 * implementation can always return {@code null}.
330 *
331 * @param operation
332 * the SyncOperation which identifies the source "entry" to
333 * fetch. The ChangeRecord can be obtained by calling
334 * <code>operation.getChangeRecord()</code>.
335 * These ChangeRecords are generated by
336 * {@link #getNextBatchOfChanges(int, AtomicLong)}
337 * or by
338 * {@link #listAllEntries(BlockingQueue)}.
339 *
340 * @return a full LDAP Entry, or null if no such entry exists.
341 * @throws EndpointException
342 * if there is an error fetching the entry
343 */
344 public abstract Entry fetchEntry(final SyncOperation operation)
345 throws EndpointException;
346
347
348
349 /**
350 * Provides a way for the Synchronization Server to acknowledge back to the
351 * script which sync operations it has processed. This method should update
352 * the official startpoint which was set by
353 * {@link #setStartpoint(SetStartpointOptions)} and is
354 * returned by {@link #getStartpoint()}.
355 * <p>
356 * <b>IMPORTANT</b>: The internal value for the startpoint should only be
357 * updated after a sync operation is acknowledged back to this extension (via
358 * this method). Otherwise it will be possible for changes to be missed when
359 * the Synchronization Server is restarted or a connection error occurs.
360 *
361 * @param completedOps
362 * a list of {@link SyncOperation}s that have finished processing.
363 * The records are listed in the order they were first detected.
364 * @throws EndpointException
365 * if there is an error acknowledging the changes back to the
366 * database
367 */
368 public abstract void acknowledgeCompletedOps(
369 final LinkedList<SyncOperation> completedOps)
370 throws EndpointException;
371
372
373
374 /**
375 * Gets a list of all the entries in the source endpoint. This is used by the
376 * 'resync' command line tool. The default implementation throws a
377 * {@link UnsupportedOperationException}; subclasses should override if the
378 * resync functionality is needed.
379 * <p>
380 * The <code>outputQueue</code> should contain {@link ChangeRecord} objects
381 * with the <code>ChangeType</code> set to <code>null</code> to indicate that
382 * these are resync operations.
383 * <p>
384 * This method should not return until all the entries at the source
385 * have been added to the output queue. Separate threads will concurrently
386 * drain entries from the queue and process them. The queue typically should
387 * not contain full entries, but rather ChangeRecord objects which identify
388 * the full source entries. These objects are then individually passed in to
389 * {@link #fetchEntry(SyncOperation)}. Therefore, it is important to make sure
390 * that the ChangeRecord instances contain enough identifiable information
391 * (e.g. primary keys) for each entry so that the entry can be found again.
392 * <p>
393 * The lifecycle of resync is similar to that of real-time sync, with a few
394 * differences:
395 * <ol>
396 * <li>Stream out a list of identifiers for the entries in the source
397 * endpoint, using a ChangeRecord as the identifier</li>
398 * <li>Fetch full source entry for a ChangeRecord</li>
399 * <li>Perform any mappings and compute the equivalent destination entry</li>
400 * <li>Fetch full destination entry</li>
401 * <li>Diff the computed destination entry and actual destination entry</li>
402 * <li>Apply the minimal set of changes at the destination to bring it in sync
403 * </li>
404 * </ol>
405 * <p>
406 * Alternatively, the full entry can be set on the ChangeRecord within this
407 * method, which will cause the "fetch full entry" step to be skipped. In this
408 * case the Sync Server will just use the entry specified on the ChangeRecord.
409 * <p>
410 * If the total set of entries is very large, it is fine to split up the work
411 * into multiple network queries within this method. The queue will not grow
412 * out of control because it blocks when it becomes full. The queue capacity
413 * is fixed at 1000.
414 * <p>
415 * @param outputQueue
416 * a queue of ChangeRecord objects which will be individually
417 * fetched via {@link #fetchEntry(SyncOperation)}
418 * @throws EndpointException
419 * if there is an error retrieving the list of entries to resync
420 */
421 public void listAllEntries(final BlockingQueue<ChangeRecord> outputQueue)
422 throws EndpointException
423 {
424 throw new UnsupportedOperationException(
425 "The listAllEntries(BlockingQueue) " +
426 "method must be implemented in the " +
427 getClass().getName() + " extension.");
428 }
429
430
431
432 /**
433 * Gets a list of all the entries in the source from a given file input.
434 * This is used by the 'resync' command line tool. The default implementation
435 * throws a {@link UnsupportedOperationException}; subclasses should override
436 * if the resync functionality is needed for specific records, which
437 * can be specified in the input file.
438 * <p>
439 * The format for the <code>inputLines</code> (e.g. the content of the file)
440 * is user-defined; it may be key/value pairs, primary keys, or full SQL
441 * statements, for example. The use of this method is triggered via the
442 * <i>--sourceInputFile</i> argument on the resync CLI. The
443 * <code>outputQueue</code> should contain {@link ChangeRecord}
444 * objects with the <code>ChangeType</code> set to <code>null</code> to
445 * indicate that these are resync operations.
446 * <p>
447 * This method should not return until all the entries specified by the input
448 * file have been added to the output queue. Separate threads will
449 * concurrently drain entries from the queue and process them. The queue
450 * typically should not contain full entries, but rather ChangeRecord
451 * objects which identify the full source entries. These objects are then
452 * individually passed in to {@link #fetchEntry(SyncOperation)}. Therefore,
453 * it is important to make sure that the ChangeRecord instances
454 * contain enough identifiable information (e.g. primary keys) for each entry
455 * so that the entry can be found again.
456 * <p>
457 * The lifecycle of resync is similar to that of real-time sync, with a few
458 * differences:
459 * <ol>
460 * <li>Stream out a list of identifiers for entries in the source endpoint,
461 * using the given input file as the basis for which entries to resync
462 * </li>
463 * <li>Fetch full source entry for an identifier</li>
464 * <li>Perform any mappings and compute the equivalent destination entry</li>
465 * <li>Fetch full destination entry</li>
466 * <li>Diff the computed destination entry and actual destination entry</li>
467 * <li>Apply the minimal set of changes at the destination to bring it in sync
468 * </li>
469 * </ol>
470 * <p>
471 * Alternatively, the full entry can be set on the ChangeRecord within this
472 * method, which will cause the "fetch full entry" step to be skipped. In this
473 * case the Sync Server will just use the entry specified on the ChangeRecord.
474 * <p>
475 * If the total set of entries is very large, it is fine to split up the work
476 * into multiple network queries within this method. The queue will not grow
477 * out of control because it blocks when it becomes full. The queue capacity
478 * is fixed at 1000.
479 * <p>
480 * @param inputLines
481 * an Iterator containing the lines from the specified input file to
482 * resync (this is specified on the CLI for the resync command).
483 * These lines can be any format, for example a set of primary keys,
484 * a set of WHERE clauses, a set of full SQL queries, etc.
485 * @param outputQueue
486 * a queue of ChangeRecord objects which will be individually
487 * fetched via {@link #fetchEntry(SyncOperation)}
488 * @throws EndpointException
489 * if there is an error retrieving the list of entries to resync
490 */
491 public void listAllEntries(final Iterator<String> inputLines,
492 final BlockingQueue<ChangeRecord> outputQueue)
493 throws EndpointException
494 {
495 throw new UnsupportedOperationException(
496 "The listAllEntries(Iterator,BlockingQueue) " +
497 "method must be implemented in the " +
498 getClass().getName() + " extension.");
499 }
500 }