001 /*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License"). You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at
010 * docs/licenses/cddl.txt
011 * or http://www.opensource.org/licenses/cddl1.php.
012 * See the License for the specific language governing permissions
013 * and limitations under the License.
014 *
015 * When distributing Covered Code, include this CDDL HEADER in each
016 * file and include the License file at
017 * docs/licenses/cddl.txt. If applicable,
018 * add the following below this CDDL HEADER, with the fields enclosed
019 * by brackets "[]" replaced with your own identifying information:
020 * Portions Copyright [yyyy] [name of copyright owner]
021 *
022 * CDDL HEADER END
023 *
024 *
025 * Copyright 2010-2013 UnboundID Corp.
026 */
027 package com.unboundid.directory.sdk.sync.scripting;
028
029 import java.io.Serializable;
030 import java.sql.SQLException;
031 import java.util.LinkedList;
032 import java.util.List;
033 import java.util.Iterator;
034 import java.util.concurrent.BlockingQueue;
035 import java.util.concurrent.atomic.AtomicLong;
036
037 import com.unboundid.ldap.sdk.Entry;
038 import com.unboundid.util.Extensible;
039 import com.unboundid.util.ThreadSafety;
040 import com.unboundid.util.ThreadSafetyLevel;
041 import com.unboundid.util.args.ArgumentException;
042 import com.unboundid.util.args.ArgumentParser;
043 import com.unboundid.directory.sdk.common.internal.Configurable;
044 import com.unboundid.directory.sdk.sync.config.JDBCSyncSourceConfig;
045 import com.unboundid.directory.sdk.sync.internal.SynchronizationServerExtension;
046 import com.unboundid.directory.sdk.sync.types.DatabaseChangeRecord;
047 import com.unboundid.directory.sdk.sync.types.SetStartpointOptions;
048 import com.unboundid.directory.sdk.sync.types.SyncOperation;
049 import com.unboundid.directory.sdk.sync.types.SyncServerContext;
050 import com.unboundid.directory.sdk.sync.types.TransactionContext;
051
052 /**
053 * This class defines an API that must be implemented by scripted extensions
054 * in order to synchronize data out of a relational database. Since the
055 * UnboundID Synchronization Server is LDAP-centric,
056 * this API allows you to take database content and convert it into LDAP
057 * entries which can then be processed by the Synchronization Server. The
058 * lifecycle of
059 * a sync operation is as follows:
060 * <ol>
061 * <li>Detect change at the synchronization source</li>
062 * <li>Fetch full source entry</li>
063 * <li>Perform any mappings and compute the equivalent destination entry</li>
064 * <li>Fetch full destination entry</li>
065 * <li>Diff the computed destination entry and actual (fetched) destination
066 * entry</li>
067 * <li>Apply the minimal set of changes at the destination to bring it in sync
068 * </li>
069 * </ol>
070 * This implies that the
071 * {@link #fetchEntry(TransactionContext, SyncOperation)} method will be
072 * called once for every change that is returned by
073 * {@link #getNextBatchOfChanges(TransactionContext, int, AtomicLong)}.
074 * <p>
075 * During realtime synchronization (i.e. when a Sync Pipe is running), there is
076 * a sliding window of changes being processed, and this API provides a
077 * distinction between some different points along that window:
078 * <ul>
079 * <li><b>Old changes</b>: These are changes that the Sync Server has
080 * processed and acknowledged back to the Sync Source. The Sync Source is
081 * under no obligation to re-detect these changes.</li>
082 * <li><b>Startpoint</b>: This marks where the Sync Source will start
083 * detecting changes if it is restarted.</li>
084 * <li><b>Detected but unacknowledged</b>: These changes have been returned by
085 * <code>getNextBatchOfChanges()</code> but not completely processed and
086 * acknowledged back to the Sync Source.</li>
087 * <li><b>Undetected changes</b>: The next call to
088 * <code>getNextBatchOfChanges()</code> should return the first changes
089 * that have not been detected. This should be somewhere at or ahead of
090 * the startpoint.</li>
091 * </ul>
092 * <p>
093 * In several places a {@link TransactionContext} is provided, which allows
094 * controlled access to the target database. By default, methods in this class
095 * are always provided with a fresh connection (i.e. a new transaction), and the
096 * Synchronization Server will always commit or rollback the transaction
097 * automatically, depending on whether the method returned normally or threw an
098 * exception. Implementers may optionally perform their own transaction
099 * management within these methods if necessary.
100 * <p>
101 * Several of these methods throw {@link SQLException}, which should be used in
102 * the case of any database access error. For other types of errors, runtime
103 * exceptions may be used (IllegalStateException, NullPointerException, etc.).
104 * The Synchronization Server will automatically retry operations that fail, up
105 * to a configurable amount of attempts. The exception to this rule is if a
106 * SQLException is thrown with a SQL state string beginning with "08"; this
107 * indicates a connection error, and in this case the operation is retried
108 * indefinitely.
109 * <BR>
110 * <H2>Configuring Groovy Scripted JDBC Sync Sources</H2>
111 * In order to configure a scripted JDBC sync source based on this API and
112 * written in the Groovy scripting language, use a command like:
113 * <PRE>
114 * dsconfig create-sync-source \
115 * --source-name "<I>{source-name}</I>" \
116 * --type groovy-scripted-jdbc \
117 * --set "server:{server-name}" \
118 * --set "script-class:<I>{class-name}</I>" \
119 * --set "script-argument:<I>{name=value}</I>"
120 * </PRE>
121 * where "<I>{source-name}</I>" is the name to use for the JDBC sync source
122 * instance, "<I>{server-name}</I>" is the name of the JDBC external server that
123 * will be used as the sync source, "<I>{class-name}</I>" is the fully-qualified
124 * name of the Groovy class written using this API, and "<I>{name=value}</I>"
125 * represents name-value pairs for any arguments to provide to the JDBC sync
126 * source. If multiple arguments should be provided to the JDBC sync source,
127 * then the "<CODE>--set script-argument:<I>{name=value}</I></CODE>" option
128 * should be provided multiple times.
129 */
130 @Extensible()
131 @SynchronizationServerExtension(appliesToLocalContent=false,
132 appliesToSynchronizedContent=true)
133 public abstract class ScriptedJDBCSyncSource implements Configurable
134 {
135 /**
136 * {@inheritDoc}
137 */
138 public void defineConfigArguments(final ArgumentParser parser)
139 throws ArgumentException
140 {
141 // No arguments will be allowed by default.
142 }
143
144 /**
145 * This hook is called when a Sync Pipe first starts up, when the
146 * <i>resync</i> process first starts up, or when the set-startpoint
147 * subcommand is called from the <i>realtime-sync</i> command line tool.
148 * Any initialization of this sync source should be performed here. This
149 * method should generally store the {@link SyncServerContext} in a class
150 * member so that it can be used elsewhere in the implementation.
151 * <p>
152 * A {@link TransactionContext} is provided, which allows
153 * controlled access to the target database. The context will contain a fresh
154 * fresh connection (i.e. a new transaction), and the Synchronization Server
155 * will always commit or rollback the transaction automatically, depending on
156 * whether this method returns normally or throws an exception. Implementers
157 * may optionally perform their own transaction management within this method
158 * if necessary.
159 * <p>
160 * The default implementation is empty.
161 *
162 * @param ctx
163 * a TransactionContext which provides a valid JDBC connection to the
164 * database.
165 * @param serverContext A handle to the server context for the server in
166 * which this extension is running.
167 * @param config The general configuration for this sync source.
168 * @param parser The argument parser which has been initialized from
169 * the configuration for this JDBC sync source.
170 */
171 public void initializeJDBCSyncSource(final TransactionContext ctx,
172 final SyncServerContext serverContext,
173 final JDBCSyncSourceConfig config,
174 final ArgumentParser parser)
175 {
176 // No initialization will be performed by default.
177 }
178
179 /**
180 * This hook is called when a Sync Pipe shuts down, when the <i>resync</i>
181 * process shuts down, or when the set-startpoint subcommand (from the
182 * <i>realtime-sync</i> command line tool) is finished. Any clean up of this
183 * sync source should be performed here.
184 * <p>
185 * A {@link TransactionContext} is provided, which allows
186 * controlled access to the target database. The context will contain a fresh
187 * fresh connection (i.e. a new transaction), and the Synchronization Server
188 * will always commit or rollback the transaction automatically, depending on
189 * whether this method returns normally or throws an exception. Implementers
190 * may optionally perform their own transaction management within this method
191 * if necessary.
192 * <p>
193 * The default implementation is empty.
194 *
195 * @param ctx
196 * a TransactionContext which provides a valid JDBC connection to the
197 * database.
198 */
199 public void finalizeJDBCSyncSource(final TransactionContext ctx)
200 {
201 //no implementation required
202 }
203
204 /**
205 * This method should effectively set the starting point for synchronization
206 * to the place specified by the <code>options</code> parameter. This should
207 * cause all changes previous to the specified start point to be disregarded
208 * and only changes after that point to be returned by
209 * {@link #getNextBatchOfChanges(TransactionContext, int, AtomicLong)}.
210 * <p>
211 * There are several different startpoint types (see
212 * {@link SetStartpointOptions}), and this implementation is not required to
213 * support them all. If the specified startpoint type is unsupported, this
214 * method should throw an {@link UnsupportedOperationException}.
215 * <p>
216 * <b>IMPORTANT</b>: The <code>RESUME_AT_SERIALIZABLE</code> startpoint type
217 * must be supported by your implementation, because this is used when a Sync
218 * Pipe first starts up.
219 * <p>
220 * This method can be called from two different contexts:
221 * <ul>
222 * <li>When the 'set-startpoint' subcommand of the realtime-sync CLI is used
223 * (the Sync Pipe is required to be stopped in this context)</li>
224 * <li>Immediately after a Sync Pipe starts up and a connection is first
225 * established to the source server (e.g. before the first call to
226 * {@link #getNextBatchOfChanges(TransactionContext, int, AtomicLong)})</li>
227 * </ul>
228 * <p>
229 * A {@link TransactionContext} is provided, which allows
230 * controlled access to the target database. The context will contain a fresh
231 * fresh connection (i.e. a new transaction), and the Synchronization Server
232 * will always commit or rollback the transaction automatically, depending on
233 * whether this method returns normally or throws an exception. Implementers
234 * may optionally perform their own transaction management within this method
235 * if necessary.
236 *
237 * @param ctx
238 * a TransactionContext which provides a valid JDBC connection to the
239 * database.
240 * @param options
241 * an object which indicates where exactly to start synchronizing
242 * (e.g.
243 * the end of the changelog, specific change number, a certain time
244 * ago, etc)
245 * @throws SQLException
246 * if there is any error while setting the start point
247 */
248 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE)
249 public abstract void setStartpoint(final TransactionContext ctx,
250 final SetStartpointOptions options)
251 throws SQLException;
252
253 /**
254 * Gets the current value of the startpoint for change detection. This is the
255 * "bookmark" which indicates which changes have already been processed and
256 * which have not. In most cases, a change number is used to detect changes
257 * and is managed by the Synchronization Server, in which case this
258 * implementation needs only to return the latest acknowledged
259 * change number. In other cases, the return value may correspond to a
260 * different value, such as the SYS_CHANGE_VERSION in Microsoft SQL Server.
261 * In any case, this method should return the value that is updated by
262 * {@link #acknowledgeCompletedOps(TransactionContext, LinkedList)}.
263 * <p>
264 * This method is called periodically and the return value is saved in the
265 * persistent state for the Sync Pipe that uses this script as its Sync
266 * Source.
267 * <p>
268 * <b>IMPORTANT</b>: The internal value for the startpoint should only be
269 * updated after a sync operation is acknowledged back to this script (via
270 * {@link #acknowledgeCompletedOps(TransactionContext, LinkedList)}).
271 * Otherwise it will be possible for changes to be missed when the
272 * Synchronization Server is restarted or a connection error occurs.
273 * @return a value to store in the persistent state for the Sync Pipe. This is
274 * usually
275 * a change number, but if a changelog table is not used to detect
276 * changes,
277 * this value should represent some other token to pass into
278 * {@link #setStartpoint(TransactionContext, SetStartpointOptions)}
279 * when the sync pipe starts up.
280 */
281 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE)
282 public abstract Serializable getStartpoint();
283
284 /**
285 * Return the next batch of change records from the database. Change records
286 * are just hints that a change happened; they do not include the actual data
287 * of the change. In an effort to never synchronize stale data, the
288 * Synchronization Server will go back and fetch the full source entry for
289 * each change record.
290 * <p>
291 * On the first invocation, this should return changes starting from the
292 * startpoint that was set by
293 * {@link #setStartpoint(TransactionContext, SetStartpointOptions)}. This
294 * method is responsible for updating the internal state such that subsequent
295 * invocations do not return duplicate changes.
296 * <p>
297 * The resulting list should be limited by <code>maxChanges</code>. The
298 * <code>numStillPending</code> reference should be set to the estimated
299 * number of changes that haven't yet been retrieved from the changelog table
300 * when this method returns, or zero if all the current changes have been
301 * retrieved.
302 * <p>
303 * <b>IMPORTANT</b>: While this method needs to keep track of which changes
304 * have already been returned so that it does not return them again, it should
305 * <b>NOT</b> modify the official startpoint. The internal value for the
306 * startpoint should only be updated after a sync operation is acknowledged
307 * back to this script (via
308 * {@link #acknowledgeCompletedOps(TransactionContext, LinkedList)}).
309 * Otherwise it will be possible for changes to be missed when the
310 * Synchronization Server is restarted or a connection error occurs. The
311 * startpoint should not change as a result of this method.
312 * <p>
313 * A {@link TransactionContext} is provided, which allows
314 * controlled access to the target database. The context will contain a fresh
315 * fresh connection (i.e. a new transaction), and the Synchronization Server
316 * will always commit or rollback the transaction automatically, depending on
317 * whether this method returns normally or throws an exception. Implementers
318 * may optionally perform their own transaction management within this method
319 * if necessary.
320 * <p>
321 * This method <b>does not need to be thread-safe</b>. It will be invoked
322 * repeatedly by a single thread, based on the polling interval set in the
323 * Sync Pipe configuration.
324 * @param ctx
325 * a TransactionContext which provides a valid JDBC connection to the
326 * database.
327 * @param maxChanges
328 * the maximum number of changes to retrieve
329 * @param numStillPending
330 * this should be set to the number of unretrieved changes that
331 * are still pending after this batch has been retrieved. This will
332 * be passed in
333 * as zero, and may be left that way if the actual value cannot be
334 * determined.
335 * @return a list of {@link DatabaseChangeRecord} instances, each
336 * corresponding
337 * to a row in the changelog table (or the equivalent if some other
338 * change
339 * tracking mechanism is being used). If there are no new changes to
340 * return, this
341 * method should return an empty list.
342 * @throws SQLException
343 * if there is any error while retrieving the next batch of changes
344 */
345 public abstract List<DatabaseChangeRecord> getNextBatchOfChanges(
346 final TransactionContext ctx,
347 final int maxChanges,
348 final AtomicLong numStillPending)
349 throws SQLException;
350
351 /**
352 * Return a full source entry (in LDAP form) from the database, corresponding
353 * to the {@link DatabaseChangeRecord} that is passed in through the
354 * {@link SyncOperation}. This method should perform any queries necessary to
355 * gather the latest values for all the attributes to be synchronized.
356 * <p>
357 * A {@link TransactionContext} is provided, which allows
358 * controlled access to the target database. The context will contain a fresh
359 * fresh connection (i.e. a new transaction), and the Synchronization Server
360 * will always commit or rollback the transaction automatically, depending on
361 * whether this method returns normally or throws an exception. Implementers
362 * may optionally perform their own transaction management within this method
363 * if necessary.
364 * <p>
365 * This method <b>must be thread safe</b>, as it will be called repeatedly and
366 * concurrently by each of the Sync Pipe worker threads as they process
367 * entries.
368 * @param ctx
369 * a TransactionContext which provides a valid JDBC connection to the
370 * database.
371 * @param operation
372 * the SyncOperation which identifies the database "entry" to
373 * fetch. The DatabaseChangeRecord can be obtained by calling
374 * <code>operation.getDatabaseChangeRecord()</code>.
375 * This is returned by
376 * {@link #getNextBatchOfChanges(TransactionContext, int, AtomicLong)}
377 * or by
378 * {@link #listAllEntries(TransactionContext, String, BlockingQueue)}
379 * .
380 * @return a full LDAP Entry, or null if no such entry exists.
381 * @throws SQLException
382 * if there is an error fetching the entry
383 */
384 public abstract Entry fetchEntry(final TransactionContext ctx,
385 final SyncOperation operation)
386 throws SQLException;
387
388 /**
389 * Provides a way for the Synchronization Server to acknowledge back to the
390 * script which sync operations it has processed. This method should update
391 * the official startpoint which was set by
392 * {@link #setStartpoint(TransactionContext, SetStartpointOptions)} and is
393 * returned by {@link #getStartpoint()}.
394 * <p>
395 * <b>IMPORTANT</b>: The internal value for the startpoint should only be
396 * updated after a sync operation is acknowledged back to this script (via
397 * this method). Otherwise it will be possible for changes to be missed when
398 * the Synchronization Server is restarted or a connection error occurs.
399 * <p>
400 * A {@link TransactionContext} is provided in case the acknowledgment needs
401 * to make it all the way back to the database itself (for example if you were
402 * using Oracle's Change Data Capture). The context will contain a fresh
403 * fresh connection (i.e. a new transaction), and the Synchronization Server
404 * will always commit or rollback the transaction automatically, depending on
405 * whether this method returns normally or throws an exception. Implementers
406 * may optionally perform their own transaction management within this method
407 * if necessary.
408 *
409 * @param ctx
410 * a TransactionContext which provides a valid JDBC connection to the
411 * database.
412 * @param completedOps
413 * a list of {@link SyncOperation}s that have finished processing.
414 * The records are listed in the order they were
415 * first detected.
416 * @throws SQLException
417 * if there is an error acknowledging the changes back to the
418 * database
419 */
420 public abstract void acknowledgeCompletedOps(
421 final TransactionContext ctx,
422 final LinkedList<SyncOperation> completedOps)
423 throws SQLException;
424
425 /**
426 * Performs a cleanup of the changelog table (if desired). There is a
427 * background thread that periodically invokes this method. It should remove
428 * any rows in the changelog table that are more than
429 * <code>maxAgeMillis</code> milliseconds old.
430 * <p>
431 * <b>NOTE:</b> If the system clock on the database server is not in sync with
432 * the system clock on the Synchronization Server, this method should query
433 * the database for its current time in order to determine the cut-off point
434 * for deleting changelog records.
435 * <p>
436 * A {@link TransactionContext} is provided, which allows
437 * controlled access to the target database. The context will contain a fresh
438 * fresh connection (i.e. a new transaction), and the Synchronization Server
439 * will always commit or rollback the transaction automatically, depending on
440 * whether this method returns normally or throws an exception. Implementers
441 * may optionally perform their own transaction management within this method
442 * if necessary.
443 * <p>
444 * If a separate mechanism will be used to manage the changelog table, this
445 * method may be implemented as a no-op and always return zero. This is how
446 * the default implementation behaves.
447 * @param ctx
448 * a TransactionContext which provides a valid JDBC connection to the
449 * database.
450 * @param maxAgeMillis
451 * the period of time (in milliseconds) after which a changelog table
452 * record should be deleted
453 * @return the number of rows that were deleted from the changelog table
454 * @throws SQLException
455 * if there is an error purging records from the changelog table
456 */
457 public int cleanupChangelog(final TransactionContext ctx,
458 final long maxAgeMillis)
459 throws SQLException
460 {
461 //no implementation provided by default; this is an opt-in feature.
462 return 0;
463 }
464
465 /**
466 * Gets a list of all the entries in the database for a given entry type. This
467 * is used by the 'resync' command line tool. The default implementation
468 * throws a {@link UnsupportedOperationException}; subclasses should override
469 * if the resync functionality is needed.
470 * <p>
471 * The <code>entryType</code> is user-defined; it will be
472 * passed in on the command line for resync. The <code>outputQueue</code>
473 * should contain {@link DatabaseChangeRecord} objects with the
474 * <code>ChangeType</code> set to <i>resync</i>.
475 * <p>
476 * This method should not return until all the entries of the given entryType
477 * have been added to the output queue. Separate threads will concurrently
478 * drain entries from the queue and process them. The queue should not
479 * actually contain full entries, but rather DatabaseChangeRecord objects
480 * which identify the full database entries. These objects are then
481 * individually passed in to
482 * {@link #fetchEntry(TransactionContext, SyncOperation)}. Therefore,
483 * it is important to make sure that the DatabaseChangeRecord instances
484 * contain enough identifiable information (e.g. primary keys) for each entry
485 * so that the entry can be found again.
486 * <p>
487 * The lifecycle of resync is similar to that of real-time sync, with a few
488 * differences:
489 * <ol>
490 * <li>Stream out a list of all IDs in the database (for a given entryType)
491 * </li>
492 * <li>Fetch full source entry for an ID</li>
493 * <li>Perform any mappings and compute the equivalent destination entry</li>
494 * <li>Fetch full destination entry</li>
495 * <li>Diff the computed destination entry and actual destination entry</li>
496 * <li>Apply the minimal set of changes at the destination to bring it in sync
497 * </li>
498 * </ol>
499 * If the total set of entries is very large, it is fine to split up the work
500 * into multiple database queries within this method. The queue will not grow
501 * out of control because it blocks when it becomes full. The queue capacity
502 * is fixed at 1000.
503 * <p>
504 * A {@link TransactionContext} is provided, which allows
505 * controlled access to the target database. The context will contain a fresh
506 * fresh connection (i.e. a new transaction), and the Synchronization Server
507 * will always commit or rollback the transaction automatically, depending on
508 * whether this method returns normally or throws an exception. Implementers
509 * may optionally perform their own transaction management within this method
510 * if necessary.
511 *
512 * @param ctx
513 * a TransactionContext which provides a valid JDBC connection to the
514 * database.
515 * @param entryType
516 * the type of database entry to be fetched (this is specified
517 * on the CLI for the resync command)
518 * @param outputQueue
519 * a queue of DatabaseChangeRecord objects which will be individually
520 * fetched via
521 * {@link #fetchEntry(TransactionContext, SyncOperation)}
522 * @throws SQLException
523 * if there is an error retrieving the list of entries to resync
524 */
525 public void listAllEntries(final TransactionContext ctx,
526 final String entryType,
527 final BlockingQueue<DatabaseChangeRecord>
528 outputQueue) throws SQLException
529 {
530 throw new UnsupportedOperationException(
531 "The listAllEntries(TransactionContext,String,BlockingQueue) " +
532 "method must be implemented in the " +
533 getClass().getName() + " extension.");
534 }
535
536 /**
537 * Note: This method is deprecated and may be removed in a future release.
538 * All new and existing code should be changed to use the version of this
539 * method which includes the <code>entryType</code> parameter.
540 * <p>
541 * Gets a list of all the entries in the database from a given file input.
542 * This is used by the 'resync' command line tool. The default implementation
543 * throws a {@link UnsupportedOperationException}; subclasses should override
544 * if the resync functionality is needed for specific database records, which
545 * can be specified in the input file.
546 * <p>
547 * The format for the <code>inputLines</code> (e.g. the content of the file)
548 * is user-defined; it may be key/value pairs, primary keys, or full SQL
549 * statements, for example. The use of this method is triggered via the
550 * <i>--sourceInputFile</i> argument on the resync CLI. The
551 * <code>outputQueue</code> should contain {@link DatabaseChangeRecord}
552 * objects with the <code>ChangeType</code> set to <i>resync</i>.
553 * <p>
554 * This method should not return until all the entries specified by the input
555 * file have been added to the output queue. Separate threads will
556 * concurrently drain entries from the queue and process them. The queue
557 * should not actually contain full entries, but rather DatabaseChangeRecord
558 * objects which identify the full database entries. These objects are then
559 * individually passed in to
560 * {@link #fetchEntry(TransactionContext, SyncOperation)}. Therefore,
561 * it is important to make sure that the DatabaseChangeRecord instances
562 * contain enough identifiable information (e.g. primary keys) for each entry
563 * so that the entry can be found again.
564 * <p>
565 * The lifecycle of resync is similar to that of real-time sync, with a few
566 * differences:
567 * <ol>
568 * <li>Stream out a list of all IDs in the database (using the given input
569 * file)</li>
570 * <li>Fetch full source entry for an ID</li>
571 * <li>Perform any mappings and compute the equivalent destination entry</li>
572 * <li>Fetch full destination entry</li>
573 * <li>Diff the computed destination entry and actual destination entry</li>
574 * <li>Apply the minimal set of changes at the destination to bring it in sync
575 * </li>
576 * </ol>
577 * If the total set of entries is very large, it is fine to split up the work
578 * into multiple database queries within this method. The queue will not grow
579 * out of control because it blocks when it becomes full. The queue capacity
580 * is fixed at 1000.
581 * <p>
582 * A {@link TransactionContext} is provided, which allows
583 * controlled access to the target database. The context will contain a fresh
584 * fresh connection (i.e. a new transaction), and the Synchronization Server
585 * will always commit or rollback the transaction automatically, depending on
586 * whether this method returns normally or throws an exception. Implementers
587 * may optionally perform their own transaction management within this method
588 * if necessary.
589 *
590 * @param ctx
591 * a TransactionContext which provides a valid JDBC connection to the
592 * database.
593 * @param inputLines
594 * an Iterator containing the lines from the specified input file to
595 * resync (this is specified on the CLI for the resync command).
596 * These lines can be any format, for example a set of primary keys,
597 * a set of WHERE clauses, a set of full SQL queries, etc.
598 * @param outputQueue
599 * a queue of DatabaseChangeRecord objects which will be individually
600 * fetched via {@link #fetchEntry(TransactionContext, SyncOperation)}
601 * @throws SQLException
602 * if there is an error retrieving the list of entries to resync
603 */
604 @Deprecated
605 public void listAllEntries(final TransactionContext ctx,
606 final Iterator<String> inputLines,
607 final BlockingQueue<DatabaseChangeRecord>
608 outputQueue) throws SQLException
609 {
610 throw new UnsupportedOperationException(
611 "The listAllEntries(TransactionContext,Iterator,BlockingQueue) " +
612 "method must be implemented in the " +
613 getClass().getName() + " extension.");
614 }
615
616 /**
617 * Gets a list of all the entries in the database from a given file input.
618 * This is used by the 'resync' command line tool. The default implementation
619 * throws a {@link UnsupportedOperationException}; subclasses should override
620 * if the resync functionality is needed for specific database records, which
621 * can be specified in the input file.
622 * <p>
623 * The format for the <code>inputLines</code> (e.g. the content of the file)
624 * is user-defined; it may be key/value pairs, primary keys, or full SQL
625 * statements, for example. The use of this method is triggered via the
626 * <i>--sourceInputFile</i> argument on the resync CLI. The
627 * <code>outputQueue</code> should contain {@link DatabaseChangeRecord}
628 * objects with the <code>ChangeType</code> set to <i>resync</i>.
629 * <p>
630 * This method should not return until all the entries specified by the input
631 * file have been added to the output queue. Separate threads will
632 * concurrently drain entries from the queue and process them. The queue
633 * should not actually contain full entries, but rather DatabaseChangeRecord
634 * objects which identify the full database entries. These objects are then
635 * individually passed in to
636 * {@link #fetchEntry(TransactionContext, SyncOperation)}. Therefore,
637 * it is important to make sure that the DatabaseChangeRecord instances
638 * contain enough identifiable information (e.g. primary keys) for each entry
639 * so that the entry can be found again.
640 * <p>
641 * The lifecycle of resync is similar to that of real-time sync, with a few
642 * differences:
643 * <ol>
644 * <li>Stream out a list of all IDs in the database (using the given input
645 * file)</li>
646 * <li>Fetch full source entry for an ID</li>
647 * <li>Perform any mappings and compute the equivalent destination entry</li>
648 * <li>Fetch full destination entry</li>
649 * <li>Diff the computed destination entry and actual destination entry</li>
650 * <li>Apply the minimal set of changes at the destination to bring it in sync
651 * </li>
652 * </ol>
653 * If the total set of entries is very large, it is fine to split up the work
654 * into multiple database queries within this method. The queue will not grow
655 * out of control because it blocks when it becomes full. The queue capacity
656 * is fixed at 1000.
657 * <p>
658 * A {@link TransactionContext} is provided, which allows
659 * controlled access to the target database. The context will contain a fresh
660 * fresh connection (i.e. a new transaction), and the Synchronization Server
661 * will always commit or rollback the transaction automatically, depending on
662 * whether this method returns normally or throws an exception. Implementers
663 * may optionally perform their own transaction management within this method
664 * if necessary.
665 *
666 * @param ctx
667 * a TransactionContext which provides a valid JDBC connection to the
668 * database.
669 * @param entryType
670 * the type of database entry to be fetched (this is specified
671 * on the CLI for the resync command)
672 * @param inputLines
673 * an Iterator containing the lines from the specified input file to
674 * resync (this is specified on the CLI for the resync command).
675 * These lines can be any format, for example a set of primary keys,
676 * a set of WHERE clauses, a set of full SQL queries, etc.
677 * @param outputQueue
678 * a queue of DatabaseChangeRecord objects which will be individually
679 * fetched via {@link #fetchEntry(TransactionContext, SyncOperation)}
680 * @throws SQLException
681 * if there is an error retrieving the list of entries to resync
682 */
683 public void listAllEntries(final TransactionContext ctx,
684 final String entryType,
685 final Iterator<String> inputLines,
686 final BlockingQueue<DatabaseChangeRecord>
687 outputQueue) throws SQLException
688 {
689 throw new UnsupportedOperationException(
690 "The listAllEntries(TransactionContext,String,Iterator,BlockingQueue) "+
691 "method must be implemented in the " +
692 getClass().getName() + " extension.");
693 }
694 }