001 /*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License"). You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at
010 * docs/licenses/cddl.txt
011 * or http://www.opensource.org/licenses/cddl1.php.
012 * See the License for the specific language governing permissions
013 * and limitations under the License.
014 *
015 * When distributing Covered Code, include this CDDL HEADER in each
016 * file and include the License file at
017 * docs/licenses/cddl.txt. If applicable,
018 * add the following below this CDDL HEADER, with the fields enclosed
019 * by brackets "[]" replaced with your own identifying information:
020 * Portions Copyright [yyyy] [name of copyright owner]
021 *
022 * CDDL HEADER END
023 *
024 *
025 * Copyright 2011-2014 UnboundID Corp.
026 */
027 package com.unboundid.directory.sdk.sync.api;
028
029
030 import java.io.Serializable;
031 import java.util.Collections;
032 import java.util.Iterator;
033 import java.util.LinkedList;
034 import java.util.List;
035 import java.util.Map;
036 import java.util.concurrent.BlockingQueue;
037 import java.util.concurrent.atomic.AtomicLong;
038
039 import com.unboundid.directory.sdk.common.internal.Configurable;
040 import com.unboundid.directory.sdk.common.internal.ExampleUsageProvider;
041 import com.unboundid.directory.sdk.common.internal.UnboundIDExtension;
042 import com.unboundid.directory.sdk.sync.config.SyncSourceConfig;
043 import com.unboundid.directory.sdk.sync.internal.SynchronizationServerExtension;
044 import com.unboundid.directory.sdk.sync.types.EndpointException;
045 import com.unboundid.directory.sdk.sync.types.SetStartpointOptions;
046 import com.unboundid.directory.sdk.sync.types.SyncOperation;
047 import com.unboundid.directory.sdk.sync.types.SyncServerContext;
048 import com.unboundid.directory.sdk.sync.types.ChangeRecord;
049 import com.unboundid.ldap.sdk.Entry;
050 import com.unboundid.util.Extensible;
051 import com.unboundid.util.args.ArgumentException;
052 import com.unboundid.util.args.ArgumentParser;
053
054
055
056 /**
057 * This class defines an API that must be implemented by extensions in order to
058 * synchronize data from a generic (non-LDAP and non-JDBC) endpoint. Since the
059 * UnboundID Synchronization Server is LDAP-centric, this API allows you to take
060 * generic content and convert it into LDAP entries which can then be processed
061 * by the Synchronization Server. The lifecycle of a sync operation is as
062 * follows:
063 * <ol>
064 * <li>Detect change at the synchronization source</li>
065 * <li>Fetch full source entry</li>
066 * <li>Perform any mappings and compute the equivalent destination entry</li>
067 * <li>Fetch full destination entry</li>
068 * <li>Diff the computed destination entry and actual (fetched) destination
069 * entry</li>
070 * <li>Apply the minimal set of changes at the destination to bring it in sync
071 * </li>
072 * </ol>
073 * This implies that the
074 * {@link #fetchEntry(SyncOperation)} method will be called once for every
075 * change that is returned by
076 * {@link #getNextBatchOfChanges(int, AtomicLong)}.
077 * <p>
078 * This is a generic interface and there is no protocol-specific connection
079 * management provided. It is expected that implementers will provide their own
080 * libraries for talking to the source endpoint and handle the connection
081 * lifecycle in the {@link #initializeSyncSource(SyncServerContext,
082 * SyncSourceConfig, ArgumentParser)} and {@link #finalizeSyncSource()}
083 * methods of this extension.
084 * <p>
085 * During realtime synchronization (i.e. when a Sync Pipe is running), there is
086 * a sliding window of changes being processed, and this API provides a
087 * distinction between some different points along that window:
088 * <ul>
089 * <li><b>Old changes</b>: These are changes that the Sync Server has
090 * processed and acknowledged back to the Sync Source. The Sync Source is
091 * under no obligation to re-detect these changes.</li>
092 * <li><b>Startpoint</b>: This marks where the Sync Source will start
093 * detecting changes if it is restarted.</li>
094 * <li><b>Detected but unacknowledged</b>: These changes have been returned by
095 * <code>getNextBatchOfChanges()</code> but not completely processed and
096 * acknowledged back to the Sync Source.</li>
097 * <li><b>Undetected changes</b>: The next call to
098 * <code>getNextBatchOfChanges()</code> should return the first changes
099 * that have not been detected. This should be somewhere at or ahead of
100 * the startpoint.</li>
101 * </ul>
102 * <p>
103 * Several of these methods throw {@link EndpointException}, which should be
104 * used in the case of any connection or endpoint error. For other types of
105 * errors, runtime exceptions may be used (IllegalStateException,
106 * NullPointerException, etc.). The Synchronization Server will automatically
107 * retry operations that fail, up to a configurable amount of attempts. The
108 * EndpointException class allows you to specify a retry policy as well.
109 * <BR>
110 * <H2>Configuring Sync Sources</H2>
111 * In order to configure a Sync Source based on this API and written in Java,
112 * use a command like:
113 * <PRE>
114 * dsconfig create-sync-source \
115 * --source-name "<I>{source-name}</I>" \
116 * --type third-party \
117 * --set "extension-class:<I>{class-name}</I>" \
118 * --set "extension-argument:<I>{name=value}</I>"
119 * </PRE>
120 * where "<I>{source-name}</I>" is the name to use for the Sync Source
121 * instance, "<I>{class-name}</I>" is the fully-qualified
122 * name of the Java class written using this API, and "<I>{name=value}</I>"
123 * represents name-value pairs for any arguments to provide to the sync
124 * source. If multiple arguments should be provided to the sync source,
125 * then the "<CODE>--set extension-argument:<I>{name=value}</I></CODE>" option
126 * should be provided multiple times.
127 */
128 @Extensible()
129 @SynchronizationServerExtension(appliesToLocalContent=false,
130 appliesToSynchronizedContent=true)
131 public abstract class SyncSource implements UnboundIDExtension,
132 Configurable,
133 ExampleUsageProvider
134 {
135 /**
136 * Creates a new instance of this Sync Source. All sync
137 * source implementations must include a default constructor, but any
138 * initialization should generally be done in the
139 * {@link #initializeSyncSource} method.
140 */
141 public SyncSource()
142 {
143 // No implementation is required.
144 }
145
146
147
148 /**
149 * {@inheritDoc}
150 */
151 public abstract String getExtensionName();
152
153
154
155 /**
156 * {@inheritDoc}
157 */
158 public abstract String[] getExtensionDescription();
159
160
161
162 /**
163 * {@inheritDoc}
164 */
165 public Map<List<String>,String> getExamplesArgumentSets()
166 {
167 return Collections.emptyMap();
168 }
169
170
171
172 /**
173 * {@inheritDoc}
174 */
175 public void defineConfigArguments(final ArgumentParser parser)
176 throws ArgumentException
177 {
178 // No arguments will be allowed by default.
179 }
180
181
182
183 /**
184 * This hook is called when a Sync Pipe first starts up, when the
185 * <i>resync</i> process first starts up, or when the set-startpoint
186 * subcommand is called from the <i>realtime-sync</i> command line tool.
187 * Any initialization of this sync source should be performed here. This
188 * method should generally store the {@link SyncServerContext} in a class
189 * member so that it can be used elsewhere in the implementation.
190 * <p>
191 * The default implementation is empty.
192 *
193 * @param serverContext A handle to the server context for the server in
194 * which this extension is running.
195 * @param config The general configuration for this sync source.
196 * @param parser The argument parser which has been initialized from
197 * the configuration for this sync source.
198 */
199 public void initializeSyncSource(final SyncServerContext serverContext,
200 final SyncSourceConfig config,
201 final ArgumentParser parser)
202 {
203 // No initialization will be performed by default.
204 }
205
206
207
208 /**
209 * This hook is called when a Sync Pipe shuts down, when the <i>resync</i>
210 * process shuts down, or when the set-startpoint subcommand (from the
211 * <i>realtime-sync</i> command line tool) is finished. Any clean up of this
212 * sync source should be performed here.
213 * <p>
214 * The default implementation is empty.
215 */
216 public void finalizeSyncSource()
217 {
218 //No implementation required by default.
219 }
220
221
222
223 /**
224 * Return the URL or path identifying the source endpoint
225 * from which this extension is transmitting data. This is used for logging
226 * purposes only, so it could just be a server name or hostname and port, etc.
227 *
228 * @return the path to the source endpoint
229 */
230 public abstract String getCurrentEndpointURL();
231
232
233
234 /**
235 * This method should effectively set the starting point for synchronization
236 * to the place specified by the <code>options</code> parameter. This should
237 * cause all changes previous to the specified start point to be disregarded
238 * and only changes after that point to be returned by
239 * {@link #getNextBatchOfChanges(int, AtomicLong)}.
240 * <p>
241 * There are several different startpoint types (see
242 * {@link SetStartpointOptions}), and this implementation is not required to
243 * support them all. If the specified startpoint type is unsupported, this
244 * method should throw an {@link UnsupportedOperationException}.
245 * <p>
246 * <b>IMPORTANT</b>: The <code>RESUME_AT_SERIALIZABLE</code> startpoint type
247 * must be supported by your implementation, because this is used when a Sync
248 * Pipe first starts up. The {@link Serializable} in this case is the same
249 * type that is returned by {@link #getStartpoint()}; the Sync Server persists
250 * it and passes it back in on on a restart.
251 * <p>
252 * This method can be called from two different contexts:
253 * <ul>
254 * <li>When the 'set-startpoint' subcommand of the realtime-sync CLI is used
255 * (the Sync Pipe is required to be stopped in this context)</li>
256 * <li>Immediately after a Sync Pipe starts up and a connection is first
257 * established to the source server (e.g. before the first call to
258 * {@link #getNextBatchOfChanges(int, AtomicLong)})</li>
259 * </ul>
260 *
261 * @param options
262 * an object which indicates where exactly to start synchronizing
263 * (e.g. the end of the changelog, specific change number, a certain
264 * time ago, etc)
265 * @throws EndpointException
266 * if there is any error while setting the start point
267 */
268 public abstract void setStartpoint(final SetStartpointOptions options)
269 throws EndpointException;
270
271
272
273 /**
274 * Gets the current value of the startpoint for change detection. This is the
275 * "bookmark" which indicates which changes have already been processed and
276 * which have not. In most cases, a change number is used to detect changes
277 * and is managed by the Synchronization Server, in which case this
278 * implementation needs only to return the latest acknowledged
279 * change number. In other cases, the return value may correspond to a
280 * different value, such as the SYS_CHANGE_VERSION in Microsoft SQL Server.
281 * In any case, this method should return the value that is updated by
282 * {@link #acknowledgeCompletedOps(LinkedList)}.
283 * <p>
284 * This method is called periodically and the return value is saved in the
285 * persistent state for the Sync Pipe that uses this extension as its Sync
286 * Source.
287 * <p>
288 * <b>IMPORTANT</b>: The internal value for the startpoint should only be
289 * updated after a sync operation is acknowledged back to this script (via
290 * {@link #acknowledgeCompletedOps(LinkedList)}).
291 * Otherwise it will be possible for changes to be missed when the
292 * Synchronization Server is restarted or a connection error occurs.
293 * @return a value to store in the persistent state for the Sync Pipe. This is
294 * usually a change number, but if a changelog table is not used to
295 * detect changes, this value should represent some other token to
296 * pass into {@link #setStartpoint(SetStartpointOptions)}
297 * when the sync pipe starts up.
298 */
299 public abstract Serializable getStartpoint();
300
301
302
303 /**
304 * Return the next batch of change records from the source. Change records
305 * are usually just hints that a change happened; they do not include
306 * the full contents of the target entry. In an effort to never synchronize
307 * stale data, the Synchronization Server will go back and fetch the full
308 * target entry for each change record.
309 * <p>
310 * On the first invocation, this should return changes starting from the
311 * startpoint that was set by
312 * {@link #setStartpoint(SetStartpointOptions)}. This method is also
313 * responsible for updating the internal state such that subsequent
314 * invocations do not return duplicate changes.
315 * <p>
316 * The resulting list should be limited by <code>maxChanges</code>. The
317 * <code>numStillPending</code> reference should be set to the estimated
318 * number of changes that haven't yet been retrieved from the source endpoint
319 * when this method returns, or zero if all the current changes have been
320 * retrieved.
321 * <p>
322 * <b>IMPORTANT</b>: While this method needs to keep track of which changes
323 * have already been returned so that it does not return them again, it should
324 * <b>NOT</b> modify the official startpoint. The internal value for the
325 * startpoint should only be updated after a sync operation is acknowledged
326 * back to this script (via
327 * {@link #acknowledgeCompletedOps(LinkedList)}).
328 * Otherwise it will be possible for changes to be missed when the
329 * Synchronization Server is restarted or a connection error occurs. The
330 * startpoint should not change as a result of this method.
331 * <p>
332 * This method <b>does not need to be thread-safe</b>. It will be invoked
333 * repeatedly by a single thread, based on the polling interval set in the
334 * Sync Pipe configuration.
335 *
336 * @param maxChanges
337 * the maximum number of changes to retrieve
338 * @param numStillPending
339 * this should be set to the number of unretrieved changes that
340 * are still pending after this batch has been retrieved. This will
341 * be passed in
342 * as zero, and may be left that way if the actual value cannot be
343 * determined.
344 * @return a list of {@link ChangeRecord} instances, each
345 * corresponding to a single change at the source endpoint.
346 * If there are no new changes to return, this method should return
347 * an empty list.
348 * @throws EndpointException
349 * if there is any error while retrieving the next batch of changes
350 */
351 public abstract List<ChangeRecord> getNextBatchOfChanges(
352 final int maxChanges,
353 final AtomicLong numStillPending)
354 throws EndpointException;
355
356
357
358 /**
359 * Return a full source entry (in LDAP form) from the source, corresponding
360 * to the {@link ChangeRecord} that is passed in through the
361 * {@link SyncOperation}. This method should perform any queries necessary to
362 * gather the latest values for all the attributes to be synchronized.
363 * <p>
364 * This method <b>must be thread safe</b>, as it will be called repeatedly and
365 * concurrently by each of the Sync Pipe worker threads as they process
366 * entries.
367 * <p>
368 * If the original ChangeRecord has the full entry already set on it (which
369 * can be done using <code>ChangeRecord.Builder#fullEntry(Entry)</code>,
370 * then this method will not get called, and the Sync Server will
371 * automatically use the full entry from the ChangeRecord. In this case, the
372 * implementation can always return {@code null}.
373 *
374 * @param operation
375 * the SyncOperation which identifies the source "entry" to
376 * fetch. The ChangeRecord can be obtained by calling
377 * <code>operation.getChangeRecord()</code>.
378 * These ChangeRecords are generated by
379 * {@link #getNextBatchOfChanges(int, AtomicLong)}
380 * or by
381 * {@link #listAllEntries(BlockingQueue)}.
382 *
383 * @return a full LDAP Entry, or null if no such entry exists.
384 * @throws EndpointException
385 * if there is an error fetching the entry
386 */
387 public abstract Entry fetchEntry(final SyncOperation operation)
388 throws EndpointException;
389
390
391
392 /**
393 * Provides a way for the Synchronization Server to acknowledge back to the
394 * script which sync operations it has processed. This method should update
395 * the official startpoint which was set by
396 * {@link #setStartpoint(SetStartpointOptions)} and is
397 * returned by {@link #getStartpoint()}.
398 * <p>
399 * <b>IMPORTANT</b>: The internal value for the startpoint should only be
400 * updated after a sync operation is acknowledged back to this extension (via
401 * this method). Otherwise it will be possible for changes to be missed when
402 * the Synchronization Server is restarted or a connection error occurs.
403 *
404 * @param completedOps
405 * a list of {@link SyncOperation}s that have finished processing.
406 * The records are listed in the order they were first detected.
407 * @throws EndpointException
408 * if there is an error acknowledging the changes back to the
409 * database
410 */
411 public abstract void acknowledgeCompletedOps(
412 final LinkedList<SyncOperation> completedOps)
413 throws EndpointException;
414
415
416
417 /**
418 * Gets a list of all the entries in the source endpoint. This is used by the
419 * 'resync' command line tool. The default implementation throws a
420 * {@link UnsupportedOperationException}; subclasses should override if the
421 * resync functionality is needed.
422 * <p>
423 * The <code>outputQueue</code> should contain {@link ChangeRecord} objects
424 * with the <code>ChangeType</code> set to <code>null</code> to indicate that
425 * these are resync operations. {@code ChangeRecords} should be added to
426 * the queue using the blocking {@code put()} method.
427 * <p>
428 * This method should not return until all the entries at the source
429 * have been added to the output queue. Separate threads will concurrently
430 * drain entries from the queue and process them. The queue typically should
431 * not contain full entries, but rather ChangeRecord objects which identify
432 * the full source entries. These objects are then individually passed in to
433 * {@link #fetchEntry(SyncOperation)}. Therefore, it is important to make sure
434 * that the ChangeRecord instances contain enough identifiable information
435 * (e.g. primary keys) for each entry so that the entry can be found again.
436 * <p>
437 * The lifecycle of resync is similar to that of real-time sync, with a few
438 * differences:
439 * <ol>
440 * <li>Stream out a list of identifiers for the entries in the source
441 * endpoint, using a ChangeRecord as the identifier</li>
442 * <li>Fetch full source entry for a ChangeRecord</li>
443 * <li>Perform any mappings and compute the equivalent destination entry</li>
444 * <li>Fetch full destination entry</li>
445 * <li>Diff the computed destination entry and actual destination entry</li>
446 * <li>Apply the minimal set of changes at the destination to bring it in sync
447 * </li>
448 * </ol>
449 * <p>
450 * Alternatively, the full entry can be set on the ChangeRecord within this
451 * method, which will cause the "fetch full entry" step to be skipped. In this
452 * case the Sync Server will just use the entry specified on the ChangeRecord.
453 * <p>
454 * If the total set of entries is very large, it is fine to split up the work
455 * into multiple network queries within this method. The queue will not grow
456 * out of control because it blocks when it becomes full. The queue capacity
457 * is fixed at 1000.
458 * <p>
459 * @param outputQueue
460 * a queue of ChangeRecord objects which will be individually
461 * fetched via {@link #fetchEntry(SyncOperation)}.
462 * {@code ChangeRecords} should be added to the queue using the
463 * blocking {@code put()} method.
464 * @throws EndpointException
465 * if there is an error retrieving the list of entries to resync
466 */
467 public void listAllEntries(final BlockingQueue<ChangeRecord> outputQueue)
468 throws EndpointException
469 {
470 throw new UnsupportedOperationException(
471 "The listAllEntries(BlockingQueue) " +
472 "method must be implemented in the '" +
473 getExtensionName() + "' extension (Java class " +
474 getClass().getName() + ").");
475 }
476
477
478
479 /**
480 * Gets a list of all the entries in the source from a given file input.
481 * This is used by the 'resync' command line tool. The default implementation
482 * throws a {@link UnsupportedOperationException}; subclasses should override
483 * if the resync functionality is needed for specific records, which
484 * can be specified in the input file.
485 * <p>
486 * The format for the <code>inputLines</code> (e.g. the content of the file)
487 * is user-defined; it may be key/value pairs, primary keys, or full SQL
488 * statements, for example. The use of this method is triggered via the
489 * <i>--sourceInputFile</i> argument on the resync CLI. The
490 * <code>outputQueue</code> should contain {@link ChangeRecord}
491 * objects with the <code>ChangeType</code> set to <code>null</code> to
492 * indicate that these are resync operations.
493 * <p>
494 * This method should not return until all the entries specified by the input
495 * file have been added to the output queue. Separate threads will
496 * concurrently drain entries from the queue and process them. The queue
497 * typically should not contain full entries, but rather ChangeRecord
498 * objects which identify the full source entries. These objects are then
499 * individually passed in to {@link #fetchEntry(SyncOperation)}. Therefore,
500 * it is important to make sure that the ChangeRecord instances
501 * contain enough identifiable information (e.g. primary keys) for each entry
502 * so that the entry can be found again.
503 * <p>
504 * The lifecycle of resync is similar to that of real-time sync, with a few
505 * differences:
506 * <ol>
507 * <li>Stream out a list of identifiers for entries in the source endpoint,
508 * using the given input file as the basis for which entries to resync
509 * </li>
510 * <li>Fetch full source entry for an identifier</li>
511 * <li>Perform any mappings and compute the equivalent destination entry</li>
512 * <li>Fetch full destination entry</li>
513 * <li>Diff the computed destination entry and actual destination entry</li>
514 * <li>Apply the minimal set of changes at the destination to bring it in sync
515 * </li>
516 * </ol>
517 * <p>
518 * Alternatively, the full entry can be set on the ChangeRecord within this
519 * method, which will cause the "fetch full entry" step to be skipped. In this
520 * case the Sync Server will just use the entry specified on the ChangeRecord.
521 * <p>
522 * If the total set of entries is very large, it is fine to split up the work
523 * into multiple network queries within this method. The queue will not grow
524 * out of control because it blocks when it becomes full. The queue capacity
525 * is fixed at 1000.
526 * <p>
527 * @param inputLines
528 * an Iterator containing the lines from the specified input file to
529 * resync (this is specified on the CLI for the resync command).
530 * These lines can be any format, for example a set of primary keys,
531 * a set of WHERE clauses, a set of full SQL queries, etc.
532 * @param outputQueue
533 * a queue of ChangeRecord objects which will be individually
534 * fetched via {@link #fetchEntry(SyncOperation)}.
535 * {@code ChangeRecords} should be added to the queue using the
536 * blocking {@code put()} method.
537 * @throws EndpointException
538 * if there is an error retrieving the list of entries to resync
539 */
540 public void listAllEntries(final Iterator<String> inputLines,
541 final BlockingQueue<ChangeRecord> outputQueue)
542 throws EndpointException
543 {
544 throw new UnsupportedOperationException(
545 "The listAllEntries(Iterator,BlockingQueue) " +
546 "method must be implemented in the '" +
547 getExtensionName() + "' extension (Java class " +
548 getClass().getName() + ").");
549 }
550 }